# RAG pipelines and Data Ingestion to Vector db pipelines

In [1]:
import os
from pathlib import Path
from langchain_community.document_loaders import PyMuPDFLoader , PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def process_all_pdf(pdf_directory):
    
    all_documents = []
    pdf_dir = Path(pdf_directory)
    
    pdf_files = list(pdf_dir.glob('**/*.pdf'))
    
    print(f"found {len(pdf_files)} PDF files to Process")
    
    for pdf_files in pdf_files:
        print(f"\npreprocessing : {pdf_files.name}")
        try:
            loader = PyPDFLoader(str(pdf_files))
            documents = loader.load()
            
            for doc in documents:
                doc.metadata['source_file'] = pdf_files.name
                doc.metadata['file_type'] = 'pdf'
                
            all_documents.extend(documents)
            print(f" Loaded {len(documents)} pages")
        except Exception as e:
            print(f" Error : {e}")
            
    print(f"\n Total documents loaded : {len(all_documents)}")
    return all_documents    

all_pdf_files = process_all_pdf('./data')

found 3 PDF files to Process

preprocessing : Reasearch2.pdf
 Loaded 25 pages

preprocessing : Reasearch3.pdf
 Loaded 10 pages

preprocessing : Reaserch1.pdf
 Loaded 118 pages

 Total documents loaded : 153


In [3]:
all_pdf_files

[Document(metadata={'producer': 'BCL easyPDF 7.00 (0353)', 'creator': 'easyPDF SDK 7 7.0', 'creationdate': '2016-11-28T09:34:30-08:00', 'moddate': '2016-11-28T09:34:30-08:00', 'source': 'data\\pdf\\Reasearch2.pdf', 'total_pages': 25, 'page': 0, 'page_label': '1', 'source_file': 'Reasearch2.pdf', 'file_type': 'pdf'}, page_content='Research Methodology: An Introduction\nMeaning Of Research\nResearch may be very broadly defined as systematic gathering of data and \ninformation and its analysis for advancement of knowledge in any subject. \nResearch attempts to find answer intellectual and practical questions through \napplication of systematic methods. Webster’s Collegiate Dictionary defines \nresearch as "studious inquiry or examination; esp: investigation or experimentation \naimed at the discovery and interpretation of facts, revision of accepted theories or \nlaws in the light of new facts, or practical application of such new or revised \ntheories or laws". Some people consider resea

In [4]:
def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]
    )

    # Filter out invalid or empty documents before splitting
    valid_docs = [
        d
        for d in documents
        if hasattr(d, "page_content")
        and isinstance(d.page_content, str)
        and d.page_content.strip()
        and d.page_content.lower() != "nan"
    ]

    split_docs = text_splitter.split_documents(valid_docs)
    print(f"split: {len(valid_docs)} documents into {len(split_docs)} chunks")

    if split_docs:
        print("\nExample Chunk:")
        print(f"content: {split_docs[0].page_content[:200]} ...")
        print(f"Metadata: {split_docs[0].metadata}")

    return split_docs


In [5]:
chunks = split_documents(all_pdf_files)

chunks

split: 153 documents into 357 chunks

Example Chunk:
content: Research Methodology: An Introduction
Meaning Of Research
Research may be very broadly defined as systematic gathering of data and 
information and its analysis for advancement of knowledge in any sub ...
Metadata: {'producer': 'BCL easyPDF 7.00 (0353)', 'creator': 'easyPDF SDK 7 7.0', 'creationdate': '2016-11-28T09:34:30-08:00', 'moddate': '2016-11-28T09:34:30-08:00', 'source': 'data\\pdf\\Reasearch2.pdf', 'total_pages': 25, 'page': 0, 'page_label': '1', 'source_file': 'Reasearch2.pdf', 'file_type': 'pdf'}


