## üìö Setup and Installation
First, let's install all necessary packages

In [None]:
!pip install -q jedi>=0.16
# Install required packages
!pip install -q gradio
!pip install -q gradio_pdf
!pip install -q pypdf PyPDF2 pymupdf
!pip install -q sentence-transformers transformers
!pip install -q faiss-cpu
# !pip install -q google-generativeai
!pip install -q numpy pandas

# Install LlamaIndex packages for enhanced document processing
!pip install -q llama-index
!pip install -q llama-index-readers-file
!pip install -q llama-index-embeddings-huggingface
!pip install -q llama-index-vector-stores-faiss
# !pip install -q llama-index-llms-gemini

In [None]:
!pip install -q llama-cpp-python==0.2.90 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu123

In [None]:
!mkdir -p models
!wget -O models/mistral-7b-instruct-v0.2.Q4_K_M.gguf \
  https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf

--2025-12-29 09:44:16--  https://huggingface.co/TheBloke/Mistral-7B-Instruct-v0.2-GGUF/resolve/main/mistral-7b-instruct-v0.2.Q4_K_M.gguf
Resolving huggingface.co (huggingface.co)... 13.35.202.40, 13.35.202.34, 13.35.202.97, ...
Connecting to huggingface.co (huggingface.co)|13.35.202.40|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://cas-bridge.xethub.hf.co/xet-bridge-us/65778ac662d3ac1817cc9201/865f5e4682dddb29c2e20270b2471a7590c83a414bbf1d72cf4c08fdff2eeca4?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=cas%2F20251229%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20251229T094416Z&X-Amz-Expires=3600&X-Amz-Signature=cbe9630fdc25fa676b7ee4ebbb7f7a617ca0ef8544a956011fe6fcfc5a1e0645&X-Amz-SignedHeaders=host&X-Xet-Cas-Uid=public&response-content-disposition=inline%3B+filename*%3DUTF-8%27%27mistral-7b-instruct-v0.2.Q4_K_M.gguf%3B+filename%3D%22mistral-7b-instruct-v0.2.Q4_K_M.gguf%22%3B&x-id=GetObject&Expires=1767005

In [None]:
# OCR dependencies (for scanned PDFs)
!apt-get -qq update
!apt-get -qq install -y tesseract-ocr
!pip install -q pytesseract pillow

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


## üîß Core Imports and Configuration

In [None]:
## üîß Core Imports and Configuration

import gradio as gr
from gradio_pdf import PDF
import fitz  # PyMuPDF
from PyPDF2 import PdfReader
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import json
from datetime import datetime
import hashlib
from llama_cpp import Llama, LlamaGrammar


# LlamaIndex imports
from llama_index.core import Document, VectorStoreIndex, StorageContext
from llama_index.core.schema import TextNode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.vector_stores import MetadataFilters, MetadataFilter, FilterOperator

# üî• ADD THIS BLOCK RIGHT HERE üî•
from llama_cpp import Llama
import re

from llama_cpp import Llama

mistral_llm = Llama(
    model_path="models/mistral-7b-instruct-v0.2.Q4_K_M.gguf",
    n_ctx=4096,
    n_threads=8,
    n_gpu_layers=20,
    chat_format="mistral-instruct",   # <-- key fix
    verbose=False,
)

def llm_generate(prompt: str, max_tokens=256, temperature=0.2) -> str:
    resp = mistral_llm.create_chat_completion(
        messages=[
            {"role": "system", "content": "You must follow instructions exactly. Output must be strictly formatted."},
            {"role": "user", "content": prompt},
        ],
        max_tokens=max_tokens,
        temperature=temperature,
        top_p=0.9,
        stop=["</s>", "\n\n\n"],
    )
    return resp["choices"][0]["message"]["content"].strip()



# Initialize embedding models (both for compatibility)
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
llama_embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")

In [None]:
print(llm_generate("Answer ONLY with the word YES."))

I understand your instruction. However, as a text-based AI, I don't have the ability to answer with just a single word like "YES." I can only type out text responses. But if we assume that "answering with the word YES" includes typing out the word YES as a text response, then the answer is: YES.


## üìÑ Data Structures for Enhanced Document Management
Let's define our data structures to handle complex document metadata:

In [None]:
@dataclass
class PageInfo:
    """Stores information about a single page"""
    page_num: int
    text: str
    doc_type: Optional[str] = None
    page_in_doc: int = 0

@dataclass
class LogicalDocument:
    """Represents a logical document within a PDF"""
    doc_id: str
    doc_type: str
    page_start: int
    page_end: int
    text: str
    chunks: List[Dict] = None

@dataclass
class ChunkMetadata:
    """Rich metadata for each chunk"""
    chunk_id: str
    doc_id: str
    doc_type: str
    chunk_index: int
    page_start: int
    page_end: int
    text: str
    filename: Optional[str] = None   # üëà ADD THIS
    embedding: Optional[np.ndarray] = None

