In [1]:
!sudo apt install tesseract-ocr
!pip install pytesseract PyMuPDF pillow pdfplumber

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
tesseract-ocr is already the newest version (4.1.1-2.1build1).
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.
Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Collecting PyMuPDF
  Downloading pymupdf-1.25.5-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (3.4 kB)
Collecting pdfplumber
  Downloading pdfplumber-0.11.6-py3-none-any.whl.metadata (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
Collecting pdfminer.six==20250327 (from pdfplumber)
  Downloading pdfminer_six-20250327-py3-none-any.whl.metadata (4.1 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (48 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.2/48.2 kB[0m 

In [2]:
import fitz  # PyMuPDF
import pytesseract
from PIL import Image
import os
import io
import csv
import pdfplumber
import numpy as np
import cv2
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Initialize paths
pdf_dir = '/content/drive/MyDrive/data'  # Update with your path
output_dir = '/content/output'
image_dir = os.path.join(output_dir, 'images')
os.makedirs(image_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)

# Configure Tesseract (for OCR)
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'

def process_pdfs(pdf_files):
    """Process PDF files and extract text, tables, and images with metadata"""
    all_chunks = []

    for pdf_name in pdf_files:
        pdf_path = os.path.join(pdf_dir, pdf_name)

        # Open PDF with both libraries
        doc = fitz.open(pdf_path)
        pdf_plumber = pdfplumber.open(pdf_path)

        for page_num in range(len(doc)):
            # Text Extraction and Chunking
            text = doc.load_page(page_num).get_text()
            text_chunks = chunk_text(text)

            # Table Extraction
            tables = pdf_plumber.pages[page_num].extract_tables()
            table_chunks = process_tables(tables, pdf_name, page_num)

            # Image Processing
            img_chunks = process_images(doc, page_num, pdf_name, image_dir)

            # Collect metadata
            page_chunks = [
                *create_text_chunks(text_chunks, pdf_name, page_num),
                *table_chunks,
                *img_chunks
            ]

            all_chunks.extend(page_chunks)

        doc.close()
        pdf_plumber.close()

    # Save all chunks to CSV
    save_chunks_to_csv(all_chunks, os.path.join(output_dir, 'metadata.csv'))

def chunk_text(text, chunk_size=1000, overlap=200):
    """Split text into overlapping chunks"""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks

def process_tables(tables, pdf_name, page_num):
    """Convert tables to readable text format"""
    chunks = []
    for i, table in enumerate(tables):
        content = "\n".join(["\t".join(map(str, row)) for row in table])
        chunks.append({
            'source': pdf_name,
            'page': page_num + 1,
            'type': 'table',
            'content': content,
            'image_path': None
        })
    return chunks

def process_images(doc, page_num, pdf_name, image_dir):
    """Extract and OCR images from PDF page"""
    chunks = []
    page = doc.load_page(page_num)

    for img_index, img in enumerate(page.get_images(full=True)):
        # Extract image
        xref = img[0]
        base_image = doc.extract_image(xref)
        image_bytes = base_image['image']

        # Save image
        img_name = f"{pdf_name}_p{page_num+1}_i{img_index+1}.{base_image['ext']}"
        img_path = os.path.join(image_dir, img_name)
        with open(img_path, 'wb') as f:
            f.write(image_bytes)

        # Perform OCR
        pil_image = Image.open(io.BytesIO(image_bytes))
        ocr_text = pytesseract.image_to_string(pil_image)

        chunks.append({
            'source': pdf_name,
            'page': page_num + 1,
            'type': 'image',
            'content': ocr_text.strip(),
            'image_path': img_path
        })

    return chunks

def create_text_chunks(chunks, pdf_name, page_num):
    """Create metadata entries for text chunks"""
    return [{
        'source': pdf_name,
        'page': page_num + 1,
        'type': 'text',
        'content': chunk,
        'image_path': None
    } for i, chunk in enumerate(chunks)]

def save_chunks_to_csv(chunks, output_path):
    """Save all chunks to a CSV file"""
    keys = chunks[0].keys() if chunks else []
    with open(output_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        writer.writerows(chunks)

# List of PDF files to process
pdf_files = [
    '/content/1. Annual Report 2023-24.pdf',
    '/content/2. financials.pdf',
    '/content/3. FYP-Handbook-2023.pdf'
]

# Run processing
process_pdfs(pdf_files)

print("Processing complete. Check output directory for results.")



Mounted at /content/drive




Processing complete. Check output directory for results.


In [3]:
!pip install sentence-transformers faiss-cpu transformers torch pillow

import faiss
import numpy as np
import csv
from PIL import Image
from sentence_transformers import SentenceTransformer
from transformers import CLIPProcessor, CLIPModel
import torch

# Initialize models
text_encoder = SentenceTransformer('all-mpnet-base-v2')
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Configure paths
metadata_path = '/content/output/metadata.csv'
output_dir = '/content/output'
text_index_path = os.path.join(output_dir, 'text_faiss.index')
image_index_path = os.path.join(output_dir, 'image_faiss.index')
metadata_map_path = os.path.join(output_dir, 'metadata_mapping.npz')

def create_faiss_index(dimension):
    return faiss.IndexFlatL2(dimension)

# Initialize FAISS indices
text_index = create_faiss_index(768)  # Sentence-BERT dimension
image_index = create_faiss_index(512)  # CLIP dimension

def process_embeddings():
    text_metadata = []
    image_metadata = []
    text_embeddings = []
    image_embeddings = []

    with open(metadata_path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Process text content (text chunks and tables)
            if row['type'] in ['text', 'table']:
                embedding = text_encoder.encode([row['content']])[0]
                text_embeddings.append(embedding)
                text_metadata.append({
                    'source': row['source'],
                    'page': row['page'],
                    'type': row['type'],
                    'content': row['content'],
                    'image_path': row['image_path']
                })

            # Process image content
            elif row['type'] == 'image':
                try:
                    image = Image.open(row['image_path'])
                    inputs = clip_processor(images=image, return_tensors="pt")
                    with torch.no_grad():
                        features = clip_model.get_image_features(**inputs)
                    embedding = features.numpy()[0].astype('float32')
                    image_embeddings.append(embedding)
                    image_metadata.append({
                        'source': row['source'],
                        'page': row['page'],
                        'type': row['type'],
                        'content': row['content'],  # OCR text
                        'image_path': row['image_path']
                    })
                except Exception as e:
                    print(f"Error processing image {row['image_path']}: {str(e)}")

    # Add embeddings to FAISS indices
    if text_embeddings:
        text_index.add(np.array(text_embeddings, dtype='float32'))
    if image_embeddings:
        image_index.add(np.array(image_embeddings, dtype='float32'))

    # Save indices and metadata
    faiss.write_index(text_index, text_index_path)
    faiss.write_index(image_index, image_index_path)

    # Save metadata mappings
    np.savez_compressed(metadata_map_path,
                        text_metadata=text_metadata,
                        image_metadata=image_metadata)

def load_indices():
    text_index = faiss.read_index(text_index_path)
    image_index = faiss.read_index(image_index_path)
    metadata = np.load(metadata_map_path, allow_pickle=True)
    return text_index, image_index, metadata

# Run embedding and indexing process
process_embeddings()

print("Embedding and indexing complete. FAISS indices and metadata saved in:", output_dir)

Collecting faiss-cpu
  Downloading faiss_cpu-1.11.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.meta

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.4k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/4.19k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/605M [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


preprocessor_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/605M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/862k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.22M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/389 [00:00<?, ?B/s]

The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image shape (3, 2, 3). Assuming channels are the first dimension.
The channel dimension is ambiguous. Got image 

Embedding and indexing complete. FAISS indices and metadata saved in: /content/output


In [4]:
!pip install gradio

import gradio as gr
import numpy as np
import faiss
from PIL import Image
import pytesseract

class MultimodalRetriever:
    def __init__(self):
        # Load indices and metadata
        self.text_index = faiss.read_index('/content/output/text_faiss.index')
        self.image_index = faiss.read_index('/content/output/image_faiss.index')
        metadata = np.load('/content/output/metadata_mapping.npz', allow_pickle=True)
        self.text_metadata = metadata['text_metadata'].tolist()
        self.image_metadata = metadata['image_metadata'].tolist()

        # Load models
        self.text_encoder = SentenceTransformer('all-mpnet-base-v2')
        self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    def search_text(self, query, top_k=3):
        """Handle text-based queries"""
        # Text-to-text search
        text_embed = self.text_encoder.encode([query])
        text_scores, text_indices = self.text_index.search(text_embed.astype('float32'), top_k)

        # Text-to-image search
        inputs = self.clip_processor(text=query, return_tensors="pt", padding=True)
        with torch.no_grad():
            text_features = self.clip_model.get_text_features(**inputs)
        image_embed = text_features.numpy().astype('float32')
        image_scores, image_indices = self.image_index.search(image_embed, top_k)

        return self._format_results(text_indices[0], image_indices[0])

    def search_image(self, image_path, top_k=3):
        """Handle image-based queries"""
        # Image-to-image search
        image = Image.open(image_path)
        inputs = self.clip_processor(images=image, return_tensors="pt")
        with torch.no_grad():
            image_features = self.clip_model.get_image_features(**inputs)
        image_embed = image_features.numpy().astype('float32')
        image_scores, image_indices = self.image_index.search(image_embed, top_k)

        # Image-to-text search via OCR
        ocr_text = pytesseract.image_to_string(image)
        text_embed = self.text_encoder.encode([ocr_text])
        text_scores, text_indices = self.text_index.search(text_embed.astype('float32'), top_k)

        return self._format_results(text_indices[0], image_indices[0])

    def _format_results(self, text_indices, image_indices):
        """Format results with source references"""
        results = []

        # Process text results
        for idx in text_indices:
            if idx < len(self.text_metadata):
                meta = self.text_metadata[idx]
                results.append({
                    'type': meta['type'],
                    'content': meta['content'][:500] + "...",
                    'source': meta['source'],
                    'page': meta['page'],
                    'score': f"{meta.get('score', 'N/A')}",
                    'preview': None
                })

        # Process image results
        for idx in image_indices:
            if idx < len(self.image_metadata):
                meta = self.image_metadata[idx]
                results.append({
                    'type': meta['type'],
                    'content': meta['content'][:500] + "...",
                    'source': meta['source'],
                    'page': meta['page'],
                    'score': f"{meta.get('score', 'N/A')}",
                    'preview': Image.open(meta['image_path'])
                })

        # Sort by type and score
        return sorted(results, key=lambda x: (x['type'], float(x['score'])))

def search_interface(query, image=None):
    retriever = MultimodalRetriever()

    if image is not None:
        results = retriever.search_image(image.name)
    elif query:
        results = retriever.search_text(query)
    else:
        return "Please enter text or upload an image"

    output = []
    for result in results:
        output.append(f"""
        **Type**: {result['type'].upper()} | **Source**: {result['source']} (Page {result['page']})
        **Content**: {result['content']}
        {f"![Preview]({result['preview'].filename})" if result['preview'] else ""}
        """)

    return "\n\n".join(output)

# Create Gradio interface
interface = gr.Interface(
    fn=search_interface,
    inputs=[gr.Text(label="Text Query"), gr.File(label="Image Query")],
    outputs=gr.Markdown(),
    title="Multimodal Document Search",
    description="Search financial documents using text or images"
)

# Launch the interface
interface.launch(share=True)

Collecting gradio
  Downloading gradio-5.28.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<25.0,>=22.0 (from gradio)
  Downloading aiofiles-24.1.0-py3-none-any.whl.metadata (10 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.10.0 (from gradio)
  Downloading gradio_client-1.10.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.9.3 (from gradio)
  Downloading ruff-0.11.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (25 kB)
Collecting safehttpx<0.2.0,>=0.1.6



In [5]:
!pip install transformers torch accelerate bitsandbytes

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import openai
import textwrap

class ResponseGenerator:
    def __init__(self, model_choice="mistral"):
        self.model_choice = model_choice
        self.retriever = MultimodalRetriever()

        if model_choice == "mistral":
            self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.3")
            self.model = AutoModelForCausalLM.from_pretrained(
                "mistralai/Mistral-7B-Instruct-v0.3",
                device_map="auto",
                load_in_4bit=True,
                torch_dtype=torch.float16
            )
        elif model_choice == "openai":
            openai.api_key = os.getenv('OPENAI_API_KEY')

        self.prompt_template = """{strategy}
Context from documents:
{context}

Question: {question}
{examples}
Answer:"""

    def generate_answer(self, query, image=None, strategy="cot"):
        # Retrieve relevant information
        if image:
            results = self.retriever.search_image(image.name, top_k=5)
        else:
            results = self.retriever.search_text(query, top_k=5)

        # Prepare context
        context = self._prepare_context(results)

        # Create prompt with selected strategy
        prompt = self._create_prompt(query, context, strategy)

        # Generate response
        if self.model_choice == "mistral":
            return self._generate_local(prompt), results
        elif self.model_choice == "openai":
            return self._generate_openai(prompt), results

    def _prepare_context(self, results):
        context = []
        for res in results[:3]:  # Use top 3 results
            if res['type'] == 'image':
                context.append(f"Image from {res['source']} page {res['page']}: {res['content']}")
            else:
                context.append(f"Text from {res['source']} page {res['page']}: {res['content']}")
        return "\n\n".join(context)

    def _create_prompt(self, query, context, strategy):
        strategies = {
            "cot": "Let's think step by step. Break down the question and use the context to build the answer.",
            "fewshot": "Examples of good answers:\n1. Q: What was 2023 revenue? A: From the annual report, 2023 revenue was $1.2B\n2. Q: Explain the chart about growth? A: The chart shows 15% YoY growth\n",
            "zero": "Answer the question using only the provided context."
        }

        examples = strategies[strategy] if strategy in strategies else ""

        return textwrap.dedent(self.prompt_template).format(
            strategy=strategies.get(strategy, ""),
            context=context,
            question=query,
            examples=examples
        )

    def _generate_local(self, prompt):
        inputs = self.tokenizer(prompt, return_tensors="pt").to("cuda")
        outputs = self.model.generate(**inputs, max_new_tokens=1000)
        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

    def _generate_openai(self, prompt):
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        return response.choices[0].message.content

def full_interface(query, image=None, strategy=gr.Dropdown(["cot", "fewshot", "zero"])):
    generator = ResponseGenerator()
    answer, sources = generator.generate_answer(query, image, strategy)

    formatted_response = f"""
    **Generated Answer**:
    {answer}

    **Source References**:
    {format_sources(sources)}
    """

    return formatted_response

def format_sources(sources):
    return "\n".join([
        f"- {res['type'].title()} from {res['source']} (Page {res['page']})"
        for res in sources[:3]
    ])

# Update Gradio interface
interface = gr.Interface(
    fn=full_interface,
    inputs=[
        gr.Text(label="Query"),
        gr.File(label="Image Upload"),
        gr.Dropdown(["cot", "fewshot", "zero"], label="Prompt Strategy")
    ],
    outputs=gr.Markdown(),
    title="Multimodal Financial Assistant",
    description="Ask questions about financial documents using text or images"
)

interface.launch(share=True)

Collecting bitsandbytes
  Downloading bitsandbytes-0.45.5-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Downloading bitsandbytes-0.45.5-py3-none-manylinux_2_24_x86_64.whl (76.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.1/76.1 MB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bitsandbytes
Successfully installed bitsandbytes-0.45.5
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://de4b238c6900172c12.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [6]:
!pip install transformers torch accelerate bitsandbytes langchain openai

import torch
from transformers import pipeline, AutoTokenizer
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import openai

class ResponseGenerator:
    def __init__(self, model_choice="openai"):
        self.model_choice = model_choice
        self._load_model()
        self._create_prompt_templates()

    def _load_model(self):
        """Load selected LLM"""
        if self.model_choice == "mistral":
            self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-Instruct-v0.1")
            self.model = pipeline(
                "text-generation",
                model="mistralai/Mistral-7B-Instruct-v0.1",
                device_map="auto",
                torch_dtype=torch.bfloat16
            )
        elif self.model_choice == "openai":
            openai.api_key = "your-api-key"  # Replace with actual key

    def _create_prompt_templates(self):
        """Create different prompt templates"""
        self.prompts = {
            'default': PromptTemplate(
                input_variables=["context", "question"],
                template="""Use the following context to answer the question.
                Context: {context}
                Question: {question}
                Answer:"""
            ),
            'chain_of_thought': PromptTemplate(
                input_variables=["context", "question"],
                template="""Analyze the question step by step using the context.
                Context: {context}
                Question: {question}
                Let's think step by step:"""
            ),
            'financial_analyst': PromptTemplate(
                input_variables=["context", "question"],
                template="""You are a financial analyst. Use these documents to answer:
                {context}
                Examples of good responses:
                - For revenue questions, compare quarterly figures
                - For chart questions, describe trends
                Question: {question}
                Professional Answer:"""
            )
        }

    def generate_response(self, context, question, strategy='default'):
        """Generate answer using specified prompting strategy"""
        formatted_prompt = self.prompts[strategy].format(
            context=context[:15000],  # Truncate to model context window
            question=question
        )

        if self.model_choice == "mistral":
            response = self.model(
                formatted_prompt,
                max_new_tokens=500,
                do_sample=True,
                temperature=0.7
            )[0]['generated_text']
        elif self.model_choice == "openai":
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "user", "content": formatted_prompt}],
                temperature=0.5
            ).choices[0].message.content

        return self._postprocess_response(response)

    def _postprocess_response(self, response):
        """Add source citations and clean output"""
        # This would integrate with your metadata
        return response + "\n\nSources: [Annual Report 2023-24, financials.pdf]"

class EnhancedRetriever(MultimodalRetriever):
    def __init__(self):
        super().__init__()
        self.response_gen = ResponseGenerator()

    def augmented_search(self, query, image=None, strategy='chain_of_thought'):
        # Get raw results from previous implementation
        if image:
            results = self.search_image(image.name)
        else:
            results = self.search_text(query)

        # Prepare context for LLM
        context = "\n\n".join([
            f"{r['type']} from {r['source']} (page {r['page']}): {r['content']}"
            for r in results
        ])

        # Generate enhanced response
        answer = self.response_gen.generate_response(
            context=context,
            question=query,
            strategy=strategy
        )

        return answer, results

def enhanced_interface(query, image=None, strategy='chain_of_thought'):
    retriever = EnhancedRetriever()
    answer, results = retriever.augmented_search(query, image, strategy)

    output = f"**Generated Answer**:\n{answer}\n\n**References**:\n"
    for result in results[:3]:  # Show top 3 references
        output += f"- {result['type']} from {result['source']} (page {result['page']})\n"

    if any(r['preview'] for r in results):
        output += "\n**Preview**:\n" + "\n".join(
            f"![Image]({r['preview'].filename})"
            for r in results if r['preview']
        )

    return output

# Create enhanced interface with strategy selection
interface = gr.Interface(
    fn=enhanced_interface,
    inputs=[
        gr.Text(label="Text Query"),
        gr.File(label="Image Query"),
        gr.Dropdown(choices=['chain_of_thought', 'financial_analyst', 'default'],
                   label="Prompting Strategy")
    ],
    outputs=gr.Markdown(),
    title="Enhanced Multimodal RAG System",
    description="Query documents with AI-powered analysis using different strategies"
)

interface.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://da2643ee4e8065a45a.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [7]:
!pip install scikit-learn matplotlib plotly nltk rouge-score

import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
import plotly.express as px
import pandas as pd

class RAGEvaluator:
    def __init__(self, retriever):
        self.retriever = retriever
        self.metrics = {
            'query_times': [],
            'similarity_scores': [],
            'manual_ratings': []
        }

    def visualize_embeddings(self):
        """Create 2D/3D visualization of embedding spaces"""
        # Get all embeddings
        text_embeds = faiss.rev_swig_ptr(self.retriever.text_index.xb,
                                       self.retriever.text_index.d * self.retriever.text_index.ntotal)
        text_embeds = text_embeds.reshape(-1, 768)

        image_embeds = faiss.rev_swig_ptr(self.retriever.image_index.xb,
                                        self.retriever.image_index.d * self.retriever.image_index.ntotal)
        image_embeds = image_embeds.reshape(-1, 512)

        # Reduce dimensionality
        combined = np.vstack([text_embeds, image_embeds])
        reduced = PCA(n_components=3).fit_transform(combined)

        # Create visualization
        types = ['text']*len(text_embeds) + ['image']*len(image_embeds)
        df = pd.DataFrame({
            'x': reduced[:,0],
            'y': reduced[:,1],
            'z': reduced[:,2],
            'type': types,
            'source': [m['source'] for m in self.retriever.text_metadata] +
                     [m['source'] for m in self.retriever.image_metadata]
        })

        fig = px.scatter_3d(df, x='x', y='y', z='z', color='type',
                          symbol='source', title="Embedding Space")
        fig.show()

    def evaluate_query(self, query, ground_truth=None, image=None):
        """Run full evaluation for a single query"""
        start_time = time.time()

        # Run query
        if image:
            response, results = self.retriever.augmented_search(query, image)
        else:
            response, results = self.retriever.augmented_search(query)

        query_time = time.time() - start_time
        self.metrics['query_times'].append(query_time)

        # Calculate metrics
        eval_results = {'response_time': query_time}

        if ground_truth:
            # Semantic similarity
            response_embed = self.retriever.text_encoder.encode([response])
            gt_embed = self.retriever.text_encoder.encode([ground_truth])
            eval_results['cosine_sim'] = np.dot(response_embed, gt_embed.T)[0][0]

            # Text generation metrics
            scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
            eval_results.update(scorer.score(ground_truth, response))

            eval_results['bleu'] = sentence_bleu(
                [ground_truth.split()],
                response.split(),
                weights=(0.5, 0.3, 0.2, 0)
            )

        return eval_results, response, results

    def visualize_results(self, results):
        """Create interactive visualization of search results"""
        df = pd.DataFrame(results)
        df['score'] = df['score'].astype(float)

        fig = px.bar(df, x='score', y='source', color='type',
                    title="Retrieval Results Ranking",
                    hover_data=['content'])
        fig.show()

    def generate_report(self):
        """Create comprehensive performance report"""
        report = {
            'avg_response_time': np.mean(self.metrics['query_times']),
            'max_response_time': np.max(self.metrics['query_times']),
            'min_response_time': np.min(self.metrics['query_times']),
            'total_queries': len(self.metrics['query_times'])
        }

        if self.metrics['similarity_scores']:
            report.update({
                'avg_cosine_sim': np.mean(self.metrics['similarity_scores']),
                'avg_rouge1': np.mean([m['rouge1'] for m in self.metrics['similarity_scores']]),
                'avg_bleu': np.mean(self.metrics['bleu_scores'])
            })

        return pd.DataFrame(report.items(), columns=['Metric', 'Value'])

def evaluation_interface(query, image=None, ground_truth=None):
    evaluator = RAGEvaluator(EnhancedRetriever())

    # Run evaluation
    metrics, response, results = evaluator.evaluate_query(query, ground_truth, image)

    # Create visualizations
    evaluator.visualize_embeddings()
    evaluator.visualize_results(results)

    # Format output
    output = f"""
    ## Response:
    {response}

    ## Metrics:
    {pd.DataFrame(metrics.items(), columns=['Metric', 'Value']).to_markdown()}

    ## Retrieval Results:
    {pd.DataFrame(results).to_markdown()}
    """

    return output

# Create evaluation interface
eval_interface = gr.Interface(
    fn=evaluation_interface,
    inputs=[
        gr.Text(label="Query"),
        gr.File(label="Image Query"),
        gr.Text(label="Ground Truth (optional)")
    ],
    outputs=gr.Markdown(),
    title="RAG System Evaluation",
    description="Test the system with queries and view performance metrics"
)

# Launch all interfaces together
gr.TabbedInterface(
    [interface, eval_interface],
    ["Main Interface", "Evaluation"]
).launch(share=True)

Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge-score
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge-score: filename=rouge_score-0.1.2-py3-none-any.whl size=24934 sha256=19ff48e3809b696222240112e6d32f0bd1c4bc9241095de33d75afa38b42bdc5
  Stored in directory: /root/.cache/pip/wheels/1e/19/43/8a442dc83660ca25e163e1bd1f89919284ab0d0c1475475148
Successfully built rouge-score
Installing collected packages: rouge-score
Successfully installed rouge-score-0.1.2
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://234d5e5073b8535540.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [1]:
# Multimodal query handling in Gradio interface
def search_interface(query, image=None):
    if image:
        results = retriever.search_image(image.name)
    else:
        results = retriever.search_text(query)

In [2]:
self.prompts['financial_analyst'] = PromptTemplate(
    input_variables=["context", "question"],
    template="""Examples:
    - EBITDA analysis: "Margins improved 2% despite rising costs"
    - Cash flow: "Operating CF increased by $1.5M YoY"
    Context: {context}
    Question: {question}"""
)

NameError: name 'PromptTemplate' is not defined

In [3]:
def visualize_results(results):
    fig = px.bar(results, x='score', y='source', color='type')
    fig.show()

In [4]:
interface = gr.Interface(
    inputs=[gr.Text(), gr.File()],
    outputs=gr.Markdown(),
    examples=[
        ["What was the net profit margin in 2023?"],
        ["./examples/chart_query.png"]
    ]
)

NameError: name 'gr' is not defined

In [5]:
interface.launch(share=True)  # Generates public URL

NameError: name 'interface' is not defined