## üìö Setup & Installation

1. Installs/updates core libraries for your stack:
*   `llama-cpp-python` (CUDA wheel), `gradio`, `gradio_pdf`, `pymupdf`, `PyPDF2`, `pillow`, `pytesseract` <br>
*   `sentence-transformers` for embeddings, `faiss-cpu` for vector search

2. Prints CUDA + model backend info to confirm GPU acceleration is active.

In [1]:
# Clean any CPU-only build first
# !pip -q uninstall -y llama-cpp-python

# Upgrade pip
!pip -q install --upgrade pip

# Install CUDA-enabled wheel (pick ONE of these)
!pip -q install llama-cpp-python --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu124
# or
# !pip -q install llama-cpp-python --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu125

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m1.8/1.8 MB[0m [31m121.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.8/1.8 MB[0m [31m47.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import llama_cpp, sys
print(llama_cpp.llama_print_system_info().decode("utf-8"))

ggml_cuda_init: GGML_CUDA_FORCE_MMQ:    yes
ggml_cuda_init: GGML_CUDA_FORCE_CUBLAS: no
ggml_cuda_init: found 1 CUDA devices:
  Device 0: Tesla T4, compute capability 7.5, VMM: yes


CUDA : ARCHS = 500,520,530,600,610,620,700,720,750,800,860,870,890,900 | FORCE_MMQ = 1 | USE_GRAPHS = 1 | PEER_MAX_BATCH_SIZE = 128 | CPU : SSE3 = 1 | SSSE3 = 1 | AVX = 1 | AVX2 = 1 | F16C = 1 | FMA = 1 | BMI2 = 1 | LLAMAFILE = 1 | OPENMP = 1 | REPACK = 1 | 


In [3]:
# Intalling UI / PDF / OCR deps
!pip -q install gradio gradio_pdf
!pip -q install pymupdf PyPDF2
!pip -q install pillow pytesseract

# Text embeddings (runs fine on either CPU or GPU via PyTorch)
!pip -q install sentence-transformers

# Vector store ‚Äî faiss-cpu is the most reliable on Colab CUDA 12.x
!pip -q install faiss-cpu

# IMPORTANT: don't reinstall llama-cpp-python here.
# If you ever need to, always use the CUDA wheel again:
# !pip -q install llama-cpp-python --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu124

In [4]:
import torch, llama_cpp
print("torch cuda:", torch.cuda.is_available())
print(llama_cpp.llama_print_system_info().decode())

torch cuda: True
CUDA : ARCHS = 500,520,530,600,610,620,700,720,750,800,860,870,890,900 | FORCE_MMQ = 1 | USE_GRAPHS = 1 | PEER_MAX_BATCH_SIZE = 128 | CPU : SSE3 = 1 | SSSE3 = 1 | AVX = 1 | AVX2 = 1 | F16C = 1 | FMA = 1 | BMI2 = 1 | LLAMAFILE = 1 | OPENMP = 1 | REPACK = 1 | 


## ‚¨áÔ∏è Download Model (Mistral-7B-Instruct GGUF)

*   Prepares a `/content/models` directory and downloads Mistral-7B open-source LLM model
*   By doing this, you get a local path for fast, private inference with `llama.cpp`.

In [5]:
from pathlib import Path
MODELS_DIR = Path("/content/models")
MODELS_DIR.mkdir(parents=True, exist_ok=True)

MODEL_PATH = MODELS_DIR / "mistral-7b-instruct.Q4_K_M.gguf"
if not MODEL_PATH.exists():
    !wget -O /content/models/mistral-7b-instruct.Q4_K_M.gguf \
      "https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf?download=1"

!ls -lh /content/models

--2026-01-23 00:52:56--  https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf?download=1
Resolving huggingface.co (huggingface.co)... 13.35.202.34, 13.35.202.121, 13.35.202.97, ...
Connecting to huggingface.co (huggingface.co)|13.35.202.34|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://us.gcp.cdn.hf.co/xet-bridge-us/65778ac662d3ac1817cc9201/865f5e4682dddb29c2e20270b2471a7590c83a414bbf1d72cf4c08fdff2eeca4?response-content-disposition=attachment%3B+filename*%3DUTF-8%27%27mistral-7b-instruct-v0.2.Q4_K_M.gguf%3B+filename%3D%22mistral-7b-instruct-v0.2.Q4_K_M.gguf%22%3B&Expires=1769133176&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiRXBvY2hUaW1lIjoxNzY5MTMzMTc2fX0sIlJlc291cmNlIjoiaHR0cHM6Ly91cy5nY3AuY2RuLmhmLmNvL3hldC1icmlkZ2UtdXMvNjU3NzhhYzY2MmQzYWMxODE3Y2M5MjAxLzg2NWY1ZTQ2ODJkZGRiMjljMmUyMDI3MGIyNDcxYTc1OTBjODNhNDE0YmJmMWQ3MmNmNGMwOGZkZmYyZWVjYTRcXD9yZXNwb25zZS1jb250ZW50

## ‚öôÔ∏è Imports & Runtime Config

1. Centralizes constants (context window, max tokens, temperature, stop tokens, GPU offload).
2. Loads the `SentenceTransformer` (embeddings on GPU) and the Llama model (mistral GGUF) for generation.
3. Provides `llm_generate()` as a thin wrapper function around `llama.cpp`.

In [6]:
import os, io, json
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Tuple, Optional

import numpy as np
import fitz  # PyMuPDF
from PyPDF2 import PdfReader

import gradio as gr
from gradio_pdf import PDF

import faiss
from sentence_transformers import SentenceTransformer
from PIL import Image
import pytesseract

from llama_cpp import Llama

# -----------------------------
# Model paths & knobs
# -----------------------------
MISTRAL_GGUF_PATH = "/content/models/mistral-7b-instruct.Q4_K_M.gguf"
LLM_CTX         = 4096
LLM_MAX_TOKENS  = 600
LLM_TEMP        = 0.1
LLM_STOP        = ["</s>"]

# Use -1 / 999 to offload all layers to GPU (when possible)
# N_GPU_LAYERS    = -1
N_GPU_LAYERS    = 999

# -----------------------------
# Embeddings on GPU
# -----------------------------
EMBEDDING_MODEL_ID = "all-MiniLM-L6-v2"
embed_model = SentenceTransformer(EMBEDDING_MODEL_ID, device="cuda")

# -----------------------------
# Load local Mistral (GPU offload)
# -----------------------------
assert os.path.exists(MISTRAL_GGUF_PATH), f"Missing GGUF at {MISTRAL_GGUF_PATH}"
llm = Llama(
    model_path=MISTRAL_GGUF_PATH,
    n_ctx=LLM_CTX,
    n_gpu_layers=N_GPU_LAYERS,
    verbose=False  # set True once to see CUDA offload details
)

def llm_generate(prompt: str, max_tokens: int = LLM_MAX_TOKENS, temperature: float = LLM_TEMP,
                 stop: Optional[List[str]] = None) -> str:
    out = llm(
        prompt=prompt,
        max_tokens=max_tokens,
        temperature=temperature,
        stop=stop or LLM_STOP,
        echo=False
    )
    return (out.get("choices", [{}])[0].get("text") or "").strip()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

llama_context: n_ctx_per_seq (4096) < n_ctx_train (32768) -- the full capacity of the model will not be utilized


## üß± Data Structures

Lightweight `dataclass` models used throughout:
*   `PageInfo` (raw page text + doc hints)
*   `LogicalDocument` (grouped pages = one logical doc)
*   `ChunkMetadata` (retrieval units with page range + embeddings)

In [7]:
@dataclass
class PageInfo:
    page_num: int
    text: str
    doc_type: Optional[str] = None
    page_in_doc: int = 0

@dataclass
class LogicalDocument:
    doc_id: str
    doc_type: str
    page_start: int
    page_end: int
    text: str
    chunks: List[Dict] = None

@dataclass
class ChunkMetadata:
    chunk_id: str
    doc_id: str
    doc_type: str
    chunk_index: int
    page_start: int
    page_end: int
    text: str
    embedding: Optional[np.ndarray] = None

## üè∑Ô∏è Rule-Based Doc Typing Keywords

1. Keyword tags (literal classification) for common document types (Pay Slip, Contract, Lender Fee Sheet, Invoice, etc.).
2. Enables fast, LLM-free classification during document ingestion.

In [8]:
import re

# ---- 1) Ultra-simple keyword bags (lowercase) ----
DOC_KEYWORDS = {
    "Resume": [
        "experience","education","skills","projects","summary","objective",
        "work history","employment history"
        ],

    "Contract": [
        "agreement","terms","parties","obligations","hereby","whereas",
        "governing law","termination","confidentiality"
        ],

    "Mortgage Contract": [
        "mortgage","deed of trust","note","borrower","lender",
        "property address","interest rate","escrow","principal and interest"
        ],

    "Invoice": [
        "invoice","invoice no","invoice number","bill to","ship to",
        "unit price","qty","subtotal","balance due","payment terms"
        ],

    "Pay Slip": [
        "salary","wages","pay period","gross pay","gross income","net pay",
        "earnings","deductions","withholding","ytd","pay date","employee"
        ],

    "Lender Fee Sheet": [
        "fee worksheet","lender fee","origination fees","origination charges",
        "closing costs","underwriting fee","processing fee","appraisal fee",
        "credit report","escrow","title"
        ],

    "Land Deed": [
        "deed","warranty deed","quitclaim deed","grantor","grantee",
        "legal description","parcel","notary","recorded","county"
        ],

    "Bank Statement": [
        "statement","account number","account ending in","ending balance",
        "available balance","transactions","deposits","withdrawals",
        "statement period"
        ],

    "Tax Document": [
        "tax","withholding","refund","filing status","form w-2","w-2",
        "1099","1040","internal revenue service","department of the treasury"
        ],

    "Insurance": [
        "policy","coverage","premium","claim","policy number",
        "effective date","expiration date","deductible","limits"
        ],

    "Report": [
        "abstract","executive summary","methodology","analysis","findings",
        "results","conclusion","discussion","recommendations"
        ],

    "Letter": [
        "dear ","sincerely","regards","to whom it may concern","subject:"
        ],

    "Form": [
        "form","application","applicant","fields","signature","instructions",
        "please print","submit"
        ],

    "ID Document": [
        "date of birth","issue date","expiration date","driver's license",
        "id number","passport","national id"
        ],

    "Medical": [
        "prescription","dosage","diagnosis","patient","provider",
        "medical record","mrn","icd-10","cpt","visit date","discharge"
        ],
}

In [9]:
# ---- 2) Core helpers ----
def _norm_text(text: str) -> str:
    return (text or "").lower()

def classify_simple(text: str) -> str:
    """
    Count keyword hits per label; return label with max hits.
    Ties: first seen. Zero hits: 'Other'.
    """
    t = _norm_text(text)
    best_label, best_hits = "Other", 0
    for label, words in DOC_KEYWORDS.items():
        hits = sum(1 for w in words if w in t)
        if hits > best_hits:
            best_label, best_hits = label, hits
    return best_label if best_hits > 0 else "Other"

PAGE_OF_RE = re.compile(r"\bpage\s+(\d+)\s+of\s+(\d+)\b", re.I)

def has_continuous_page_number(prev_text: str, curr_text: str) -> bool:
    """True if 'Page i of N' -> 'Page i+1 of N'."""
    p = PAGE_OF_RE.search(_norm_text(prev_text))
    c = PAGE_OF_RE.search(_norm_text(curr_text))
    if not (p and c):
        return False
    try:
        return int(c.group(1)) == int(p.group(1)) + 1 and c.group(2) == p.group(2)
    except:
        return False

def same_doc(prev_label: str, curr_label: str) -> bool:
    """Boundary heuristic: same predicted label ‚áí same logical document."""
    return (prev_label or "Other") == (curr_label or "Other")

## üîÄ Boundary/Type Detection Switches

Feature flags:
- `USE_RULES_FOR_UPLOAD`: use keyword rules at upload time (skip LLM).
- `USE_SIMPLE_QUERY_ROUTER`: optional keyword router at query time.

In [10]:
# ---- 3) Switches: use rules during upload; keep LLM for answers ----
USE_RULES_FOR_UPLOAD = True          # <‚Äî keep True to avoid LLM during ingestion
USE_SIMPLE_QUERY_ROUTER = False      # <‚Äî set True to avoid LLM at query-routing time

## üß≠ Routing Helper Functions (Upload & Query)

1. `classify_document_type()` ‚Üí follows key-word based rules (or fallback to any LLM version if you flip the flag).
2. `detect_document_boundary()` ‚Üí keeps multi-page docs together (page-number continuity + label consistency).
3. Optional `predict_query_document_type()` if you enable the simple router.

In [11]:
# ---- 4) Patch the names your ingestion loop already calls ----

def classify_document_type(text: str) -> str:
    """
    Upload-time classifier.
    If USE_RULES_FOR_UPLOAD is True, use simple keyword rules (fast).
    Otherwise, fall back to any previously defined LLM-based version (if available).
    """
    if USE_RULES_FOR_UPLOAD:
        return classify_simple(text)
    # Fallback path if you ever flip the switch off:
    try:
        return llm_classify_document_type(text)  # only if you kept an LLM impl around under this name
    except NameError:
        return classify_simple(text)

def detect_document_boundary(prev_text: str, curr_text: str, current_doc_type: str = None) -> bool:
    """
    Upload-time boundary detector.
    - First, honor page-number continuity (Page i of N -> Page i+1 of N).
    - Else, same-label heuristic using the simple classifier.
    Return True if 'curr_text' continues the same logical doc; False if a new doc starts.
    """
    if has_continuous_page_number(prev_text, curr_text):
        return True
    prev_label = current_doc_type or "Other"
    curr_label = classify_simple(curr_text) if USE_RULES_FOR_UPLOAD else classify_document_type(curr_text)
    return same_doc(prev_label, curr_label)

In [12]:
# ---- 5) Optional: lightweight query-time router to avoid LLM routing ----
def predict_query_document_type(query: str):
    """
    Optional, non-LLM query router that reuses the same keyword bags.
    Returns (label, confidence[0..1]).
    """
    if not USE_SIMPLE_QUERY_ROUTER:
        # If you still have an LLM router elsewhere, your app can call that instead.
        # Returning ("Other", 0.0) effectively disables routing.
        return "Other", 0.0

    t = _norm_text(query)
    best, hits = "Other", 0
    for label, words in DOC_KEYWORDS.items():
        h = sum(1 for w in words if w in t)
        if h > hits:
            best, hits = label, h
    conf = 0.9 if hits >= 2 else (0.6 if hits == 1 else 0.0)
    return best, conf

## üìë PDF Extraction + OCR + Logical Segmentation

1. Opens PDF via PyMuPDF, extracts text; if empty, runs Tesseract OCR on page image.
2. Groups pages into logical documents using the rule-based boundary detector.
3. Returns (`pages_info`, `logical_docs`) for downstream chunking.

In [13]:
# --- PDF extraction + OCR + logical segmentation (no LLM) ---
def extract_and_analyze_pdf(pdf_file):
    # open
    if isinstance(pdf_file, dict) and "content" in pdf_file:
        doc = fitz.open(stream=pdf_file["content"], filetype="pdf")
    elif hasattr(pdf_file, "read"):
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
    else:
        doc = fitz.open(pdf_file)

    pages_info = []
    for i, page in enumerate(doc):
        text = page.get_text() or ""
        if not text.strip():
            try:
                pix = page.get_pixmap()
                img = Image.open(io.BytesIO(pix.tobytes("png")))
                text = pytesseract.image_to_string(img) or ""
            except Exception:
                text = ""
        pi = PageInfo(page_num=i, text=text)
        pages_info.append(pi)
    doc.close()

    if not pages_info:
        raise ValueError("No text extracted from PDF")

    logical_docs, current_doc_pages = [], []
    current_doc_type, doc_counter = None, 0

    for i, page_info in enumerate(pages_info):
        if i == 0:
            current_doc_type = classify_document_type(page_info.text)   # <- rules
            page_info.doc_type = current_doc_type
            page_info.page_in_doc = 0
            current_doc_pages = [page_info]
        else:
            same = detect_document_boundary(pages_info[i-1].text, page_info.text, current_doc_type)  # <- rules
            if same:
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = len(current_doc_pages)
                current_doc_pages.append(page_info)
            else:
                logical_docs.append(LogicalDocument(
                    doc_id=f"doc_{doc_counter}",
                    doc_type=current_doc_type,
                    page_start=current_doc_pages[0].page_num,
                    page_end=current_doc_pages[-1].page_num,
                    text="\n\n".join(p.text for p in current_doc_pages)
                ))
                doc_counter += 1
                current_doc_type = classify_document_type(page_info.text)  # <- rules
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = 0
                current_doc_pages = [page_info]

    if current_doc_pages:
        logical_docs.append(LogicalDocument(
            doc_id=f"doc_{doc_counter}",
            doc_type=current_doc_type,
            page_start=current_doc_pages[0].page_num,
            page_end=current_doc_pages[-1].page_num,
            text="\n\n".join(p.text for p in current_doc_pages)
        ))

    return pages_info, logical_docs

## ‚úÇÔ∏è Chunking with Page Ranges

1. `chunk_document_with_metadata()` splits each logical document into overlapping text windows. Chunk size of 500 tokens and an overlap of 100 tokens are taken.
      - Every chunk carries doc_type + estimated page_start/end for later source citing.
2. `process_all_documents()` applies chunking across the packet.

In [14]:
# --- Chunking with metadata (no LLM) ---
def chunk_document_with_metadata(logical_doc: LogicalDocument, chunk_size: int = 500, overlap: int = 100):
    chunks, words = [], logical_doc.text.split()
    if len(words) <= chunk_size:
        return [ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_0",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=0,
            page_start=logical_doc.page_start,
            page_end=logical_doc.page_end,
            text=logical_doc.text
        )]
    stride, i = max(1, chunk_size - overlap), 0
    while i < len(words):
        end = min(i + chunk_size, len(words))
        chunk_text = " ".join(words[i:end])
        rel = i / max(1, len(words))
        page_range = max(1, logical_doc.page_end - logical_doc.page_start + 1)
        chunk_page_start = logical_doc.page_start + int(rel * page_range)
        chunk_page_end = min(logical_doc.page_end, chunk_page_start + 1)
        chunks.append(ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_{len(chunks)}",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=len(chunks),
            page_start=chunk_page_start,
            page_end=chunk_page_end,
            text=chunk_text
        ))
        if end >= len(words): break
        i += stride
    return chunks

def process_all_documents(logical_docs: List[LogicalDocument]) -> List[ChunkMetadata]:
    all_chunks = []
    for ld in logical_docs:
        cks = chunk_document_with_metadata(ld, chunk_size=500, overlap=100)
        # ensure doc_type carried into each chunk
        for c in cks:
            c.doc_type = ld.doc_type
        ld.chunks = cks
        all_chunks.extend(cks)
    return all_chunks

## üîé Retriever (FAISS) + Per-Type Indices

1. Builds a global FAISS index over all chunk embeddings.
2. Also builds per-doc-type sub-indices for filtered retrieval (e.g., only ‚ÄúLender Fee Sheet‚Äù).
3. `retrieve()` returns top-k chunks with normalized relevance scores.

In [15]:
# --- Retriever & FAISS indices (no LLM router) ---
class IntelligentRetriever:
    def __init__(self):
        self.index = None
        self.chunks: List[ChunkMetadata] = []
        self.doc_type_indices: Dict[str, Dict] = {}

    def build_indices(self, chunks: List[ChunkMetadata]):
        self.chunks = chunks
        texts = [c.text for c in chunks]
        embs = embed_model.encode(texts, show_progress_bar=True, convert_to_numpy=True)
        for i, c in enumerate(chunks):
            c.embedding = embs[i]
        dim = embs.shape[1]
        self.index = faiss.IndexFlatL2(dim)
        self.index.add(embs)

        self.doc_type_indices = {}
        for dt in sorted(set(c.doc_type for c in chunks)):
            idxs = [i for i, c in enumerate(chunks) if c.doc_type == dt]
            if not idxs:
                continue
            sub = faiss.IndexFlatL2(dim)
            sub.add(embs[idxs])
            self.doc_type_indices[dt] = {"index": sub, "map": idxs}

    def retrieve(self, query: str, k: int = 4, filter_doc_type: Optional[str] = None, auto_route: bool = False):
        q = embed_model.encode([query], convert_to_numpy=True)
        # no LLM routing; use filter when provided, else global
        if filter_doc_type and filter_doc_type in self.doc_type_indices:
            td = self.doc_type_indices[filter_doc_type]
            D, I = td["index"].search(q, k)
            ids, dists = [td["map"][i] for i in I[0]], D[0]
        else:
            D, I = self.index.search(q, k)
            ids, dists = I[0], D[0]

        maxd = float(max(dists) if len(dists) else 1.0)
        scores = [(maxd - d) / maxd for d in dists]
        return [(self.chunks[i], scores[j]) for j, i in enumerate(ids)]

## üß† Grounded Answer Assembly (LLM)

1. `answer_with_sources`: End-to-end coordinator for building a safe, compact context and generating an answer strictly from retrieved chunks.
2. `summarize_documents_llm`: LLM summary across logical documents (Pay Slip, Contract, Lender Fee Sheet, etc.), grouped for quick scanning.

In [16]:
# --- Replacement: grounded answer with token budget + de-dupe ---

# If you already set these elsewhere, keep them; otherwise these defaults are safe.
LLM_CTX = 4096                 # Mistral-7B context window
LLM_MAX_OUT = min(LLM_MAX_TOKENS, 512) if "LLM_MAX_TOKENS" in globals() else 512
LLM_TEMP = globals().get("LLM_TEMP", 0.2)

def _approx_token_count(s: str) -> int:
    # ~4 chars per token for English; good enough to avoid overflows.
    return max(1, len(s) // 4)

def _shrink_to_budget(blocks: list[str], max_ctx_tokens: int) -> str:
    """Concatenate blocks until ~max_ctx_tokens; trim last block if needed."""
    out, used = [], 0
    for b in blocks:
        t = _approx_token_count(b)
        if used + t > max_ctx_tokens:
            need = max_ctx_tokens - used
            out.append(b[:need * 4])  # approx chars
            break
        out.append(b)
        used += t
    return "\n".join(out)

def answer_with_sources(question: str, retrieved: List[Tuple[ChunkMetadata, float]]) -> Dict:
    """
    Build a compact, de-duped context from retrieved chunks and ask the LLM to answer.
    Returns {answer, sources[], confidence}.
    """
    if not retrieved:
        return {"answer": "I couldn't find enough context in the document(s).",
                "sources": [], "confidence": 0.0}

    # 1) De-dupe by chunk_id (preserve order)
    unique, seen = [], set()
    for cm, sc in retrieved:
        if getattr(cm, "chunk_id", None) in seen:
            continue
        seen.add(getattr(cm, "chunk_id", id(cm)))
        unique.append((cm, sc))

    # 2) Build labeled blocks + sources
    blocks, sources = [], []
    for cm, sc in unique:
        header = f"[From {cm.doc_type}, Pages {cm.page_start}-{cm.page_end}, score {sc:.2f}]"
        blocks.append(header + "\n" + cm.text)
        sources.append({
            "doc_type": cm.doc_type,
            "pages": f"{cm.page_start}-{cm.page_end}",
            "relevance": f"{sc:.2%}",
            "preview": (cm.text[:160] + "...") if len(cm.text) > 160 else cm.text
        })

    # 3) Respect the model's context window: leave headroom for the prompt + reply
    #    (prompt scaffolding ~200‚Äì300 tokens; adjust margin if you change formatting)
    PROMPT_MARGIN = 256
    ctx_budget = max(256, LLM_CTX - LLM_MAX_OUT - PROMPT_MARGIN)
    context = _shrink_to_budget(blocks, max_ctx_tokens=ctx_budget)

    prompt = f"""Use ONLY the context to answer. If the context is insufficient, say you don't know.
Cite doc types and pages briefly.

Context:
{context}

Question: {question}
Answer:"""

    try:
        txt = llm_generate(prompt, max_tokens=LLM_MAX_OUT, temperature=LLM_TEMP)
        avg_conf = float(sum(sc for _, sc in unique) / max(1, len(unique)))
        return {"answer": (txt or "").strip(), "sources": sources, "confidence": avg_conf}
    except Exception as e:
        return {"answer": f"Error generating answer: {e}", "sources": sources, "confidence": 0.0}

In [17]:
# ---------- Shared helper functions ----------
def _approx_token_count(s: str) -> int:
    return max(1, len(s) // 4)

def _shrink_to_budget(blocks: list[str], max_ctx_tokens: int) -> str:
    out, used = [], 0
    for b in blocks:
        t = _approx_token_count(b)
        if used + t > max_ctx_tokens:
            need = max_ctx_tokens - used
            out.append(b[:need * 4])
            break
        out.append(b)
        used += t
    return "\n\n".join(out)

# ---------- Summary (LLM) ----------
def summarize_documents_llm(docs: List[LogicalDocument], max_ctx_tokens: int = None) -> str:
    if not docs:
        return "No documents loaded."
    # Build lightweight context: first ~800 chars of each doc with labels & pages
    blocks = []
    for d in docs:
        hdr = f"[{d.doc_type} | Pages {d.page_start+1}-{d.page_end+1}]"
        txt = (d.text[:1200] + "...") if len(d.text) > 1200 else d.text
        blocks.append(hdr + "\n" + txt)

    # Respect the model window
    ctx_budget = max_ctx_tokens or max(256, LLM_CTX - 512)  # reserve ~512 for instructions + answer
    context = _shrink_to_budget(blocks, ctx_budget)

    prompt = f"""Summarize the following multi-document packet for a non-expert.
Group bullets by *document type* (Pay Slip, Contract, Lender Fee Sheet, etc.).
Keep it concise and strictly grounded in the context.

Context:
{context}

Write:
- A 3‚Äì6 bullet overview overall
- Then 2‚Äì4 bullets per document type found
- If any amounts appear, reference them briefly with the page range
"""
    return llm_generate(prompt, max_tokens=min(LLM_MAX_TOKENS, 600), temperature=LLM_TEMP)

## üóÇÔ∏è LocalDocStore Orchestrator

End-to-end coordinator for:
- `process_pdf()`: extract ‚Üí segment ‚Üí chunk ‚Üí embed ‚Üí index
- `query()`: retrieve ‚Üí generate grounded answer
- `summarize()`: LLM summary across logical docs
- `structure()`: compact view for the UI (doc type, page range, chunks)

In [18]:
class LocalDocStore:
    def __init__(self):
        self.pages: List[PageInfo] = []
        self.docs: List[LogicalDocument] = []
        self.chunks: List[ChunkMetadata] = []
        self.retriever = IntelligentRetriever()
        self.ready = False
        self.stats: Dict = {}
        self.filename: Optional[str] = None

    def process_pdf(self, pdf_file, filename="document.pdf"):
        self.ready = False
        self.filename = filename
        t0 = datetime.now()
        self.pages, self.docs = extract_and_analyze_pdf(pdf_file)
        self.chunks = process_all_documents(self.docs)
        self.retriever.build_indices(self.chunks)
        dt = (datetime.now() - t0).total_seconds()
        self.stats = {
            "filename": filename,
            "pages": len(self.pages),
            "docs": len(self.docs),
            "chunks": len(self.chunks),
            "types": sorted(list(set(d.doc_type for d in self.docs))),
            "time": f"{dt:.1f}s"
        }
        self.ready = True
        return True, self.stats

    def query(self, q: str, k: int = 4, filter_type: Optional[str] = None, auto_route: bool = True):
        if not self.ready:
            return {"answer": "Please upload and process a PDF first.", "sources": [], "confidence": 0.0}
        hits = self.retriever.retrieve(q, k=k, filter_doc_type=filter_type, auto_route=auto_route)
        return answer_with_sources(q, hits)

    def structure(self) -> List[Dict]:
        return [{
            "id": d.doc_id,
            "type": d.doc_type,
            "pages": f"{d.page_start + 1}-{d.page_end + 1}",
            "chunks": len(d.chunks) if d.chunks else 0
        } for d in self.docs]


    def summarize(self) -> Dict:
        if not self.ready:
            return {"answer": "Please upload and process a PDF first.", "sources": [], "confidence": None}
        txt = summarize_documents_llm(self.docs, max_ctx_tokens=LLM_CTX - 512)
        # Build lightweight sources = one per doc
        sources = [{
            "doc_type": d.doc_type,
            "pages": f"{d.page_start+1}-{d.page_end+1}",
            "relevance": "",
            "preview": (d.text[:160] + "...") if len(d.text) > 160 else d.text
        } for d in self.docs]
        return {"answer": txt.strip(), "sources": sources, "confidence": None}

## üß© UI Helpers (Sources + Suggestions)
1. `_render_sources()` ‚Üí pretty ‚ÄúSources‚Äù block with doc type + pages.
2. `_suggest_for_types()` ‚Üí smart Suggested Questions list based on detected doc types.
3. `summary_handler()` ‚Üí injects a user ‚ÄúWhat‚Äôs the summary?‚Äù bubble and appends the LLM summary.

In [19]:
# ---------- helpers (no style forcing) ----------
def _render_sources(sources):
    if not sources:
        return ""
    lines = ["\n\nüìç **Sources:**"]
    for s in sources:
        pages = s.get("pages", "")
        lines.append(f"‚Ä¢ {s.get('doc_type','')} (Pages {pages})")
    return "\n".join(lines)

def summary_handler(history, doc_filter):
    """User bubble + bot summary; no bullet/style constraints."""
    if not store.ready:
        return history + [[None, "üìö Please upload and process a PDF first."]]
    history = history + [["üìù What's the summary?", None]]
    resp = store.summarize()
    text = (resp.get("answer","") or "").strip() + _render_sources(resp.get("sources", []))
    history[-1][1] = text
    return history

def _suggest_for_types(types: list[str]) -> list[str]:
    if not types: return []
    s = []
    if "Pay Slip" in types:
        s += ["What is the net pay?", "How many working days are shown?", "List earnings and deductions."]
    if "Contract" in types:
        s += ["Summarize the key obligations.", "What are the termination conditions?", "Who are the parties and dates?"]
    if "Lender Fee Sheet" in types:
        s += ["What are the total lender fees?", "List origination and underwriting fees.", "What is the APR and term?"]
    if "Invoice" in types:
        s += ["What is the total amount due?", "List line items and amounts."]

    # Removed generic "brief summary" since the button already covers it
    s += ["What pages are most relevant to payment amounts?"]

    seen, out = set(), []
    for q in s:
        if q not in seen:
            seen.add(q); out.append(q)
    return out[:10]

## üîß Handlers (Process / Chat / Reset)
1. process_pdf_handler() ‚Üí runs ingestion, fills ‚ÄúDocument Info‚Äù and ‚ÄúSuggested Questions.‚Äù
2. chat_handler() ‚Üí calls store.query() (note the correct kwarg filter_type), prints sources + confidence, and shows friendly errors in-chat if something fails.
3. clear_all() ‚Üí resets app state and UI controls.

In [20]:
# ---------- store ----------
# Assumes LocalDocStore() class is defined above (with .process_pdf, .query, .summarize, .structure)
store = LocalDocStore()

def process_pdf_handler(pdf_file):
    if pdf_file is None:
        return "‚ö†Ô∏è Please upload a PDF.", "", gr.update(choices=["All"], value="All"), gr.update(choices=[], value=None)
    ok, st = store.process_pdf(pdf_file, filename=getattr(pdf_file, "name", "document.pdf"))
    if ok:
        status = (
            f"‚úÖ **Processed**\n"
            f"- üìÑ File: {st['filename']}\n"
            f"- üìë Pages: {st['pages']}\n"
            f"- üìö Docs: {st['docs']}\n"
            f"- üß© Chunks: {st['chunks']}\n"
            f"- üè∑ Types: {', '.join(st['types'])}\n"
            f"- ‚è± Time: {st['time']}\n"
        )
        struct = "\n".join([f"‚Ä¢ **{d['type']}** (Pages {d['pages']}) ‚Äî {d['chunks']} chunks" for d in store.structure()])
        types = ["All"] + st["types"]
        suggs = _suggest_for_types(st["types"])
        return status, struct, gr.update(choices=types, value="All"), gr.update(choices=suggs, value=(suggs[0] if suggs else None))
    return "‚ùå Error", "", gr.update(choices=["All"], value="All"), gr.update(choices=[], value=None)

def chat_handler(message, history, doc_filter, auto_route, num_chunks):
    """No formatting hints; pass message through as-is, with safe error reporting."""
    if not store.ready:
        return history + [[message, "üìö Please upload and process a PDF first."]]

    filt = None if doc_filter == "All" else doc_filter

    try:
        # ‚úÖ Use the correct kwarg name expected by LocalDocStore.query
        res = store.query(
            message,
            k=num_chunks,
            filter_type=filt,          # <-- was filter_doc_type (bug)
            auto_route=bool(auto_route and filt is None)
        )

        reply = res.get("answer", "").strip()
        if res.get("sources"):
            reply += "\n\nüìç **Sources:**\n" + "\n".join(
                f"‚Ä¢ {s['doc_type']} (Pages {s['pages']}) ‚Äî Relevance {s['relevance']}"
                for s in res["sources"]
            )
        conf = res.get("confidence")
        reply += (
            f"\n\n*Confidence: {conf:.1%} | Backend: Mistral-7B (open-source, local)*"
            if conf is not None
            else "\n\n*Backend: Mistral-7B (open-source, local)*"
        )
        return history + [[message, reply]]

    except Exception as e:
        # Show a helpful error in the chat instead of a red 'Error' badge
        return history + [[message, f"‚ö†Ô∏è Error answering: `{type(e).__name__}: {e}`"]]

def clear_all():
    global store
    store = LocalDocStore()
    return (None, "‚è≥ Waiting for PDF upload...", "", gr.update(choices=["All"], value="All"), [], "", True, gr.update(choices=[], value=None))

## üé® Custom CSS (Handlee Everywhere)
1. Imports Google Font ‚ÄúHandlee‚Äù and applies it app-wide (chat, headers, inputs, buttons).
2. Defines soft card look, readable chat, warm orange primary buttons, and a dark header band with white title / subtitle.

In [21]:
# ---------- polished, personalized UI (Display Name + Suggestions) ----------
APP_TITLE = "Sriram‚Äôs Document Intelligence Chatbot"
APP_SUBTITLE = "Private, Fast, and Explainable PDF Q&A"

custom_css = """
/* Load the handwriting font and apply it APP-WIDE */
@import url('https://fonts.googleapis.com/css2?family=Handlee&display=swap');

/* ---------- Make Handlee the default everywhere ---------- */
:root, body, .gradio-container,
.prose, .prose *, .gr-markdown, .gr-markdown *,
#chatbox, #chatbox *, .wrap, .message,
label, .label, legend, summary,
input, textarea, select, option,
.gr-textbox textarea, .gr-text-input input,
.gr-dropdown, .gr-dropdown *, .gr-select, .gr-select *,
.gr-checkbox, .gr-checkbox *, .gr-radio, .gr-radio *,
.gr-button, .gr-button *, button, .btn, .btn * {
  font-family: "Handlee", cursive !important;
  color:#0f172a !important; /* readable dark text */
}

/* ---------- Pleasant card look + readable chat ---------- */
.card{
  background:#f8fafc;
  border-radius:12px;
  padding:12px 16px;
  box-shadow:0 2px 6px rgba(0,0,0,.05);
}
#chatbox{
  border-radius:12px;
  border:1px solid #e2e8f0;
  background:#ffffff;
  color:#0f172a;
  line-height:1.6;
  letter-spacing:.1px;
  box-shadow:0 4px 12px rgba(2,6,23,.04);
}

/* ---------- Inputs / text areas / dropdowns ---------- */
input[type="text"], input[type="search"], textarea,
.gr-textbox textarea, .gr-text-input input{
  background:#ffffff !important;
  color:#0f172a !important;
  border:1px solid #e2e8f0 !important;
  border-radius:10px !important;
}
input::placeholder, textarea::placeholder{ color:#64748b !important; }

.gr-dropdown [role="combobox"], .gr-dropdown input,
.gr-dropdown .container, .gr-select .container{
  background:#ffffff !important;
  color:#0f172a !important;
  border:1px solid #e2e8f0 !important;
  border-radius:10px !important;
}
.gr-dropdown [role="listbox"], .gr-dropdown .menu{
  background:#ffffff !important;
  color:#0f172a !important;
  border:1px solid #e2e8f0 !important;
}

/* ---------- Buttons (warm orange primary) ---------- */
.gr-button.primary, .gr-button--primary, button.primary{
  background:#fb923c !important;         /* orange-400 */
  border:1px solid #f97316 !important;    /* orange-500 */
  color:#1f2937 !important;               /* slate-700 */
  border-radius:12px !important;
}
.gr-button.primary:hover, .gr-button--primary:hover, button.primary:hover{
  background:#f97316 !important;
}

/* ---------- Header band (white text) ---------- */
.header-wrap{
  text-align:center;
  padding:12px;
  background:#1e293b;
  border-radius:12px;
  margin-bottom:8px;
}
.header-wrap h1, .header-wrap p, .header-wrap, .header-wrap *{
  color:#ffffff !important;
}

/* ---------- Footer ---------- */
.footer{
  text-align:center;
  color:#64748b;
  font-size:13px;
  padding:8px 0 0 0;
}
"""

## üß± Gradio Layout (Blocks)
Three-column layout:
- Left: Display Name + PDF viewer + Process/Reset
- Middle: Document Info, Retrieval Settings, Suggested Questions
- Right: Chatbot, Textbox + Send, Generate Summary button above Clear Chat

## ‚ö° Wiring the Events
1. Live header rename when Display Name changes.
2. Button / Linkage:
- Process / Change PDF ‚Üí handlers
- Send/Enter ‚Üí chat_handler
- Generate Summary ‚Üí summary_handler
- Ask Selected ‚Üí routes through chat_handler
- Reset / Clear Chat ‚Üí state clear

## üöÄ Launch
Starts the Gradio app with your custom CSS and layout; shows a public share link in Colab.

In [22]:
with gr.Blocks(title=APP_TITLE, css=custom_css) as demo:
    # --- Header ---
    header_html = gr.HTML(f"""
    <div class="header-wrap">
      <h1>üß† {APP_TITLE} üß†</h1>
      <p>{APP_SUBTITLE}</p>
    </div>
    """)

    with gr.Row():
        # ========== Left: PDF + controls ==========
        with gr.Column(scale=2):
            with gr.Row():
                user_name = gr.Textbox(value="Sriram", label="ü™™ Display Name", scale=2)
            pdf_in = PDF(label="üìÑ PDF Viewer", interactive=True, height=560, elem_classes=["card"])
            with gr.Row():
                process_btn = gr.Button("‚öôÔ∏è Process PDF", variant="primary")
                clear_btn   = gr.Button("üßπ Reset", variant="secondary")

        # ========== Middle: info + retrieval settings ==========
        with gr.Column(scale=1):
            gr.Markdown("### üìä Document Info", elem_classes=["card"])
            status_md    = gr.Markdown("‚è≥ Waiting for PDF upload...", elem_classes=["card"])
            structure_md = gr.Markdown("", elem_classes=["card"])

            gr.Markdown("### üîç Retrieval Settings", elem_classes=["card"])
            doc_filter   = gr.Dropdown(choices=["All"], value="All", label="üè∑ Doc Type Filter", elem_classes=["card"])
            auto_route   = gr.Checkbox(value=False, label="üéØ Auto-Route Queries to Likely Doc Type", elem_classes=["card"])
            num_chunks   = gr.Slider(1, 10, value=4, step=1, label="üìä Chunks to Retrieve", elem_classes=["card"])

            sugg_dd      = gr.Dropdown(choices=[], value=None, label="üí° Suggested Questions", elem_classes=["card"])
            ask_sugg     = gr.Button("‚û°Ô∏è Ask Selected", size="sm")

        # ========== Right: chat ==========
        with gr.Column(scale=2):
            gr.Markdown("### üí¨ Ask Questions", elem_classes=["card"])
            bot = gr.Chatbot(height=560, show_label=False, elem_id="chatbox")
            with gr.Row():
                msg      = gr.Textbox(placeholder="e.g., What are the payment terms?")
                send_btn = gr.Button("üì§ Send", variant="primary")

            # Summary button ABOVE Clear Chat
            summary_btn = gr.Button("üìù Generate Summary", size="sm")

            # Clear chat row (stays below)
            with gr.Row():
                clear_chat = gr.Button("üßº Clear Chat", size="sm")

    # --- Footer ---
    gr.HTML("""
    <div class="footer">
      Built with ‚ù§Ô∏è using <b>Mistral-7B</b> + <b>Gradio</b> ‚Äî runs locally for privacy
    </div>
    """)

    # ---------- Handlers (defined inside Blocks so we can bind right away) ----------
    def _update_header(name: str):
        title = f"{name.strip()}'s Document Intelligence Chatbot" if name.strip() else APP_TITLE
        return gr.update(value=f"""
        <div class="header-wrap">
          <h1>üß† {title}</h1>
          <p>{APP_SUBTITLE}</p>
        </div>
        """)

    def _ask_selected(q, history, _doc_filter, _auto_route, _num_chunks):
        if not q:
            return history + [[None, "‚ÑπÔ∏è Pick a suggestion from the left dropdown."]]
        # Reuse your main chat handler
        return chat_handler(q, history, _doc_filter, _auto_route, _num_chunks)

    # ---------- Event wiring (ALL inside the Blocks context) ----------
    user_name.change(_update_header, inputs=user_name, outputs=header_html)

    process_btn.click(process_pdf_handler, inputs=pdf_in,
                      outputs=[status_md, structure_md, doc_filter, sugg_dd])
    pdf_in.change(process_pdf_handler, inputs=pdf_in,
                  outputs=[status_md, structure_md, doc_filter, sugg_dd])

    clear_btn.click(clear_all,
                    outputs=[pdf_in, status_md, structure_md, doc_filter, bot, msg, auto_route, sugg_dd])
    clear_chat.click(lambda: [], outputs=bot)

    msg.submit(chat_handler, inputs=[msg, bot, doc_filter, auto_route, num_chunks], outputs=bot) \
       .then(lambda: "", None, msg)
    send_btn.click(chat_handler, inputs=[msg, bot, doc_filter, auto_route, num_chunks], outputs=bot) \
            .then(lambda: "", None, msg)

    summary_btn.click(summary_handler, inputs=[bot, doc_filter], outputs=bot)

    ask_sugg.click(_ask_selected, inputs=[sugg_dd, bot, doc_filter, auto_route, num_chunks], outputs=bot)

# Launch outside the Blocks context
demo.launch(share=True, debug=False)

  with gr.Blocks(title=APP_TITLE, css=custom_css) as demo:


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://f9e753a4336356148e.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