## üß† Document Intelligence Functions
These functions handle document classification and boundary detection:

In [None]:
def classify_document_type(text: str, max_length: int = 1500) -> str:
    """
    Classify the document type based on its content using Mistral (open-source).
    """
    text_sample = text[:max_length] if len(text) > max_length else text

    valid_types = [
        'Resume', 'Contract', 'Mortgage Contract', 'Invoice', 'Pay Slip',
        'Lender Fee Sheet', 'Land Deed', 'Bank Statement', 'Tax Document',
        'Insurance', 'Report', 'Letter', 'Form', 'ID Document',
        'Medical', 'Other'
    ]

    prompt = f"""
You are a document classifier.

Choose EXACTLY ONE category from this list:
{", ".join(valid_types)}

Rules:
- Output ONLY the category name
- No explanation
- If uncertain, output "Other"

Document sample:
{text_sample}

Category:
""".strip()

    try:
        raw = llm_generate(prompt, max_tokens=16, temperature=0.0)
        doc_type = re.sub(r"[^A-Za-z ]", "", raw).strip().split()[0]

        # Normalize strictly
        for t in valid_types:
            if doc_type.lower() == t.lower():
                return t

        return "Other"

    except Exception as e:
        print(f"Classification error: {e}")
        return "Other"


def detect_document_boundary(
    prev_text: str,
    curr_text: str,
    current_doc_type: str = None
) -> bool:
    """
    Detect if two consecutive pages belong to the same document using Mistral.
    Returns True if they are from the same document.
    """
    if not prev_text or not curr_text:
        return False

    prev_sample = prev_text[-500:] if len(prev_text) > 500 else prev_text
    curr_sample = curr_text[:500] if len(curr_text) > 500 else curr_text

    prompt = f"""
Determine whether these two pages belong to the SAME document.

Consider:
- Continuity of content
- Formatting consistency
- Topic coherence
- Page numbers or headers

Return JSON ONLY in this exact format:
{{
  "same_document": true or false,
  "confidence": number between 0 and 1
}}

Current document type: {current_doc_type or "Unknown"}

End of previous page:
{prev_sample}

Start of current page:
{curr_sample}
""".strip()

    try:

        raw = llm_generate(prompt, max_tokens=120, temperature=0.0)

        match = re.search(r"\{.*\}", raw, re.DOTALL)
        if not match:
            # If model failed to follow format ‚Üí be conservative
            return True

        data = json.loads(match.group(0))

        same_document = bool(data.get("same_document", True))
        confidence = float(data.get("confidence", 0.0))

        # HYBRID DECISION POLICY (HERE)
        if confidence >= 0.85:
          return same_document

        # Low confidence ‚Üí keep pages together
        return False

    except Exception as e:
        print(f"Boundary detection error: {e}")
        # safer default: keep pages together
        return True

## üìë Advanced PDF Processing Pipeline
Now let's build the enhanced PDF processing pipeline:

