In [6]:
import asyncio
import os
import sys
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.chains.summarize.chain import load_summarize_chain
from langchain.docstore.document import Document
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from pydantic import BaseModel, Field
from langchain_core.prompts import PromptTemplate
from helper_functions import encode_pdf, encode_from_string
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate


In [58]:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Retrieve the API key from the environment
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
    raise ValueError("OPENAI_API_KEY is not set in your environment.")
else:
    print("API key loaded successfully.")

API key loaded successfully.


In [59]:
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')

In [68]:
path = "Aka Book.pdf"

In [73]:
async def retry_with_exponential_backoff(coroutine, max_retries=5):
    """
    Retries a coroutine using exponential backoff upon encountering a RateLimitError.
    
    Args:
        coroutine: The coroutine to be executed.
        max_retries: The maximum number of retry attempts.
        
    Returns:
        The result of the coroutine if successful.
        
    Raises:
        The last encountered exception if all retry attempts fail.
    """
    for attempt in range(max_retries):
        try:
            # Attempt to execute the coroutine
            return await coroutine
        except RateLimitError as e:
            # If the last attempt also fails, raise the exception
            if attempt == max_retries - 1:
                raise e

            # Wait for an exponential backoff period before retrying
            await exponential_backoff(attempt)

    # If max retries are reached without success, raise an exception
    raise Exception("Max retries reached")


In [76]:
async def encode_pdf_hierarchical(path, chunk_size=1000, chunk_overlap=200, is_string=False):
    """
    Asynchronously encodes a PDF book into a hierarchical vector store using OpenAI embeddings.
    Includes rate limit handling with exponential backoff.
    
    Args:
        path: The path to the PDF file.
        chunk_size: The desired size of each text chunk.
        chunk_overlap: The amount of overlap between consecutive chunks.
        
    Returns:
        A tuple containing two FAISS vector stores:
        1. Document-level summaries
        2. Detailed chunks
    """
    
    # Load PDF documents
    if not is_string:
        loader = PyPDFLoader(path)
        documents = await asyncio.to_thread(loader.load)
    else:
        text_splitter = RecursiveCharacterTextSplitter(
            # Set a really small chunk size, just to show.
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            is_separator_regex=False,
        )
        documents = text_splitter.create_documents([path])


    # Create document-level summaries
    summary_llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=4000)
    summary_chain = load_summarize_chain(summary_llm, chain_type="map_reduce")
    
    async def summarize_doc(doc):
        """
        Summarizes a single document with rate limit handling.
        
        Args:
            doc: The document to be summarized.
            
        Returns:
            A summarized Document object.
        """
        # Retry the summarization with exponential backoff
        summary_output = await retry_with_exponential_backoff(summary_chain.ainvoke([doc]))
        summary = summary_output['output_text']
        return Document(
            page_content=summary,
            metadata={"source": path, "page": doc.metadata["page"], "summary": True}
        )

    # Process documents in smaller batches to avoid rate limits
    batch_size = 5  # Adjust this based on your rate limits
    summaries = []
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        batch_summaries = await asyncio.gather(*[summarize_doc(doc) for doc in batch])
        summaries.extend(batch_summaries)
        await asyncio.sleep(1)  # Short pause between batches

    # Split documents into detailed chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    detailed_chunks = await asyncio.to_thread(text_splitter.split_documents, documents)

    # Update metadata for detailed chunks
    for i, chunk in enumerate(detailed_chunks):
        chunk.metadata.update({
            "chunk_id": i,
            "summary": False,
            "page": int(chunk.metadata.get("page", 0))
        })

    # Create embeddings
    embeddings = OpenAIEmbeddings()

    # Create vector stores asynchronously with rate limit handling
    async def create_vectorstore(docs):
        """
        Creates a vector store from a list of documents with rate limit handling.
        
        Args:
            docs: The list of documents to be embedded.
            
        Returns:
            A FAISS vector store containing the embedded documents.
        """
        return await retry_with_exponential_backoff(
            asyncio.to_thread(FAISS.from_documents, docs, embeddings)
        )

    # Generate vector stores for summaries and detailed chunks concurrently
    summary_vectorstore, detailed_vectorstore = await asyncio.gather(
        create_vectorstore(summaries),
        create_vectorstore(detailed_chunks)
    )

    return summary_vectorstore, detailed_vectorstore

