## Data Ingestion

In [None]:
import os
from langchain_community.document_loaders import PyMuPDFLoader, PyPDFLoader
from langchain_classic.text_splitter import RecursiveCharacterTextSplitter
from pathlib import Path

In [None]:
# Read all pdfs inside the directory
def process_all_pdfs(pdf_directory):
    all_documents = []
    pdf_dir = Path(pdf_directory)

    pdf_files = list(pdf_dir.glob("*/*.pdf"))

    print(f"Found {len(pdf_files)} PDF files to process")

    for pdf_file in pdf_files:
        print(f"\n Processing : {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'

            all_documents.extend(documents)
            print(f" ✓ Loaded {len(documents)} pages")

        except Exception as e:
            print(f" ✗ Error : {e}")
    print(f"\nTotal documents loaded : {len(all_documents)}")
    return all_documents

# Process all PDFs in a data directory
all_pdf_documents = process_all_pdfs("data/")

In [None]:
# Text Splitting into Chunks

def split_documents(documents, chunk_size = 1000, chunk_overlap=200):
    """Splitting docs into smaller chunks for better RAG Performance"""
    text_spitter = RecursiveCharacterTextSplitter(
        chunk_size = chunk_size,
        chunk_overlap = chunk_overlap,
        length_function = len,
        separators = ["\n\n", "\n", " ", ""]
    )
    split_docs = text_spitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    # Show example chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata : {split_docs[0].metadata}")
    return split_docs

chunks = split_documents(all_pdf_documents)

In [None]:
from langchain_core.documents import Document

In [None]:
doc=Document(
    page_content="Let this be the main content I am using to create RAG",
    metadata={
        "source":"example.txt",
        "pages":1,
        "author":"shireesha",
        "date_created": "2025-12-23"
    }
)
doc

In [None]:
## Create a simple txt file
import os
os.makedirs("../data/text_files", exist_ok=True)

In [None]:
sample_texts={
    "data/text_files/python_intro.txt":"""Python is a high-level, interpreted programming language known for its clear syntax and code readability, which was created in 1991. It is popular for beginners due to its gentle learning curve and is widely used in web development, data science, artificial intelligence, and automation. 

Key features include:
Dynamic Typing: Variable types are automatically determined at runtime, which simplifies coding.
Multiple Paradigms: It supports object-oriented, functional, and procedural programming styles.
Extensive Libraries: A vast ecosystem of libraries and the Python Package Index (PyPI) offer ready-to-use code for various tasks.""",
    "data/text_files/machine_learning.txt":"""Machine Learning (ML) is a type of Artificial Intelligence (AI) that lets computers learn from data to find patterns, make decisions, and predict outcomes, without being explicitly programmed for every task, using algorithms trained on vast datasets for tasks like image recognition, recommendations, and translation. Key types include Supervised Learning (labeled data, e.g., spam filters), Unsupervised Learning (unlabeled data, e.g., customer grouping), and Reinforcement Learning (learning via rewards/penalties for decision-making). """
}

for filepath,content in sample_texts.items():
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(content)


print("Sample text files created!")

In [None]:
### Read the text using text loaders of langchain
from langchain_community.document_loaders import TextLoader

loader = TextLoader("data/text_files/python_intro.txt", encoding="utf-8")

loader.load()

In [None]:
## Directory Loader
from langchain_community.document_loaders import DirectoryLoader

dir_loader=DirectoryLoader(
    "data/text_files",
    glob="**/*.txt",
    loader_cls=TextLoader,
    show_progress=True
)

text_docs=dir_loader.load()
text_docs

In [None]:
from langchain_community.document_loaders import PyMuPDFLoader, PyPDFLoader

# load all the files
dir_loader=DirectoryLoader(
    "data/pdf_files",
    glob='**/*.pdf',
    loader_cls=PyMuPDFLoader,
    show_progress=True
    )

pdf_docs=dir_loader.load()
pdf_docs


# Embedding and VectorStoreDB

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List,Dict,Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [None]:
class EmbeddingManager:
    def __init__(self, model_name: str='all-MiniLM-L6-V2'):
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model Loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model{self.model_name}:{e}")
            raise

    def generate_embeddings(self, texts:List[str])->np.ndarray:
        if not self.model:
            raise ValueError("Model not loaded")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts,show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings
        

In [None]:
# Initialize embedding manager

embedding_manager = EmbeddingManager()
embedding_manager

# VectorStore

In [None]:
class VectorStore:
    def __init__(self, collection_name: str = "pdf_docs", persist_directory:str='data/vector_store'):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client=None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection : {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents:List[Any], embeddings:np.ndarray):
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings")

        print(f"Adding {len(documents)} documents to vector store...")

        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text =[]
        embeddings_list = []

        for i,(doc,embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique id for each record
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)
            
            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_text.append(doc.page_content)

            # Embeddings
            embeddings_list.append(embedding.tolist())

        # Add to collection
        try:
            self.collection.add(
                ids = ids,
                embeddings= embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully added {len(documents)} documents to vector store")
            print(f"Total documents in collection : {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vector_store = VectorStore()
vector_store

## Convert the text to embeddings

In [None]:
texts = [doc.page_content for doc in chunks]

## Generate the Enbeddings

In [None]:
embeddings = embedding_manager.generate_embeddings(texts)

## Store everything in  the vector Database

In [None]:
vector_store.add_documents(chunks,embeddings)

## Retriever Pipeline from Vector Store

In [None]:
class RAGRetriever:
    def __init__(self, vector_store:VectorStore, embedding_manager:EmbeddingManager):
        self.vector_store=vector_store
        self.embedding_manager = embedding_manager
        
    def retrieve(self, query:str, top_k :int=5, score_threshold:float=0.0) -> List[Dict[str, Any]]:
        print(f"Retrieving documents for query : {query}")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")

        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # Search in Vector Store
        try:
            results = self.vector_store.collection.query(
                query_embeddings = [query_embedding.tolist()],
                n_results=top_k
            )

            # Process results as "Context in form of list"
            retrieved_docs = []

            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents,metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content':document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i+1                            
                        })

                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")

            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return[]


rag_retriever = RAGRetriever(vector_store, embedding_manager)


In [None]:
rag_retriever