In [None]:
def extract_and_analyze_pdf(pdf_file) -> Tuple[List[PageInfo], List[LogicalDocument]]:
    """
    Extract text from PDF and perform intelligent document analysis.
    Returns both page-level info and logical document groupings.
    Supports various file types including scanned PDFs with OCR.
    """
    print("üìñ Starting PDF extraction and analysis...")

    # Extract text from each page
    if isinstance(pdf_file, dict) and "content" in pdf_file:
        doc = fitz.open(stream=pdf_file["content"], filetype="pdf")
    elif hasattr(pdf_file, "read"):
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
    else:
        doc = fitz.open(pdf_file)

    pages_info = []
    for i, page in enumerate(doc):
        text = page.get_text()

        # If no text found, try OCR (for scanned documents)
        if not text.strip():
            print(f"  Page {i}: No text found, attempting OCR...")
            try:
                # Convert page to image and perform OCR
                pix = page.get_pixmap(dpi=300)
                img_data = pix.tobytes("png")
                from PIL import Image
                import pytesseract
                import io

                img = Image.open(io.BytesIO(img_data))
                text = pytesseract.image_to_string(img)
                print(f"  Page {i}: OCR extracted {len(text)} characters")
            except Exception as e:
                print(f"  Page {i}: OCR failed - {e}")
                text = ""

        pages_info.append(PageInfo(page_num=i, text=text))

    doc.close()

    if not pages_info:
        raise ValueError("No text could be extracted from PDF")

    print(f"‚úÖ Extracted {len(pages_info)} pages")

    # Perform document classification and boundary detection
    print("üß† Analyzing document structure...")
    logical_docs = []
    current_doc_type = None
    current_doc_pages = []
    doc_counter = 0

    for i, page_info in enumerate(pages_info):
        if i == 0:
            # First page - classify document type
            current_doc_type = classify_document_type(page_info.text)
            page_info.doc_type = current_doc_type
            page_info.page_in_doc = 0
            current_doc_pages = [page_info]
            print(f"  Page {i}: New document detected - {current_doc_type}")
        else:
            # Check if this page continues the previous document
            prev_text = pages_info[i-1].text
            is_same = detect_document_boundary(prev_text, page_info.text, current_doc_type)

            if is_same:
                # Continue current document
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = len(current_doc_pages)
                current_doc_pages.append(page_info)
            else:
                # New document detected - save previous and start new
                logical_doc = LogicalDocument(
                    doc_id=f"doc_{doc_counter}",
                    doc_type=current_doc_type,
                    page_start=current_doc_pages[0].page_num,
                    page_end=current_doc_pages[-1].page_num,
                    text="\n\n".join([p.text for p in current_doc_pages])
                )
                logical_docs.append(logical_doc)
                doc_counter += 1

                # Start new document
                current_doc_type = classify_document_type(page_info.text)
                page_info.doc_type = current_doc_type
                page_info.page_in_doc = 0
                current_doc_pages = [page_info]
                print(f"  Page {i}: New document detected - {current_doc_type}")

    # Don't forget the last document
    if current_doc_pages:
        logical_doc = LogicalDocument(
            doc_id=f"doc_{doc_counter}",
            doc_type=current_doc_type,
            page_start=current_doc_pages[0].page_num,
            page_end=current_doc_pages[-1].page_num,
            text="\n\n".join([p.text for p in current_doc_pages])
        )
        logical_docs.append(logical_doc)

    print(f"‚úÖ Identified {len(logical_docs)} logical documents")
    for ld in logical_docs:
        print(f"   - {ld.doc_type}: Pages {ld.page_start}-{ld.page_end}")

    return pages_info, logical_docs

## ‚úÇÔ∏è Intelligent Chunking with Metadata Preservation
We'll provide two chunking approaches - our custom implementation and LlamaIndex's built-in capabilities:

In [None]:
def chunk_document_with_metadata(logical_doc: LogicalDocument,
                                chunk_size: int = 500,
                                overlap: int = 100) -> List[ChunkMetadata]:
    """
    Chunk a logical document while preserving rich metadata.
    Uses sliding window with overlap for better context.
    """
    chunks_metadata = []
    words = logical_doc.text.split()

    if len(words) <= chunk_size:
        # Document is small enough to be a single chunk
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_0",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=0,
            page_start=logical_doc.page_start,
            page_end=logical_doc.page_end,
            text=logical_doc.text
        )
        chunks_metadata.append(chunk_meta)
    else:
        # Create overlapping chunks
        stride = chunk_size - overlap
        for i, start_idx in enumerate(range(0, len(words), stride)):
            end_idx = min(start_idx + chunk_size, len(words))
            chunk_text = ' '.join(words[start_idx:end_idx])

            # Calculate which pages this chunk spans
            # (simplified - in production, track more precisely)
            chunk_position = start_idx / len(words)
            page_range = logical_doc.page_end - logical_doc.page_start
            relative_page = int(chunk_position * page_range)
            chunk_page_start = logical_doc.page_start + relative_page
            chunk_page_end = min(chunk_page_start + 1, logical_doc.page_end)

            chunk_meta = ChunkMetadata(
                chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
                doc_id=logical_doc.doc_id,
                doc_type=logical_doc.doc_type,
                chunk_index=i,
                page_start=chunk_page_start,
                page_end=chunk_page_end,
                text=chunk_text
            )
            chunks_metadata.append(chunk_meta)

            if end_idx >= len(words):
                break

    return chunks_metadata

def chunk_with_llama_index(logical_doc: LogicalDocument,
                           chunk_size: int = 500,
                           chunk_overlap: int = 100) -> List[Document]:
    """
    Alternative: Use LlamaIndex's advanced chunking with metadata.
    """
    # Create LlamaIndex document with metadata
    doc = Document(
        text=logical_doc.text,
        metadata={
            "doc_id": logical_doc.doc_id,
            "doc_type": logical_doc.doc_type,
            "page_start": logical_doc.page_start,
            "page_end": logical_doc.page_end,
            "source": f"{logical_doc.doc_type}_document"
        }
    )

    # Use LlamaIndex's sentence splitter for better chunking
    splitter = SentenceSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        paragraph_separator="\n\n",
        separator=" ",
    )

    # Create nodes (chunks) from document
    nodes = splitter.get_nodes_from_documents([doc])

    # Convert to our ChunkMetadata format for consistency
    chunks_metadata = []
    for i, node in enumerate(nodes):
        chunk_meta = ChunkMetadata(
            chunk_id=f"{logical_doc.doc_id}_chunk_{i}",
            doc_id=logical_doc.doc_id,
            doc_type=logical_doc.doc_type,
            chunk_index=i,
            page_start=node.metadata.get("page_start", logical_doc.page_start),
            page_end=node.metadata.get("page_end", logical_doc.page_end),
            text=node.text
        )
        chunks_metadata.append(chunk_meta)

    return chunks_metadata

