# Multimodal Fusion Retrieval: Combining Multimodal Vector-store and Keyword Search

In this notebook, we implement a fusion retrieval system that combines the strengths of both

**A) semantic vector search - Enhanced with multimodal context retrieval(retrieval based on both text and images)** 

**B) keyword-based BM25 retrieval.(https://en.wikipedia.org/wiki/Okapi_BM25)**

This approach improves retrieval quality by capturing both conceptual similarity and exact keyword matches.

## Why Fusion Retrieval Matters

Traditional RAG systems(https://en.wikipedia.org/wiki/Retrieval-augmented_generation) typically rely on vector search alone mostly based on texts, but this has limitations:

- Vector search may miss the essential relevant information from images 
- Vector search excels at semantic similarity but may miss exact keyword matches
- Keyword search is great for specific terms but lacks semantic understanding
- Different queries perform better with different retrieval methods - may both rely on texts and images or in combination with keyword matches

Multimodal Fusion retrieval gives us the best of both worlds by:

- Performing both vector-based and keyword-based retrieval
- Enhances retrieval quality by augmenting with image captions 
- Normalizing the scores from each approach
- Combining them with a weighted formula
- Ranking documents based on the combined score

## Setting Up the Environment
We begin by importing necessary libraries.

In [None]:
import os
import numpy as np
from rank_bm25 import BM25Okapi
import fitz
import io
from PIL import Image
from openai import OpenAI
import re
import json
import time
import shutil
import tempfile
from sklearn.metrics.pairwise import cosine_similarity
import base64

## Setting Up the OpenAI API Client
We initialize the OpenAI client to generate embeddings and responses.

- An important step is to get an OPENAI_API_KEY from https://platform.openai.com/

In [None]:
# Initialize the OpenAI client with the base URL and API key
client = OpenAI(
    base_url="https://api.openai.com/v1/",
    api_key=os.getenv("OPENAI_API_KEY")  # Retrieve the API key from environment variables
)

## Document Processing Functions

- Basic cleaning of text


In [None]:
def clean_text(text):
    
    """
    Clean text by removing extra whitespace and special characters.
    
    Args:
        text (str): Input text
        
    Returns:
        str: Cleaned text
    """
    
    # Replace multiple whitespace characters (including newlines and tabs) with a single space
    text = re.sub(r'\s+', ' ', text)
    
    # Fix common OCR issues by replacing tab and newline characters with a space
    text = text.replace('\\t', ' ')
    text = text.replace('\\n', ' ')
    
    # Remove any leading or trailing whitespace and ensure single spaces between words
    text = ' '.join(text.split())
    
    return text

#### Multimodal content extraction pipeline

- Content extraction pagewise
- We have option whether to extract image information or not
- Segregating texts and image data

In [None]:
def extract_content_from_pdf(pdf_path, use_image = True, output_dir=None):
    
    """
    Extract both text and images from a PDF file.
    
    Args:
        pdf_path (str): Path to the PDF file
        output_dir (str, optional): Directory to save extracted images
        
    Returns:
        Tuple[List[Dict], List[Dict]]: Text data and image data
    """
    
    # Create a temporary directory for images if not provided
    temp_dir = None
    if output_dir is None:
        temp_dir = tempfile.mkdtemp()
        output_dir = temp_dir
    else:
        os.makedirs(output_dir, exist_ok=True)
        
    text_data = []  # List to store extracted text data
    image_paths = []  # List to store paths of extracted images
    # text_only = ""
    print(f"Extracting content from {pdf_path}...")
    
    try:
        with fitz.open(pdf_path) as pdf_file:
            # Loop through every page in the PDF
            for page_number in range(len(pdf_file)):
                page = pdf_file[page_number]
                
                # Extract text from the page
                text = page.get_text().strip()
                # text_only += text
                if text:
                    text_data.append({
                        "content": text,
                        "metadata": {
                            "source": pdf_path,
                            "page": page_number + 1,
                            "type": "text"
                        }
                    })
                
                # Extract images from the page
                if not use_image:
                    continue
                image_list = page.get_images(full=True)
                for img_index, img in enumerate(image_list):
                    xref = img[0]  # XREF of the image
                    base_image = pdf_file.extract_image(xref)
                    
                    if base_image:
                        image_bytes = base_image["image"]
                        image_ext = base_image["ext"]
                        
                        # Save the image to the output directory
                        img_filename = f"page_{page_number+1}_img_{img_index+1}.{image_ext}"
                        img_path = os.path.join(output_dir, img_filename)
                        
                        with open(img_path, "wb") as img_file:
                            img_file.write(image_bytes)
                        
                        image_paths.append({
                            "path": img_path,
                            "metadata": {
                                "source": pdf_path,
                                "page": page_number + 1,
                                "image_index": img_index + 1,
                                "type": "image"
                            }
                        })
  
        print(f"Extracted {len(text_data)} text segments and {len(image_paths)} images")
        return text_data, image_paths
    
    except Exception as e:
        print(f"Error extracting content: {e}")
        if temp_dir and os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
        raise

## Chunking the Extracted Text
Once we have the extracted text, we divide it into smaller, overlapping chunks - more manageable pieces to improve retrieval accuracy and reduce computational overhead.

In [None]:
def chunk_text(text_data, chunk_size=1000, overlap=200):
    
    """
    Split text data into overlapping chunks.
    
    Args:
        text_data (List[Dict]): Text data extracted from PDF
        chunk_size (int): Size of each chunk in characters
        overlap (int): Overlap between chunks in characters
        
    Returns:
        List[Dict]: Chunked text data
    """
    
    chunked_data = []  # Initialize an empty list to store chunked data
    
    for item in text_data:
        text = item["content"]  # Extract the text content
        text = clean_text(text)
        metadata = item["metadata"]  # Extract the metadata
        
        # Skip if text is too short
        if len(text) < chunk_size / 2:
            chunked_data.append({
                "content": text,
                "metadata": metadata
            })
            continue
        
        # Create chunks with overlap
        chunks = []
        for i in range(0, len(text), chunk_size - overlap):
            chunk = text[i:i + chunk_size]  # Extract a chunk of the specified size
            if chunk:  # Ensure we don't add empty chunks
                chunks.append(chunk)
        
        # Add each chunk with updated metadata
        for i, chunk in enumerate(chunks):
            chunk_metadata = metadata.copy()  # Copy the original metadata
            chunk_metadata["chunk_index"] = i  # Add chunk index to metadata
            chunk_metadata["chunk_count"] = len(chunks)  # Add total chunk count to metadata
            
            chunked_data.append({
                "content": chunk,  # The chunk text
                "metadata": chunk_metadata  # The updated metadata
            })
    
    print(f"Created {len(chunked_data)} text chunks")  # Print the number of created chunks
    return chunked_data  # Return the list of chunked data

## Image Processing Functions

- Encoding

In [None]:
def encode_image(image_path):
    
    """
    Encode an image file as base64.
    
    Args:
        image_path (str): Path to the image file
        
    Returns:
        str: Base64 encoded image
    """
    
    # Open the image file in binary read mode
    with open(image_path, "rb") as image_file:
        # Read the image file and encode it to base64
        encoded_image = base64.b64encode(image_file.read())
        # Decode the base64 bytes to a string and return
        return encoded_image.decode('utf-8')

#### Image caption generation

- We use here generic gpt-4o model, however it's worth trying other specialized models like **llava-1.5-7b**

In [None]:
def generate_image_caption(image_path):
    
    """
    Generate a caption for an image using OpenAI's vision capabilities.
    
    Args:
        image_path (str): Path to the image file
        
    Returns:
        str: Generated caption
    """
    
    # Check if the file exists and is an image
    if not os.path.exists(image_path):
        return "Error: Image file not found"
    
    try:
        # Open and validate the image
        Image.open(image_path)
        
        # Encode the image to base64
        base64_image = encode_image(image_path)
        
        # Create the API request to generate the caption
        response = client.chat.completions.create(
            model="gpt-4o", # Use the llava-1.5-7b model
            messages=[
                {
                    "role": "system",
                    "content": "You are an assistant specialized in describing images from academic papers. "
                    "Provide detailed captions for the image that capture key information. "
                    "If the image contains charts, tables, or diagrams, describe their content and purpose clearly. "
                    "Your caption should be optimized for future retrieval when people ask questions about this content."
                },
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe this image in detail, focusing on its academic content:"},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=300
        )
        
        # Extract the caption from the response
        caption = response.choices[0].message.content
        return caption
    
    except Exception as e:
        # Return an error message if an exception occurs
        return f"Error generating caption: {str(e)}"

- Image data creation with metadata(reference, page no etc. of the image) 

In [None]:
def process_images(image_paths):
    
    """
    Process all images and generate captions.
    
    Args:
        image_paths (List[Dict]): Paths to extracted images
        
    Returns:
        List[Dict]: Image data with captions
    """
    
    image_data = []  # Initialize an empty list to store image data with captions
    
    print(f"Generating captions for {len(image_paths)} images...")  # Print the number of images to process
    for i, img_item in enumerate(image_paths):
        print(f"Processing image {i+1}/{len(image_paths)}...")  # Print the current image being processed
        img_path = img_item["path"]  # Get the image path
        metadata = img_item["metadata"]  # Get the image metadata
        
        # Generate caption for the image
        caption = generate_image_caption(img_path)
        
        # Add the image data with caption to the list
        image_data.append({
            "content": caption,  # The generated caption
            "metadata": metadata,  # The image metadata
            "image_path": img_path  # The path to the image
        })
    
    return image_data  # Return the list of image data with captions

## Creating Embeddings for Text Chunks
Embeddings transform text into numerical vectors, which allow for efficient similarity search.

In [None]:
def create_embeddings(texts, model="text-embedding-3-large"):
    
    """
    Create embeddings for the given texts.
    
    Args:
        texts (str or List[str]): Input text(s)
        model (str): Embedding model name
        
    Returns:
        List[List[float]]: Embedding vectors
    """
    
    # Handle both string and list inputs
    input_texts = texts if isinstance(texts, list) else [texts]
    
    # Process in batches if needed (OpenAI API limits)
    batch_size = 100
    all_embeddings = []
    
    # Iterate over the input texts in batches
    for i in range(0, len(input_texts), batch_size):
        batch = input_texts[i:i + batch_size]  # Get the current batch of texts
        
        # Create embeddings for the current batch
        response = client.embeddings.create(
            model=model,
            input=batch
        )
        
        # Extract embeddings from the response
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)  # Add the batch embeddings to the list
    
    # If input was a string, return just the first embedding
    if isinstance(texts, str):
        return all_embeddings[0]
    
    # Otherwise return all embeddings
    return all_embeddings

## Creating Multimodal Vector Store

- A simplified real time object based vector store implementation. An industry level application may require a permanent vector database consisting embeddings and lookup tables of entire knowledge base for faster and efficient retrieval

- The VectorStore object is consist of
   - chunk embeddings
   - contents/texts
   - metadata of elements


- The VectorStore object has `similarity_search` recipe of finding the most similar items to a query embedding based on **cosine similarity** 

In [None]:
class MultiModalVectorStore:
    
    """
    A simple vector store implementation for multi-modal content.
    """
    
    def __init__(self):
        # Initialize lists to store vectors, contents, and metadata
        self.vectors = []
        self.contents = []
        self.metadata = []
    
    def add_item(self, content, embedding, metadata=None):
       
        """
        Add an item to the vector store.
        
        Args:
            content (str): The content (text or image caption)
            embedding (List[float]): The embedding vector
            metadata (Dict, optional): Additional metadata
        """
        
        # Append the embedding vector, content, and metadata to their respective lists
        self.vectors.append(np.array(embedding))
        self.contents.append(content)
        self.metadata.append(metadata or {})
    
    def add_items(self, items, embeddings):
        
        """
        Add multiple items to the vector store.
        
        Args:
            items (List[Dict]): List of content items
            embeddings (List[List[float]]): List of embedding vectors
        """
        
        # Loop through items and embeddings and add each to the vector store
        for i, (item, embedding) in enumerate(zip(items, embeddings)):
            self.add_item(
                # text=item["content"],  # Extract text from item
                content=item["content"],
                embedding=embedding,
                metadata={**item.get("metadata", {}), "index": i}
            )
    
    def similarity_search(self, query_embedding, k=5):
        
        """
        Find the most similar items to a query embedding.
        
        Args:
            query_embedding (List[float]): Query embedding vector
            k (int): Number of results to return
            
        Returns:
            List[Dict]: Top k most similar items
        """
        
        # Return an empty list if there are no vectors in the store
        if not self.vectors:
            return []
        
        # Convert query embedding to numpy array
        query_vector = np.array(query_embedding)
        
        # Calculate similarities using cosine similarity
        similarities = []
        for i, vector in enumerate(self.vectors):
            similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector))
            similarities.append((i, similarity))
        
        # Sort by similarity (descending)
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # Return top k results
        results = []
        for i in range(min(k, len(similarities))):
            idx, score = similarities[i]
            results.append({
                # "text": self.contents[idx],  # Retrieve text by index
                "content": self.contents[idx],
                "metadata": self.metadata[idx],
                "similarity": float(score)  # Convert to float for JSON serialization
            })
        
        return results
    
    def get_all_documents(self):
        
        """
        Get all documents in the store.
        
        Returns:
            List[Dict]: All documents
        """
        
        return [{"text": content, "metadata": meta} for content, meta in zip(self.contents, self.metadata)]  # Combine texts and metadata

