# RAG Pipleline for Document proccessing
This colab notebook walks us through the entire implementation of RAG pipline for intelligent document processing and smart querying.

# Package installation for pipeline

In [None]:
# Install required packages
%pip install -q gradio
%pip install -q gradio_pdf
%pip install -q pypdf PyPDF2 pymupdf
%pip install -q sentence-transformers transformers
%pip install -q faiss-cpu
%pip install -q google-generativeai
%pip install -q numpy pandas

# Install LlamaIndex packages for enhanced document processing
%pip install -q llama-index
%pip install -q llama-index-readers-file
%pip install -q llama-index-embeddings-huggingface
%pip install -q llama-index-vector-stores-faiss
%pip install -q llama-index-llms-gemini

[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m4.5/4.5 MB[0m [31m54.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h

# Now to import the core packages installed, and to configure the API key

In [None]:
import os
from getpass import getpass

import gradio as gr
from gradio_pdf import PDF
import fitz  # PyMuPDF
from PyPDF2 import PdfReader
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import google.generativeai as genai

from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.core.schema import TextNode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.vector_stores import MetadataFilters, MetadataFilter, FilterOperator

# GEMINI API key
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or getpass("Gemini API key (hidden): ")
if GEMINI_API_KEY:
    os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
    genai.configure(api_key=GEMINI_API_KEY)
    gemini_model = genai.GenerativeModel("models/gemini-2.5-flash-lite")
else:
    print("No GEMINI_API_KEY set ‚Äî set env or enter when prompted.")

# Embeddings
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
llama_embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")


Error while fetching `HF_TOKEN` secret value from your vault: 'Requesting secret HF_TOKEN timed out. Secrets can only be fetched when running from the Colab UI.'.
You are not authenticated with the Hugging Face Hub in this notebook.
If the error persists, please let us know by opening an issue on GitHub (https://github.com/huggingface/huggingface_hub/issues/new).


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

# Now to configure the various data classes for the pipeline
These are data structures to handle complex document metadata

In [19]:
@dataclass
class PageInfo:
    """Stores information about a single page"""
    page_num: int
    text: str
    doc_type: Optional[str] = None
    page_in_doc: int = 0

@dataclass
class LogicalDocument:
    """Represents a logical document within a PDF"""
    doc_id: str
    doc_type: str
    page_start: int
    page_end: int
    text: str
    chunks: List[Dict] = None

@dataclass
class ChunkMetadata:
    """Rich metadata for each chunk"""
    chunk_id: str
    doc_id: str
    doc_type: str
    chunk_index: int
    page_start: int
    page_end: int
    text: str
    embedding: Optional[np.ndarray] = None

# Document classification functions and Boundary detection

In [20]:
def classify_document_type(text: str, max_length: int = 1500) -> str:
    """
    Classify the document type based on its content.
    Uses LLM to intelligently identify document category.
    """
    # Truncate text if too long to avoid token limits
    text_sample = text[:max_length] if len(text) > max_length else text

    prompt = f"""
    Analyze this document and classify it into ONE of these categories:
    - Resume: CV, professional profile, work history
    - Contract: Legal agreement, terms and conditions, service agreement
    - Mortgage Contract: Home loan agreement, mortgage terms, property financing
    - Invoice: Bill, payment request, financial statement
    - Pay Slip: Salary statement, wage slip, earnings statement
    - Lender Fee Sheet: Loan fees, lender charges, closing costs
    - Land Deed: Property deed, title document, ownership certificate
    - Bank Statement: Account statement, transaction history
    - Tax Document: W2, 1099, tax return, tax form
    - Insurance: Insurance policy, coverage document
    - Report: Analysis, research document, findings
    - Letter: Correspondence, memo, communication
    - Form: Application, questionnaire, data entry form
    - ID Document: Driver's license, passport, identification
    - Medical: Medical report, prescription, health record
    - Other: Doesn't fit other categories

    Document sample:
    {text_sample}

    Respond with ONLY the category name, nothing else.
    """

    try:
        response = gemini_model.generate_content(prompt)
        doc_type = response.text.strip()

        # Normalize the response
        valid_types = [
            'Resume', 'Contract', 'Mortgage Contract', 'Invoice', 'Pay Slip',
            'Lender Fee Sheet', 'Land Deed', 'Bank Statement', 'Tax Document',
            'Insurance', 'Report', 'Letter', 'Form', 'ID Document',
            'Medical', 'Other'
        ]

        # Find best match (case-insensitive)
        for valid_type in valid_types:
            if doc_type.lower() == valid_type.lower():
                return valid_type

        return 'Other'
    except Exception as e:
        print(f"Classification error: {e}")
        return 'Other'

def detect_document_boundary(prev_text: str, curr_text: str,
                            current_doc_type: str = None) -> bool:
    """
    Detect if two consecutive pages belong to the same document.
    Returns True if they're from the same document.
    """
    # Quick heuristic checks first
    if not prev_text or not curr_text:
        return False

    # Sample the texts for LLM analysis
    prev_sample = prev_text[-500:] if len(prev_text) > 500 else prev_text
    curr_sample = curr_text[:500] if len(curr_text) > 500 else curr_text

    prompt = f"""
    Determine if these two pages are from the SAME document.

    Current document type: {current_doc_type or 'Unknown'}

    End of Previous Page:
    ...{prev_sample}

    Start of Current Page:
    {curr_sample}...

    Consider:
    - Continuity of content
    - Formatting consistency
    - Topic coherence
    - Page numbers or headers

    Answer ONLY 'Yes' if same document or 'No' if different document.
    """

    try:
        response = gemini_model.generate_content(prompt)
        return response.text.strip().lower().startswith('yes')
    except Exception as e:
        print(f"Boundary detection error: {e}")
        # Default to keeping pages together if uncertain
        return True

# PDF processing pipeline

In [21]:
def extract_and_analyze_pdf(pdf_file) -> Tuple[List[PageInfo], List[LogicalDocument]]:
    """
    Extract text from PDF and perform intelligent document analysis.
    Returns both page-level info and logical document groupings.
    Supports various file types including scanned PDFs with OCR.
    """
    print("üìñ Starting PDF extraction and analysis...")

    # Extract text from each page
    if isinstance(pdf_file, dict) and "content" in pdf_file:
        doc = fitz.open(stream=pdf_file["content"], filetype="pdf")
    elif hasattr(pdf_file, "read"):
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
    else:
        doc = fitz.open(pdf_file)

    pages_info = []
    for i, page in enumerate(doc):
        text = page.get_text()

        # If no text is found,  switch to OCR in case of scanned documents
        if not text.strip():
            print(f"  Page {i}: No text found, attempting OCR...")
            try:
                # Convert page to image and perform OCR
                pix = page.get_pixmap()
                img_data = pix.tobytes("png")
                from PIL import Image
                import pytesseract
                import io

                img = Image.open(io.BytesIO(img_data))
                text = pytesseract.image_to_string(img)
                print(f"  Page {i}: OCR extracted {len(text)} characters")
            except Exception as e:
                print(f"  Page {i}: OCR failed - {e}")
                text = ""

        pages_info.append(PageInfo(page_num=i, text=text))

    doc.close()

    if not pages_info:
        raise ValueError("No text could be extracted from PDF")

    print(f"‚úÖ Extracted {len(pages_info)} pages")

    # Perform document classification and boundary detection
    print("üß† Analyzing document structure...")
    logical_docs = []
    current_doc_type = None
    current_doc_pages = []
    doc_counter = 0

    for i, page_info in enumerate(pages_info):
        if i == 0:
            # First page - classify document type
            current_doc_type = classify_document_type(page_info.text)
            page_info.doc_type = current_doc_type
            page_info.page_in_doc = 0
            current_doc_pages = [page_info]
            print(f"  Page {i}: New document detected - {current_doc_type}")
        else:
            # Check if this page continues the previous document
            prev_text = pages_info[i-1].text
            is_same = detect_document_boundary(prev_text, page_info.text, current_doc_type)

            if is_same:
                # Continue current document
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = len(current_doc_pages)
                current_doc_pages.append(page_info)
            else:
                # New document detected - save previous and start new
                logical_doc = LogicalDocument(
                    doc_id=f"doc_{doc_counter}",
                    doc_type=current_doc_type,
                    page_start=current_doc_pages[0].page_num,
                    page_end=current_doc_pages[-1].page_num,
                    text="\n\n".join([p.text for p in current_doc_pages])
                )
                logical_docs.append(logical_doc)
                doc_counter += 1

                # Start new document
                current_doc_type = classify_document_type(page_info.text)
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = 0
                current_doc_pages = [page_info]
                print(f"  Page {i}: New document detected - {current_doc_type}")

    # For the last document
    if current_doc_pages:
        logical_doc = LogicalDocument(
            doc_id=f"doc_{doc_counter}",
            doc_type=current_doc_type,
            page_start=current_doc_pages[0].page_num,
            page_end=current_doc_pages[-1].page_num,
            text="\n\n".join([p.text for p in current_doc_pages])
        )
        logical_docs.append(logical_doc)

    print(f"‚úÖ Identified {len(logical_docs)} logical documents")
    for ld in logical_docs:
        print(f"   - {ld.doc_type}: Pages {ld.page_start}-{ld.page_end}")

    return pages_info, logical_docs

# Now to implement chunking
Using overlapping chunking with a sliding window approach

In [22]:
def chunk_document_with_metadata(logical_doc: LogicalDocument,
                                chunk_size: int = 500,
                                overlap: int = 100) -> List[ChunkMetadata]:
    """
    Chunk a logical document while preserving rich metadata.
    Uses sliding window with overlap for better context.
    """
    chunks_metadata = []
    words = logical_doc.text.split()

    if len(words) <= chunk_size:
        # Document is small enough to be a single chunk
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_0",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=0,
            page_start=logical_doc.page_start,
            page_end=logical_doc.page_end,
            text=logical_doc.text
        )
        chunks_metadata.append(chunk_meta)
    else:
        # Create overlapping chunks
        stride = chunk_size - overlap
        for i, start_idx in enumerate(range(0, len(words), stride)):
            end_idx = min(start_idx + chunk_size, len(words))
            chunk_text = ' '.join(words[start_idx:end_idx])

            # Calculate which pages this chunk spans
            chunk_position = start_idx / len(words)
            page_range = logical_doc.page_end - logical_doc.page_start
            relative_page = int(chunk_position * page_range)
            chunk_page_start = logical_doc.page_start + relative_page
            chunk_page_end = min(chunk_page_start + 1, logical_doc.page_end)

            chunk_meta = ChunkMetadata(
                chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
                doc_id=logical_doc.doc_id,
                doc_type=logical_doc.doc_type,
                chunk_index=i,
                page_start=chunk_page_start,
                page_end=chunk_page_end,
                text=chunk_text
            )
            chunks_metadata.append(chunk_meta)

            if end_idx >= len(words):
                break

    return chunks_metadata

def chunk_with_llama_index(logical_doc: LogicalDocument,
                           chunk_size: int = 500,
                           chunk_overlap: int = 100) -> List[Document]:
    """
    Alternative: Use LlamaIndex's advanced chunking with metadata.
    """
    # Create LlamaIndex document with metadata
    doc = Document(
        text=logical_doc.text,
        metadata={
            "doc_id": logical_doc.doc_id,
            "doc_type": logical_doc.doc_type,
            "page_start": logical_doc.page_start,
            "page_end": logical_doc.page_end,
            "source": f"{logical_doc.doc_type}_document"
        }
    )

    # Using LlamaIndex's sentence splitter for better chunking
    splitter = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        paragraph_separator="\n\n",
        separator=" ",
    )

    # Creating nodes (chunks) from document
    nodes = splitter.get_nodes_from_documents([doc])

    # Converting to the ChunkMetadata format for consistency
    chunks_metadata = []
    for i, node in enumerate(nodes):
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=i,
            page_start=node.metadata.get("page_start", logical_doc.page_start),
            page_end=node.metadata.get("page_end", logical_doc.page_end),
            text=node.text
        )
        chunks_metadata.append(chunk_meta)

    return chunks_metadata

def process_all_documents(logical_docs: List[LogicalDocument],
                         use_llama_index: bool = False) -> List[ChunkMetadata]:
    """
    Process all logical documents into chunks with metadata.
    Can use either custom or LlamaIndex chunking.
    """
    all_chunks = []

    for logical_doc in logical_docs:
        if use_llama_index:
            chunks = chunk_with_llama_index(logical_doc)
        else:
            chunks = chunk_document_with_metadata(logical_doc)

        logical_doc.chunks = chunks  # Store reference
        all_chunks.extend(chunks)
        print(f"üìÑ {logical_doc.doc_type}: Created {len(chunks)} chunks")

    return all_chunks

# Query Routing 