def process_all_documents(logical_docs: List[LogicalDocument],
                         use_llama_index: bool = False) -> List[ChunkMetadata]:
    """
    Process all logical documents into chunks with metadata.
    Can use either custom or LlamaIndex chunking.
    """
    all_chunks = []

    for logical_doc in logical_docs:
        if use_llama_index:
            chunks = chunk_with_llama_index(logical_doc)
        else:
            chunks = chunk_document_with_metadata(logical_doc)

        logical_doc.chunks = chunks  # Store reference
        all_chunks.extend(chunks)
        print(f"üìÑ {logical_doc.doc_type}: Created {len(chunks)} chunks")

    return all_chunks

## üéØ Query Routing and Intelligent Retrieval

In [None]:
def predict_query_document_type(query: str) -> Tuple[str, float]:
    valid_types = [
        'Resume', 'Contract', 'Mortgage Contract', 'Invoice', 'Pay Slip',
        'Lender Fee Sheet', 'Land Deed', 'Bank Statement', 'Tax Document',
        'Insurance', 'Report', 'Letter', 'Form', 'ID Document',
        'Medical', 'Other'
    ]

    prompt = f"""
Pick the MOST LIKELY document type that contains the answer.

Valid types:
{", ".join(valid_types)}

Query: "{query}"

Return JSON ONLY in this exact format:
{{"type":"<one valid type>","confidence":<number between 0 and 1>}}
""".strip()

    try:
        raw = llm_generate(prompt, max_tokens=80, temperature=0.0)

        # Extract JSON object even if the model adds text
        m = re.search(r'\{.*\}', raw, flags=re.DOTALL)
        if not m:
            return "Other", 0.0

        obj = json.loads(m.group(0))
        t = obj.get("type", "Other")
        c = float(obj.get("confidence", 0.5))

        # normalize
        t_norm = "Other"
        for vt in valid_types:
            if str(t).lower() == vt.lower():
                t_norm = vt
                break
        c = max(0.0, min(1.0, c))
        return t_norm, c

    except Exception as e:
        print(f"Query routing error: {e}")
        return "Other", 0.0


class IntelligentRetriever:
    """
    Advanced retrieval system with metadata filtering and query routing.
    """

    def __init__(self):
        self.index = None
        self.chunks_metadata = []
        self.doc_type_indices = {}  # Separate indices per doc type

    def build_indices(self, chunks_metadata: List[ChunkMetadata]):
        """
        Build FAISS indices with document type segregation.
        """
        print("üî® Building vector indices...")
        self.chunks_metadata = chunks_metadata

        # Create embeddings for all chunks
        texts = [chunk.text for chunk in chunks_metadata]
        embeddings = embed_model.encode(texts, show_progress_bar=True)

        # Store embeddings in metadata
        for i, chunk in enumerate(chunks_metadata):
            chunk.embedding = embeddings[i]

        # Build main index
        dim = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dim)
        self.index.add(embeddings)

        # Build separate indices for each document type
        doc_types = set(chunk.doc_type for chunk in chunks_metadata)
        for doc_type in doc_types:
            type_indices = [i for i, chunk in enumerate(chunks_metadata)
                          if chunk.doc_type == doc_type]
            if type_indices:
                type_embeddings = embeddings[type_indices]
                type_index = faiss.IndexFlatL2(dim)
                type_index.add(type_embeddings)
                self.doc_type_indices[doc_type] = {
                    'index': type_index,
                    'mapping': type_indices  # Maps back to original chunks
                }

        print(f"‚úÖ Indexed {len(chunks_metadata)} chunks across {len(doc_types)} document types")

    def retrieve(self, query: str, k: int = 4,
                filter_doc_type: Optional[str] = None,
                auto_route: bool = True) -> List[Tuple[ChunkMetadata, float]]:
        """
        Retrieve relevant chunks with optional filtering and routing.
        Returns chunks with relevance scores.
        """
        query_embedding = embed_model.encode([query])

        # Determine which index to search
        if filter_doc_type and filter_doc_type in self.doc_type_indices:
            # Use filtered index
            type_data = self.doc_type_indices[filter_doc_type]
            D, I = type_data['index'].search(query_embedding, k)
            # Map back to original chunks
            chunk_indices = [type_data['mapping'][i] for i in I[0]]
            distances = D[0]
        elif auto_route:
            # Predict best document type
            predicted_type, confidence = predict_query_document_type(query)
            print(f"üéØ Query routed to: {predicted_type} (confidence: {confidence:.2f})")

            if confidence > 0.7 and predicted_type in self.doc_type_indices:
                # High confidence - use specific index
                type_data = self.doc_type_indices[predicted_type]
                D, I = type_data['index'].search(query_embedding, k)
                chunk_indices = [type_data['mapping'][i] for i in I[0]]
                distances = D[0]
            else:
                # Low confidence - search all
                D, I = self.index.search(query_embedding, k)
                chunk_indices = I[0]
                distances = D[0]
        else:
            # Search all chunks
            D, I = self.index.search(query_embedding, k)
            chunk_indices = I[0]
            distances = D[0]

        # Convert distances to similarity scores (inverse)
        max_dist = max(distances) if len(distances) > 0 else 1.0
        scores = [(max_dist - d) / max_dist for d in distances]

        results = [(self.chunks_metadata[i], scores[idx])
                  for idx, i in enumerate(chunk_indices)]

        return results