## BM25 Implementation

- BM25, or Best Match 25, also known as Okapi BM25, is a ranking algorithm for information retrieval and search engines that determines a document's relevance to a given query and ranks documents based on their relevance scores.

In [None]:
def create_bm25_index(chunks):
    
    """
    Create a BM25 index from the given chunks.
    
    Args:
        chunks (List[Dict]): List of text chunks
        
    Returns:
        BM25Okapi: A BM25 index
    """
    
    # Extract text from each chunk
    texts = [chunk["content"] for chunk in chunks]
    
    # Tokenize each document by splitting on whitespace
    tokenized_docs = [text.split() for text in texts]
    
    # Create the BM25 index using the tokenized documents
    bm25 = BM25Okapi(tokenized_docs)
    
    # Print the number of documents in the BM25 index
    print(f"Created BM25 index with {len(texts)} documents")
    
    return bm25

- Based on the BM25 index, chunked texts and a given search query retrieves the top k context documents

In [None]:
def bm25_search(bm25, chunks, query, k=5):
    
    """
    Search the BM25 index with a query.
    
    Args:
        bm25 (BM25Okapi): BM25 index
        chunks (List[Dict]): List of text chunks
        query (str): Query string
        k (int): Number of results to return
        
    Returns:
        List[Dict]: Top k results with scores
    """
    
    # Tokenize the query by splitting it into individual words
    query_tokens = query.split()
    
    # Get BM25 scores for the query tokens against the indexed documents
    scores = bm25.get_scores(query_tokens)
    
    # Initialize an empty list to store results with their scores
    results = []
    
    # Iterate over the scores and corresponding chunks
    for i, score in enumerate(scores):
        # Create a copy of the metadata to avoid modifying the original
        metadata = chunks[i].get("metadata", {}).copy()
        # Add index to metadata
        metadata["index"] = i
        
        results.append({
            "text": chunks[i]["content"],
            "metadata": metadata,  # Add metadata with index
            "bm25_score": float(score)
        })
    
    # Sort the results by BM25 score in descending order
    results.sort(key=lambda x: x["bm25_score"], reverse=True)
    
    # Return the top k results
    return results[:k]

