## RAG Pipelines - Data Ingestion to Vector DB Pipeline

In [None]:

# Importing the necessary libraries
from msilib.schema import Class
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path
import torch

In [2]:
print("Current working directory:", os.getcwd())

Current working directory: D:\Projects\GitHub Repository_AI-Projects_Destination\AI-Projects\notebooks


In [3]:
# Read all the files inside the directory and sub-directories   
def process_all_pdfs(directory_path):
    """Process all PDF files in the given directory and return a list of documents."""
    all_documents = []
    pdf_files = list(Path(directory_path).rglob("**/*.pdf"))  # Recursively find all PDF files

    print(f"Found {len(pdf_files)} PDF files.")

#Batch PDF Document Loading and Metadata Enrichment using PyMuPDFLoader
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file}")
        try:
            loader = PyMuPDFLoader(str(pdf_file))
            documents = loader.load()

            #Add source information also to the document metadata
            for doc in documents:
                doc.metadata["source"] = str(pdf_file)
                doc.metadata["file_type"] = "pdf"

            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages from {pdf_file}")

        except Exception as e:
            print(f"Following error occurred while loading {pdf_file} with PyMuPDFLoader: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")

    return all_documents

all_pdf_documents = process_all_pdfs("../data/pdf/")
#print(all_pdf_documents) # Print the first document to verify

Found 2 PDF files.
Processing file: ..\data\pdf\aao5646-kroodsma-sm.pdf
Loaded 40 pages from ..\data\pdf\aao5646-kroodsma-sm.pdf
Processing file: ..\data\pdf\AI_ML_DS_Interview_Roadmap.pdf
Loaded 2 pages from ..\data\pdf\AI_ML_DS_Interview_Roadmap.pdf

Total documents loaded: 42


In [4]:
# Text Splitting documents into chunks