In [77]:
import asyncio  ### don't run this cell

# Assume your PDF path is stored in the variable "path"
path = "Aka Book.pdf"

# Use asyncio.run to execute your asynchronous function
summary_store, detailed_store = await encode_pdf_hierarchical(path)


  return float.__new__(cls, value)
  embeddings = OpenAIEmbeddings()


Vector stores have been created and saved successfully.


In [3]:
import os ### this one saves the embeddings results, no need to run

# Ensure the current working directory is correct
save_path = os.path.join(os.getcwd(), "vector_stores")  # This saves in the current directory

# Create the directory if it does not exist
if not os.path.exists(save_path):
    os.makedirs(save_path)

# Save the vector stores
summary_store.save_local(os.path.join(save_path, "summary_store"))
detailed_store.save_local(os.path.join(save_path, "detailed_store"))

print(f"Vector stores have been saved in: {save_path}")


NameError: name 'summary_store' is not defined

In [5]:
def retrieve_hierarchical(query, summary_vectorstore, detailed_vectorstore, k_summaries=3, k_chunks=5):
    """
    Performs a hierarchical retrieval using the query.

    Args:
        query: The search query.
        summary_vectorstore: The vector store containing document summaries.
        detailed_vectorstore: The vector store containing detailed chunks.
        k_summaries: The number of top summaries to retrieve.
        k_chunks: The number of detailed chunks to retrieve per summary.

    Returns:
        A list of relevant detailed chunks.
    """
    
    # Retrieve top summaries
    top_summaries = summary_vectorstore.similarity_search(query, k=k_summaries)
    
    relevant_chunks = []
    for summary in top_summaries:
        # For each summary, retrieve relevant detailed chunks
        page_number = summary.metadata["page"]
        page_filter = lambda metadata: metadata["page"] == page_number
        page_chunks = detailed_vectorstore.similarity_search(
            query, 
            k=k_chunks, 
            filter=page_filter
        )
        relevant_chunks.extend(page_chunks)
    
    return relevant_chunks

In [None]:
### this one reads the embeddings' results, you need to run this

vector_store_path = os.path.join(os.getcwd(), "vector_stores")

# Paths to the saved FAISS vector stores
summary_store_path = os.path.join(vector_store_path, "summary_store")
detailed_store_path = os.path.join(vector_store_path, "detailed_store")

# Initialize OpenAI embeddings (ensure your API key is set)
embeddings = OpenAIEmbeddings()

# Load the stored vector databases
summary_store = FAISS.load_local(summary_store_path, embeddings, allow_dangerous_deserialization=True)
detailed_store = FAISS.load_local(detailed_store_path, embeddings, allow_dangerous_deserialization=True)


  embeddings = OpenAIEmbeddings()


In [8]:
query = "describe the process differences between production of regular chocolate, milk chocolate and white chocolate?"
results = retrieve_hierarchical(query, summary_store, detailed_store)

# Print results
for chunk in results:
    print(f"Page: {chunk.metadata['page']}")
    print(f"Content: {chunk.page_content}...")  # Print first 100 characters
    print("---")

Page: 143
Content: Whole milk
Separation
Cream
Butter making
Addition
of sugar
Addition of
sugar and
cocoa mass
Buttermilk
Cheese manufacture
Whey
Demineralize
Dry Dry Dry Dry Dry
Yoghurt
Fermentation
Skim milkCasein
manufacture
Dry Dry Dry Dry Dry
Lactose
Crystallize
Demineralized
whey powder
Whey
powder
Skim milk
(non-fat)
powder
Cream
(high-fat)
powder
AMF/
fractions
Buttermilk
powder
Whole milk
powder
White
crumb
Chocolate
crumb
Yoghurt
powder
Ultra/f_iltration
Figure 5.1 Flow chart of dairy processes and products used in milk chocolate....
---
Page: 536
Content: Recipes   497
The milk chocolate recipes could also be made using “chocolate crumb” to 
replace some or all of the cocoa mass, milk powder and sugar (see Chapter 6). If 
a softer milk chocolate is required, the full cream milk powder can be replaced 
with skimmed milk powder and milk fat (butter oil).
Table 20.3 contains recipes for different types of white chocolate that can be 
made into tablets.
When making white chocol

