### RAG Pipelines - Data ingestion to Vector DB Pipeline

In [3]:
import os
from langchain_community.document_loaders import PyMuPDFLoader, PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [4]:
### Read all the PDFs inside the directory

def process_all_pdfs(pdf_directory):
    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob("**/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files")

    for pdf_file in pdf_files:
        print(f"Processing {pdf_file.name}")

        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            for doc in documents:
                doc.metadata["source_file"] = pdf_file.name
                doc.metadata["file_type"] = "pdf"

            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages")
        except Exception as e:
            print(f"Errors: {e}")

    print(f"Total documents loaded: {all_documents}")

    return all_documents

In [5]:
process_all_pdfs("../data/pdf")

Found 2 PDF files
Processing Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf
Loaded 2 pages
Processing Pawneet Singh Resume Main.pdf
Loaded 2 pages
Total documents loaded: [Document(metadata={'producer': 'Skia/PDF m141', 'creator': 'FlowCV - https://flowcv.com', 'creationdate': '2025-10-23T10:39:36+00:00', 'moddate': '2025-10-23T10:39:36+00:00', 'keywords': 'FlowCV – Online Resume Builder – https://flowcv.com', 'source': '../data/pdf/Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'file_type': 'pdf'}, page_content='PAWNEET SINGH\nStaff Engineer 2 & AI-Enabled Software Architect\npawneet.dev@gmail.com\n \n+91-9990997885\n \nBangalore, India\n \nlinkedin.com/in/pawneetdev\n \ngithub.com/pawneetdev\n \nSUMMARY\nStaff Engineer and Software Architect with close to 10 years of experience in software engineering, distributed \nsystems, and AI-driven development. S

[Document(metadata={'producer': 'Skia/PDF m141', 'creator': 'FlowCV - https://flowcv.com', 'creationdate': '2025-10-23T10:39:36+00:00', 'moddate': '2025-10-23T10:39:36+00:00', 'keywords': 'FlowCV – Online Resume Builder – https://flowcv.com', 'source': '../data/pdf/Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'file_type': 'pdf'}, page_content='PAWNEET SINGH\nStaff Engineer 2 & AI-Enabled Software Architect\npawneet.dev@gmail.com\n \n+91-9990997885\n \nBangalore, India\n \nlinkedin.com/in/pawneetdev\n \ngithub.com/pawneetdev\n \nSUMMARY\nStaff Engineer and Software Architect with close to 10 years of experience in software engineering, distributed \nsystems, and AI-driven development. Specialized in building scalable microservices, modernizing legacy \nplatforms, and integrating LLM-based automation and AIOps to improve reliability and engineering velocity. \n

In [10]:
## Text splitting into chunks (for better RAG performance)

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    if split_docs:
        print(f"\n Example chunk")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadat: {split_docs[0].metadata}")

    return split_docs

In [11]:
all_pdf_documents = process_all_pdfs("../data/pdf")
chunks = split_documents(all_pdf_documents)

print(chunks)

Found 2 PDF files
Processing Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf
Loaded 2 pages
Processing Pawneet Singh Resume Main.pdf
Loaded 2 pages
Total documents loaded: [Document(metadata={'producer': 'Skia/PDF m141', 'creator': 'FlowCV - https://flowcv.com', 'creationdate': '2025-10-23T10:39:36+00:00', 'moddate': '2025-10-23T10:39:36+00:00', 'keywords': 'FlowCV – Online Resume Builder – https://flowcv.com', 'source': '../data/pdf/Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'total_pages': 2, 'page': 0, 'page_label': '1', 'source_file': 'Pawneet_Singh_Staff_Engineer_AI_Architect_Resume.pdf', 'file_type': 'pdf'}, page_content='PAWNEET SINGH\nStaff Engineer 2 & AI-Enabled Software Architect\npawneet.dev@gmail.com\n \n+91-9990997885\n \nBangalore, India\n \nlinkedin.com/in/pawneetdev\n \ngithub.com/pawneetdev\n \nSUMMARY\nStaff Engineer and Software Architect with close to 10 years of experience in software engineering, distributed \nsystems, and AI-driven development. S

### Embedding and VectorStore DB

In [13]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        # https://huggingface.co/sentence-transformers
        self.model_name = model_name
        self.model = None
        self._load_model()
    
    def _load_model(self): # protected function starts with underscore
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimensions: {self.model.get_sentence_embedding_dimension()}") # embedding dimesions
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        if not self.model:
            raise ValueError("Model not loaded")

        print(f"Generating embeddings for {len(texts)} texts....")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings

In [26]:
em = EmbeddingManager()
em

Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimensions: <bound method SentenceTransformer.get_sentence_embedding_dimension of SentenceTransformer(
  (0): Transformer({'max_seq_length': 256, 'do_lower_case': False, 'architecture': 'BertModel'})
  (1): Pooling({'word_embedding_dimension': 384, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})
  (2): Normalize()
)>


<__main__.EmbeddingManager at 0x123eb8180>

### Vector Store

In [None]:
class VectorStore:
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            
            # https://www.trychroma.com/
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )

            print(f"Vector store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise
    
    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")

        print(f"Adding {len(documents)} documents to vector store...")

        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            metadata = dict(doc.metadata)
            metadata["doc_index"] = i
            metadata["content_length"] = len(doc.page_content)
            metadatas.append(metadata)

            documents_text.append(doc.page_content)

            embeddings_list.append(embedding.to_list())
        
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )

            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise


vector_store = VectorStore()