def text_splitting(documents, chunk_size=1000, chunk_overlap=100):
    """Split documents into smaller chunk for better RAG Performance"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    try:
        splits = text_splitter.split_documents(documents)
        print(f"Split {len(documents)} documents into {len(splits)} chunks.")
    except Exception as e:
        print(f"Error splitting document {documents.metadata.get('source', 'unknown')}: {e}")

    if splits:
        print(f"\nFirst Chunk as Example Chunk:")
        print(f"\nFirst chunk preview: {splits[0].page_content[:200]}...")  # Print first 200 characters of the first chunk
        print(f"First chunk metadata: {splits[0].metadata}")
        print(f"First Chunk Content: {splits[0].page_content}")
    return splits

split_chunks = text_splitting(all_pdf_documents, chunk_size=1000, chunk_overlap=100)
print(split_chunks[0]) # Print the first chunk to verify


Split 42 documents into 112 chunks.

First Chunk as Example Chunk:

First chunk preview: www.sciencemag.org/content/359/6378/904/suppl/DC1 
 
 
 
 
 
 
 
Supplementary Materials for 
 
Tracking the global footprint of fisheries 
 
David A. Kroodsma,* Juan Mayorga, Timothy Hochberg, Nathan...
First chunk metadata: {'producer': 'Adobe PDF Library 11.0', 'creator': 'Acrobat PDFMaker 11 for Word', 'creationdate': '2018-02-14T15:19:51-05:00', 'source': '..\\data\\pdf\\aao5646-kroodsma-sm.pdf', 'file_path': '..\\data\\pdf\\aao5646-kroodsma-sm.pdf', 'total_pages': 40, 'format': 'PDF 1.6', 'title': '', 'author': 'David Kroodsma', 'subject': '', 'keywords': '', 'moddate': '2018-02-14T15:24:29-05:00', 'trapped': '', 'modDate': "D:20180214152429-05'00'", 'creationDate': "D:20180214151951-05'00'", 'page': 0, 'file_type': 'pdf'}
First Chunk Content: www.sciencemag.org/content/359/6378/904/suppl/DC1 
 
 
 
 
 
 
 
Supplementary Materials for 
 
Tracking the global footprint of fisheries 
 
David A. 

In [5]:
# Embedding & VectorStore DB

import numpy as np
from sentence_transformers import SentenceTransformer ## Embedding Model
#from langchain.embeddings import HuggingFaceEmbeddings
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity


class EmbeddingManager:
    """The class manages embedding generation using a SentenceTransformer model."""

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        """Initialize the Sentence Transformer Embedding model.

        Args:
            model_name (str): HuggingFace model name for sentence embeddings.
        """
        self.model = None
        self.model_name = model_name
        self._load_model()

    def _load_model(self):
        """Load the Sentence Transformer embedding model. """
        try:
            print(f"Loading model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Loaded embedding model: {self.model_name} successfully. Embedding Dimension: {self.model.get_sentence_embedding_dimension()}")

        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embedding(self, chunk_text: str) -> np.ndarray:
        """Generate embedding for list of chunk texts.

        Args:
            chunk_text : list of texts to generate embeddings for.

        Returns :
        numpy array of embeddings with shape (len(chunk_text), embedding_dimension)
        """

        if self.model is None:
            raise ValueError("Embedding model is not loaded.")
        
        print(f"Generating embeddings for {len(chunk_text)} number of chunk texts...")
        #embeddings = self.model.encode(chunk_text, show_progress_bar=True, batch_size=16, convert_to_numpy=True, device="cuda" if torch.cuda.is_available() else "cpu")
        embeddings = self.model.encode(chunk_text, show_progress_bar=True)

        print(f"embeddings created")
        print(f"Generated embeddings with shape: {embeddings.shape}")

        return embeddings

    """
    def embedding_dimension(self) -> int:
        # Return the dimension of the embeddings
        if self.model is None:
            raise ValueError("Embedding model is not loaded.")
        return self.model.get_sentence_embedding_dimension() # Not necessary to have a separate function created to check the embedding dimension and print it out. As you can see its already included under _load_model() function.
    """

# Initialize Embedding Manager
embedding_manager = EmbeddingManager()
embedding_manager

Loading model: all-MiniLM-L6-v2
Loaded embedding model: all-MiniLM-L6-v2 successfully. Embedding Dimension: 384


<__main__.EmbeddingManager at 0x20f490c5970>

In [6]:
# VECTOR STORE - ChromaDB

class VectorStoreManager:
    """Manages ChromaDB vector store operations including adding, querying, and deleting documents."""

    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/VectorStore/chroma_db"):
        """
        Initialize ChromaDB client and collection.

        Args:
            collection_name (str): Name of the ChromaDB collection.
            persist_directory (str): Directory to persist the vector store or ChromaDB data.
        """
        self.client = None
        self.collection = None
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        print(1)
        self._initialize_store()  # Connect to or create the index / storage layer where your RAG data lives.
    
    def _initialize_store(self):
        """Initialize the ChromaDB Client and Collection."""
        try:
            # Create persistent ChromaDB Client
            os.makedirs(self.persist_directory, exist_ok=True)  # Create directory if it doesn't exist
            print(2)

            try:
                self.client = chromadb.PersistentClient(path=self.persist_directory,)
                print(3)
            except Exception as e:
                print(f" Error creating Chromadb Client :{e}")

            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF Documents Embeddings For RAG"}
            )
            print(4)

            print(f"✅ Initialized ChromaDB vector store: {self.collection_name}")
            print(f"📦 Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"❌ Error initializing ChromaDB vector store: {e}")
            raise

    def add_documents(self, chunks: List[Any], embeddings: np.ndarray):
        """
        Add chunks document and embeddings to the ChromaDB collection.

        Args:
            chunks (List[Any]): List of chunks with 'page_content' and 'metadata'.
            embeddings (np.ndarray): Corresponding embeddings for list of LangChain chunks.
        """
        if not chunks:
            print("⚠️ No chunks to add as documents.")
            return
        if len(chunks) != len(embeddings):
            raise ValueError("Number of chunks and embeddings must match.")
        
        print(f"🧩 Adding {len(chunks)} chunks as documents to the vector store...")

        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        chunks_text_doc = []
        embeddings_list = []

        print(3)

        for i, (doc, emb) in enumerate(zip(chunks, embeddings)):
            print(4)
            doc_id = str(uuid.uuid4())  # Generate a unique ID for each chunk document. # Generate a unique ID for each chunk document # can try out another unique ID creation as per Krish Naik's RG LangChain Video also required
            ids.append(doc_id)
            print(5)

            # Prepare metadata
            metadata = dict(doc.metadata) if hasattr(doc, "metadata") else {}
            print(6)

            metadata["chunk_index"] = i
            metadata["content_length"] = len(getattr(doc, "page_content", ""))
            print(7)

            metadatas.append(metadata)
            print(8)


            # Prepare document text
            chunks_text_doc.append(getattr(doc, "page_content", ""))
            print(9)


            # Prepare embeddings
            embeddings_list.append(emb.tolist())  # Convert numpy array to list
            print(10)
            
            print (f"count: {i}")

        
        # Add to ChromaDB collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=chunks_text_doc
            )
            
            print(f"✅ Added {len(documents)} chunk documents to the vector store.")
            print(f"📈 Total chunk documents in collection now: {self.collection.count()}")

        except Exception as e:
            print(f"❌ Error adding chunk documents to vector store: {e}")
            raise

# Instantiate and verify initialization
vector_store_manager = VectorStoreManager()
vector_store_manager

1
2
3
4
✅ Initialized ChromaDB vector store: pdf_documents
📦 Existing documents in collection: 0


<__main__.VectorStoreManager at 0x20f4930e780>

In [None]:
# Convert Text to Embeddings

chunk_text = [doc.page_content for doc in split_chunks]
print(1)
print(f"{max(len(t) for t in chunk_text)}")
embeddings = embedding_manager.generate_embedding(chunk_text[:5])
print(2)
print(f"Embeddings shape: {embeddings.shape}")

# Store the embeddings in the chromadb collections
vector_store_manager.add_documents(split_chunks[:5], embeddings)
print(11)

1
998
Generating embeddings for 5 number of chunk texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

## RAG Pipelines - Retriever Pipeline From Vector Store

In [None]:
Class RAGRetriever