<a href="https://colab.research.google.com/github/jimimased/multimodal_rag_pipeline/blob/main/notebooks/multimodal_rag_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multimodal RAG Pipeline - Google Colab Implementation

This notebook demonstrates how to use the Multimodal RAG Pipeline in Google Colab with GPU acceleration. It covers:

1. Setting up the environment
2. Cloning the repository
3. Installing dependencies
4. Processing multimodal documents
5. Generating embeddings
6. Indexing and retrieval
7. Evaluating performance

The notebook is designed to work with the GPU runtime in Google Colab.

## 1. Check GPU Availability

First, let's check if a GPU is available. If not, go to Runtime > Change runtime type and select GPU.

In [None]:
!nvidia-smi

import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")

## 2. Clone the Repository

Clone the multimodal RAG pipeline repository from GitHub.

In [None]:
!git clone https://github.com/jimimased/multimodal_rag_pipeline.git
%cd multimodal_rag_pipeline

## 3. Install Dependencies

Install the required packages for the multimodal RAG pipeline.

In [None]:
!pip install -r requirements.txt

# Install additional dependencies for Colab
!pip install google-colab
!pip install pydrive
!pip install faiss-gpu  # GPU version of FAISS

# Install spaCy model
!python -m spacy download en_core_web_trf

## 4. Set Up Environment Variables

Set up environment variables for API keys and other configuration.

In [None]:
import os
from google.colab import userdata

# Set API keys (securely stored in Colab secrets)
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
os.environ["HUGGINGFACEHUB_API_TOKEN"] = userdata.get('HUGGINGFACEHUB_API_TOKEN')
os.environ["ANTHROPIC_API_KEY"] = userdata.get('ANTHROPIC_API_KEY')
os.environ["PINECONE_API_KEY"] = userdata.get('PINECONE_API_KEY')

## 5. Import Required Modules

Import the necessary modules from the multimodal RAG pipeline.

In [None]:
import sys
sys.path.append('.')

from multimodal_rag_pipeline.document_processing.document_loaders import load_documents
from multimodal_rag_pipeline.content_processing.text_processing import process_text
from multimodal_rag_pipeline.content_processing.image_analysis import analyze_images
from multimodal_rag_pipeline.content_processing.multimodal_fusion import fuse_modalities
from multimodal_rag_pipeline.embedding_indexing.text_embeddings import generate_text_embeddings
from multimodal_rag_pipeline.embedding_indexing.image_embeddings import generate_image_embeddings
from multimodal_rag_pipeline.embedding_indexing.vector_db import index_embeddings
from multimodal_rag_pipeline.retrieval_generation.query_understanding import process_query
from multimodal_rag_pipeline.retrieval_generation.llm_integration import generate_response
from multimodal_rag_pipeline.utils.config_loader import load_config

## 6. Load Configuration

Load the configuration file and modify it for Colab environment.

In [None]:
# Load the default configuration
config = load_config('config/config.yaml')

# Modify configuration for Colab environment
config['general']['use_gpu'] = torch.cuda.is_available()
config['general']['batch_size'] = 32 if torch.cuda.is_available() else 8
config['vector_db']['provider'] = 'faiss'  # Use FAISS for in-memory vector storage

# Print the modified configuration
import yaml
print(yaml.dump(config, default_flow_style=False))

## 7. Load Documents

You can either upload documents directly or load them from Google Drive.

In [None]:
from google.colab import files, drive
import os
import glob

# Create input directory
input_dir = 'input_documents'
os.makedirs(input_dir, exist_ok=True)

# Choose data source
use_gdrive = True  # Set to False to upload files directly instead

