# Multi-modal RAG 2025: Beyond Text - Images, PDFs, Audio & Video

Explore how to build RAG systems that work with images, documents, audio, and video content using modern LangChain capabilities.

## 🎯 What You'll Learn

- **Multi-modal Understanding** - Processing different content types
- **Image RAG** - Extract and search visual information
- **Document RAG** - Advanced PDF processing with layout awareness
- **Audio/Video RAG** - Transcription and content extraction
- **Unified Search** - Cross-modal retrieval and generation
- **Vision-Language Models** - Modern multimodal AI capabilities

## 📋 Prerequisites

- Completed previous RAG notebooks
- OpenAI API key (for vision models)
- Additional API keys for specialized services (optional)

## 🔧 Enhanced Setup for Multi-modal Processing

In [None]:
# Install packages for multi-modal RAG
%pip install --upgrade --quiet langchain-core langchain-community langchain-openai langgraph chromadb python-dotenv
%pip install --upgrade --quiet unstructured[all-docs] pillow pytesseract pdf2image
%pip install --upgrade --quiet opencv-python-headless whisper-openai python-magic-bin  # python-magic for Windows

In [None]:
import os
from dotenv import load_dotenv
import tempfile
import requests
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import base64
from io import BytesIO

# Core LangChain
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.documents import Document

# Multi-modal processing
from langchain_community.document_loaders import (
    UnstructuredPDFLoader,
    UnstructuredImageLoader,
    UnstructuredFileLoader
)

# Image processing
from PIL import Image
import cv2
import numpy as np

# Load environment
load_dotenv()

if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("Please set OPENAI_API_KEY in your .env file")

print("✅ Multi-modal setup complete!")
print("🎨 Image processing: PIL, OpenCV")
print("📄 Document processing: Unstructured")
print("🧠 Vision model: GPT-4 Vision")

## 🖼️ Image RAG: Visual Content Processing

Let's start with processing and understanding images.

### 📸 Image Analysis and Description

In [None]:
# Initialize vision-capable model
vision_llm = ChatOpenAI(model="gpt-4-vision-preview", max_tokens=1000)

def encode_image_to_base64(image_path_or_url: str) -> str:
    """Convert image to base64 for API consumption."""
    if image_path_or_url.startswith(('http://', 'https://')):
        # URL image
        response = requests.get(image_path_or_url)
        return base64.b64encode(response.content).decode('utf-8')
    else:
        # Local file
        with open(image_path_or_url, "rb") as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')

class ImageAnalyzer:
    """Analyze images using vision models."""
    
    def __init__(self, llm=None):
        self.llm = llm or vision_llm
        
        # Analysis prompt
        self.analysis_prompt = ChatPromptTemplate.from_messages([
            ("system", """
You are an expert at analyzing images and extracting detailed information.
Analyze the provided image and describe:

1. **Main Subject**: What is the primary focus of the image?
2. **Visual Elements**: Colors, composition, style, objects present
3. **Text Content**: Any text visible in the image (OCR)
4. **Context & Setting**: Where/when might this be from?
5. **Technical Details**: Image quality, type, notable features
6. **Searchable Keywords**: Key terms for semantic search

Provide a comprehensive description that would be useful for:
- Semantic search and retrieval
- Answering questions about the image content
- Understanding the visual context
"""),
            ("human", [
                {
                    "type": "text",
                    "text": "Please analyze this image in detail:"
                },
                {
                    "type": "image_url",
                    "image_url": {
                        "url": "data:image/jpeg;base64,{image_data}"
                    }
                }
            ])
        ])
    
    def analyze_image(self, image_path_or_url: str, metadata: Dict[str, Any] = None) -> Document:
        """Analyze an image and return a document with analysis."""
        try:
            # Encode image
            image_base64 = encode_image_to_base64(image_path_or_url)
            
            # Create the message with the image
            messages = self.analysis_prompt.format_messages(image_data=image_base64)
            
            # Get analysis
            response = self.llm.invoke(messages)
            
            # Create document
            doc_metadata = {
                "source": image_path_or_url,
                "type": "image",
                "analysis_model": self.llm.model_name,
                **(metadata or {})
            }
            
            return Document(
                page_content=response.content,
                metadata=doc_metadata
            )
            
        except Exception as e:
            print(f"❌ Image analysis failed: {e}")
            return Document(
                page_content=f"Failed to analyze image: {str(e)}",
                metadata={"source": image_path_or_url, "type": "image", "error": str(e)}
            )
    
    def analyze_multiple_images(self, image_paths: List[str]) -> List[Document]:
        """Analyze multiple images."""
        documents = []
        
        for i, image_path in enumerate(image_paths):
            print(f"🖼️ Analyzing image {i+1}/{len(image_paths)}: {Path(image_path).name}")
            doc = self.analyze_image(image_path, {"image_index": i})
            documents.append(doc)
        
        return documents