## üí¨ Enhanced Answer Generation with Source Attribution

In [None]:
def extract_query_signals(query: str) -> List[str]:
    prompt = f"""
Extract 3‚Äì5 key semantic signals from this question.
Signals should be nouns or short phrases likely to appear verbatim in documents.

Return JSON list ONLY.

Question:
{query}

Example output:
["total", "monthly", "payment"]
"""
    raw = llm_generate(prompt, max_tokens=80, temperature=0.0)
    return json.loads(re.search(r"\[.*\]", raw).group(0))

In [None]:
def semantic_boost(retrieved_chunks, query_signals):
    boosted = []
    for chunk, score in retrieved_chunks:
        text = chunk.text.lower()
        overlap = sum(1 for s in query_signals if s.lower() in text)

        if overlap > 0:
            score = min(score + 0.05 * overlap, 1.0)

        boosted.append((chunk, score))
    return boosted

In [None]:
def generate_answer_with_sources(
    query: str,
    retrieved_chunks: List[Tuple[ChunkMetadata, float]]
) -> Dict:
    """
    Generate answer with detailed source attribution using Mistral.
    """

    # 1Ô∏è‚É£ Extract semantic intent from the query (document-agnostic)
    query_signals = extract_query_signals(query)

    # 2Ô∏è‚É£ Apply semantic boosting BEFORE filtering
    retrieved_chunks = semantic_boost(retrieved_chunks, query_signals)

    # 3Ô∏è‚É£ Remove very low-relevance chunks (noise)
    retrieved_chunks = [
        (chunk, score)
        for chunk, score in retrieved_chunks
        if score > 0.2
    ]

    # Re-check after filtering
    if not retrieved_chunks:
        return {
            'answer': "I couldn't find relevant information to answer your question.",
            'sources': [],
            'confidence': 0.0,
            'chunks_used': 0
        }

    # 4Ô∏è‚É£ Sort chunks by relevance (best evidence first)
    retrieved_chunks = sorted(
        retrieved_chunks,
        key=lambda x: x[1],
        reverse=True
    )

    # Build context and source list
    context_parts = []
    sources = []

    for chunk_meta, score in retrieved_chunks:
        context_parts.append(
            f"[SOURCE | {chunk_meta.doc_type} | Pages {chunk_meta.page_start}-{chunk_meta.page_end} | Chunk {chunk_meta.chunk_index}]"
        )
        context_parts.append(chunk_meta.text)
        context_parts.append("")


        sources.append({
            'doc_type': chunk_meta.doc_type,
            'filename': chunk_meta.filename,   # üëà ADD THIS
            'pages': f"{chunk_meta.page_start}-{chunk_meta.page_end}",
            'chunk': chunk_meta.chunk_index,
            'relevance': f"{score:.2%}",
            'preview': chunk_meta.text[:120].replace("\n", " ") + "..."
        })


    context = "\n".join(context_parts)

    prompt = f"""
You are a document question-answering assistant.

STRICT RULES:
1. Use ONLY the information in the SOURCES.
2. If the answer is not present, say:
   "I don't have enough information in the provided documents to answer that."
3. Cite facts inline using (DocumentType pX‚ÄìY).

ANSWER RULES:
- If multiple numbers exist, choose the value explicitly labeled as TOTAL
- Ignore line items, fees, prepaid amounts, or partial components
- Prefer headings like "TOTAL ESTIMATED MONTHLY PAYMENT"

SOURCES:
{context}

QUESTION:
{query}

ANSWER (with citations):
""".strip()

    try:
        answer = llm_generate(
            prompt,
            max_tokens=350,
            temperature=0.2
        )

        # avg_score = sum(score for _, score in retrieved_chunks) / len(retrieved_chunks)
        confidence = max(score for _, score in retrieved_chunks)


        return {
            'answer': answer,
            'sources': sources,
            'confidence': float(confidence),
            'chunks_used': len(retrieved_chunks)
        }

    except Exception as e:
        print(f"Answer generation error: {e}")
        return {
            'answer': f"Error generating answer: {str(e)}",
            'sources': sources,
            'confidence': 0.0,
            'chunks_used': len(retrieved_chunks)
        }