[Document(metadata={'producer': 'BCL easyPDF 7.00 (0353)', 'creator': 'easyPDF SDK 7 7.0', 'creationdate': '2016-11-28T09:34:30-08:00', 'moddate': '2016-11-28T09:34:30-08:00', 'source': 'data\\pdf\\Reasearch2.pdf', 'total_pages': 25, 'page': 0, 'page_label': '1', 'source_file': 'Reasearch2.pdf', 'file_type': 'pdf'}, page_content='Research Methodology: An Introduction\nMeaning Of Research\nResearch may be very broadly defined as systematic gathering of data and \ninformation and its analysis for advancement of knowledge in any subject. \nResearch attempts to find answer intellectual and practical questions through \napplication of systematic methods. Webster’s Collegiate Dictionary defines \nresearch as "studious inquiry or examination; esp: investigation or experimentation \naimed at the discovery and interpretation of facts, revision of accepted theories or \nlaws in the light of new facts, or practical application of such new or revised \ntheories or laws". Some people consider resea

### Embedding and VectorDb

In [6]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
import uuid
from chromadb.config import Settings
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [7]:
class EmbeddingManager:
    
    def __init__(self, model_name : str = "all-MiniLM-L6-v2"):
        
        self.model_name = model_name
        self.model = None
        self._load_model()
        
    def _load_model(self):
        try:
            print(f"Loading embedding model : {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model Loaded Successfully. Embedded dimensions : {self.model.get_sentence_embedding_dimension()}")
        except Exception as e :
            print(f"Error loading model {self.model_name} : {e}")
            raise
    
    def generate_embeddings(self, texts: List[str]) -> np.ndarray :
        
        if not self.model:
            raise ValueError("Model not loaded!")

        clean_texts = []
        for t in texts:
            if isinstance(t, str) and t.strip() and t.lower() != "nan":
                clean_texts.append(t)
            else:
                print(f"⚠️ Skipping invalid text: {repr(t)[:60]}")

        if not clean_texts:
            raise ValueError("No valid texts for embedding!")

        print(f" Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generating embedding with shape: {embeddings.shape}")
        
        return embeddings     
           

In [8]:
embeddingManager = EmbeddingManager()
embeddingManager

Loading embedding model : all-MiniLM-L6-v2


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


Model Loaded Successfully. Embedded dimensions : 384


<__main__.EmbeddingManager at 0x22186517380>

### Vector Store

In [9]:
class VectorStore:
    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "./data/vector_store"):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()

    def _initialize_store(self):
        try:
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )

            print(f"✅ Vector store initialized. Collection: {self.collection_name}")
            print(f"📄 Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"❌ Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Any], embeddings: np.ndarray):
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match number of embeddings.")

        print(f"🧩 Adding {len(documents)} documents to vector store...")

        ids = []
        metadatas = []
        document_texts = []
        embeddings_lists = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # Collect metadata
            metadata = dict(getattr(doc, "metadata", {}))
            metadata["doc_index"] = i
            metadata["content_length"] = len(getattr(doc, "page_content", ""))
            metadatas.append(metadata)

            # Collect document text
            document_texts.append(getattr(doc, "page_content", ""))

            # Convert embeddings to list
            embeddings_lists.append(embedding.tolist())

        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_lists,
                metadatas=metadatas,
                documents=document_texts
            )

            print(f"✅ Successfully added {len(documents)} documents to vector store.")
            print(f"📈 Total documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"❌ Error adding documents to vector store: {e}")
            raise
    
vectorstore = VectorStore()

vectorstore
        

✅ Vector store initialized. Collection: pdf_documents
📄 Existing documents in collection: 0


<__main__.VectorStore at 0x22187047770>

In [10]:
chunks

[Document(metadata={'producer': 'BCL easyPDF 7.00 (0353)', 'creator': 'easyPDF SDK 7 7.0', 'creationdate': '2016-11-28T09:34:30-08:00', 'moddate': '2016-11-28T09:34:30-08:00', 'source': 'data\\pdf\\Reasearch2.pdf', 'total_pages': 25, 'page': 0, 'page_label': '1', 'source_file': 'Reasearch2.pdf', 'file_type': 'pdf'}, page_content='Research Methodology: An Introduction\nMeaning Of Research\nResearch may be very broadly defined as systematic gathering of data and \ninformation and its analysis for advancement of knowledge in any subject. \nResearch attempts to find answer intellectual and practical questions through \napplication of systematic methods. Webster’s Collegiate Dictionary defines \nresearch as "studious inquiry or examination; esp: investigation or experimentation \naimed at the discovery and interpretation of facts, revision of accepted theories or \nlaws in the light of new facts, or practical application of such new or revised \ntheories or laws". Some people consider resea

In [11]:
texts = [doc.page_content for doc in chunks if isinstance(doc.page_content, str) and doc.page_content.strip()]


embeddings = embeddingManager.generate_embeddings(texts)

vectorstore.add_documents(chunks, embeddings)

 Generating embeddings for 357 texts...


Batches: 100%|██████████| 12/12 [00:01<00:00,  7.94it/s]


Generating embedding with shape: (357, 384)
🧩 Adding 357 documents to vector store...
✅ Successfully added 357 documents to vector store.
📈 Total documents in collection: 357


### Retriever Pipeline for vector store

In [12]:
class RAGRetriever :
    
    def __init__(self, vector_store: VectorStore, embedding_manager : EmbeddingManager) :
        self.vector_store = vector_store
        self.embedding_manager = embedding_manager
        
    def retrieve(self, query : str, top_k : int = 5, score_threshold : float = 0.0) -> List[Dict[str, Any]]:
        print(f"Retrieving the documents for query ")
        print(f"Top_k : {top_k}, score Threshold : {score_threshold} ")
        
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]
        
        try :
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results= top_k
            )
            
            retrieved_docs = []
            
            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]
                
                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    similarity_score = 1 - distance
                    
                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f"Retrieved {len(retrieved_docs)} documents (after filtering)")
            else:
                print("No documents found")
            
            return retrieved_docs
            
        except Exception as e:
            print(f"Error during retrieval: {e}")
            return []

rag_retriever=RAGRetriever(vectorstore,embeddingManager)

In [13]:
rag_retriever

<__main__.RAGRetriever at 0x2218da75940>

In [14]:
rag_retriever.retrieve("What is research")