# Create image analyzer
image_analyzer = ImageAnalyzer()
print("📸 Image analyzer ready")

### 🧪 Test Image Analysis

In [None]:
# Let's test with some sample images (using URLs for demo)
sample_images = [
    "https://upload.wikimedia.org/wikipedia/commons/thumb/4/47/PNG_transparency_demonstration_1.png/280px-PNG_transparency_demonstration_1.png",
    "https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Vd-Orig.svg/256px-Vd-Orig.svg.png"
]

print("🧪 Testing image analysis with sample images...")

try:
    # Analyze first image
    test_doc = image_analyzer.analyze_image(sample_images[0])
    print(f"\n✅ Image Analysis Result:")
    print(f"Source: {test_doc.metadata['source']}")
    print(f"Analysis: {test_doc.page_content[:300]}...")
    
except Exception as e:
    print(f"⚠️ Image analysis test failed: {e}")
    print("💡 This might be due to API limitations or network issues.")
    print("💡 In production, ensure you have access to vision models.")
    
    # Create mock document for demonstration
    test_doc = Document(
        page_content="This is a demonstration image showing PNG transparency with a checkered pattern background. The image contains geometric shapes and demonstrates alpha channel transparency capabilities.",
        metadata={"source": sample_images[0], "type": "image", "mock": True}
    )
    print("\n🔄 Using mock analysis for demo purposes")

## 📄 Advanced Document Processing

Beyond simple text extraction - understand document structure, tables, and visual elements.

### 🗂️ Intelligent PDF Processing

In [None]:
class AdvancedDocumentProcessor:
    """Advanced document processing with layout awareness."""
    
    def __init__(self):
        self.supported_types = {
            '.pdf': self._process_pdf,
            '.docx': self._process_docx,
            '.pptx': self._process_pptx,
            '.txt': self._process_text,
            '.md': self._process_text
        }
    
    def process_document(self, file_path: str) -> List[Document]:
        """Process a document with structure awareness."""
        file_path = Path(file_path)
        extension = file_path.suffix.lower()
        
        if extension not in self.supported_types:
            raise ValueError(f"Unsupported file type: {extension}")
        
        processor = self.supported_types[extension]
        return processor(str(file_path))
    
    def _process_pdf(self, file_path: str) -> List[Document]:
        """Process PDF with advanced structure extraction."""
        try:
            # Use Unstructured for advanced PDF processing
            loader = UnstructuredPDFLoader(
                file_path,
                mode="elements",  # Extract individual elements
                strategy="hi_res"  # High resolution processing
            )
            
            elements = loader.load()
            
            # Group elements by type and enhance metadata
            documents = []
            for element in elements:
                # Enhance metadata with element information
                enhanced_metadata = {
                    "source": file_path,
                    "type": "pdf",
                    "element_type": element.metadata.get("category", "unknown"),
                    "page_number": element.metadata.get("page_number", 1),
                    **element.metadata
                }
                
                doc = Document(
                    page_content=element.page_content,
                    metadata=enhanced_metadata
                )
                documents.append(doc)
            
            return documents
            
        except Exception as e:
            print(f"⚠️ Advanced PDF processing failed: {e}")
            # Fallback to simple processing
            return self._simple_pdf_fallback(file_path)
    
    def _simple_pdf_fallback(self, file_path: str) -> List[Document]:
        """Simple PDF fallback processing."""
        try:
            loader = UnstructuredPDFLoader(file_path)
            docs = loader.load()
            
            # Add basic metadata
            for doc in docs:
                doc.metadata.update({
                    "type": "pdf",
                    "processing": "fallback"
                })
            
            return docs
        except Exception as e:
            print(f"❌ PDF processing completely failed: {e}")
            return [Document(
                page_content=f"Failed to process PDF: {file_path}",
                metadata={"source": file_path, "type": "pdf", "error": str(e)}
            )]
    
    def _process_docx(self, file_path: str) -> List[Document]:
        """Process Word documents."""
        try:
            loader = UnstructuredFileLoader(file_path)
            docs = loader.load()
            
            for doc in docs:
                doc.metadata.update({"type": "docx"})
            
            return docs
        except Exception as e:
            return [Document(
                page_content=f"Failed to process DOCX: {str(e)}",
                metadata={"source": file_path, "type": "docx", "error": str(e)}
            )]
    
    def _process_pptx(self, file_path: str) -> List[Document]:
        """Process PowerPoint presentations."""
        try:
            loader = UnstructuredFileLoader(file_path)
            docs = loader.load()
            
            for doc in docs:
                doc.metadata.update({"type": "pptx"})
            
            return docs
        except Exception as e:
            return [Document(
                page_content=f"Failed to process PPTX: {str(e)}",
                metadata={"source": file_path, "type": "pptx", "error": str(e)}
            )]
    
    def _process_text(self, file_path: str) -> List[Document]:
        """Process plain text files."""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            return [Document(
                page_content=content,
                metadata={
                    "source": file_path,
                    "type": Path(file_path).suffix[1:],  # Remove the dot
                    "length": len(content)
                }
            )]
        except Exception as e:
            return [Document(
                page_content=f"Failed to process text file: {str(e)}",
                metadata={"source": file_path, "type": "text", "error": str(e)}
            )]

