### Data Ingestion


In [1]:
### document structure

from langchain_core.documents import Document

In [2]:
doc=Document(
    page_content="This is the content of the document.", 
    metadata={
        "source": "user_manual.pdf",
        "pages":1,
        "author":"John Doe"}
    )

doc

Document(metadata={'source': 'user_manual.pdf', 'pages': 1, 'author': 'John Doe'}, page_content='This is the content of the document.')

In [3]:
## create a simple txt file

import os
os.makedirs("../data/text_files", exist_ok=True)

In [4]:
sample_texts={
    "../data/text_files/doc1.txt": """RAG Intoduction
    Retrieval-Augmented Generation (RAG) is a technique used to enhance large language models by combining text generation with information retrieval. Instead of relying only on the knowledge stored inside the model during training, RAG systems dynamically fetch relevant information from external data sources before generating a response.

In a typical RAG pipeline, documents such as PDFs, text files, or web pages are first loaded and converted into a structured format. These documents are then split into smaller chunks to ensure efficient processing. Each chunk is transformed into a numerical representation called an embedding, which captures the semantic meaning of the text.

When a user asks a question, the query is also converted into an embedding and compared against the stored document embeddings using a vector similarity search. The most relevant chunks are retrieved and passed to a language model as additional context. This allows the model to produce more accurate, up-to-date, and context-aware answers.

RAG is especially useful in applications like chatbots, question-answering systems, and enterprise knowledge assistants, where factual accuracy and domain-specific information are critical. By grounding model responses in retrieved documents, RAG helps reduce hallucinations and improves trustworthiness."""

}

for file_path, content in sample_texts.items():
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(content)

In [5]:
##TextLoader
from langchain_community.document_loaders import TextLoader

loader= TextLoader("../data/text_files/doc1.txt", encoding="utf-8")
documents=loader.load()
documents


  from .autonotebook import tqdm as notebook_tqdm


