In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
from typing import List, Dict, Any
from pydantic import BaseModel, Field

class SearchResult(BaseModel):
    content: str
    metadata: Dict = None

class ExpandedQueries(BaseModel):
    queries: List[str] = Field(..., description="List of queries")

class Response(BaseModel):
    response: str = Field(..., description="Summary from relevant docs")

In [3]:
query_expansion_promt = """
Given the following user question, generate three different versions of the query 
that could help retrieve relevant information. Make them diverse but related to the 
original question.

Original question: {question}

Return the result in the following format:
{format_instructions}
"""

final_prompt = """
Answer the question based on the following context and chat history. Generate the summary using relevant context to answer the question.
If you cannot answer the question based on the context, say so.

Context: {context}

Question: {question}

Return the result in the following format:
{format_instructions}
"""

In [4]:
from langchain.vectorstores import FAISS
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings

class RAGPipeline:
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash")
        self.embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
        self.setup_components()

    def setup_components(self):
        """Initialize all RAG components"""
        # Text splitter for document processing
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
        
        # Initialize empty vector store
        self.vector_store = None
        self.bm25_retriever = None
        self.ensemble_retriever = None
        
        # Setup prompt templates
        self.query_parser = PydanticOutputParser(pydantic_object=ExpandedQueries)
        self.query_expansion_prompt = PromptTemplate.from_template(
            template=query_expansion_promt,
            input=["question"],
            partial_variables={'format_instructions': self.query_parser.get_format_instructions()}
        )
        
        self.final_response_parser = PydanticOutputParser(pydantic_object=Response)
        self.final_prompt = PromptTemplate.from_template(
            template=final_prompt,
            input=["context", "question"],
            partial_variables={'format_instructions': self.final_response_parser.get_format_instructions()}
        )

    def ingest_documents(self, documents: List[str], metadata_list: List[Dict] = None):
        """Process and store documents in vector store and BM25"""
        if metadata_list is None:
            metadata_list = [{}] * len(documents)
            
        # Process documents
        docs = [
            Document(page_content=doc, metadata=meta)
            for doc, meta in zip(documents, metadata_list)
        ]
        splits = self.text_splitter.split_documents(docs)
        
        # Create ensemble retriever
        self.vector_store = FAISS.from_documents(splits, self.embeddings)
        self.bm25_retriever = BM25Retriever.from_documents(splits)
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[
                self.vector_store.as_retriever(search_kwargs={"k": 10}),
                self.bm25_retriever
            ],
            weights=[0.7, 0.3]
        )

    def expand_query(self, question: str) -> List[str]:
        """Generate multiple versions of the query using LLM"""
        chain = self.query_expansion_prompt | self.llm | self.query_parser
        result = chain.invoke({
            "question": question
        }).model_dump()

        return result["queries"]

    def retrieve_and_fuse(self, question: str, n: int) -> List[Document]:
        """Retrieve documents using multiple queries and fuse results"""
        if not self.ensemble_retriever:
            raise ValueError("No documents ingested yet. Please call ingest_documents first.")
            
        # Expand the query
        self.expanded_queries = self.expand_query(question)
        all_docs = []
        
        # Retrieve documents for each query
        for query in [question] + self.expanded_queries:
            docs = self.ensemble_retriever.invoke(query)
            all_docs.extend(docs)
            
        # Remove duplicates and sort by relevance
        seen = set()
        unique_docs = []
        for doc in all_docs:
            if doc.page_content not in seen:
                seen.add(doc.page_content)
                unique_docs.append(doc)
                
        return unique_docs[:n]

    def process_query(self, question: str, n: int) -> Dict[str, Any]:
        """Main method to process a query and generate response"""
        # Retrieve relevant documents
        relevant_docs = self.retrieve_and_fuse(question, n)
        docs = []
        for doc in relevant_docs:
            docs.append(
                SearchResult(
                    content=doc.page_content,
                    source=doc.metadata.get('source', 'unknown'),
                    metadata=doc.metadata
                ).model_dump()
            )
        
        # Format context from documents
        context = "\n\n".join(doc.page_content for doc in relevant_docs)
        
        # Create response chain and invoke it
        chain = self.final_prompt | self.llm | self.final_response_parser
        response = chain.invoke({
            "context": context,
            "question": question
        }).model_dump()
        
        return {
            "response": response["response"],
            "relevant_docs": docs,
            "expanded_queries": self.expanded_queries
        }

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
import os
import pytz
from datetime import datetime

def load_content(filename) -> str:
	root = "../assets/documents"
	filepath = os.path.join(root, filename)
	with open(filepath, "r", encoding='utf-8') as file:
		content = file.read()
	return content

def get_file_details(filename) -> Dict[str, str]:
	root = "../assets/documents"
	filepath = os.path.join(root, filename)

	ist = pytz.timezone("Asia/Kolkata")
	creation_time_utc = datetime.fromtimestamp(os.path.getctime(filepath))
	creation_time_ist = creation_time_utc.astimezone(ist).strftime("%Y-%m-%d %H:%M:%S %Z")

	return {
		"filename": os.path.basename(filepath),
		"creation_time": creation_time_ist,
		"size_kb": f"{round(os.path.getsize(filepath) / 1024, 2)} KB"
	}

# Example usage
def main(query: str, n: int = 5) -> Dict[str, Any]:
	# Initialize the pipeline
	rag = RAGPipeline()

	# Sample documents
	documents = [load_content(filepath) for filepath in os.listdir("../assets/documents")]
	metadata = [get_file_details(filepath) for filepath in os.listdir("../assets/documents")]

	# Ingest documents
	rag.ingest_documents(
		documents,
		metadata_list=metadata
	)

	result = rag.process_query(query, n)

	return result

In [6]:
query = "What is RAG? What is the importance of vector db in RAG implementation?"
result = main(query, 10)

In [7]:
result['expanded_queries']

['Explain Retrieval Augmented Generation (RAG) and the role of vector databases.',
 'How do vector databases contribute to the effectiveness of RAG systems?',
 'RAG architecture: benefits of using vector embeddings and similarity search.']

In [8]:
result['relevant_docs']

[{'content': '### Challenges and Considerations\n\n- **Latency**: Retrieving documents and generating responses can introduce delays.\n- **Knowledge Base Maintenance**: The quality of responses depends on the reliability and freshness of the external knowledge source.\n- **Computational Costs**: Running both retrieval and generation components can be resource-intensive.\n\n### Future of RAG\n\nWith ongoing advancements in AI, RAG is expected to become more efficient and widely adopted. Future improvements may include better retriever architectures, optimized response generation, and lower latency techniques to ensure seamless real-time applications.',
  'metadata': {'filename': 'rag.md',
   'creation_time': '2025-02-07 15:19:43 IST',
   'size_kb': '2.75 KB'}},
 {'content': '#### Working Mechanism\n\n1. **Query Processing**: The user provides an input query.\n2. **Document Retrieval**: The retriever fetches the top-k relevant documents from an external knowledge base.\n3. **Response Gen

In [10]:
result['response']

'Retrieval-Augmented Generation (RAG) is an advanced technique that combines retrieval-based models and generative models to improve the quality and relevance of generated text by incorporating external knowledge. Vector databases are specialized data storage solutions designed to efficiently handle high-dimensional vector data and support fast nearest neighbor search, making them essential for the retriever component in RAG to fetch relevant information from a knowledge base based on an input query.'