## Fusion Retrieval Function

- Finally combining merits of both multimodal vector_store results and BM25 search based results
- calculates the `combined_score` based on `alpha` - weight distribution between vector_store based score and BM25 search based score

In [None]:
def fusion_retrieval(query, chunks, vector_store, bm25_index, k=5, alpha=0.5):
    
    """
    Perform fusion retrieval combining multimodal vector-based and BM25 search.
    
    Args:
        query (str): Query string
        chunks (List[Dict]): Original text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of results to return
        alpha (float): Weight for vector scores (0-1), where 1-alpha is BM25 weight
        
    Returns:
        List[Dict]: Top k results based on combined scores
    """
    
    print(f"Performing fusion retrieval for query: {query}")
    
    # Define small epsilon to avoid division by zero
    epsilon = 1e-8
    
    # Get vector search results
    query_embedding = create_embeddings(query)  # Create embedding for the query
    vector_results = vector_store.similarity_search(query_embedding, k=len(chunks))  # Perform vector search
    
    # Separate text and image results
    text_results = [r for r in vector_results if r["metadata"].get("type") == "text"]
    image_results = [r for r in vector_results if r["metadata"].get("type") == "image"]
    
    print(f"Retrieved {len(vector_results)} relevant items ({len(text_results)} text, {len(image_results)} image captions)")

    # Get BM25 search results
    bm25_results = bm25_search(bm25_index, chunks, query, k=len(chunks))  # Perform BM25 search
    
    # Create dictionaries to map document index to score
    vector_scores_dict = {result["metadata"]["index"]: result["similarity"] for result in vector_results}
    bm25_scores_dict = {result["metadata"]["index"]: result["bm25_score"] for result in bm25_results}
    
    # Ensure all documents have scores for both methods
    all_docs = vector_store.get_all_documents()
    combined_results = []
    
    for i, doc in enumerate(all_docs):
        vector_score = vector_scores_dict.get(i, 0.0)  # Get vector score or 0 if not found
        bm25_score = bm25_scores_dict.get(i, 0.0)  # Get BM25 score or 0 if not found
        combined_results.append({
            "text": doc["text"],
            "metadata": doc["metadata"],
            "vector_score": vector_score,
            "bm25_score": bm25_score,
            "index": i
        })
    
    # Extract scores as arrays
    vector_scores = np.array([doc["vector_score"] for doc in combined_results])
    bm25_scores = np.array([doc["bm25_score"] for doc in combined_results])
    
    # Normalize scores
    norm_vector_scores = (vector_scores - np.min(vector_scores)) / (np.max(vector_scores) - np.min(vector_scores) + epsilon)
    norm_bm25_scores = (bm25_scores - np.min(bm25_scores)) / (np.max(bm25_scores) - np.min(bm25_scores) + epsilon)
    
    # Compute combined scores
    combined_scores = alpha * norm_vector_scores + (1 - alpha) * norm_bm25_scores
    
    # Add combined scores to results
    for i, score in enumerate(combined_scores):
        combined_results[i]["combined_score"] = float(score)
    
    # Sort by combined score (descending)
    combined_results.sort(key=lambda x: x["combined_score"], reverse=True)
    
    # Return top k results
    top_results = combined_results[:k]
    
    print(f"Retrieved {len(top_results)} documents with fusion retrieval")
    return top_results