# Create document processor
doc_processor = AdvancedDocumentProcessor()
print("📄 Advanced document processor ready")
print(f"   Supported types: {list(doc_processor.supported_types.keys())}")

## 🎧 Audio and Video Processing

Extract and process content from audio and video files.

In [None]:
class AudioVideoProcessor:
    """Process audio and video files for RAG."""
    
    def __init__(self, openai_api_key: str = None):
        self.openai_api_key = openai_api_key or os.getenv("OPENAI_API_KEY")
    
    def process_audio(self, audio_path: str, metadata: Dict[str, Any] = None) -> Document:
        """Process audio file using Whisper API."""
        try:
            import openai
            
            # Initialize OpenAI client
            client = openai.OpenAI(api_key=self.openai_api_key)
            
            # Transcribe audio
            with open(audio_path, "rb") as audio_file:
                transcript = client.audio.transcriptions.create(
                    model="whisper-1", 
                    file=audio_file,
                    response_format="verbose_json"
                )
            
            # Create document with enhanced metadata
            doc_metadata = {
                "source": audio_path,
                "type": "audio",
                "duration": transcript.duration if hasattr(transcript, 'duration') else None,
                "language": transcript.language if hasattr(transcript, 'language') else None,
                "transcription_model": "whisper-1",
                **(metadata or {})
            }
            
            return Document(
                page_content=transcript.text,
                metadata=doc_metadata
            )
            
        except Exception as e:
            print(f"❌ Audio processing failed: {e}")
            return Document(
                page_content=f"Failed to process audio: {str(e)}",
                metadata={"source": audio_path, "type": "audio", "error": str(e)}
            )
    
    def process_video(self, video_path: str, extract_audio: bool = True, extract_frames: bool = False) -> List[Document]:
        """Process video file - extract audio transcription and optionally analyze frames."""
        documents = []
        
        if extract_audio:
            # Extract audio and transcribe
            try:
                # Use ffmpeg to extract audio (this is a simplified example)
                import subprocess
                
                audio_path = video_path.replace('.mp4', '_audio.wav').replace('.mov', '_audio.wav')
                
                # Extract audio using ffmpeg (requires ffmpeg installation)
                subprocess.run([
                    'ffmpeg', '-i', video_path, '-q:a', '0', '-map', 'a', audio_path, '-y'
                ], check=True, capture_output=True)
                
                # Process the extracted audio
                audio_doc = self.process_audio(audio_path, {"extracted_from_video": True})
                documents.append(audio_doc)
                
                # Clean up temporary audio file
                os.remove(audio_path)
                
            except Exception as e:
                print(f"⚠️ Video audio extraction failed: {e}")
                documents.append(Document(
                    page_content=f"Failed to extract audio from video: {str(e)}",
                    metadata={"source": video_path, "type": "video_audio", "error": str(e)}
                ))
        
        if extract_frames:
            # Extract and analyze key frames
            try:
                frame_docs = self._extract_and_analyze_frames(video_path)
                documents.extend(frame_docs)
            except Exception as e:
                print(f"⚠️ Video frame extraction failed: {e}")
        
        return documents
    
    def _extract_and_analyze_frames(self, video_path: str, num_frames: int = 5) -> List[Document]:
        """Extract key frames from video and analyze them."""
        try:
            # Use OpenCV to extract frames
            cap = cv2.VideoCapture(video_path)
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            frame_documents = []
            
            # Extract frames at regular intervals
            for i in range(num_frames):
                frame_number = int((i + 1) * total_frames / (num_frames + 1))
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
                
                ret, frame = cap.read()
                if ret:
                    # Save frame temporarily
                    temp_frame_path = f"temp_frame_{i}.jpg"
                    cv2.imwrite(temp_frame_path, frame)
                    
                    # Analyze frame with vision model
                    frame_doc = image_analyzer.analyze_image(
                        temp_frame_path,
                        {
                            "source_video": video_path,
                            "frame_number": frame_number,
                            "frame_index": i
                        }
                    )
                    
                    frame_documents.append(frame_doc)
                    
                    # Clean up temporary frame
                    os.remove(temp_frame_path)
            
            cap.release()
            return frame_documents
            
        except Exception as e:
            print(f"❌ Frame extraction failed: {e}")
            return []