Retrieving the documents for query 
Top_k : 5, score Threshold : 0.0 
 Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 36.48it/s]

Generating embedding with shape: (1, 384)
Retrieved 5 documents (after filtering)





[{'id': 'doc_5a6bd658_140',
  'content': 'Word ‘Research’ is comprises of two words  = Re+Search . It means to \nsearch again. So research means a systematic investigation or activity to \ngain new knowledge of the already existing facts. \n Research is an intellectual activity. It is responsible for bringing \nto light new knowledge. It is also responsible for correcting the present \nmistakes, removing existing misconceptions and adding new learning to \nthe existing fund of knowledge. Researches are  considered as a \ncombination of those activities whic h are removed from day to day life \n1',
  'metadata': {'creationdate': '2015-03-18T13:27:55+02:00',
   'page_label': '7',
   'source_file': 'Reaserch1.pdf',
   'total_pages': 118,
   'creator': 'Microsoft® Word 2010',
   'source': 'data\\pdf\\Reaserch1.pdf',
   'content_length': 552,
   'file_type': 'pdf',
   'moddate': '2015-03-18T13:27:55+02:00',
   'author': 'Smart',
   'producer': 'Microsoft® Word 2010',
   'page': 6,
   'doc_i

In [15]:
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()


groq_api_key= os.getenv("GROQ_API_KEY")

### RAG pipeline to LLM Output Generation

In [16]:
from langchain_groq import ChatGroq
from langchain_core.prompts import PromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage

In [17]:
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

# Initialize groq llm 
groq_api_key= os.getenv("GROQ_API_KEY")

llm= ChatGroq(groq_api_key=groq_api_key, model_name="llama-3.1-8b-instant", temperature=0.1, max_tokens=1024)

# Simple RAG function : retrieve context + generate response
def rag_simple(query, retriever, llm, top_k=3):
    results= retriever.retrieve(query, top_k=top_k)
    context="\n\n".join([doc['content'] for doc in results]) if results else ""
    if not context:
        return "No relevant context found."
    
    prompt = f"""Use the following context to answer the question concisly.
        Context:
        {context}
        Question: {query}
        Answer:"""
    response= llm.invoke([prompt.format(context=context, query=query)])
    return response.content

In [18]:
answer = rag_simple("What is Cross-sectional Survey?", rag_retriever, llm)
print(answer)

Retrieving the documents for query 
Top_k : 3, score Threshold : 0.0 
 Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 61.46it/s]

Generating embedding with shape: (1, 384)
Retrieved 3 documents (after filtering)





A Cross-sectional Survey is an observational survey that collects data from a sample of the target group at a specific time, allowing a quantitative researcher to assess several variables at that time.


Enhanced RAG pipiline Features

In [19]:
def rag_advanced(query, retriever, llm, top_k=5, min_score=0.2, return_context=False):

    results= retriever.retrieve(query, top_k=top_k, score_threshold=min_score)
    if not results:
        return { 'answer': "No relevant context found.",'sources':[], 'confidence': 0.0, 'context': '' }
    
    context = "\n\n".join([doc['content'] for doc in results])
    sources = [{
        'source': doc['metadata'].get('source_file', doc['metadata'].get('source', 'unknown')),
        'page': doc['metadata'].get('page', 'unknown'),
        'score': doc['similarity_score'],
        'preview':doc['content'][:300] + '...'
    } for doc in results]
    confidence= max([doc['similarity_score'] for doc in results])

    # Generate answer
    prompt = f"""Use the following context to answer the question concisely.\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"""
    response= llm.invoke([prompt.format(context=context, query=query)])

    output={
        'answer': response.content,
        'sources': sources,
        'confidence': confidence
    }
    if return_context:
        output['context'] = context
    return output


result = rag_advanced("What is Longitudinal survey?", rag_retriever, llm, top_k=5, min_score=0.2, return_context=True)
print("Answer:", result['answer'])
print("sources:", result['sources'])
print("confidence:", result['confidence'])
print("context:", result['context'][:300])



Retrieving the documents for query 
Top_k : 5, score Threshold : 0.2 
 Generating embeddings for 1 texts...


Batches: 100%|██████████| 1/1 [00:00<00:00, 37.74it/s]

Generating embedding with shape: (1, 384)
Retrieved 2 documents (after filtering)





Answer: A Longitudinal survey is an observational survey where researchers evaluate the same individuals to discover any changes that may occur over time, often used in medicine, applied sciences, and market trends to analyze customer satisfaction or gather feedback on products or services.
sources: [{'source': 'Reasearch3.pdf', 'page': 3, 'score': 0.4966356158256531, 'preview': 'collection is from individuals who show similarity in all variables except those chosen for study. Multiple samples can be analyzed \nand compared by conducting a cross -sectional survey. Longitudinal surveys are the inv erse of cross -sectional studies. In cross -\nsectional studies data collection w...'}, {'source': 'Reasearch3.pdf', 'page': 3, 'score': 0.27099019289016724, 'preview': "the sample then searches to communalize the results to the  population. One of the advantages of using a survey is that the \nresearcher can collect data from a sample of respondents from a large population. Analysis of large 