RAG Pipelines- Data Ingestion to Vecto DB Pipeline

In [3]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

In [4]:
### Read all the pdf's inside the directory
def process_all_pdfs(pdf_directory):
    """Process all PDF files in a directory"""
    all_documents = []
    pdf_dir = Path(pdf_directory)
    
    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    
    print(f"Found {len(pdf_files)} PDF files to process")
    
    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()
            
            # Add source information to metadata
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            
            all_documents.extend(documents)
            print(f"  ✓ Loaded {len(documents)} pages")
            
        except Exception as e:
            print(f"  ✗ Error: {e}")
    
    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

# Process all PDFs in the data directory
all_pdf_documents = process_all_pdfs("../data")

Found 3 PDF files to process

Processing: exam2-Solution-Annotated.pdf
  ✓ Loaded 6 pages

Processing: Kashish Jethmalani HW Assignment 2.pdf
  ✓ Loaded 5 pages

Processing: Kashish Jethmalani_034302188_Project Scheduling.pdf
  ✓ Loaded 11 pages

Total documents loaded: 22


In [5]:
all_pdf_documents

[Document(metadata={'producer': 'MiKTeX pdfTeX-1.40.25', 'creator': 'TeX', 'creationdate': '2025-11-03T06:54:25-08:00', 'moddate': '2025-11-04T19:22:22-08:00', 'ptex.fullbanner': 'This is MiKTeX-pdfTeX 4.15.0 (1.40.25)', 'trapped': '/False', 'source': '..\\data\\pdf\\exam2-Solution-Annotated.pdf', 'total_pages': 6, 'page': 0, 'page_label': '1', 'source_file': 'exam2-Solution-Annotated.pdf', 'file_type': 'pdf'}, page_content='CECS 528, Exam 2 Solutions, Fall 2025, Dr. Ebert\nIMPORTANT: READ THE FOLLOWING DIRECTIONS. Directions,\n\x88 For each problem, write your solution using ONE SHEET OF PAPER ONLY (BOTH\nFRONT AND BACK). Write NAME and PROBLEM NUMBERon each sheet.\n\x88 Write solutions to different problems on SEPARATE SHEETSof paper.\n\x88 It is OK to use the same sheet for parts of different make up problems.\nUnit 2 LO Problems\nLO5. Do the following.\n(a) The dynamic-programming algorithm that solves the Runaway Traveling Salesperson\noptimization problem defines a recurrence for

In [6]:
### Text splitting get into chunks

def split_documents(documents,chunk_size=1000,chunk_overlap=200):
    """Split documents into smaller chunks for better RAG performance"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")
    
    # Show example of a chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")
    
    return split_docs

In [7]:
chunks=split_documents(all_pdf_documents)
chunks

Split 22 documents into 28 chunks

Example chunk:
Content: CECS 528, Exam 2 Solutions, Fall 2025, Dr. Ebert
IMPORTANT: READ THE FOLLOWING DIRECTIONS. Directions,
 For each problem, write your solution using ONE SHEET OF PAPER ONLY (BOTH
FRONT AND BACK). Writ...
Metadata: {'producer': 'MiKTeX pdfTeX-1.40.25', 'creator': 'TeX', 'creationdate': '2025-11-03T06:54:25-08:00', 'moddate': '2025-11-04T19:22:22-08:00', 'ptex.fullbanner': 'This is MiKTeX-pdfTeX 4.15.0 (1.40.25)', 'trapped': '/False', 'source': '..\\data\\pdf\\exam2-Solution-Annotated.pdf', 'total_pages': 6, 'page': 0, 'page_label': '1', 'source_file': 'exam2-Solution-Annotated.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'MiKTeX pdfTeX-1.40.25', 'creator': 'TeX', 'creationdate': '2025-11-03T06:54:25-08:00', 'moddate': '2025-11-04T19:22:22-08:00', 'ptex.fullbanner': 'This is MiKTeX-pdfTeX 4.15.0 (1.40.25)', 'trapped': '/False', 'source': '..\\data\\pdf\\exam2-Solution-Annotated.pdf', 'total_pages': 6, 'page': 0, 'page_label': '1', 'source_file': 'exam2-Solution-Annotated.pdf', 'file_type': 'pdf'}, page_content='CECS 528, Exam 2 Solutions, Fall 2025, Dr. Ebert\nIMPORTANT: READ THE FOLLOWING DIRECTIONS. Directions,\n\x88 For each problem, write your solution using ONE SHEET OF PAPER ONLY (BOTH\nFRONT AND BACK). Write NAME and PROBLEM NUMBERon each sheet.\n\x88 Write solutions to different problems on SEPARATE SHEETSof paper.\n\x88 It is OK to use the same sheet for parts of different make up problems.\nUnit 2 LO Problems\nLO5. Do the following.\n(a) The dynamic-programming algorithm that solves the Runaway Traveling Salesperson\noptimization problem defines a recurrence for

### embedding and vectorStoreDB

In [8]:
import numpy as np
from sentence_transformers import SentenceTransformer #embedding model will be loaded from here
import chromadb
from chromadb.config import Settings
import uuid #evry embedding will have a unique id
from typing import List, Dict, Any, Tuple   
from sklearn.metrics.pairwise import cosine_similarity  #to compute similarity between embeddings


In [9]:
class EmbeddingManager:
    """handles document embedding generation using sentence transformers"""
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):


        """
        all-MiniLM-L6-v2: converts text into vectos around 384 dimensions
        initialize the embedding manager
        
        args: model_name: huggingface model name for sentence embeddings"""
        self.model_name = model_name
        self.model=None
        self._load_model() #this wll load all_MiniLM-L6-v2 model


    def _load_model(self):
        """Load the sentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model=SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")    
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for a list of texts
        
        args: texts: list of text strings to be embedded
        
        returns: numpy array of embeddings"""
        if not self.model:
            raise ValueError("Embedding model is not loaded.")
        
        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings=self.model.encode(texts, show_progress_bar=True)
        print(f"Embeddings generated successfully: shape {embeddings.shape}")
        return embeddings
        

    def get_embedding_dimension(self) -> int:
        """Get the dimension of the embeddings generated by the model
        
        returns: embedding dimension as integer"""
        if not self.model:
            raise ValueError("Embedding model is not loaded.")
        
        return self.model.get_sentence_embedding_dimension()


#now we will initialize the embedding manager
embedding_manager=EmbeddingManager()
embedding_manager


Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x1ceaa0783e0>

vectorStore

In [10]:
import os

In [11]:
class VectorStore:
    """manages document embeddings in a ChromaDB vector store"""


    def __init__(self, collection_name: str = "pdf_documents", persistent_directory: str = "../data/vhjector_store"):
        """initialize the vector store
        
        args:
            collection_name: name of the ChromaDB collection
            persistent_directory: directory to persist the vector store data"""
        
        self.collection_name=collection_name
        self.persistent_directory=persistent_directory
        self.client=None
        self.collection=None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            #create persistent ChromaDB client
            os.makedirs(self.persistent_directory, exist_ok=True)
            self.client=chromadb.PersistentClient(path=self.persistent_directory)
            
            #get or create collection
            self.collection=self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vector store initialized. Collection:{self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise


    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        """Add documents and their embeddings to the vector store
        
        args:
            documents: list of langchain documents
            embeddings: corresponding embeddings for the documents"""
        
        if len(documents) != embeddings.shape[0]:
            raise ValueError("Number of documents and embeddings must match.")
        
        print(f"Adding {len(documents)} documents to vector store...")
        

        #print data for ChromaDB
        ids=[]
        metadatas=[]
        documents_text=[]
        embeddings_list=[]


        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            #Generate unique id for each document
            dc_id=f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(dc_id)

            #prepare metadata
            metadata = dict(doc.metadata)  #copy existing metadata
            metadata['doc_index']=i  #add document index
            metadata['content_length']=len(doc.page_content)  #add content length
            metadatas.append(metadata)


            #document content
            documents_text.append(doc.page_content)


            #embedding
            embeddings_list.append(embedding.tolist())

        #add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )

            print(f"Successfully added {len(documents)} documents to vector store.")
            print(f"Total documents in collection now: {self.collection.count()}")


        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

VectorStore=VectorStore()
VectorStore




Vector store initialized. Collection:pdf_documents
Existing documents in collection: 112


<__main__.VectorStore at 0x1cea9bea090>

In [12]:
chunks

[Document(metadata={'producer': 'MiKTeX pdfTeX-1.40.25', 'creator': 'TeX', 'creationdate': '2025-11-03T06:54:25-08:00', 'moddate': '2025-11-04T19:22:22-08:00', 'ptex.fullbanner': 'This is MiKTeX-pdfTeX 4.15.0 (1.40.25)', 'trapped': '/False', 'source': '..\\data\\pdf\\exam2-Solution-Annotated.pdf', 'total_pages': 6, 'page': 0, 'page_label': '1', 'source_file': 'exam2-Solution-Annotated.pdf', 'file_type': 'pdf'}, page_content='CECS 528, Exam 2 Solutions, Fall 2025, Dr. Ebert\nIMPORTANT: READ THE FOLLOWING DIRECTIONS. Directions,\n\x88 For each problem, write your solution using ONE SHEET OF PAPER ONLY (BOTH\nFRONT AND BACK). Write NAME and PROBLEM NUMBERon each sheet.\n\x88 Write solutions to different problems on SEPARATE SHEETSof paper.\n\x88 It is OK to use the same sheet for parts of different make up problems.\nUnit 2 LO Problems\nLO5. Do the following.\n(a) The dynamic-programming algorithm that solves the Runaway Traveling Salesperson\noptimization problem defines a recurrence for

In [13]:
###convert the text to embeddings
texts=[doc.page_content for doc in chunks]

###generate embeddings

embeddings=embedding_manager.generate_embeddings(texts)

###store in vector database
VectorStore.add_documents(chunks,embeddings)



Generating embeddings for 28 texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated successfully: shape (28, 384)
Adding 28 documents to vector store...
Successfully added 28 documents to vector store.
Total documents in collection now: 140


Retriverer pipeline from VectorStore

In [14]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""
    
    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """
        Initialize the retriever
        
        Args:
            vector_store: Vector store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager

    def retrieve(self, query: str, top_k: int = 5, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
        """
        Retrieve relevant documents for a query
        
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")
        
        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )
            
            # Process results
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(VectorStore,embedding_manager)



In [15]:
rag_retriever

<__main__.RAGRetriever at 0x1ceaa375370>

In [16]:

rag_retriever.retrieve("Which is the best software for scheduling?")

Retrieving documents for query: 'Which is the best software for scheduling?'
Top K: 5, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated successfully: shape (1, 384)
Retrieved 5 documents (after filtering)


[{'id': 'doc_7217c511_27',
  'content': '8. Software Used \nThis project schedule was designed using Zoho Projects, including the creation of task \ndependencies, duration charts, and critical path identification.',
  'metadata': {'moddate': '2025-11-21T15:05:23-08:00',
   'page_label': '11',
   'page': 10,
   'creationdate': '2025-11-21T15:05:23-08:00',
   'source_file': 'Kashish Jethmalani_034302188_Project Scheduling.pdf',
   'author': 'Kashish Hiranand Jethmalani',
   'content_length': 170,
   'file_type': 'pdf',
   'total_pages': 11,
   'producer': 'Microsoft® Word for Microsoft 365',
   'source': '..\\data\\pdf\\Kashish Jethmalani_034302188_Project Scheduling.pdf',
   'doc_index': 27,
   'creator': 'Microsoft® Word for Microsoft 365'},
  'similarity_score': 0.08139872550964355,
  'distance': 0.9186012744903564,
  'rank': 1},
 {'id': 'doc_dcd55c89_27',
  'content': '8. Software Used \nThis project schedule was designed using Zoho Projects, including the creation of task \ndependen

next integration is llm and output, means retrieval pipeline, till now we have created data ingestion pipeline with query retrieval pipeline

Integration Vectordb Context pipeline with LLM output

we will implement augmentation and generation

In [17]:
###Simple RAG pipeline with GROQ LLM
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

###Initialize GROQ LLM(need to set GROQ_API_KEY in .env file)
groq_api_key = os.getenv("GROQ_API_KEY")

if not groq_api_key:
    raise ValueError("GROQ_API_KEY not found in environment variables. Please set it in your .env file.")

llm=ChatGroq(groq_api_key=groq_api_key, model_name="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)

##2. Simple RAG function: retrieve relevant documents and generate answer
def rag_simple(query, retreiver,llm,top_k=3):
    ##retrieve the context

    results=retreiver.retrieve(query,top_k=top_k)
    context="\n\n".join([doc['content'] for doc in results]) if results else "No relevant documents found."


    if not context:
        return "No relevant documents found."
    
    ##generate answer using Graq LLM

    prompt = """Use the following context to answer the question concisely.

        Context:
        {context}

        Question:
        {query}

        Answer:
        """

    response = llm.invoke(prompt.format(context=context, query=query))
    return response.content


In [18]:
answer=rag_simple("best scheduling tool?", rag_retriever,llm)

Retrieving documents for query: 'best scheduling tool?'
Top K: 3, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated successfully: shape (1, 384)
Retrieved 0 documents (after filtering)


Enhanced RAG Pipeline Features

In [None]:
#--Enhanced RAG pipelone features

def rag_advacned(query, retriever, llm, top_k=5, min_score=0.2, return_context=False):
    """
    RAG pipeline with extra features:
        -returns answer, sources, confidence score, and optionally full context.
    """
    results=retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return {'answer': 'No relevant context found.', 'sources': [], 'confidence_score': 0.0, 'context': ''}
    
    #prepare context and sources
    context="\n\n".join([doc['content'] for doc in results])
    sources=[{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview': doc['content'][:300] + '....' #first 300 characters

    } for doc in results]
    confidence = max([doc['similarity_score'] for doc in results])

    #generate answer
    prompt = f"""Use the following context to answer the question concisely. \nContext:\n{context}\n\nQuestion:\n{query}\n\nAnswer:"""
    response = llm.invoke([prompt.format(context=context, query=query)])
    
    output={
        'answer': response.content,
        'sources': sources,
        'confidence_score': confidence
    }
    if return_context:
        output['context']=context
    return output

#Example Usage
result=rag_advacned("what is scheduling tool?", rag_retriever,llm, top_k=3, min_score=0.1, return_context=True)
print("Answer:", result['answer'])
print("Sources:", result['sources'])
print("Confidence:", result['confidence_score'])
print("Context:", result['context'][:300])
    

Retrieving documents for query: 'what is scheduling tool?'
Top K: 3, Score threshold: 0.1
Generating embeddings for 1 texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated successfully: shape (1, 384)
Retrieved 3 documents (after filtering)
Answer: A scheduling tool is a software used to plan, organize, and manage tasks and timelines, such as Zoho Projects.
Sources: [{'source': 'Kashish Jethmalani_034302188_Project Scheduling.pdf', 'page': 10, 'score': 0.13738775253295898, 'preview': '8. Software Used \nThis project schedule was designed using Zoho Projects, including the creation of task \ndependencies, duration charts, and critical path identification.....'}, {'source': 'Kashish Jethmalani_034302188_Project Scheduling.pdf', 'page': 10, 'score': 0.13738775253295898, 'preview': '8. Software Used \nThis project schedule was designed using Zoho Projects, including the creation of task \ndependencies, duration charts, and critical path identification.....'}, {'source': 'Kashish Jethmalani_034302188_Project Scheduling.pdf', 'page': 10, 'score': 0.13738775253295898, 'preview': '8. Software Used \nThis project schedule was designed using Z

In [22]:
# --- Advanced RAG Pipeline: Streaming, Citations, History, Summarization ---
from typing import List, Dict, Any
import time

class AdvancedRAGPipeline:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.history = []  # Store query history

    def query(self, question: str, top_k: int = 5, min_score: float = 0.2, stream: bool = False, summarize: bool = False) -> Dict[str, Any]:
        # Retrieve relevant documents
        results = self.retriever.retrieve(question, top_k=top_k, score_threshold=min_score)
        if not results:
            answer = "No relevant context found."
            sources = []
            context = ""
        else:
            context = "\n\n".join([doc['content'] for doc in results])
            sources = [{
                'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
                'page': doc['metadata'].get('page', 'unknown'),
                'score': doc['similarity_score'],
                'preview': doc['content'][:120] + '...'
            } for doc in results]
            # Streaming answer simulation
            prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {question}\n\nAnswer:"""
            if stream:
                print("Streaming answer:")
                for i in range(0, len(prompt), 80):
                    print(prompt[i:i+80], end='', flush=True)
                    time.sleep(0.05)
                print()
            response = self.llm.invoke([prompt.format(context=context, question=question)])
            answer = response.content

        # Add citations to answer
        citations = [f"[{i+1}] {src['source']} (page {src['page']})" for i, src in enumerate(sources)]
        answer_with_citations = answer + "\n\nCitations:\n" + "\n".join(citations) if citations else answer

        # Optionally summarize answer
        summary = None
        if summarize and answer:
            summary_prompt = f"Summarize the following answer in 2 sentences:\n{answer}"
            summary_resp = self.llm.invoke([summary_prompt])
            summary = summary_resp.content

        # Store query history
        self.history.append({
            'question': question,
            'answer': answer,
            'sources': sources,
            'summary': summary
        })

        return {
            'question': question,
            'answer': answer_with_citations,
            'sources': sources,
            'summary': summary,
            'history': self.history
        }

# Example usage:
adv_rag = AdvancedRAGPipeline(rag_retriever, llm)
result = adv_rag.query("what is scheduling tool", top_k=3, min_score=0.1, stream=True, summarize=True)
print("\nFinal Answer:", result['answer'])
print("Summary:", result['summary'])
print("History:", result['history'][-1])

Retrieving documents for query: 'what is scheduling tool'
Top K: 3, Score threshold: 0.1
Generating embeddings for 1 texts...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated successfully: shape (1, 384)
Retrieved 3 documents (after filtering)
Streaming answer:
Use the following context to answer the question concisely.
Context:
8. Software Used 
This project schedule was designed using Zoho Projects, including the creation of task 
dependencies, duration charts, and critical path identification.

8. Software Used 
This project schedule was designed using Zoho Projects, including the creation of task 
dependencies, duration charts, and critical path identification.

8. Software Used 
This project schedule was designed using Zoho Projects, including the creation of task 
dependencies, duration charts, and critical path identification.

Question: what is scheduling tool

Answer:

Final Answer: A scheduling tool.

Citations:
[1] Kashish Jethmalani_034302188_Project Scheduling.pdf (page 10)
[2] Kashish Jethmalani_034302188_Project Scheduling.pdf (page 10)
[3] Kashish Jethmalani_034302188_Project Scheduling.pdf (page 10)
Summary: A schedulin