## üèóÔ∏è Enhanced Document Store

In [None]:
class EnhancedDocumentStore:
    """
    Manages the complete document processing and retrieval pipeline.
    """

    def __init__(self):
        self.pages_info = []
        self.logical_docs = []
        self.chunks_metadata = []
        self.retriever = IntelligentRetriever()
        self.is_ready = False
        self.processing_stats = {}
        self.filename = None

    def process_pdf(self, pdf_file, filename: str = "document.pdf"):
        """
        Complete PDF processing pipeline.
        """
        self.filename = filename
        self.is_ready = False
        start_time = datetime.now()

        try:
            # Extract and analyze PDF
            self.pages_info, self.logical_docs = extract_and_analyze_pdf(pdf_file)

            # Chunk documents with metadata
            self.chunks_metadata = process_all_documents(self.logical_docs)

            # Attach filename to every chunk
            for chunk in self.chunks_metadata:
                chunk.filename = self.filename

            # Build retrieval indices
            self.retriever.build_indices(self.chunks_metadata)

            # Calculate processing statistics
            process_time = (datetime.now() - start_time).total_seconds()
            self.processing_stats = {
                'filename': filename,
                'total_pages': len(self.pages_info),
                'documents_found': len(self.logical_docs),
                'total_chunks': len(self.chunks_metadata),
                'document_types': list(set(doc.doc_type for doc in self.logical_docs)),
                'processing_time': f"{process_time:.1f}s"
            }

            self.is_ready = True
            return True, self.processing_stats

        except Exception as e:
            return False, {'error': str(e)}

    def query(self, question: str, filter_type: Optional[str] = None,
             auto_route: bool = True, k: int = 4) -> Dict:
        """
        Query the document store.
        """
        if not self.is_ready:
            return {
                'answer': "Please upload and process a PDF first.",
                'sources': [],
                'confidence': 0.0
            }

        # Retrieve relevant chunks
        retrieved = self.retriever.retrieve(
            question, k=k,
            filter_doc_type=filter_type,
            auto_route=auto_route
        )

        # Generate answer with sources
        result = generate_answer_with_sources(question, retrieved)
        result['filter_used'] = filter_type or ('auto' if auto_route else 'none')

        return result

    def get_document_structure(self) -> List[Dict]:
        """
        Get the document structure for UI display.
        """
        if not self.logical_docs:
            return []

        structure = []
        for doc in self.logical_docs:
            structure.append({
                'id': doc.doc_id,
                'type': doc.doc_type,
                'pages': f"{doc.page_start + 1}-{doc.page_end + 1}",  # 1-indexed for UI
                'chunks': len(doc.chunks) if doc.chunks else 0,
                'preview': doc.text[:200] + "..." if len(doc.text) > 200 else doc.text
            })

        return structure

## üé® Gradio Interface with Enhanced Features
Now let's create the sophisticated Gradio interface:

In [None]:
# Global store instance
doc_store = EnhancedDocumentStore()

def process_pdf_handler(pdf_file):
    """Handle PDF upload and processing."""
    if pdf_file is None:
        return "‚ö†Ô∏è Please upload a PDF file", None, gr.update(choices=["All"])

    # Process the PDF
    success, stats = doc_store.process_pdf(pdf_file,
                                          filename=getattr(pdf_file, 'name', 'document.pdf'))

    if success:
        # Prepare status message
        status_msg = f"""
        ‚úÖ **Successfully Processed:**
        - üìÑ File: {stats['filename']}
        - üìë Pages: {stats['total_pages']}
        - üìö Documents Found: {stats['documents_found']}
        - üß© Chunks Created: {stats['total_chunks']}
        - üè∑Ô∏è Types: {', '.join(stats['document_types'])}
        - ‚è±Ô∏è Time: {stats['processing_time']}
        """

        # Get document structure for display
        structure = doc_store.get_document_structure()
        structure_display = "\n".join([
            f"‚Ä¢ **{doc['type']}** (Pages {doc['pages']}): {doc['chunks']} chunks"
            for doc in structure
        ])

        # Update filter choices
        doc_types = ["All"] + stats['document_types']

        return status_msg, structure_display, gr.update(choices=doc_types, value="All")

    else:
        return f"‚ùå Error: {stats.get('error', 'Unknown error')}", "", gr.update(choices=["All"])

