### RAG Pipelines Data Ingestion to Vector DB Pipeline

In [35]:
import os
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, PyMuPDFLoader
from pathlib import Path

In [36]:
def process_all_pdfs(pdf_directory : str):

    loader = DirectoryLoader(
        pdf_directory,
        glob = "**/*.pdf",
        loader_cls = PyMuPDFLoader,
        show_progress = False
    )

    documents = loader.load()

    for doc in documents:
        doc.metadata['source_file'] = Path(doc.metadata['source']).name
        doc.metadata['file_type'] = 'pdf'
    print(f"\n✅ Loaded {len(documents)} documents from {pdf_directory}")
    return documents

all_pdf_documents = process_all_pdfs("../data")


✅ Loaded 129 documents from ../data


In [37]:
all_pdf_documents[5]

Document(metadata={'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_files/unit2.pdf', 'file_path': '../data/pdf_files/unit2.pdf', 'total_pages': 23, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2022-12-17T08:32:38+00:00', 'trapped': '', 'modDate': "D:20221217083238+00'00'", 'creationDate': "D:20221217083238+00'00'", 'page': 5, 'source_file': 'unit2.pdf', 'file_type': 'pdf'}, page_content='6 \n \n \ncomputer for processing, which increases the response time. \n➢ Lower Communication Cost  \n• In distributed database systems, if data is located locally where it is mostly used, then \nthe communication costs for data manipulation can be minimized.  \n• This is not feasible in centralized systems. \n \nWhy Distributed Databases \n➢ Organizational and economic reasons \n➢ Interconnection of existing databases \n➢ Incremental growth 

In [38]:
# ### process all pdfs inside directory 

# def process_all_pdfs(pdf_directory):
#     ## process all pdf in the directory 
#     all_documents = []
#     pdf_dir = Path(pdf_directory)

#     ##find all pdf files recursively
#     pdf_files = list(pdf_dir.glob("**/*.pdf"))

#     print(f"found {len(pdf_files)} PDF files to process")

#     for pdf_file in pdf_files:
#         print(f"\nProcessing: {pdf_file.name}")
#         try:
#             loader = PyPDFLoader(str(pdf_file))
#             documents = loader.load()

#             ## add source info to metadata
#             for doc in documents:
#                 doc.metadata['source_file'] = pdf_file.name
#                 doc.metadata['file_type'] = 'pdf'
            
#             all_documents.extend(documents)
#             print(f" Loaded {len(documents)} pages")
        
#         except Exception as e:
#             print(f" Error: {e}")
            
#     print(f"\nTotal documents loaded: {len(all_documents)}")
#     return all_documents

# # process all documents in data directory
# all_pdf_documents = process_all_pdfs("../data")


In [39]:
for doc in all_pdf_documents[:5]:  # first 5 docs
    print(doc.metadata)


{'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_files/unit2.pdf', 'file_path': '../data/pdf_files/unit2.pdf', 'total_pages': 23, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2022-12-17T08:32:38+00:00', 'trapped': '', 'modDate': "D:20221217083238+00'00'", 'creationDate': "D:20221217083238+00'00'", 'page': 0, 'source_file': 'unit2.pdf', 'file_type': 'pdf'}
{'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_files/unit2.pdf', 'file_path': '../data/pdf_files/unit2.pdf', 'total_pages': 23, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2022-12-17T08:32:38+00:00', 'trapped': '', 'modDate': "D:20221217083238+00'00'", 'creationDate': "D:20221217083238+00'00'", 'page': 1, 'sourc

In [40]:
### text splitting into chunks

def split_documents(documents, chunk_size=1000, chunk_overlap=200):
    #splitting documents into smaller chunks for better RAG performance
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size = chunk_size,
        chunk_overlap = chunk_overlap,
        length_function = len,
        separators = ["\n\n", "\n", " ", ""]
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    #show example of chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")

    return split_docs

In [41]:
chunks = split_documents(all_pdf_documents)


Split 129 documents into 319 chunks

Example chunk:
Content: 1 
 
 
 
 
 
 
SCHOOL OF COMPUTING 
 
DEPARTMENT OF COMPUTER SCIENCE AND 
ENGINEERING 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
     
 
    UNIT – I  - DISTRIBUTED DATABASE AND INFORMATION SY...
Metadata: {'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_files/unit2.pdf', 'file_path': '../data/pdf_files/unit2.pdf', 'total_pages': 23, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2022-12-17T08:32:38+00:00', 'trapped': '', 'modDate': "D:20221217083238+00'00'", 'creationDate': "D:20221217083238+00'00'", 'page': 0, 'source_file': 'unit2.pdf', 'file_type': 'pdf'}


### Embedding and VectorStoreDB

In [42]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Tuple, Any
from sklearn.metrics.pairwise import cosine_similarity



In [43]:
class EmbeddingManager:
    # handles document embedding generation using SentenceTransformer

    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        # initialise embedding manager
        #args: model_name = HuggingFace model for sentence embedding

        self.model_name = model_name
        self.model = None
        self._load_model()

    def _load_model(self):
        # load SentenceTransformer model
        try:
            print(f"Loading embedded model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimensions: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading the model {self.model_name} : {e}")
            raise

    def generate_embedding(self, texts : List[str]) -> np.ndarray:
        # generate embedding for list of texts

        #args:
            # texts: list of text strings to embeddings

        # returns:
            # numpy array of embeddings with shape (len(texts), embedding_dim)

        if not self.model:
            raise ValueError("Model not Loaded")

        print(f"generate embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"generated embeddings with shape: {embeddings.shape}")

        return embeddings
    
## initialise the embeddding manager

embedding_manager = EmbeddingManager()
embedding_manager


Loading embedded model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimensions: 384


<__main__.EmbeddingManager at 0x130d76900>

### VectorStore

In [47]:
class VectorStore:
    # manage document embeddings in chromaDB vector store

    def __init__(self, collection_name: str = "pdf_documents", persist_directory: str = "../data/vector_store"):
        # initialize the vector store

        # Args:
            # collection_name : name of chromaDB collection
            # persist_directory : directory to persist the vector store
        
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize_store()
    
    def _initialize_store(self):
        # initialize chromaDB client and collection
        try:
            #create persistent chromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path = self.persist_directory)

            #get or create collection
            self.collection = self.client.get_or_create_collection(
                name = self.collection_name,
                metadata={"description" : "PDF documents for RAG"}
            )
            print(f"Vector store initialized. Collection : {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents : List[Any], embeddings: np.ndarray):
        # add documents and their embeddings to the vector store

        # Args:
            #documents: list of LangChain documents
            #embeddings: corresponding embeddings for the documents

        if len(documents) != len(embeddings):
            raise ValueError("number of documents must match number of embeddings")
        
        print(f"Adding {len(documents)} documents to vector store...")

        # prepare data for chromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # generate unique id
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # document content
            documents_text.append(doc.page_content)

            # embedding
            embeddings_list.append(embedding.tolist())

        # add to collection
        try:
            self.collection.add(
                ids = ids,
                embeddings = embeddings_list,
                metadatas = metadatas,
                documents = documents_text
            )
            print(f"Successfully added {len(documents)} documents to Vector Store")
            print(f"Total documents in collection: {self.collection.count()}")
        
        except Exception as e:
            print(f"Error adding  documents to vector store: {e}")
            raise

vectorstore = VectorStore()
vectorstore



Vector store initialized. Collection : pdf_documents
Existing documents in collection: 0


<__main__.VectorStore at 0x130d76e40>

In [45]:
chunks[:5]

[Document(metadata={'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_files/unit2.pdf', 'file_path': '../data/pdf_files/unit2.pdf', 'total_pages': 23, 'format': 'PDF 1.7', 'title': '', 'author': '', 'subject': '', 'keywords': '', 'moddate': '2022-12-17T08:32:38+00:00', 'trapped': '', 'modDate': "D:20221217083238+00'00'", 'creationDate': "D:20221217083238+00'00'", 'page': 0, 'source_file': 'unit2.pdf', 'file_type': 'pdf'}, page_content='1 \n \n \n \n \n \n \nSCHOOL OF COMPUTING \n \nDEPARTMENT OF COMPUTER SCIENCE AND \nENGINEERING \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n     \n \n    UNIT – I  - DISTRIBUTED DATABASE AND INFORMATION SYSTEMS- SCSA3008'),
 Document(metadata={'producer': 'Microsoft® Word for Microsoft 365', 'creator': 'Microsoft® Word for Microsoft 365', 'creationdate': '2022-12-17T08:32:38+00:00', 'source': '../data/pdf_fil

In [48]:
# convert the text to embeddings
texts = [doc.page_content for doc in chunks]

# generate the embeddings
embeddings = embedding_manager.generate_embedding(texts)

# store it in the vector database
vectorstore.add_documents(chunks, embeddings)

generate embeddings for 319 texts...


Batches: 100%|██████████| 10/10 [00:02<00:00,  3.74it/s]


generated embeddings with shape: (319, 384)
Adding 319 documents to vector store...
Successfully added 319 documents to Vector Store
Total documents in collection: 319