# Create audio/video processor
av_processor = AudioVideoProcessor()
print("🎧 Audio/Video processor ready")
print("   📝 Supports: Audio transcription, Video frame extraction")
print("   ⚠️  Note: Requires ffmpeg for video processing")

## 🌐 Unified Multi-modal RAG System

Combine all content types into a unified RAG system.

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

class MultimodalRAGSystem:
    """Unified RAG system handling multiple content types."""
    
    def __init__(self):
        self.embeddings = OpenAIEmbeddings()
        self.llm = ChatOpenAI(model="gpt-4-vision-preview", max_tokens=1000)
        
        # Processors
        self.image_analyzer = ImageAnalyzer()
        self.doc_processor = AdvancedDocumentProcessor()
        self.av_processor = AudioVideoProcessor()
        
        # Text splitter for long documents
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            add_start_index=True
        )
        
        # Vector store
        self.vectorstore = None
        self.documents = []
    
    def add_content(self, content_path: str, content_type: str = None) -> int:
        """Add content of various types to the knowledge base."""
        content_path = Path(content_path)
        
        if content_type is None:
            # Auto-detect content type
            content_type = self._detect_content_type(content_path)
        
        print(f"🔄 Processing {content_type} content: {content_path.name}")
        
        try:
            if content_type == "image":
                docs = [self.image_analyzer.analyze_image(str(content_path))]
            elif content_type == "document":
                docs = self.doc_processor.process_document(str(content_path))
            elif content_type == "audio":
                docs = [self.av_processor.process_audio(str(content_path))]
            elif content_type == "video":
                docs = self.av_processor.process_video(str(content_path))
            else:
                raise ValueError(f"Unsupported content type: {content_type}")
            
            # Split long documents
            split_docs = []
            for doc in docs:
                if len(doc.page_content) > 1000:
                    chunks = self.text_splitter.split_documents([doc])
                    split_docs.extend(chunks)
                else:
                    split_docs.append(doc)
            
            self.documents.extend(split_docs)
            print(f"✅ Added {len(split_docs)} document chunks")
            
            return len(split_docs)
            
        except Exception as e:
            print(f"❌ Failed to process {content_path}: {e}")
            return 0
    
    def _detect_content_type(self, content_path: Path) -> str:
        """Auto-detect content type from file extension."""
        extension = content_path.suffix.lower()
        
        image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'}
        document_extensions = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
        audio_extensions = {'.mp3', '.wav', '.m4a', '.flac', '.ogg'}
        video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.webm'}
        
        if extension in image_extensions:
            return "image"
        elif extension in document_extensions:
            return "document"
        elif extension in audio_extensions:
            return "audio"
        elif extension in video_extensions:
            return "video"
        else:
            return "unknown"
    
    def build_index(self):
        """Build the vector index from all documents."""
        if not self.documents:
            raise ValueError("No documents to index. Add content first.")
        
        print(f"🏗️ Building index from {len(self.documents)} documents...")
        
        # Create vector store
        self.vectorstore = Chroma.from_documents(
            documents=self.documents,
            embedding=self.embeddings,
            persist_directory=tempfile.mkdtemp(prefix="multimodal_rag_")
        )
        
        print("✅ Multimodal index built successfully")
    
    def search(self, query: str, k: int = 5, filter_by_type: str = None) -> List[Document]:
        """Search across all content types."""
        if not self.vectorstore:
            raise ValueError("Index not built. Call build_index() first.")
        
        search_kwargs = {"k": k}
        
        # Add type filter if specified
        if filter_by_type:
            search_kwargs["filter"] = {"type": filter_by_type}
        
        return self.vectorstore.similarity_search(query, **search_kwargs)
    
    def query(self, question: str, include_sources: bool = True) -> Dict[str, Any]:
        """Query the multimodal RAG system."""
        if not self.vectorstore:
            raise ValueError("Index not built. Call build_index() first.")
        
        # Retrieve relevant documents
        relevant_docs = self.search(question)
        
        # Prepare context
        context_parts = []
        sources = []
        
        for doc in relevant_docs:
            content_type = doc.metadata.get("type", "unknown")
            source = doc.metadata.get("source", "Unknown")
            
            context_parts.append(
                f"[{content_type.upper()} CONTENT from {Path(source).name}]\n{doc.page_content}"
            )
            
            if include_sources:
                sources.append({
                    "type": content_type,
                    "source": source,
                    "preview": doc.page_content[:150] + "..."
                })
        
        context = "\n\n".join(context_parts)
        
        # Generate response
        prompt = ChatPromptTemplate.from_template(
            """
You are an expert assistant with access to multimodal content including text, images, audio, and video.

Answer the user's question based on the provided context from various content types.
Be specific about which type of content your answer comes from when relevant.

Context:
{context}

Question: {question}

Answer:
"""
        )
        
        try:
            response = self.llm.invoke(prompt.format(context=context, question=question))
            
            return {
                "answer": response.content,
                "sources": sources if include_sources else [],
                "context_types": list(set(doc.metadata.get("type", "unknown") for doc in relevant_docs))
            }
            
        except Exception as e:
            return {
                "error": f"Failed to generate response: {e}",
                "sources": sources if include_sources else []
            }
    
    def get_statistics(self) -> Dict[str, Any]:
        """Get statistics about the knowledge base."""
        if not self.documents:
            return {"total_documents": 0}
        
        type_counts = {}
        total_content_length = 0
        
        for doc in self.documents:
            content_type = doc.metadata.get("type", "unknown")
            type_counts[content_type] = type_counts.get(content_type, 0) + 1
            total_content_length += len(doc.page_content)
        
        return {
            "total_documents": len(self.documents),
            "content_types": type_counts,
            "total_content_length": total_content_length,
            "indexed": self.vectorstore is not None
        }

