### RAG Pipelines - Data Ingestion to Vector DB ###

In [None]:
import os, certifi
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pathlib import Path

os.environ["SSL_CERT_FILE"] = certifi.where()
os.environ["REQUESTS_CA_BUNDLE"] = certifi.where()


from dotenv import load_dotenv
load_dotenv()


# CHROMA_API_KEY = os.getenv("CHROMA_API_KEY")
# CHROMA_TENANT = os.getenv("CHROMA_TENANT")
# CHROMA_DB=os.getenv("RAG_DB")



### Converting Raw PDF/File to Langchain Document Structure

In [None]:
### Read all the pdf's inside directory ###

all_documents = []
def process_documents(pdf_directory):
    """Processes all the PDF's that are present inside the given pdf_diretory"""

    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    print(f"Found {len(pdf_files)} PDF's to process.")
    print(f"\n\nPDF FILES = {pdf_files}\n\n")

    for pdf_file in pdf_files:
        print(f"Processing {pdf_file.name} pdf\n")

        try:

            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            ## Adding information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'


            all_documents.extend(documents)
            print(f"✅Loaded {len(documents)} documents")



        except Exception as e:
            print("❌Error in loading documents")

        print(f"\nTotal documents loaded: {len(documents)}")
        return documents


# Calling the above function
process_documents("../data/pdf_files/")


In [None]:
print(all_documents)

### Splitting the Document structure to Smaller chunks/pieces

In [None]:
# Splitting Text into Chunks

def split_documents(documents,chunk_size=1000,chunk_overlap=200):
    """Splits documents into smaller chunks for better RAG performance"""

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_overlap=chunk_overlap,
        chunk_size=chunk_size,
        length_function = len,
        separators=["\n\n","\n"," ",""]
    )

    split_docs= text_splitter.split_documents(documents)

    print(f"\nSplit {len(documents)} documents into {len(split_docs)} documents successfully")

    if split_docs:
        print(f"\nExample chunk: ")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")


    return split_docs

In [None]:
chunks = split_documents(all_documents)
print(chunks)

### Embedding & Vector Store DB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any , Tuple
from sklearn.metrics.pairwise import cosine_similarity