def chat_handler(message, history, doc_filter, auto_route, num_chunks):
    """Handle chat interactions."""
    if not doc_store.is_ready:
        response = "üìö Please upload and process a PDF document first."
        return history + [
            {"role": "user", "content": message},
            {"role": "assistant", "content": response}
        ]


    # Query the document store
    filter_type = None if doc_filter == "All" else doc_filter
    result = doc_store.query(
        message,
        filter_type=filter_type,
        auto_route=auto_route and filter_type is None,
        k=num_chunks
    )

    # Format response with sources
    response = f"{result['answer']}\n\n"

    if result['sources']:
        response += "üìç **Sources:**\n"
        for src in result['sources']:
            response += f"‚Ä¢ {src['doc_type']} (Pages {src['pages']}) - Relevance: {src['relevance']}\n"

    # response += f"\n*Confidence: {result['confidence']:.1%} | Filter: {result['filter_used']}*"
    response += (
    f"\n*Confidence: {result['confidence']:.1%} "
    f"(based on retrieval similarity across {len(result['sources'])} chunks) "
    f"| Filter: {result['filter_used']}*"
    )

    return history + [
    {"role": "user", "content": message},
    {"role": "assistant", "content": response}
    ]


def create_interface():
    """Create the enhanced Gradio interface with unified single-tab layout."""

    with gr.Blocks(title="Enhanced Document Q&A", theme=gr.themes.Soft()) as demo:
        gr.Markdown("""
        # Enhanced Document Q&A System
        ### Intelligent Multi-Document Analysis with Advanced RAG Pipeline
        """)

        with gr.Row():
            # Left side - PDF preview and upload
            with gr.Column(scale=2):
                pdf_input = gr.File(
                    label="üìÑ Upload PDF",
                    file_types=[".pdf"]
                )

                pdf_preview = gr.Markdown()


                with gr.Row():
                    process_btn = gr.Button(
                        "üîÑ Process Document",
                        variant="primary",
                        size="lg",
                        scale=2
                    )
                    clear_all_btn = gr.Button(
                        "üóëÔ∏è Clear All",
                        variant="secondary",
                        size="lg",
                        scale=1
                    )

            # Middle - Document info and settings
            with gr.Column(scale=1):
                gr.Markdown("### üìä Document Info")
                status_output = gr.Markdown(
                    value="‚è≥ Waiting for PDF upload..."
                )

                structure_output = gr.Markdown(
                    value="",
                    label="Document Structure"
                )

                gr.Markdown("### ‚öôÔ∏è Settings")

                doc_filter = gr.Dropdown(
                    choices=["All"],
                    value="All",
                    label="üè∑Ô∏è Document Type Filter",
                    info="Filter search to specific document type"
                )

                auto_route = gr.Checkbox(
                    value=True,
                    label="üéØ Auto-Route Queries",
                    info="Automatically detect relevant document type"
                )

                num_chunks = gr.Slider(
                    minimum=1,
                    maximum=10,
                    value=4,
                    step=1,
                    label="üìä Chunks to Retrieve"
                )

            # Right side - Chat interface
            with gr.Column(scale=2):
                gr.Markdown("### üí¨ Ask Questions")
                chatbot = gr.Chatbot(
                    label="Conversation",
                    height=500,
                    elem_id="chatbot",
                    show_label=False
                )

                with gr.Row():
                    msg_input = gr.Textbox(
                        label="Ask a question",
                        placeholder="e.g., What are the payment terms? What is the total amount?",
                        scale=4,
                        show_label=False
                    )
                    send_btn = gr.Button("üì§ Send", scale=1, variant="primary")

                with gr.Row():
                    clear_chat_btn = gr.Button("üóëÔ∏è Clear Chat", size="sm", scale=1)
                    example_btn1 = gr.Button("üìù What's the summary?", size="sm", scale=1)
                    example_btn2 = gr.Button("üí∞ Find amounts", size="sm", scale=1)

        # Status bar at the bottom
        with gr.Row():
            status_bar = gr.Markdown(
                value="**Status:** Ready | **Documents:** 0 | **Chunks:** 0 | **Cache Hits:** 0/0",
                elem_id="status_bar"
            )

        # Event handlers
        def update_status_bar():
            """Update the status bar with current statistics."""
            if doc_store.is_ready:
                stats = doc_store.processing_stats
                cache_rate = 0
                if hasattr(doc_store.retriever, 'total_queries') and doc_store.retriever.total_queries > 0:
                    cache_rate = (doc_store.retriever.cache_hits / doc_store.retriever.total_queries) * 100

                return f"**Status:** ‚úÖ Ready | **Documents:** {stats.get('documents_found', 0)} | **Chunks:** {stats.get('total_chunks', 0)} | **Cache Rate:** {cache_rate:.0f}%"
            return "**Status:** Ready | **Documents:** 0 | **Chunks:** 0 | **Cache Hits:** 0/0"

        def clear_all():
            """Clear everything and reset the interface."""
            global doc_store
            doc_store = EnhancedDocumentStore()
            return (
                None,  # pdf_input
                "‚è≥ Waiting for PDF upload...",  # status_output
                "",  # structure_output
                gr.update(choices=["All"], value="All"),  # doc_filter
                [],  # chatbot
                "",  # msg_input
                update_status_bar()  # status_bar
            )

        # Process PDF handler with status bar update
        def process_pdf_with_status(pdf_file):
            status, structure, filter_update = process_pdf_handler(pdf_file)
            status_bar_text = update_status_bar()
            return status, structure, filter_update, status_bar_text

        # Chat handler with status bar update
        def chat_with_status(message, history, doc_filter, auto_route, num_chunks):
            new_history = chat_handler(message, history, doc_filter, auto_route, num_chunks)
            status_bar_text = update_status_bar()
            return new_history, status_bar_text

        # Example question handlers
        def ask_summary(history):
          history = history or []
          response = chat_handler(
              "Can you provide a summary of the main points in this document?",
              history,
              doc_filter.value,
              auto_route.value,
              num_chunks.value
          )
          return response

        def ask_amounts(history):
          history = history or []
          response = chat_handler(
              "What are all the monetary amounts or financial figures mentioned?",
              history,
              doc_filter.value,
              auto_route.value,
              num_chunks.value
          )
          return response



        # Wire up all the events
        process_btn.click(
            fn=process_pdf_with_status,
            inputs=[pdf_input],
            outputs=[status_output, structure_output, doc_filter, status_bar]
        )

        clear_all_btn.click(
            fn=clear_all,
            outputs=[pdf_input, status_output, structure_output, doc_filter,
                    chatbot, msg_input, status_bar]
        )

        # Chat interactions
        msg_input.submit(
            fn=chat_with_status,
            inputs=[msg_input, chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot, status_bar]
        ).then(
            lambda: "",
            outputs=[msg_input]
        )

        send_btn.click(
            fn=chat_with_status,
            inputs=[msg_input, chatbot, doc_filter, auto_route, num_chunks],
            outputs=[chatbot, status_bar]
        ).then(
            lambda: "",
            outputs=[msg_input]
        )

        clear_chat_btn.click(
            lambda: [],
            outputs=[chatbot]
        )

        example_btn1.click(
            fn=ask_summary,
            inputs=[chatbot],
            outputs=[chatbot]
        ).then(
            fn=update_status_bar,
            outputs=[status_bar]
        )

        example_btn2.click(
            fn=ask_amounts,
            inputs=[chatbot],
            outputs=[chatbot]
        ).then(
            fn=update_status_bar,
            outputs=[status_bar]
        )

        # Auto-process when PDF is uploaded
        pdf_input.change(
            fn=process_pdf_with_status,
            inputs=[pdf_input],
            outputs=[status_output, structure_output, doc_filter, status_bar]
        )


    return demo