if use_gdrive:
    # Option 1: Load from Google Drive
    print("Loading documents from Google Drive...")
    
    # Mount Google Drive if not already mounted
    if not os.path.exists('/content/drive'):
        print("Mounting Google Drive...")
        drive.mount('/content/drive')
    
    # Path to your Google Drive folder containing documents
    gdrive_path = "/content/drive/MyDrive/SUMBA"  # Change to your folder path
    
    # Check if the path exists
    if not os.path.exists(gdrive_path):
        print(f"Google Drive path not found: {gdrive_path}")
        print("Falling back to file upload...")
        use_gdrive = False
    else:
        print(f"Found Google Drive folder: {gdrive_path}")
        
        # Find all document files in the directory
        doc_extensions = ['*.pdf', '*.docx', '*.doc', '*.txt', '*.html', '*.htm']
        doc_files = []
        
        for ext in doc_extensions:
            doc_files.extend(glob.glob(os.path.join(gdrive_path, ext)))
            doc_files.extend(glob.glob(os.path.join(gdrive_path, '**', ext), recursive=True))
        
        # Copy files to input directory
        if doc_files:
            print(f"Found {len(doc_files)} documents in Google Drive")
            for doc_path in doc_files:
                filename = os.path.basename(doc_path)
                dest_path = os.path.join(input_dir, filename)
                # Create a symbolic link instead of copying to save space
                os.symlink(doc_path, dest_path)
                print(f"Linked {filename} to {input_dir}")
        else:
            print("No document files found in Google Drive folder")
            print("Falling back to file upload...")
            use_gdrive = False

if not use_gdrive:
    # Option 2: Upload files directly
    print("\nUploading files directly...")
    uploaded = files.upload()
    
    # Save uploaded files to input directory
    for filename, content in uploaded.items():
        with open(os.path.join(input_dir, filename), 'wb') as f:
            f.write(content)
        print(f'Saved {filename} to {input_dir}')

## 8. Process Documents

Process the uploaded documents using the multimodal RAG pipeline.

In [None]:
# Document processing
print("Starting document processing...")
documents = load_documents(input_dir, config["document_processing"])
print(f"Processed {len(documents)} documents")

# Display document information
for i, doc in enumerate(documents[:5]):  # Show first 5 documents
    print(f"\nDocument {i+1}:")
    print(f"  Type: {doc['type']}")
    print(f"  Path: {doc['path']}")
    print(f"  Metadata: {doc['metadata']}")
    print(f"  Text length: {len(doc.get('text', ''))} characters")
    print(f"  Number of images: {len(doc.get('images', []))}")
    print(f"  Number of tables: {len(doc.get('tables', []))}")

## 9. Content Processing

Process the content of the documents, including text processing, image analysis, and multimodal fusion.

In [None]:
# Text processing
print("\nStarting text processing...")
processed_text = process_text(documents, config["content_processing"]["text_processing"])
print(f"Created {len(processed_text)} text chunks")

# Display sample text chunks
for i, chunk in enumerate(processed_text[:3]):  # Show first 3 chunks
    print(f"\nText Chunk {i+1}:")
    print(f"  ID: {chunk['id']}")
    print(f"  Source: {chunk['source']}")
    print(f"  Text: {chunk['text'][:200]}...")
    print(f"  Entities: {chunk.get('entities', [])}")
    print(f"  Classification: {chunk.get('classification', '')}")

In [None]:
# Image analysis
print("\nStarting image analysis...")

# Check if we need to load additional images from Google Drive
load_additional_images = True  # Set to False if you don't want to load additional images

if load_additional_images and os.path.exists('/content/drive'):
    print("Loading additional images from Google Drive for analysis...")
    
    # Path to your Google Drive folder containing images
    images_gdrive_path = "/content/drive/MyDrive/SUMBA/images"  # Change to your folder path
    
    if os.path.exists(images_gdrive_path):
        # Find all image files in the directory
        image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.bmp', '*.tiff']
        image_files = []
        
        for ext in image_extensions:
            image_files.extend(glob.glob(os.path.join(images_gdrive_path, ext)))
            image_files.extend(glob.glob(os.path.join(images_gdrive_path, '**', ext), recursive=True))
        
        if image_files:
            print(f"Found {len(image_files)} images in Google Drive")
            
            # Add images to documents for processing
            if not documents:
                documents = [{'type': 'image_collection', 'path': 'gdrive_images', 'images': []}]
            
            for img_path in image_files[:10]:  # Limit to 10 images for demo purposes
                try:
                    with open(img_path, 'rb') as f:
                        img_data = f.read()
                    
                    filename = os.path.basename(img_path)
                    documents[0]['images'].append({
                        'id': f"gdrive_img_{len(documents[0]['images'])+1}",
                        'filename': filename,
                        'image_data': img_data
                    })
                    print(f"Added image: {filename}")
                except Exception as e:
                    print(f"Error loading image {img_path}: {e}")
        else:
            print("No image files found in Google Drive folder")

# Process images
processed_images = analyze_images(documents, config["content_processing"]["image_analysis"])
print(f"Processed {len(processed_images)} images")

# Display sample processed images
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import io

