### RAG Pipeline - Data Ingestion to Vector DB Pipeline

In [1]:
import os
from langchain_core.documents import Document
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pathlib import Path

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Read all the pdf's inside the directory
def process_all_pdfs(pdf_directory: str) -> list[Document]:
    """Process all PDF files in a directory"""
    all_documents = []
    pdf_dir = Path(pdf_directory)

    # Find all PDF files recursively
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    print(f"Found {len(pdf_files)} PDF files to process")

    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()

            # Add source information to metadata
            for doc in documents:
                doc.metadata["source_file"] = pdf_file.name
                doc.metadata["file_type"] = "pdf"

            all_documents.extend(documents)
            print(f"Loaded {len(documents)} pages")
            
        except Exception as e:
            print(f"Error: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

all_pdf_documents = process_all_pdfs("../data")

Found 4 PDF files to process

Processing: A Dataset and Model for Realistic License Plate Deblurring.pdf
Loaded 9 pages

Processing: Adaptive_Lightweight_License_Plate_Image_Recovery_Using_Deep_Learning_Based_on_Generative_Adversarial_Network.pdf
Loaded 19 pages

Processing: Arcega_CV.pdf
Loaded 1 pages

Processing: Jansen_Cruz_CV.pdf
Loaded 1 pages

Total documents loaded: 30


In [3]:
all_pdf_documents

[Document(metadata={'producer': 'pdfTeX-1.40.25', 'creator': 'LaTeX with hyperref', 'creationdate': '2024-04-23T00:45:45+00:00', 'author': '', 'keywords': '', 'moddate': '2024-04-23T00:45:45+00:00', 'ptex.fullbanner': 'This is pdfTeX, Version 3.141592653-2.6-1.40.25 (TeX Live 2023) kpathsea version 6.3.5', 'subject': '', 'templateversion': 'IJCAI.2024.0', 'title': '', 'trapped': '/False', 'source': '..\\data\\pdf_files\\A Dataset and Model for Realistic License Plate Deblurring.pdf', 'total_pages': 9, 'page': 0, 'page_label': '1', 'source_file': 'A Dataset and Model for Realistic License Plate Deblurring.pdf', 'file_type': 'pdf'}, page_content='A Dataset and Model for Realistic License Plate Deblurring\nHaoyan Gong, Yuzheng Feng, Zhenrong Zhang, Xianxu Hou, Jingxin Liu, Siqi\nHuang and Hongbin Liu∗\nSchool of AI and Advanced Computing, Xi’an Jiaotong-Liverpool University\n{haoyan.gong21, yuzheng.feng21, zhenrong.zhang21}@student.xjtlu.edu.cn, {xianxu.hou, jingxin.liu,\nsiqi.huang, hong

### Text splitting into chunks

In [38]:
def split_documents(documents: list[Document], chunk_size=2000, chunk_overlap=500) -> list[Document]:
    """Split documents into smaller chunks for better RAG performance"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""] 
    )

    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")

    # Show example of a chunk
    if split_docs:
        print(f"\nExample chunk:")
        print(f"Content: {split_docs[0].page_content[:200]}...")
        print(f"Metadata: {split_docs[0].metadata}")

    return split_docs

In [39]:
chunks = split_documents(all_pdf_documents)

Split 30 documents into 89 chunks

Example chunk:
Content: A Dataset and Model for Realistic License Plate Deblurring
Haoyan Gong, Yuzheng Feng, Zhenrong Zhang, Xianxu Hou, Jingxin Liu, Siqi
Huang and Hongbin Liu∗
School of AI and Advanced Computing, Xi’an Ji...
Metadata: {'producer': 'pdfTeX-1.40.25', 'creator': 'LaTeX with hyperref', 'creationdate': '2024-04-23T00:45:45+00:00', 'author': '', 'keywords': '', 'moddate': '2024-04-23T00:45:45+00:00', 'ptex.fullbanner': 'This is pdfTeX, Version 3.141592653-2.6-1.40.25 (TeX Live 2023) kpathsea version 6.3.5', 'subject': '', 'templateversion': 'IJCAI.2024.0', 'title': '', 'trapped': '/False', 'source': '..\\data\\pdf_files\\A Dataset and Model for Realistic License Plate Deblurring.pdf', 'total_pages': 9, 'page': 0, 'page_label': '1', 'source_file': 'A Dataset and Model for Realistic License Plate Deblurring.pdf', 'file_type': 'pdf'}


In [40]:
chunks

[Document(metadata={'producer': 'pdfTeX-1.40.25', 'creator': 'LaTeX with hyperref', 'creationdate': '2024-04-23T00:45:45+00:00', 'author': '', 'keywords': '', 'moddate': '2024-04-23T00:45:45+00:00', 'ptex.fullbanner': 'This is pdfTeX, Version 3.141592653-2.6-1.40.25 (TeX Live 2023) kpathsea version 6.3.5', 'subject': '', 'templateversion': 'IJCAI.2024.0', 'title': '', 'trapped': '/False', 'source': '..\\data\\pdf_files\\A Dataset and Model for Realistic License Plate Deblurring.pdf', 'total_pages': 9, 'page': 0, 'page_label': '1', 'source_file': 'A Dataset and Model for Realistic License Plate Deblurring.pdf', 'file_type': 'pdf'}, page_content='A Dataset and Model for Realistic License Plate Deblurring\nHaoyan Gong, Yuzheng Feng, Zhenrong Zhang, Xianxu Hou, Jingxin Liu, Siqi\nHuang and Hongbin Liu∗\nSchool of AI and Advanced Computing, Xi’an Jiaotong-Liverpool University\n{haoyan.gong21, yuzheng.feng21, zhenrong.zhang21}@student.xjtlu.edu.cn, {xianxu.hou, jingxin.liu,\nsiqi.huang, hong

### Embedding and VectorStoreDB

In [41]:
import numpy as np
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any, Tuple
from sklearn.metrics.pairwise import cosine_similarity

In [42]:
class EmbeddingManager:
    """Handles documeng embedding generating using SentenceTransformer"""

    def __init__(self, model_name: str="all-MiniLM-L6-v2"):
        """Initialize the embedding manager
        Args:
            model_name: HuggingFace model name for sentence embeddings
        """
        self.model_name=model_name
        self.model=None
        self._load_model()

    def _load_model(self):
        """Load the SentenceTransformer model"""
        try:
            print(f"Loading embedding model: {self.model_name}")
            self.model = SentenceTransformer(self.model_name)
            print(f"Model loaded successfully. Embedding dimension: {self.model.get_sentence_embedding_dimension()}")
        except Exception as e:
            print(f"Error loading model {self.model_name}: {e}")
            raise

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for a list of texts
        Args:
            texts: List of text string to embed
        Returns:
            numpy array of embeddings with shape (len(texts), embedding_dim)
        """
        if not self.model:
            raise ValueError("Model not loaded")

        print(f"Generating embeddings for {len(texts)} texts...")
        embeddings = self.model.encode(texts, show_progress_bar=True)
        print(f"Generated embeddings with shape: {embeddings.shape}")
        return embeddings

embedding_manager = EmbeddingManager()
embedding_manager

Loading embedding model: all-MiniLM-L6-v2
Model loaded successfully. Embedding dimension: 384


<__main__.EmbeddingManager at 0x26ddb23a750>

### VectorStore

In [43]:
class VectorStore:
    """Manages document embeddings in a ChromaDB vector store"""

    def __init__(self, collection_name: str="pdf_documents", persist_directory: str="../data/vector_store"):
        """Initialize the vector store
        Args:
            collection_name: Name of the ChromaDB collection
            persist_directory: Directory to persist the vector store
        """
        self.collection_name=collection_name
        self.persist_directory=persist_directory
        self.client=None
        self.collection=None
        self._initialize_store()

    def _initialize_store(self):
        """Initialize ChromaDB client and collection"""
        try:
            # Create persistent ChromaDB client
            os.makedirs(self.persist_directory, exist_ok=True)
            self.client = chromadb.PersistentClient(path=self.persist_directory)

            # Get or create collection
            self.collection = self.client.get_or_create_collection(
                name=self.collection_name,
                metadata={"description": "PDF document embeddings for RAG"}
            )
            print(f"Vectore store initialized. Collection: {self.collection_name}")
            print(f"Existing documents in collection: {self.collection.count()}")
        
        except Exception as e:
            print(f"Error initializing vector store: {e}")
            raise

    def add_documents(self, documents: List[Document], embeddings: np.ndarray):
        """Add documents and their embeddings to the vector store
        Args:
            documents: List of LangChain documents
            embeddings: Corresponding embeddings for the documents
        """
        if len(documents) != len(embeddings):
            raise ValueError("Number of documents must match the number of embeddings")

        print(f"Adding {len(documents)} documents to vector store...")

        # Prepare data for ChromaDB
        ids = []
        metadatas = []
        documents_text = []
        embeddings_list = []

        for i, (doc, embedding) in enumerate(zip(documents, embeddings)):
            # Generate unique ID
            doc_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
            ids.append(doc_id)

            # Prepare metadata
            metadata = dict(doc.metadata)
            metadata['doc_index'] = i
            metadata['content_length'] = len(doc.page_content)
            metadatas.append(metadata)

            # Document content
            documents_text.append(doc.page_content)

            # Embedding
            embeddings_list.append(embedding.tolist())

        # Add to collection
        try:
            self.collection.add(
                ids=ids,
                embeddings=embeddings_list,
                metadatas=metadatas,
                documents=documents_text
            )
            print(f"Successfully loaded {len(documents)} documents to vector store")
            print(f"Total documents in collection: {self.collection.count()}")

        except Exception as e:
            print(f"Error adding documents to vector store: {e}")
            raise

vector_store = VectorStore()
vector_store

Vectore store initialized. Collection: pdf_documents
Existing documents in collection: 501


<__main__.VectorStore at 0x26dd3ace240>

In [44]:
 # Convert the text to embeddings
texts = [doc.page_content for doc in chunks]

# Generate the embeddings
embeddings = embedding_manager.generate_embeddings(texts)

# Store in the vector database
vector_store.add_documents(documents=chunks, embeddings=embeddings)

Generating embeddings for 89 texts...


Batches: 100%|███████████████████████████████████████████████████████████████████████████| 3/3 [00:01<00:00,  2.04it/s]


Generated embeddings with shape: (89, 384)
Adding 89 documents to vector store...
Successfully loaded 89 documents to vector store
Total documents in collection: 590


### Retriever Pipeline from VectorStore

In [45]:
class RAGRetriever:
    """Handles query-based retrieval from the vector store"""

    def __init__(self, vector_store: VectorStore, embedding_manager: EmbeddingManager):
        """Initialize the retriever
        Args:
            vector_store: Vectore store containing document embeddings
            embedding_manager: Manager for generating query embeddings
        """
        self.vector_store=vector_store
        self.embedding_manager=embedding_manager

    def retrieve(self, query: str, top_k: int=5, score_threshold: float=0.0) -> List[Dict[str, Any]]:
        """Retrieve relevant documents for a query
        Args:
            query: The search query
            top_k: Number of top results to return
            score_threshold: Minimum similarity score threshold
        Returns:
            List of dictionaries containing retrieved documents and metadata
        """
        print(f"Retrieving documents for query: '{query}'")
        print(f"Top K: {top_k}, Score threshold: {score_threshold}")

        # Generate query embedding
        query_embedding = self.embedding_manager.generate_embeddings([query])[0]

        # Search in vector store
        try:
            results = self.vector_store.collection.query(
                query_embeddings=[query_embedding.tolist()],
                n_results=top_k
            )

            # Process results
            retrieved_docs = []

            if results['documents'] and results['documents'][0]:
                documents = results['documents'][0]
                metadatas = results['metadatas'][0]
                distances = results['distances'][0]
                ids = results['ids'][0]

                for i, (doc_id, document, metadata, distance) in enumerate(zip(ids, documents, metadatas, distances)):
                    # Convert distance to similarity score (ChromaDB uses cosine distance)
                    similarity_score = 1 - distance

                    if similarity_score >= score_threshold:
                        retrieved_docs.append({
                            'id': doc_id,
                            'content': document,
                            'metadata': metadata,
                            'similarity_score': similarity_score,
                            'distance': distance,
                            'rank': i + 1
                        })
                
                print(f'Retrieved {len(retrieved_docs)} documents (after filtering)')
            else:
                print('No documents found')

            return retrieved_docs

        except Exception as e:
            print(f'Error during retrieval: {e}')
            return []

rag_retriever = RAGRetriever(vector_store=vector_store, embedding_manager=embedding_manager)

In [54]:
rag_retriever.retrieve('What is License Plate AI Deblurring')

Retrieving documents for query: 'What is License Plate AI Deblurring'
Top K: 5, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 143.87it/s]

Generated embeddings with shape: (1, 384)
Retrieved 5 documents (after filtering)





[{'id': 'doc_ba646422_5',
  'content': 'pacted the image deblurring field[Ramakrishnan et al., 2017;\nKupyn et al., 2018; Kupyn et al., 2019; Zhao et al., 2022].\nDespite these advancements, deblurring license plate images\nremains a significant challenge, primarily due to the lack of\nlarge-scale, tailored datasets. The complexity of license plate\nblurring, with its more severe degradation compared to stan-\ndard motion blur, poses an additional challenge. To better jus-\ntify the performance of existing image deblurring algorithms\non real-world blurred license plate images, we evaluate the\nperformance of several state-of-the-art deblurring algorithms\narXiv:2404.13677v1  [cs.CV]  21 Apr 2024',
  'metadata': {'file_type': 'pdf',
   'subject': '',
   'creationdate': '2024-04-23T00:45:45+00:00',
   'page_label': '1',
   'source': '..\\data\\pdf_files\\A Dataset and Model for Realistic License Plate Deblurring.pdf',
   'moddate': '2024-04-23T00:45:45+00:00',
   'ptex.fullbanner': 'Thi

### Integration VectorDB Context Pipeline with LLM Output

In [47]:
# RAG pipeline with Groq LLM
from langchain_groq import ChatGroq
import os
from dotenv import load_dotenv
load_dotenv()

# Initialize Groq
groq_api_key = os.getenv('GROQ_API_KEY')

llm = ChatGroq(
    groq_api_key=groq_api_key,
    model_name="llama-3.1-8b-instant",
    temperature=0.1,
    max_tokens=1024
)

# RAG function: retrieve context + generate response
def rag_simple(query, retriever, llm, top_k=3):
    results = retriever.retrieve(query, top_k=top_k)
    context = '\n\n'.join([doc['content'] for doc in results]) if results else ''

    if not context:
        return 'No relevant context found to answer the question'

    # Generate the answer using Groq LLM
    prompt = f"""Use the following context to answer the question concisely.
        Context: 
        {context}
    
        Question: {query}
    
        Answer:"""

    response = llm.invoke([prompt.format(context=context, query=query)])
    return response.content
    

In [55]:
answer = rag_simple("What is License Plate Deblurring?", rag_retriever, llm)
print(answer)

Retrieving documents for query: 'What is License Plate Deblurring?'
Top K: 3, Score threshold: 0.0
Generating embeddings for 1 texts...


Batches: 100%|██████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 166.49it/s]

Generated embeddings with shape: (1, 384)
Retrieved 3 documents (after filtering)





Efficient recognition of vehicle license plates is crucial for intelligent traffic management systems, however real-world scenarios often pose a significant challenge due to motion blur.