## Document Processing Pipeline

- We have option whether to use image information or not 

In [None]:
def process_document(pdf_path, use_image = True, chunk_size=1000, chunk_overlap=200):
    
    """
    Process a document for fusion retrieval.
    
    Args:
        pdf_path (str): Path to the PDF file
        chunk_size (int): Size of each chunk in characters
        chunk_overlap (int): Overlap between chunks in characters
        
    Returns:
        Tuple[List[Dict], SimpleVectorStore, BM25Okapi]: Chunks, vector store, and BM25 index
    """
    
    image_dir = "extracted_images"
    os.makedirs(image_dir, exist_ok=True)
    # Extract text and images from the PDF
    if use_image:
        text, image_paths = extract_content_from_pdf(pdf_path, image_dir)
    
    else:
        text, image_paths = extract_content_from_pdf(pdf_path, use_image=False)

    # process the extracted images to generate captions
    image_data = process_images(image_paths)        
    
    # Split the cleaned text into overlapping chunks
    chunked_text = chunk_text(text, chunk_size, chunk_overlap)
    
    # Combine all content items (text chunks and image captions)
    all_items = chunked_text + image_data
    
    # Extract content for embedding
    contents = [item["content"] for item in all_items]
    
    # Create embeddings for all content
    print("Creating embeddings for all content...")
    embeddings = create_embeddings(contents)
    
    # Build the vector store and add items with their embeddings
    vector_store = MultiModalVectorStore()
    vector_store.add_items(all_items, embeddings)

    # Prepare document info with counts of text chunks and image captions
    doc_info = {
        "text_count": len(chunked_text),
        "image_count": len(image_data),
        "total_items": len(all_items),
    }
    
    # Print summary of added items
    print(f"Added {len(all_items)} items to vector store ({len(chunked_text)} text chunks, {len(image_data)} image captions)")
    
    # Create a BM25 index from the chunks
    bm25_index = create_bm25_index(chunked_text)
    
    # Return the chunks, vector store, and BM25 index
    return chunked_text, vector_store, bm25_index, doc_info