In [None]:
demo = create_interface()
demo.launch(share=True, debug=True)

  with gr.Blocks(title="Enhanced Document Q&A", theme=gr.themes.Soft()) as demo:


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://87126d8d3e2d9e2226.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


üìñ Starting PDF extraction and analysis...
‚úÖ Extracted 4 pages
üß† Analyzing document structure...
  Page 0: New document detected - Resume
  Page 1: New document detected - Other
  Page 2: New document detected - Other
  Page 3: New document detected - Other
‚úÖ Identified 4 logical documents
   - Resume: Pages 0-0
   - Other: Pages 1-1
   - Other: Pages 2-2
   - Other: Pages 3-3
üìÑ Resume: Created 1 chunks
üìÑ Other: Created 1 chunks
üìÑ Other: Created 1 chunks
üìÑ Other: Created 1 chunks
üî® Building vector indices...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

‚úÖ Indexed 4 chunks across 2 document types
Query routing error: Extra data: line 2 column 1 (char 46)
üéØ Query routed to: Other (confidence: 0.00)
üìñ Starting PDF extraction and analysis...
‚úÖ Extracted 2 pages
üß† Analyzing document structure...
  Page 0: New document detected - Contract
‚úÖ Identified 1 logical documents
   - Contract: Pages 0-1
üìÑ Contract: Created 1 chunks
üî® Building vector indices...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

‚úÖ Indexed 1 chunks across 1 document types
üéØ Query routed to: Bank Statement (confidence: 0.80)
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://87126d8d3e2d9e2226.gradio.live




# Google Slides link

https://docs.google.com/presentation/d/1mW1zGwmhKCAVbLKlylmGg19f-iTFcHWphiOrNgqym58/edit?usp=sharing