# Create multimodal RAG system
multimodal_rag = MultimodalRAGSystem()
print("🌐 Multimodal RAG system initialized")

## 🧪 Multimodal RAG Demo

Let's demonstrate the system with different content types:

In [None]:
# Demo: Add sample content (using mock data for demonstration)
print("🎮 Multimodal RAG Demo")
print("=" * 50)

# Add sample text document
sample_text = """
# AI Agents Architecture

Modern AI agents consist of several key components:

## Core Components
1. **Reasoning Engine**: Handles logical inference and decision-making
2. **Memory System**: Stores both short-term and long-term information
3. **Tool Interface**: Allows interaction with external systems
4. **Planning Module**: Breaks down complex tasks into manageable steps

## Task Decomposition Methods
- Chain of Thought (CoT): Sequential reasoning steps
- Tree of Thoughts (ToT): Exploring multiple reasoning paths
- Hierarchical Task Networks (HTN): Multi-level planning

These components work together to create intelligent, autonomous systems capable of complex problem-solving.
"""

# Create a temporary text file
temp_text_file = "temp_ai_agents.md"
with open(temp_text_file, "w") as f:
    f.write(sample_text)

# Add to multimodal RAG
try:
    added_docs = multimodal_rag.add_content(temp_text_file, "document")
    print(f"📝 Added text document: {added_docs} chunks")
except Exception as e:
    print(f"⚠️ Text processing failed: {e}")

# Add sample image analysis (mock)
try:
    # Create a mock image document since we might not have actual image processing
    mock_image_doc = Document(
        page_content="""This image shows a diagram of an AI agent architecture. 
        The diagram contains several connected boxes representing different components:
        - A central 'Reasoning Engine' in blue
        - Connected modules for 'Memory', 'Planning', and 'Tools'
        - Arrows showing data flow between components
        - The overall layout suggests a modular, interconnected system
        - Text labels are in Arial font, with a clean technical diagram style
        - Color scheme uses blues and grays for a professional appearance""",
        metadata={"source": "ai_agent_diagram.png", "type": "image", "mock": True}
    )
    multimodal_rag.documents.append(mock_image_doc)
    print("🖼️ Added image analysis (mock)")