## Response Generation

- Based on the query and retrieved results as context generate the final answer.
- gpt-4o is used as LLM brain, however worth exploring _Meta’s Llama family models_
- Also temperature plays crucial role in response generation, the degree of exploration(randomness) or exploitation(deterministic).I used a moderate temperature = 0.1

In [None]:
def generate_response(query, results):
    
    """
    Generate a response based on the query and retrieved results.
    
    Args:
        query (str): User query
        results (List[Dict]): Retrieved content
        
    Returns:
        str: Generated response
    """
    
    # Format the context from the retrieved results
    context = ""
    
    for i, result in enumerate(results):
        print(result)
        # Determine the type of content (text or image caption)
        content_type = "Text" if result["metadata"].get("type") == "text" else "Image caption"
        # Get the page number from the metadata
        page_num = result["metadata"].get("page", "unknown")
        
        # Append the content type and page number to the context
        context += f"[{content_type} from page {page_num}]\n"
        # Append the actual content to the context
        context += result["text"]
        context += "\n\n"
    
    # System message to guide the AI assistant
    system_message = """You are an AI assistant specializing in answering questions about documents 
    that contain both text and images. You have been given relevant text passages and image captions 
    from the document. Use this information to provide a comprehensive, accurate response to the query.
    If information comes from an image or chart, mention this in your answer.
    If the retrieved information doesn't fully answer the query, acknowledge the limitations."""

    # User message containing the query and the formatted context
    user_message = f"""Query: {query}

    Retrieved content:
    {context}

    Please answer the query based on the retrieved content.
    """
    
    # Generate the response using the OpenAI API
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        temperature=0.1
    )
    
    # Return the generated response
    return response.choices[0].message.content

## Main Retrieval Function

- Combining multimodal vector_store and BM25 based search returns the final response and retrieved documents for user review.

## Comparing Retrieval Methods

### I compared 4 retrieval methods to assess pros and cons of each

- `build_text_only_store` - text-only vector store and retrieval with out bm25 method

- `build_text_image_store` - text and image based multimodal vector store and retrieval with out bm25 method

- `bm25_only_rag` - based on only bm25 based search result

- `answer_with_multimodal_fusion_rag` - fusion based rag, combining multimodal vector store and bm25 based search and scoring

In [None]:
def build_text_only_store(pdf_path, chunk_size=1000, chunk_overlap=200):
    
    """
    Build a text-only vector store for comparison.
    
    Args:
        pdf_path (str): Path to the PDF file
        chunk_size (int): Size of each chunk in characters
        chunk_overlap (int): Overlap between chunks in characters
        
    Returns:
        VectorStore: Text-only vector store
    """
    
    # Extract text from PDF (reuse function but ignore images)
    text_data, _ = extract_content_from_pdf(pdf_path, False, None)
    
    # Chunk text
    chunked_text = chunk_text(text_data, chunk_size, chunk_overlap)
    
    # Extract content for embedding
    contents = [item["content"] for item in chunked_text]
    
    # Create embeddings
    print("Creating embeddings for text-only content...")
    embeddings = create_embeddings(contents)
    
    # Build vector store
    vector_store = MultiModalVectorStore()
    vector_store.add_items(chunked_text, embeddings)
    
    print(f"Added {len(chunked_text)} text items to text-only vector store")
    return vector_store

In [None]:
def build_text_image_store(pdf_path, chunk_size=1000, chunk_overlap=200):
    
    """
    Build a text-only vector store for comparison.
    
    Args:
        pdf_path (str): Path to the PDF file
        chunk_size (int): Size of each chunk in characters
        chunk_overlap (int): Overlap between chunks in characters
        
    Returns:
        MultiModalVectorStore: Text-only vector store
    """
    
    # Extract text from PDF (reuse function but ignore images)
    text_data, _ = extract_content_from_pdf(pdf_path, None)
    
    # Chunk text
    chunked_text = chunk_text(text_data, chunk_size, chunk_overlap)
    
    # Extract content for embedding
    contents = [item["content"] for item in chunked_text]
    
    # Create embeddings
    print("Creating embeddings for text-only content...")
    embeddings = create_embeddings(contents)
    
    # Build vector store
    vector_store = MultiModalVectorStore()
    vector_store.add_items(chunked_text, embeddings)
    
    print(f"Added {len(chunked_text)} text items to text-image vector store")
    return vector_store

In [None]:
def bm25_only_rag(query, chunks, bm25_index, k=5):
    
    """
    Answer a query using only BM25-based RAG.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        
    Returns:
        Dict: Query results
    """
    
    # Retrieve documents using BM25 search
    retrieved_docs = bm25_search(bm25_index, chunks, query, k=k)
    
    # Format the context from the retrieved documents by joining their text with separators
    # context = "\n\n---\n\n".join([doc["text"] for doc in retrieved_docs])
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, retrieved_docs)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }

In [None]:
def answer_with_multimodal_fusion_rag(query, chunks, vector_store, bm25_index, k=5, alpha=0.5):
    
    """
    Answer a query using fusion RAG.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        vector_store (SimpleVectorStore): Vector store
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores
        
    Returns:
        Dict: Query results including retrieved documents and response
    """
    
    # Retrieve documents using fusion retrieval method
    retrieved_docs = fusion_retrieval(query, chunks, vector_store, bm25_index, k=k, alpha=alpha)
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, retrieved_docs)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }

 - Method to generate response and k nos of retrieved documents given  a query and vector_store

In [None]:
def vector_only_rag(query, vector_store, k=5):
    
    """
    Answer a query using only vector-based RAG.
    
    Args:
        query (str): User query
        vector_store (SimpleVectorStore): Vector store
        k (int): Number of documents to retrieve
        
    Returns:
        Dict: Query results
    """
    
    # Create query embedding
    query_embedding = create_embeddings(query)
    
    # Retrieve documents using vector-based similarity search
    retrieved_docs = vector_store.similarity_search(query_embedding, k=k)
    print(f"Retrieved {len(retrieved_docs)} documents using vector search")
    for doc in retrieved_docs:
        doc['text'] = doc.pop('content')
    
    
    # Generate a response based on the query and the formatted context
    response = generate_response(query, retrieved_docs)
    
    # Return the query, retrieved documents, and the generated response
    return {
        "query": query,
        "retrieved_documents": retrieved_docs,
        "response": response
    }


## Evaluation Functions

### Evaluates merits of above mentioned 4 retrieval strategies.

- There are different industry standard statistical metrics in NLP - BLEU, ROUGE, MRR, BERTScore etc. However in LLM world a qualitative evaluation is more relevant
- A manual review of all the the generated answers and retrieved documents would be too time consuming and also error prone
- I created the following frame work as qualitative evaluation
  
  -  __LLM as Generator__: Replace human effort by generating ___n___ nos of question-answer pairs by LLM using structured prompts from the pdf document as validation data
  
  (** it's possible that the system generate different sets of validation question-answer pairs each time the generator functions run.This is actually the beauty of the system and make the validation system random to navigate possible memorization)
  
  -  __LLM as Evaluator__: Replace human effort of qualitative evaluation by evaluating the question-answer pairs by LLM itself using well designed prompts.Further the generated evaluation clearly describe merits and drawback of the each validation query in terms of possible dimensions like __Relevance__, __Factual correctness__, __Completeness__, __Clarity and coherence__ etc. w.r.t. the respective reference answer.

In [None]:
def generate_validation_questions_anaswers(pdf_path, num_question_answers=10):
    
    """
    Generate question-answer pairs by analyzing the content of a PDF document.
    
    Args:
        pdf_path (str): path to the PDF file
        num_question_answers (int): Number of question-answer pairs to generate
        
    Returns:
        List[Dict]: List of question-answer pairs
    """
    
    system_prompt = """You are an expert on Retrieval-Augmented Generation (RAG) systems. Based on the provided contents from the document,
    generate a set of question-answer pairs. The questions should be relevant to the content and cover various aspects including the main idea, reasoning, and image interpretation.
    The questions should be strinctly based the content of the document. 

    Generate 10 question-answer pairs:
    - 3 factual questions (one about hallucination in LLMs),
    - 3 analytical questions (critical thinking),
    - 4 image-based questions.
    Categorize also the question-answer pairs as "factual", "analytical", or "image-based".
    Answers should be brief and to the point, and image questions should refer to the figures explicitly."""

    # Extract text and captions
    doc = fitz.open(pdf_path)
    full_text = ""
    image_captions = []

    for page in doc:
        full_text += page.get_text()
        for block in page.get_text("dict")["blocks"]:
            if block["type"] == 0:
                for line in block["lines"]:
                    line_text = " ".join([span["text"] for span in line["spans"]])
                    if line_text.lower().startswith("figure"):
                        image_captions.append(line_text)

    doc.close()

    user_prompt = f"""Document Text:
    {full_text[:4000]}  # Truncated to first 4000 chars to fit prompt length

    Image Captions:
    {chr(10).join(image_captions)}

    Generate {num_question_answers} question-answer pairs based on the above content.
    """
   
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.1
    )
    
    # Extract the content from the response
    content = response.choices[0].message.content
    question_type = None
    qa_list = []

    sections = re.split(r"### (Factual|Analytical|Image-Based) Questions", content)

    # Skip the initial empty or heading text
    for i in range(1, len(sections), 2):
        question_type = sections[i].strip()
        section_text = sections[i + 1]

        # Find question-answer pairs
        qa_pairs = re.findall(
            r"\*\*Question:\*\*\s*(.*?)\n\s*-\s*\*\*Answer:\*\*\s*(.*?)(?=\n\d+\.|\Z)",
            section_text,
            flags=re.DOTALL
        )

        for question, answer in qa_pairs:
            qa_list.append({
                "question": question.strip(),
                "answer": answer.strip(),
                "type": question_type
            })
        
    # Save the question-answer pairs to a JSON file
    with open("data/val.json", "w") as f:
        json.dump(qa_list, f, indent=4)
    return qa_list