In [None]:


# Define a new class that uses your hierarchical retrieval results
class HierarchicalRAG:
    def __init__(self, summary_store, detailed_store):
        # Save your stores (these are your preloaded FAISS vector stores)
        self.summary_store = summary_store
        self.detailed_store = detailed_store
        
        # Initialize the LLM
        self.llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo", max_tokens=4000)
        
        # Create a prompt template similar to AdaptiveRAG
        prompt_template = """Use the following pieces of context to answer the question at the end. 
If you don't know the answer, just say that you don't know—don't try to make up an answer.

{context}

Question: {question}
Answer:"""
        self.prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
        self.llm_chain = self.prompt | self.llm

    def answer(self, query: str) -> str:
        # Retrieve hierarchical results (this function should combine the summary and detailed stores)
        results = retrieve_hierarchical(query, self.summary_store, self.detailed_store)
        
        # (Optional) Print the retrieved chunks for debugging/inspection
        for chunk in results:
            print(f"Page: {chunk.metadata['page']}")
            print(f"Content: {chunk.page_content[:100]}...")  # First 100 characters
            print("---")
        
        # Combine all the retrieved page content into one context string.
        # If no relevant context is found, let the LLM know by providing a fallback message.
        if results:
            context = "\n".join([chunk.page_content for chunk in results])
        else:
            context = "No relevant context found."

        # Prepare input data for the prompt
        input_data = {"context": context, "question": query}
        
        # Invoke the chain to get the answer.
        answer = self.llm_chain.invoke(input_data)
        return answer.content




In [None]:
query = "Describe the processing steps from cocoa beans to cocoa butter"
rag_system = HierarchicalRAG(summary_store, detailed_store)
answer = rag_system.answer(query)
print(f"Answer: {answer}")

In [None]:
import ollama
from langchain.prompts import PromptTemplate

class HierarchicalRAG_mistral:
    def __init__(self, summary_store, detailed_store):
        self.summary_store = summary_store
        self.detailed_store = detailed_store

        # Use Ollama's locally installed Mistral model
        self.model_name = "mistral"  # Ensures it uses the 7B version

        # Create a prompt template similar to AdaptiveRAG
        self.prompt = PromptTemplate(
            template="""Use the following pieces of context to answer the question at the end.
            If you don't know the answer, just say that you don't know—don't try to make up an answer.

            Context:
            {context}

            Question: {question}
            Answer:""",
            input_variables=["context", "question"]
        )

    def answer(self, query: str) -> str:
        """Retrieve hierarchical results and generate an answer using Mistral via Ollama."""
        # Retrieve hierarchical results.
        results = retrieve_hierarchical(query, self.summary_store, self.detailed_store)

        # (Optional) Debug: Print retrieved chunks.
        for chunk in results:
            print(f"Page: {chunk.metadata['page']}")
            print(f"Content: {chunk.page_content[:100]}...")
            print("---")

        # Combine retrieved page contents into one context string.
        context = "\n".join([chunk.page_content for chunk in results]) if results else "No relevant context found."

        # Prepare the input data for the prompt.
        input_data = self.prompt.format(context=context, question=query)

        # Use Ollama's Mistral model to generate a response
        response = ollama.chat(model=self.model_name, messages=[{"role": "user", "content": input_data}])

        # Return the generated answer
        return response['message']['content']



In [None]:
# Example usage:
query = "Describe the differences between regular chocolate, milk chocolate, and white chocolate."
rag_system = HierarchicalRAG_mistral(summary_store, detailed_store)
answer = rag_system.answer(query)
print(f"Answer: {answer}")