[Document(metadata={'source': '../data/text_files/doc1.txt'}, page_content='RAG Intoduction\n    Retrieval-Augmented Generation (RAG) is a technique used to enhance large language models by combining text generation with information retrieval. Instead of relying only on the knowledge stored inside the model during training, RAG systems dynamically fetch relevant information from external data sources before generating a response.\n\nIn a typical RAG pipeline, documents such as PDFs, text files, or web pages are first loaded and converted into a structured format. These documents are then split into smaller chunks to ensure efficient processing. Each chunk is transformed into a numerical representation called an embedding, which captures the semantic meaning of the text.\n\nWhen a user asks a question, the query is also converted into an embedding and compared against the stored document embeddings using a vector similarity search. The most relevant chunks are retrieved and passed to a 

In [6]:
## directory loader
from langchain_community.document_loaders import DirectoryLoader

dir_loader= DirectoryLoader("../data/text_files", 
glob="*.txt",  #Pattern to match files
loader_cls=TextLoader, #Loader class to use
loader_kwargs={"encoding":"utf-8"},#Loader specific arguments
show_progress=False
)

documents=dir_loader.load()
documents

[Document(metadata={'source': '..\\data\\text_files\\doc1.txt'}, page_content='RAG Intoduction\n    Retrieval-Augmented Generation (RAG) is a technique used to enhance large language models by combining text generation with information retrieval. Instead of relying only on the knowledge stored inside the model during training, RAG systems dynamically fetch relevant information from external data sources before generating a response.\n\nIn a typical RAG pipeline, documents such as PDFs, text files, or web pages are first loaded and converted into a structured format. These documents are then split into smaller chunks to ensure efficient processing. Each chunk is transformed into a numerical representation called an embedding, which captures the semantic meaning of the text.\n\nWhen a user asks a question, the query is also converted into an embedding and compared against the stored document embeddings using a vector similarity search. The most relevant chunks are retrieved and passed to

In [7]:
from reportlab.lib.pagesizes import LETTER
from reportlab.pdfgen import canvas
import os

# Ensure folder exists
os.makedirs("../data/pdf_files", exist_ok=True)

# ---------- PDF 1 ----------
pdf1_path = "../data/pdf_files/rag_intro.pdf"
c1 = canvas.Canvas(pdf1_path, pagesize=LETTER)

c1.drawString(72, 750, "Introduction to Retrieval-Augmented Generation (RAG)")
c1.drawString(72, 720, "RAG combines information retrieval with text generation.")
c1.drawString(72, 690, "It improves factual accuracy by grounding LLMs in documents.")

c1.save()

# ---------- PDF 2 ----------
pdf2_path = "../data/pdf_files/rag_pipeline.pdf"
c2 = canvas.Canvas(pdf2_path, pagesize=LETTER)

c2.drawString(72, 750, "RAG Pipeline Overview")
c2.drawString(72, 720, "1. Load documents (PDFs, text files, etc.)")
c2.drawString(72, 690, "2. Chunk documents")
c2.drawString(72, 660, "3. Generate embeddings")
c2.drawString(72, 630, "4. Retrieve relevant chunks for a query")

c2.save()

print("PDF files created successfully.")


PDF files created successfully.


In [8]:
from langchain_community.document_loaders import PyPDFLoader,PyMuPDFLoader

dir_loader= DirectoryLoader(
    "../data/pdf_files", 
    glob="*.pdf",  #Pattern to match files
    loader_cls=PyMuPDFLoader, #Loader class to use
    #loader_cls=PyPDFLoader,
   
    show_progress=False
)

pdf_documents=dir_loader.load()
pdf_documents

[Document(metadata={'producer': 'ReportLab PDF Library - (opensource)', 'creator': 'anonymous', 'creationdate': '2026-01-31T14:59:56+05:00', 'source': '..\\data\\pdf_files\\rag_intro.pdf', 'file_path': '..\\data\\pdf_files\\rag_intro.pdf', 'total_pages': 1, 'format': 'PDF 1.3', 'title': 'untitled', 'author': 'anonymous', 'subject': 'unspecified', 'keywords': '', 'moddate': '2026-01-31T14:59:56+05:00', 'trapped': '', 'modDate': "D:20260131145956+05'00'", 'creationDate': "D:20260131145956+05'00'", 'page': 0}, page_content='Introduction to Retrieval-Augmented Generation (RAG)\nRAG combines information retrieval with text generation.\nIt improves factual accuracy by grounding LLMs in documents.'),
 Document(metadata={'producer': 'ReportLab PDF Library - (opensource)', 'creator': 'anonymous', 'creationdate': '2026-01-31T14:59:56+05:00', 'source': '..\\data\\pdf_files\\rag_pipeline.pdf', 'file_path': '..\\data\\pdf_files\\rag_pipeline.pdf', 'total_pages': 1, 'format': 'PDF 1.3', 'title': 'un

In [9]:
print(type(pdf_documents))
print(type(pdf_documents[0]))

<class 'list'>
<class 'langchain_core.documents.base.Document'>


### embedding and vectorDB

In [10]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any,Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [11]:
pip install sentence-transformers


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 26.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [12]:
import numpy as np
from typing import List, Any
from sentence_transformers import SentenceTransformer
from langchain_text_splitters import RecursiveCharacterTextSplitter



class EmbeddingManager:
    """Handles chunking and embedding generation using SentenceTransformer models."""

    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model."""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(
                f"Model loaded successfully. "
                f"Embedding dimension: {self.model.get_sentence_embedding_dimension()}"
            )
        except Exception as e:
            print(f"Error loading model: {e}")
            raise e

    def chunk_documents(self, documents: List[Any]) -> List[Any]:
        """Split documents into overlapping chunks."""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )

        chunks = splitter.split_documents(documents)
        print(f"[INFO] Split {len(documents)} documents into {len(chunks)} chunks.")
        return chunks

    def embed_chunks(self, chunks: List[Any]) -> np.ndarray:
        """Generate embeddings for document chunks."""
        if not self.model:
            raise ValueError("Embedding model is not loaded.")

        texts = [chunk.page_content for chunk in chunks]
        print(f"[INFO] Generating embeddings for {len(texts)} chunks...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"[INFO] Embeddings shape: {embeddings.shape}")
        return embeddings


### VectorStore

In [20]:
import os

class VectorStore:
    """ Manages storage and retrieval of document embeddings using ChromaDB. """
    def __init__(self, collection_name: str = "pdf_documents",persist_directory: str="../data/vector_store"):
        """
        Initialize the vector store with a specified collection name.
        Args:
            collection_name (str): Name of the ChromaDB collection.
            persist_directory (str): Directory to persist vector store.
        """
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()


    def _initialize_store(self):
        """ Initialize ChromaDB client and collection. """
        try:
            #Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            print("Initializing ChromaDB client...")
            self.client = chromadb.PersistentClient(path=self.persist_directory )

            self.collection = self.client.get_or_create_collection(name=self.collection_name,metadata={"description":"PDF Document embeddings for RAG"})
            print(f"ChromaDB collection '{self.collection_name}' is ready.")
            print(f"Existing documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"Error initializing ChromaDB: {e}")
            raise e
        

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """
        Add documents and their embeddings to the vector store.
        Args:
            documents (List[Any]): List of Langchain Documents.
            embeddings (np.ndarray): Corresponding embeddings for the documents.
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents and embeddings must match.")
       

        print(f"Adding {len(documents)} documents to the vector store...")

        #Prepare data for ChromaDB
        ids=[]
        metadatas=[]
        documents_text=[]
        embeddings_list=[]

        for i,(doc,embedding) in enumerate(zip(documents,embeddings)):
            #Generate unique ID for each document
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            #Prepare metadata and content
            metadata=dict(doc.metadata)  #Copy metadata
            metadata['chunk_index'] = i
            metadata['content_length']= len(doc.page_content)
            metadata['doc_type'] = "text" 
            metadatas.append(metadata)

            #Document content and embedding
            documents_text.append(doc.page_content)
            embeddings_list.append(embedding.tolist())

         #Add to ChromaDB collection
        try:
            self.collection.add(
            ids=ids,
            metadatas=metadatas,
            documents=documents_text,
            embeddings=embeddings_list
            )
            print(f"Successfully added {len(documents)} documents to the vector store.")
            print(f"Total documents in collection: {self.collection.count()}")
        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise e
    

### Initializing the vector store
vector_store= VectorStore()
vector_store


Initializing ChromaDB client...
ChromaDB collection 'pdf_documents' is ready.
Existing documents in collection: 4


<__main__.VectorStore at 0x1f5bf95c8d0>

In [21]:
embedding_manager=EmbeddingManager()
chunks = embedding_manager.chunk_documents(documents)

print("Total chunks:", len(chunks))
print("\n--- First chunk ---\n")
print(chunks[0].page_content)
print("\nMetadata:", chunks[0].metadata)


Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384
[INFO] Split 1 documents into 2 chunks.
Total chunks: 2

--- First chunk ---

RAG Intoduction
    Retrieval-Augmented Generation (RAG) is a technique used to enhance large language models by combining text generation with information retrieval. Instead of relying only on the knowledge stored inside the model during training, RAG systems dynamically fetch relevant information from external data sources before generating a response.

In a typical RAG pipeline, documents such as PDFs, text files, or web pages are first loaded and converted into a structured format. These documents are then split into smaller chunks to ensure efficient processing. Each chunk is transformed into a numerical representation called an embedding, which captures the semantic meaning of the text.

Metadata: {'source': '..\\data\\text_files\\doc1.txt'}


In [15]:
for i, chunk in enumerate(chunks[:5]):
    print(f"Chunk {i+1} length:", len(chunk.page_content))


Chunk 1 length: 701
Chunk 2 length: 646


In [16]:
# Initialize managers

vector_store = VectorStore()

# 1️⃣ Chunk documents
chunks = embedding_manager.chunk_documents(documents)

# (Optional) See a chunk
print("\n--- SAMPLE CHUNK ---\n")
print(chunks[0].page_content[:500])
print("\nMetadata:", chunks[0].metadata)

# 2️⃣ Generate embeddings
embeddings = embedding_manager.embed_chunks(chunks)

# 3️⃣ Store in ChromaDB
vector_store.add_documents(chunks, embeddings)


Initializing ChromaDB client...
ChromaDB collection 'pdf_documents' is ready.
Existing documents in collection: 2
[INFO] Split 1 documents into 2 chunks.

--- SAMPLE CHUNK ---

RAG Intoduction
    Retrieval-Augmented Generation (RAG) is a technique used to enhance large language models by combining text generation with information retrieval. Instead of relying only on the knowledge stored inside the model during training, RAG systems dynamically fetch relevant information from external data sources before generating a response.

In a typical RAG pipeline, documents such as PDFs, text files, or web pages are first loaded and converted into a structured format. These doc

Metadata: {'source': '..\\data\\text_files\\doc1.txt'}
[INFO] Generating embeddings for 2 chunks...


Batches: 100%|██████████| 1/1 [00:00<00:00,  1.95it/s]

[INFO] Embeddings shape: (2, 384)
Adding 2 documents to the vector store...
Successfully added 2 documents to the vector store.
Total documents in collection: 4





### Retriever Pipeline from VectorStore

In [17]:
class RAGRetriever:
    """Handles retrieval of relevant document chunks for a given query."""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever with vector store and embedding manager.
        
        Args:
            vector_store (VectorStore): The vector store instance.  
            embedding_manager (EmbeddingManager): The embedding manager instance.

        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
       

    def retrieve(self, query: str,top_k:5,score_threshold:float=0.0) -> List[Dict[str, Any]]:
        """
        Retrieve top-k relevant document chunks for the query.
        
        Args:
            query (str): The user query.
            top_k (int): Number of top relevant chunks to retrieve.
            score_threshold (float): Minimum similarity score threshold.
            
        Returns:
            List[Dict[str, Any]]: List of retrieved document chunks with metadata.
        """

        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score Threshold: {score_threshold}")
        
        #Generate embedding for the query
        query_embedding = self.embedding_manager.model.encode([query])[0]

        #Perform similarity search in the vector store
        results = self.vector_store.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=self.top_k
        )

        #Format results
        retrieved_chunks = []
        for doc_id, doc_content, metadata in zip(results['ids'][0], results['documents'][0], results['metadatas'][0]):
            retrieved_chunks.append({
                "id": doc_id,
                "content": doc_content,
                "metadata": metadata
            })

        return retrieved_chunks