In [None]:
def evaluate_responses(query, vector_response, vector_response_with_images, bm25_response, fusion_response, reference_answer=None):
    
    """
    Evaluate the responses from different retrieval methods.
    
    Args:
        query (str): User query
        vector_response (str): Response from vector-only RAG based on only text
        vector_response_with_images (str): Response from vector-only RAG based on text and image captions
        bm25_response (str): Response from BM25-only RAG
        fusion_response (str): Response from fusion RAG
        reference_answer (str, optional): Reference answer
        
    Returns:
        str: Evaluation of responses
    """
    
    # System prompt for the evaluator to guide the evaluation process
    system_prompt = """You are an expert evaluator of RAG systems. Compare responses from three different retrieval approaches:
    1. Vector-based retrieval: Uses semantic similarity for document retrieval
    2. Vector-based retrieval with images: Uses semantic similarity and image captions
    2. BM25 keyword retrieval: Uses keyword matching for document retrieval
    3. Fusion retrieval: Combines both vector and keyword approaches

    Evaluate the responses based on:
    - Relevance to the query
    - Factual correctness
    - Comprehensiveness
    - Clarity and coherence
    - 
    """

    # User prompt containing the query and responses
    user_prompt = f"""Query: {query}

    Vector-based response:
    {vector_response}

    Vector-based response with images:
    {vector_response_with_images}

    BM25 keyword response:
    {bm25_response}

    Fusion response:
    {fusion_response}
    """

    # Add reference answer to the prompt if provided
    if reference_answer:
        user_prompt += f"""
            Reference answer:
            {reference_answer}
        """

    # Add instructions for detailed comparison to the user prompt
    user_prompt += """
    Please provide a detailed comparison of these three responses. Which approach performed best for this query and why?
    Be specific about the strengths and weaknesses of each approach for this particular query.
    At the end pick a answer which retrieval approaches is the best for the given query.
    """

    # Generate the evaluation using meta-llama/Llama-3.2-3B-Instruct
    response = client.chat.completions.create(
        model="gpt-4o",  # Specify the model to use
        messages=[
            {"role": "system", "content": system_prompt},  # System message to guide the evaluator
            {"role": "user", "content": user_prompt}  # User message with query and responses
        ],
        temperature=0  # Set the temperature for response generation
    )
    
    # Return the generated evaluation content
    return response.choices[0].message.content

In [None]:
def compare_retrieval_methods(query,vector_store_with_text_only, vector_store_with_image_captions, chunks, bm25_index, k=5, alpha=0.5, reference_answer=None):
    
    """
    Compare different retrieval methods for a query.
    
    Args:
        query (str): User query
        chunks (List[Dict]): Text chunks
        vector_store_with_text_only :Simple VectorStore consisting of text only
        vector_store_with_image_captions: Multimodal VectorStore consisting of text and image data
        bm25_index (BM25Okapi): BM25 index
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores in fusion retrieval
        reference_answer (str, optional): Reference answer for comparison
        
    Returns:
        Dict: Comparison results of different retrieval methods
    """
    
    print(f"\n=== Comparing retrieval methods for query: {query} ===\n")
    
    # Run vector-only RAG - only for text
    print("\nRunning vector-only RAG with text only...")
    vector_result = vector_only_rag(query, vector_store_with_text_only, k)

    # Run vector-only RAG - both text and image captions
    print("\nRunning vector-only RAG with text and image captions...")
    vector_result_with_images = vector_only_rag(query, vector_store_with_image_captions, k)
    
    # Run BM25-only RAG
    print("\nRunning BM25-only RAG...")
    bm25_result = bm25_only_rag(query, chunks, bm25_index, k)
    
    # Run fusion RAG
    print("\nRunning fusion RAG with text and image...")
    fusion_result = answer_with_multimodal_fusion_rag(query, chunks, vector_store_with_image_captions, bm25_index, k, alpha)
    
    # Compare responses from different retrieval methods
    print("\nComparing responses...")
    comparison = evaluate_responses(
        query, 
        vector_result["response"], 
        vector_result_with_images["response"],
        bm25_result["response"], 
        fusion_result["response"],
        reference_answer
    )
    
    # Return the comparison results
    return {
        "query": query,
        "vector_result": vector_result,
        "vector_result_with_images": vector_result_with_images,
        "bm25_result": bm25_result,
        "fusion_result": fusion_result,
        "comparison": comparison
    }

## Complete Evaluation Pipeline

- For all the test query we generate a structured,comparative Evaluation results for 4 different retrieval strategies.