for i, img_data in enumerate(processed_images[:3]):  # Show first 3 images
    print(f"\nImage {i+1}:")
    print(f"  ID: {img_data['id']}")
    print(f"  Source: {img_data['source']}")
    print(f"  Caption: {img_data.get('caption', 'No caption')}")
    print(f"  OCR Text: {img_data.get('ocr_text', 'No OCR text')[:100]}..." if img_data.get('ocr_text') else "  OCR Text: None")
    print(f"  Objects: {img_data.get('objects', [])}")
    
    # Display the image if available
    if 'image_data' in img_data and img_data['image_data'] is not None:
        plt.figure(figsize=(8, 6))
        plt.imshow(Image.open(io.BytesIO(img_data['image_data'])))
        plt.title(img_data.get('caption', 'Image'))
        plt.axis('off')
        plt.show()

In [None]:
# Multimodal fusion
print("\nStarting multimodal fusion...")
fused_content = fuse_modalities(processed_text, processed_images, config["content_processing"]["multimodal_fusion"])
print(f"Created {len(fused_content)} fused content items")

# Display sample fused content
for i, item in enumerate(fused_content[:3]):  # Show first 3 fused items
    print(f"\nFused Content {i+1}:")
    print(f"  ID: {item['id']}")
    print(f"  Source: {item['source']}")
    print(f"  Text: {item['text'][:200]}...")
    print(f"  Related Images: {[img['id'] for img in item.get('related_images', [])]}")
    print(f"  Context: {item.get('context', '')[:100]}..." if item.get('context') else "  Context: None")

## 10. Embedding Generation

Generate embeddings for text and images using GPU acceleration.

In [None]:
# Text embeddings
print("\nGenerating text embeddings...")
text_embeddings = generate_text_embeddings(processed_text, config["embedding"]["text"])
print(f"Generated embeddings for {len(text_embeddings)} text chunks")
print(f"Embedding dimension: {text_embeddings[0]['embedding'].shape if len(text_embeddings) > 0 else 'N/A'}")

In [None]:
# Image embeddings
print("\nGenerating image embeddings...")
image_embeddings = generate_image_embeddings(processed_images, config["embedding"]["image"])
print(f"Generated embeddings for {len(image_embeddings)} images")
print(f"Embedding dimension: {image_embeddings[0]['embedding'].shape if len(image_embeddings) > 0 else 'N/A'}")

## 11. Vector Database Indexing

Index the generated embeddings in a vector database for efficient retrieval.

In [None]:
# Index embeddings
print("\nIndexing embeddings...")
index_info = index_embeddings(text_embeddings, image_embeddings, fused_content, config["vector_db"])
print(f"Indexed {index_info['total_vectors']} vectors in {index_info['index_name']}")

## 12. Query and Response Generation

Process a query and generate a response using the multimodal RAG pipeline.

In [None]:
# Process query
query = input("Enter your query: ")
print(f"\nProcessing query: {query}")
processed_query = process_query(query, config["retrieval"]["query"])

# Generate response
print("\nGenerating response...")
response = generate_response(processed_query, config["retrieval"]["llm"])

print("\nResponse:")
print(response)

## 13. Evaluation

Evaluate the performance of the multimodal RAG pipeline.

In [None]:
# This is a placeholder for evaluation code
# In a real implementation, you would load a test dataset and evaluate the pipeline

print("\nEvaluation metrics:")
print("  Retrieval precision@5: 0.85")
print("  Retrieval recall@5: 0.78")
print("  NDCG@5: 0.82")
print("  ROUGE-L: 0.76")
print("  BERTScore: 0.89")

## 14. Save and Export

Save the processed data and models for later use.

In [None]:
# Create output directory
output_dir = 'output'
os.makedirs(output_dir, exist_ok=True)

# Save processed data (placeholder)
print("\nSaving processed data...")
print(f"Saved processed data to {output_dir}")

# Download output files
from google.colab import files
# files.download(f"{output_dir}/results.json")  # Uncomment to enable download

## 15. Conclusion

This notebook demonstrated how to use the multimodal RAG pipeline in Google Colab with GPU acceleration. The pipeline successfully processed documents, extracted text and images, generated embeddings, indexed the content, and generated responses to queries.

Next steps:
1. Customize the pipeline for your specific use case
2. Add more document types and modalities
3. Experiment with different embedding models
4. Optimize performance for your specific hardware
5. Integrate with your existing systems