except Exception as e:
    print(f"⚠️ Image processing failed: {e}")

# Add mock audio transcription
try:
    mock_audio_doc = Document(
        page_content="""Welcome to this presentation on AI agents. In this session, we'll explore 
        the fundamental components that make up modern artificial intelligence agents. 
        First, let's discuss the reasoning engine, which serves as the brain of the agent. 
        The reasoning engine processes information, makes decisions, and determines the best 
        course of action. Next, we have the memory system, which stores both immediate context 
        and long-term knowledge. This is crucial for maintaining coherent conversations and 
        learning from past experiences. The planning module is responsible for breaking down 
        complex tasks into manageable steps, using techniques like hierarchical task networks.""",
        metadata={"source": "ai_agents_lecture.mp3", "type": "audio", "mock": True, "duration": 180}
    )
    multimodal_rag.documents.append(mock_audio_doc)
    print("🎧 Added audio transcription (mock)")
except Exception as e:
    print(f"⚠️ Audio processing failed: {e}")

# Show statistics
stats = multimodal_rag.get_statistics()
print(f"\n📊 Knowledge Base Statistics:")
print(f"   Total Documents: {stats['total_documents']}")
print(f"   Content Types: {stats['content_types']}")
print(f"   Total Content Length: {stats['total_content_length']:,} characters")

# Clean up
os.remove(temp_text_file)

In [None]:
# Build the multimodal index
print("🏗️ Building multimodal search index...")
try:
    multimodal_rag.build_index()
    print("✅ Index built successfully")
except Exception as e:
    print(f"❌ Index building failed: {e}")
    print("💡 This might be due to lack of documents or API issues")

In [None]:
# Test multimodal queries
if multimodal_rag.vectorstore:
    print("🔍 Testing Multimodal Queries\n")
    
    test_queries = [
        "What are the main components of an AI agent?",
        "Can you describe what the diagram shows?",
        "What did the speaker say about memory systems?",
        "How do reasoning engines work according to the available content?"
    ]
    
    for i, query in enumerate(test_queries, 1):
        print(f"❓ Query {i}: {query}")
        
        try:
            result = multimodal_rag.query(query)
            
            if "error" in result:
                print(f"   ❌ Error: {result['error']}")
            else:
                print(f"   🤖 Answer: {result['answer'][:200]}...")
                print(f"   📚 Sources: {result['context_types']}")
                
                if result['sources']:
                    print(f"   🔗 Source Details:")
                    for source in result['sources'][:2]:  # Show first 2 sources
                        print(f"      - {source['type']}: {source['source']}")
        
        except Exception as e:
            print(f"   ❌ Query failed: {e}")
        
        print()
else:
    print("⚠️ Skipping queries - index not built")

## 🔧 Advanced Multimodal Techniques

### 🎯 Cross-Modal Retrieval