In [None]:
def evaluate_fusion_retrieval(pdf_path, test_queries, reference_answers=None, k=5, alpha=0.5):
    
    """
    Evaluate fusion retrieval compared to other methods.
    
    Args:
        pdf_path (str): Path to the PDF file
        test_queries (List[str]): List of test queries
        reference_answers (List[str], optional): Reference answers
        k (int): Number of documents to retrieve
        alpha (float): Weight for vector scores in fusion retrieval
        
    Returns:
        Dict: Evaluation results
    """
    
    print("=== EVALUATING FUSION RETRIEVAL ===\n")
    
    # Process the document to extract text, create chunks, and build vector and BM25 indices with out images
    chunks, vector_store_wo_image, bm25_index, _ = process_document(pdf_path, False)

    # Process the document to extract text, create chunks, and build vector and BM25 indices
    chunks, vector_store_with_image, bm25_index, _ = process_document(pdf_path)
    
    # Initialize a list to store results for each query
    results = []
    
    # Iterate over each test query
    for i, query in enumerate(test_queries):
        print(f"\n\n=== Evaluating Query {i+1}/{len(test_queries)} ===")
        print(f"Query: {query}")
        
        # Get the reference answer if available
        reference = None
        if reference_answers and i < len(reference_answers):
            reference = reference_answers[i]
        
        # Compare retrieval methods for the current query
        comparison = compare_retrieval_methods(
            query, 
            vector_store_wo_image, 
            vector_store_with_image,
            chunks,
            bm25_index, 
            k=k, 
            alpha=alpha,
            reference_answer=reference
        )
        
        # Append the comparison results to the results list
        results.append(comparison)
        
        # Print the responses from different retrieval methods
        print("\n=== Vector-based Response with out image ===")
        print(comparison["vector_result"]["response"])
        
        # Print the responses from different retrieval methods
        print("\n=== Vector-based Response with image ===")
        print(comparison["vector_result_with_images"]["response"])

        print("\n=== BM25 Response ===")
        print(comparison["bm25_result"]["response"])
        
        print("\n=== Fusion Response ===")
        print(comparison["fusion_result"]["response"])
        
        print("\n=== Comparison ===")
        print(comparison["comparison"])
    
    # Generate an overall analysis of the fusion retrieval performance
    overall_analysis = generate_overall_analysis(results)
    
    # Return the results and overall analysis
    return {
        "results": results,
        "overall_analysis": overall_analysis
    }

In [None]:
def generate_overall_analysis(results):
    
    """
    Generate an overall analysis of retrieval strategies.
    
    Args:
        results (List[Dict]): Results from evaluating queries
        
    Returns:
        str: Overall analysis
    """
    
    # System prompt to guide the evaluation process
    system_prompt = """You are an expert at evaluating information retrieval systems. 
    Based on multiple test queries, provide an overall analysis comparing three retrieval approaches:
    1. Vector-based retrieval with out metadata of result having no 'type' as 'image' and rather have all 'type' as 'text' (semantic similarity)
    2. Vector-based retrieval with metadata of result may have 'type' as 'image' as well as 'type' as 'text' (semantic similarity)
    3. BM25 keyword retrieval (keyword matching)
    4. Fusion retrieval (combination of both)

    Focus on:
    1. Types of queries where each approach performs best
    2. Overall strengths and weaknesses of each approach
    3. How fusion retrieval balances the trade-offs
    4. How fusion retrieval with image based answers provides advantages over the individual methods
    5. Recommendations for when to use each approach
    
    At the end generate a summary of the analysis in a tabular format in overall which retrieval approaches is the best for most of questions.
    """

    # Create a summary of evaluations for each query
    evaluations_summary = ""
    for i, result in enumerate(results):
        evaluations_summary += f"Query {i+1}: {result['query']}\n"
        evaluations_summary += f"Comparison Summary: {result['comparison'][:200]}...\n\n"

    # User prompt containing the evaluations summary
    user_prompt = f"""Based on the following evaluations of different retrieval methods across {len(results)} queries, 
    provide an overall analysis comparing these three approaches:

    {evaluations_summary}

    Please provide a comprehensive analysis of vector-based relying only on texts, vector-based relying both on text and images, BM25, and fusion retrieval approaches,
    highlighting when and why fusion retrieval provides advantages over the individual methods."""

    # Generate the overall analysis using meta-llama/Llama-3.2-3B-Instruct
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0
    )
    
    # Return the generated analysis content
    return response.choices[0].message.content

## Evaluating Fusion Retrieval

- Running the evaluator for all the reference queries and respective answers

In [None]:
# Path to validation document with questions and answers

# Path to the PDF document to be evaluated
pdf_path = "data/RAG_white_papers_articles.pdf"
val_path = "data/val.json"
generate_validation_questions_anaswers(pdf_path, num_question_answers=10)
validation_doc = json.load(open(val_path, "r"))

test_queries = [item['question'] for item in validation_doc]

## Quick test queries
# test_queries = [
#     "Why might Modular RAG offer a significant advantage over Naive or Advanced RAG in real-world applications?",
#     "Based on Figure 1 (RAG Technology Tree), how has RAG research evolved over time?",  # AI-specific query
#     "What is a primary challenge that LLMs face which RAG is designed to solve?"
# ]

# Optional reference answer

reference_answers = [item['answer'] for item in validation_doc]

# reference_answers = [
#     "Modular RAG provides flexibility by allowing modules to be rearranged or replaced, enabling better adaptation to diverse tasks, reducing redundancy, and supporting more dynamic interaction flows",
#     "Initially focused on inference through retrieval, RAG research has expanded into pre-training and fine-tuning stages, incorporating more complex and adaptable architectures",
#     "Hallucination—producing content not grounded in factual sources—is a key challenge RAG addresses by grounding generation in retrieved external knowledge"

# ]

# Set parameters
k = 5  # Number of documents to retrieve
alpha = 0.5  # Weight for vector scores (0.5 means equal weight between vector and BM25)

# Run evaluation
evaluation_results = evaluate_fusion_retrieval(
    pdf_path=pdf_path,
    test_queries=test_queries,
    reference_answers=reference_answers,
    k=k,
    alpha=alpha
)

# Print overall analysis
print("\n\n=== OVERALL ANALYSIS ===\n")
print(evaluation_results["overall_analysis"])