In [None]:
class EmbeddingsManager:
    """Helps to generate embeddings by using Hugging Face SEntence transformer model."""

    def __init__(self, model_name: str = "all-MiniLM-L6-V2"):
        """
            This init function is called as soon as the object for this class will be created.
            This function does 3 things:
                - Initializes the local variable 'model_name', set's it equal to the passed model_name (str)
                - Initializes the local variable model, initially set's it to none. We will set this model attribute to the actual 
                    sentence transformer model instance.

                - Calls the _load_model() function.
        """

        self.model_name = model_name
        self.model = None
        self._load_model()


    def _load_model(self):
        """This function is responsible to load the Hugging face sentence transformmer model."""

        try:

            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully: Embedding dimension: {self.model.get_sentence_embedding_dimension()}")


        except Exception as e:
            print(f"Error while loading {self.model_name}: {e}")
            raise


    def generate_embeddings(self,texts: List[str]) -> np.ndarray:

        """Generate embeddings for a list of texts
         
          Args: 
                    texts: List of text strings
                     
                      
                    Returns: 
                            numpy array of embeddings with shape (len(texts), embedding_dimensions) """
        
        if not self.model:
            raise ValueError("Model not loaded")
        

        print(f"Generate embeddings for {len(texts)} documents")
        embeddings = self.model.encode(texts,show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings
    

    def get_embeddings_dimensions(self):
        """Get the sentence embedding dimensions for a model"""

        if not self.model:
            raise ValueError("Model not loaded correctly")

        return self.model.get_sentence_embedding_dimension()    




In [None]:
embedding_manager = EmbeddingsManager()


### Initializing Vector Store

In [None]:
import os
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any , Tuple
from sklearn.metrics.pairwise import cosine_similarity


class VectorStore:
    """Manages vector embeddings in a ChromaDB vector store"""

    def __init__(self, collection_name: str ="pdf_files", persistDirectory: str = "../data/vetor_store"):
        
        self.collection_name = collection_name
        self.persistDirectory = persistDirectory
        self.client = None
        self.initialize_store()
        self.collection = self.client.get_or_create_collection(name=self.collection_name)


    def initialize_store(self):

        """Initializes ChromaDB client and collection """

        try:

            os.makedirs(self.persistDirectory, exist_ok=True) 
            # Use local persistent client instead of cloud client
            self.client = chromadb.PersistentClient(path=self.persistDirectory)

            # Get or create a collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description":"PFDF document embeddings for RAG"}
            )

            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error while initializing ChromaDB client: {e}")
            raise

    def _sanitize_metadata(self, metadata: Any) -> Dict[str, Any]:
        """Ensure metadata is a flat dict with serializable values and no None.
        Unsupported values are stringified. Keys with None are dropped.
        """
        if not isinstance(metadata, dict):
            return {}
        clean: Dict[str, Any] = {}
        for k, v in metadata.items():
            if v is None:
                continue
            if isinstance(v, (str, int, float, bool)):
                clean[k] = v
            else:
                try:
                    clean[k] = str(v)
                except Exception:
                    # skip if completely unserializable
                    pass
        return clean

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
            Add documents and their embeddings to the vector store.
        """


        try:

            if(len(documents) != len(embeddings)):
                raise ValueError("Documents and embeddings must have the same length")

            print(f"Adding {len(documents)} documents to the Vector STore...")


            # Prepare data for ChromaDB

            ids = []
            metadatas = []
            documents_text = []
            embeddings_list = []

            assert len(documents) == len(embeddings), (
                f"Mismatch: {len(documents)} documents vs {len(embeddings)} embeddings"
            )

            for i, (doc,embedding) in enumerate(zip(documents,embeddings)):

                # Generate unique ID
                doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
                ids.append(doc_id)

                # Extract content and metadata robustly
                if hasattr(doc, "page_content"):
                    content = getattr(doc, "page_content", "")
                    raw_metadata = getattr(doc, "metadata", {})
                elif isinstance(doc, dict):
                    content = doc.get("page_content") or doc.get("text") or str(doc)
                    raw_metadata = doc.get("metadata", {})
                elif isinstance(doc, str):
                    content = doc
                    raw_metadata = {}
                else:
                    content = str(doc)
                    raw_metadata = {}

                content = content if isinstance(content, str) else str(content)

                # Sanitize metadata and add our fields
                metadata = self._sanitize_metadata(raw_metadata)
                metadata['doc_index'] = i
                metadata['content_length'] = len(content)
                metadatas.append(metadata)

                # Document content
                documents_text.append(content)

                # Embeddings
                embeddings_list.append(embedding.tolist())

            self.collection.add(
                    ids = ids,
                    documents = documents_text,
                    embeddings = embeddings_list,
                    metadatas = metadatas
                )

            print(f"Successfully added {len(documents)} documents to ChromDB collection.")
            print(f"Total documents in collection: {self.collection.count()}")


        except Exception as e:
            print(f"Error while adding documents/embeddings to ChromaDB collection: {e}")
            raise



vector_store = VectorStore()
vector_store

In [None]:
# Firstly, we will convert the Textual Chunks into Embeddings.

texts = [getattr(doc, "page_content", str(doc)) for doc in chunks]

# generating embeddings

embeddings = embedding_manager.generate_embeddings(texts)

# passing the embeddings to ChromaDB to store (use original docs so metadata is kept)

vector_store.add_documents(chunks, embeddings)

### Retriever Pipeline from VectorSTore (Pipeline 2)


In [None]:
class RAGRetriever:
    """
        The job of this class is to define all the member functions that help to retrieve the result from the Vector store
        The retrieval will be done based on the user's query.
    """

    def __init__(self,vector_store: VectorStore, embeddings_manager: EmbeddingsManager):
        """
            This init function initializes two class attributes:
                - A Vector Store.
                - An embeddings manager to convert text to embeddings, then do semantic search and get result from Vector Store
        """

        self.vector_store = vector_store
        self.embeddings_manager = embeddings_manager


    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0)-> List[Dict[str,Any]]:

        """
            1. This function takes the user's query , the number of documents to be returned (the top matches), and a score_threshold variable.

            2. As the name suggests, this function performs the retrieval from the Vector Store attribute of the RAGRetriever class, so that 
            we can provide extra context to our LLM.
        """

        print(f"Retrieving documents for query: {query}")
        print(f"Top K : {top_k}, Score Threshold: {score_threshold}")

        #Convert the Query into an embedding (so we can match embedded query vector with embedded vector documents)

        queryEmbeddings = self.embeddings_manager.generate_embeddings([query])[0]

        retrieved_docs = []

        #Search in Vector Store for top k matched vectors (documents)

        try:

            results = self.vector_store.collection.query(
                query_embeddings=[queryEmbeddings.tolist()],
                n_results = top_k
                )
            

            if(results['documents'] and results['documents'][0]):
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids= results['ids'][0]


                for i, (doc_id,document,metadata,distance) in enumerate(zip(ids,documents,metadatas,distances)):
                    # Convert distances to Similary Score
                    similarity_score = 1-distance 

                    if(similarity_score >= score_threshold):
                        retrieved_docs.append({
                            'id': doc_id,
                            'content':document ,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })


            print(f"Retrieved Docs: {len(retrieved_docs)}\n")
            return retrieved_docs


        except Exception as e:
            print(f"An error ocurred while performing Retrieval of documents: {e}")
            





In [None]:
rag_retriever = RAGRetriever(vector_store=vector_store, embeddings_manager=embedding_manager)


### Integrating the Vector DB context with LLM Output (Augmented Generation)

In [None]:
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv

load_dotenv()

groq_api_key = os.getenv("GROQ_API_KEY")


### Initializing the Groq LLM
llm = ChatGroq(api_key=groq_api_key, model_name="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)


### simple RAG function.

def simple_rag_function(query,retriever = rag_retriever,llm = llm,top_k=3):

    ###Retrieving the context
    retrievedContext = rag_retriever.retrieve(query=query,top_k=top_k)
    context = "\n\n".join([doc['content'] for doc in retrievedContext]) if retrievedContext else ""


    if not context:
        return "No Relevant context found to the query."
    

    ###Generating the Final Answer using Groq LLM.

    prompt = f"""Use the following context to answer the question concisely.

        Context:
        {context}

        Question: {query}

        Answer:""" 
    

    llm_response = llm.invoke([prompt.format(context=context,query=query)])

    return llm_response.content




In [None]:
answer = simple_rag_function(query="")
print(answer)