In [None]:
class CrossModalRetriever:
    """Advanced retriever for cross-modal queries."""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
    
    def retrieve_cross_modal(self, query: str, k: int = 8) -> Dict[str, List[Document]]:
        """Retrieve documents across different modalities."""
        if not self.vectorstore:
            return {"error": "Vector store not initialized"}
        
        # Get all relevant documents
        all_docs = self.vectorstore.similarity_search(query, k=k*2)
        
        # Group by content type
        grouped_docs = {
            "text": [],
            "image": [],
            "audio": [],
            "video": [],
            "document": []
        }
        
        for doc in all_docs:
            content_type = doc.metadata.get("type", "document")
            if content_type in grouped_docs:
                grouped_docs[content_type].append(doc)
            else:
                grouped_docs["document"].append(doc)
        
        # Balance the results across modalities
        balanced_results = {}
        for content_type, docs in grouped_docs.items():
            if docs:
                balanced_results[content_type] = docs[:k//len([k for k, v in grouped_docs.items() if v])]
        
        return balanced_results
    
    def synthesize_cross_modal_response(self, query: str, grouped_docs: Dict[str, List[Document]]) -> str:
        """Generate response synthesizing information from multiple modalities."""
        
        # Prepare context from different modalities
        context_parts = []
        
        for content_type, docs in grouped_docs.items():
            if docs:
                type_content = f"\n=== {content_type.upper()} SOURCES ===\n"
                for doc in docs:
                    type_content += f"Source: {doc.metadata.get('source', 'Unknown')}\n"
                    type_content += f"Content: {doc.page_content[:300]}...\n\n"
                
                context_parts.append(type_content)
        
        context = "\n".join(context_parts)
        
        # Generate synthesized response
        prompt = ChatPromptTemplate.from_template(
            """
You are an expert at synthesizing information from multiple types of content.

Given content from text documents, images, audio, and video sources, provide a comprehensive 
answer that draws insights from all available modalities.

Specifically mention which types of content contributed to your answer and how they complement each other.

Available Content:
{context}

Question: {query}

Comprehensive Answer:
"""
        )
        
        try:
            response = self.llm.invoke(prompt.format(context=context, query=query))
            return response.content
        except Exception as e:
            return f"Error generating cross-modal response: {e}"

# Create cross-modal retriever
if multimodal_rag.vectorstore:
    cross_modal_retriever = CrossModalRetriever(multimodal_rag.vectorstore, multimodal_rag.llm)
    print("🎯 Cross-modal retriever initialized")
    
    # Test cross-modal retrieval
    test_query = "Explain AI agent architecture using all available information"
    
    print(f"\n🔍 Cross-Modal Query: {test_query}")
    
    grouped_results = cross_modal_retriever.retrieve_cross_modal(test_query)
    
    print("📊 Cross-Modal Results:")
    for content_type, docs in grouped_results.items():
        if docs:
            print(f"   {content_type}: {len(docs)} documents")
    
    # Generate synthesized response
    synthesized_response = cross_modal_retriever.synthesize_cross_modal_response(test_query, grouped_results)
    print(f"\n🧠 Synthesized Response:")
    print(f"{synthesized_response[:500]}...")
else:
    print("⚠️ Cross-modal retriever not available - vector store not built")

## 🚀 Production Considerations for Multimodal RAG

In [None]:
class ProductionMultimodalRAG:
    """Production-ready multimodal RAG with optimizations."""
    
    def __init__(self):
        self.content_cache = {}  # Cache processed content
        self.processing_stats = {
            "images_processed": 0,
            "documents_processed": 0,
            "audio_processed": 0,
            "video_processed": 0,
            "total_processing_time": 0
        }
        
        # Cost tracking (approximate)
        self.cost_estimates = {
            "vision_api_calls": 0,
            "whisper_api_calls": 0,
            "embedding_tokens": 0
        }
    
    def process_content_batch(self, content_paths: List[str], batch_size: int = 5) -> Dict[str, Any]:
        """Process multiple content items efficiently."""
        import time
        
        results = {
            "processed": 0,
            "failed": 0,
            "skipped_cache": 0,
            "processing_time": 0
        }
        
        start_time = time.time()
        
        # Process in batches to avoid overwhelming APIs
        for i in range(0, len(content_paths), batch_size):
            batch = content_paths[i:i+batch_size]
            
            print(f"🔄 Processing batch {i//batch_size + 1}/{(len(content_paths) + batch_size - 1)//batch_size}")
            
            for content_path in batch:
                try:
                    # Check cache first
                    cache_key = str(content_path)
                    if cache_key in self.content_cache:
                        results["skipped_cache"] += 1
                        continue
                    
                    # Process content
                    content_type = self._detect_content_type(Path(content_path))
                    
                    if content_type == "image":
                        self.cost_estimates["vision_api_calls"] += 1
                        self.processing_stats["images_processed"] += 1
                    elif content_type == "audio":
                        self.cost_estimates["whisper_api_calls"] += 1
                        self.processing_stats["audio_processed"] += 1
                    
                    # Cache result (in production, use persistent cache)
                    self.content_cache[cache_key] = {
                        "processed_at": time.time(),
                        "content_type": content_type
                    }
                    
                    results["processed"] += 1
                    
                except Exception as e:
                    print(f"⚠️ Failed to process {content_path}: {e}")
                    results["failed"] += 1
            
            # Add delay between batches to respect rate limits
            if i + batch_size < len(content_paths):
                time.sleep(1)  # 1 second delay
        
        results["processing_time"] = time.time() - start_time
        self.processing_stats["total_processing_time"] += results["processing_time"]
        
        return results
    
    def _detect_content_type(self, content_path: Path) -> str:
        """Detect content type from extension."""
        extension = content_path.suffix.lower()
        
        type_mappings = {
            'image': {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'},
            'document': {'.pdf', '.docx', '.pptx', '.txt', '.md'},
            'audio': {'.mp3', '.wav', '.m4a', '.flac', '.ogg'},
            'video': {'.mp4', '.mov', '.avi', '.mkv', '.webm'}
        }
        
        for content_type, extensions in type_mappings.items():
            if extension in extensions:
                return content_type
        
        return 'unknown'
    
    def estimate_costs(self) -> Dict[str, float]:
        """Estimate API costs (approximate)."""
        # Rough cost estimates (as of 2025)
        cost_per_call = {
            "vision_api_calls": 0.01,  # $0.01 per image
            "whisper_api_calls": 0.006,  # $0.006 per minute
            "embedding_tokens": 0.0001 / 1000  # $0.0001 per 1K tokens
        }
        
        estimated_costs = {}
        total_cost = 0
        
        for service, count in self.cost_estimates.items():
            cost = count * cost_per_call[service]
            estimated_costs[service] = cost
            total_cost += cost
        
        estimated_costs["total"] = total_cost
        return estimated_costs
    
    def get_performance_report(self) -> Dict[str, Any]:
        """Get comprehensive performance report."""
        return {
            "processing_stats": self.processing_stats,
            "cost_estimates": self.estimate_costs(),
            "cache_stats": {
                "items_cached": len(self.content_cache),
                "cache_hit_ratio": len([v for v in self.content_cache.values() if v]) / max(1, len(self.content_cache))
            }
        }

# Create production system
prod_multimodal = ProductionMultimodalRAG()

# Demo with mock data
mock_content_paths = [
    "image1.jpg", "image2.png", "document1.pdf", 
    "audio1.mp3", "video1.mp4"
]

print("🏭 Production Multimodal RAG System")
print("=" * 40)

# Test batch processing
batch_results = prod_multimodal.process_content_batch(mock_content_paths)
print(f"📊 Batch Processing Results:")
for key, value in batch_results.items():
    print(f"   {key}: {value}")

# Get performance report
report = prod_multimodal.get_performance_report()
print(f"\n📈 Performance Report:")
print(f"   Processing Stats: {report['processing_stats']}")
print(f"   Estimated Costs: ${report['cost_estimates']['total']:.4f}")
print(f"   Cache Stats: {report['cache_stats']}")

## 🎓 Key Takeaways: Multimodal RAG

### ✅ What You've Built

1. **Image Analysis System** - Extract and understand visual content
2. **Advanced Document Processing** - Structure-aware PDF/document handling
3. **Audio/Video Processing** - Transcription and frame analysis
4. **Unified Multimodal Search** - Cross-modal retrieval and synthesis
5. **Production Optimizations** - Batch processing, caching, cost estimation

### 🧠 Multimodal RAG Best Practices

**Content Processing:**
- Use high-resolution processing for documents when layout matters
- Extract key frames from videos rather than processing every frame
- Cache expensive operations (vision API, transcription)
- Implement fallback processing for when advanced methods fail

**Search & Retrieval:**
- Balance results across different modalities
- Use content-type aware metadata for filtering
- Implement cross-modal understanding (text query → image results)

**Cost Management:**
- Monitor API usage across vision, transcription, and embedding services
- Implement intelligent caching strategies
- Use batch processing to optimize throughput
- Consider using local models for cost-sensitive applications

### 🔮 Advanced Techniques

**1. Multimodal Embedding Models**
- Use models like CLIP that understand both text and images
- Enable direct image-text similarity without separate processing

**2. Layout-Aware Document Processing**
- Preserve table structures and formatting
- Extract and analyze charts, graphs, and diagrams
- Understand document hierarchies and sections

**3. Real-time Multimodal Processing**
- Stream processing for live audio/video
- Incremental indexing for new content
- Hot/cold storage for different access patterns

### 🚀 Production Deployment

**Infrastructure:**
- Separate processing pipelines for different content types
- GPU instances for local vision/audio models
- CDN for serving processed content

**Monitoring:**
- Track processing success rates by content type
- Monitor API costs and usage patterns
- Alert on processing failures or quality degradation

**Scalability:**
- Queue-based processing for large content volumes
- Distributed vector stores for large-scale deployment
- Content deduplication to avoid redundant processing

You now have the foundation for building sophisticated multimodal RAG systems! 🎉