In [None]:
!pip install gradio Pillow PyMuPDF requests chromadb langchain langchain-google-genai beautifulsoup4 google-generativeai

Collecting PyMuPDF
  Downloading pymupdf-1.26.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (3.4 kB)
Collecting google-ai-generativelanguage<0.7.0,>=0.6.18 (from langchain-google-genai)
  Using cached google_ai_generativelanguage-0.6.18-py3-none-any.whl.metadata (9.8 kB)
INFO: pip is looking at multiple versions of google-generativeai to determine which version is compatible with other requirements. This could take a while.
Collecting google-generativeai
  Downloading google_generativeai-0.8.4-py3-none-any.whl.metadata (4.2 kB)
  Downloading google_generativeai-0.8.3-py3-none-any.whl.metadata (3.9 kB)
  Downloading google_generativeai-0.8.2-py3-none-any.whl.metadata (3.9 kB)
  Downloading google_generativeai-0.8.1-py3-none-any.whl.metadata (3.9 kB)
  Downloading google_generativeai-0.8.0-py3-none-any.whl.metadata (3.9 kB)
  Downloading google_generativeai-0.7.2-py3-none-any.whl.metadata (4.0 kB)
  Downloading google_generativeai-0.7.1-py3-none-any.whl.metadata (3.

In [None]:
export GEMINI_API_KEY="AIzaSyCG4ZCEKzgjokW0fdgAdXFjcXvVsUOnrSo"

SyntaxError: invalid syntax (<ipython-input-5-8ae5bc3d880f>, line 1)

In [None]:
# -*- coding: utf-8 -*-
"""Enhanced Multimodal RAG System with URL and PDF Processing - FIXED VERSION
Includes interactive Gradio chatbot with Gemini 2.0 Flash
"""
import os
import uuid
import base64
import requests
from typing import List, Dict, Any, Optional
import gradio as gr
from io import BytesIO
from PIL import Image
import tempfile
import logging
import fitz  # PyMuPDF for better PDF handling
import re
from pathlib import Path

# Core libraries
import chromadb
from langchain.vectorstores import Chroma
from langchain.storage import InMemoryStore
from langchain.schema.document import Document
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.messages import HumanMessage

# For URL processing
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup

# Gemini imports
import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MultimodalRAGSystem:
    def __init__(self,
                 gemini_api_key: str = None,
                 persist_directory: str = "./chroma_db"):
        """
        Initialize the Multimodal RAG System

        Args:
            gemini_api_key: Gemini API key for embeddings and generation
            persist_directory: Directory to persist ChromaDB
        """
        # Set API key
        if gemini_api_key and gemini_api_key != "your-gemini-api-key-here":
            os.environ["GOOGLE_API_KEY"] = gemini_api_key
            genai.configure(api_key=gemini_api_key)
        else:
            raise ValueError("Please provide a valid GEMINI_API_KEY.")

        # Initialize vector store with GoogleGenerativeAIEmbeddings
        try:
            self.embedding_function = GoogleGenerativeAIEmbeddings(
                model="models/embedding-001",
                google_api_key=gemini_api_key
            )
            self.vectorstore = Chroma(
                collection_name="multimodal_rag",
                embedding_function=self.embedding_function,
                persist_directory=persist_directory
            )
        except Exception as e:
            logger.error(f"Error initializing embeddings: {e}")
            raise

        # Initialize storage and retriever
        self.store = InMemoryStore()
        self.id_key = "doc_id"
        self.retriever = MultiVectorRetriever(
            vectorstore=self.vectorstore,
            docstore=self.store,
            id_key=self.id_key,
        )

        # Initialize Gemini models
        try:
            self.llm = ChatGoogleGenerativeAI(
                model="gemini-2.0-flash",  # Using stable model
                temperature=0.3,
                google_api_key=gemini_api_key,
                convert_system_message_to_human=True
            )

            self.summarize_model = ChatGoogleGenerativeAI(
                model="gemini-2.0-flash",
                temperature=0.3,
                google_api_key=gemini_api_key
            )
        except Exception as e:
            logger.error(f"Error initializing Gemini models: {e}")
            raise

        # Storage for processed documents
        self.processed_docs = []

        logger.info("MultimodalRAGSystem initialized successfully")

    def extract_from_pdf(self, pdf_path: str) -> Dict[str, List]:
        """
        Extract text, tables, and images from PDF using PyMuPDF

        Args:
            pdf_path: Path to PDF file

        Returns:
            Dictionary containing extracted elements
        """
        try:
            logger.info(f"Processing PDF: {pdf_path}")

            # Open PDF
            doc = fitz.open(pdf_path)

            texts = []
            images = []
            tables = []

            for page_num in range(len(doc)):
                page = doc.load_page(page_num)

                # Extract text
                text_content = page.get_text()
                if text_content.strip():
                    # Split into chunks
                    chunks = self._split_text_into_chunks(text_content, max_chars=2000)
                    for chunk in chunks:
                        texts.append(Document(
                            page_content=chunk,
                            metadata={"page": page_num + 1, "source": os.path.basename(pdf_path)}
                        ))

                # Extract images
                image_list = page.get_images()
                for img_index, img in enumerate(image_list):
                    try:
                        # Get image
                        xref = img[0]
                        pix = fitz.Pixmap(doc, xref)

                        if pix.n - pix.alpha < 4:  # GRAY or RGB
                            img_data = pix.tobytes("png")
                            img_b64 = base64.b64encode(img_data).decode()
                            images.append(img_b64)

                        pix = None  # Free memory

                        if len(images) >= 20:  # Limit images
                            break

                    except Exception as e:
                        logger.warning(f"Failed to extract image {img_index} from page {page_num}: {e}")
                        continue

                # Extract tables (basic table detection)
                tables_on_page = page.find_tables()
                for table in tables_on_page:
                    try:
                        table_data = table.extract()
                        if table_data:
                            # Convert table to text representation
                            table_text = self._table_to_text(table_data)
                            tables.append(Document(
                                page_content=table_text,
                                metadata={"page": page_num + 1, "source": os.path.basename(pdf_path), "type": "table"}
                            ))
                    except Exception as e:
                        logger.warning(f"Failed to extract table from page {page_num}: {e}")
                        continue

            doc.close()

            logger.info(f"Extracted {len(texts)} text chunks, {len(tables)} tables, {len(images)} images from PDF")

            return {
                "texts": texts,
                "tables": tables,
                "images": images,
                "source": os.path.basename(pdf_path),
                "type": "pdf"
            }

        except Exception as e:
            logger.error(f"Error processing PDF {pdf_path}: {str(e)}")
            return {"texts": [], "tables": [], "images": [], "source": os.path.basename(pdf_path), "type": "pdf"}

    def _split_text_into_chunks(self, text: str, max_chars: int = 2000) -> List[str]:
        """Split text into manageable chunks"""
        if len(text) <= max_chars:
            return [text]

        chunks = []
        sentences = re.split(r'[.!?]+', text)
        current_chunk = ""

        for sentence in sentences:
            if len(current_chunk + sentence) <= max_chars:
                current_chunk += sentence + ". "
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = sentence + ". "

        if current_chunk:
            chunks.append(current_chunk.strip())

        return chunks

    def _table_to_text(self, table_data: List[List]) -> str:
        """Convert table data to text representation"""
        if not table_data:
            return ""

        text_rows = []
        for row in table_data:
            text_row = " | ".join([str(cell) if cell else "" for cell in row])
            text_rows.append(text_row)

        return "\n".join(text_rows)

    def extract_from_url(self, url: str) -> Dict[str, List]:
        """
        Extract content from URL

        Args:
            url: URL to extract content from

        Returns:
            Dictionary containing extracted elements
        """
        try:
            logger.info(f"Processing URL: {url}")

            # Fetch webpage content
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()

            # Parse HTML
            soup = BeautifulSoup(response.text, 'html.parser')

            # Remove script and style elements
            for script in soup(["script", "style", "nav", "header", "footer"]):
                script.decompose()

            # Extract text content
            text_content = soup.get_text()

            # Clean up text
            lines = (line.strip() for line in text_content.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            clean_text = ' '.join(chunk for chunk in chunks if chunk)

            # Split into manageable chunks
            text_chunks = self._split_text_into_chunks(clean_text, max_chars=2000)
            texts = [Document(
                page_content=chunk,
                metadata={"source": url, "type": "web_text"}
            ) for chunk in text_chunks if len(chunk.strip()) > 100]

            # Extract images from HTML
            images = self._extract_images_from_html(soup, url)

            logger.info(f"Extracted {len(texts)} text chunks, {len(images)} images from URL")

            return {
                "texts": texts,
                "tables": [],
                "images": images,
                "source": url,
                "type": "url"
            }

        except Exception as e:
            logger.error(f"Error processing URL {url}: {str(e)}")
            return {"texts": [], "tables": [], "images": [], "source": url, "type": "url"}

    def _extract_images_from_html(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        """Extract and convert images from HTML to base64"""
        try:
            images = []

            for img_tag in soup.find_all('img'):
                img_url = img_tag.get('src')
                if not img_url:
                    continue

                # Convert relative URLs to absolute
                if not img_url.startswith(('http://', 'https://')):
                    img_url = urljoin(base_url, img_url)

                try:
                    # Download image
                    img_response = requests.get(img_url, timeout=10)
                    img_response.raise_for_status()

                    # Check if it's actually an image
                    if not img_response.headers.get('content-type', '').startswith('image/'):
                        continue

                    # Convert to base64
                    img_base64 = base64.b64encode(img_response.content).decode('utf-8')
                    images.append(img_base64)

                    if len(images) >= 10:  # Limit number of images
                        break

                except Exception as e:
                    logger.warning(f"Failed to download image {img_url}: {str(e)}")
                    continue

            return images

        except Exception as e:
            logger.error(f"Error extracting images from HTML: {str(e)}")
            return []

    def summarize_elements(self, elements: Dict[str, List]) -> Dict[str, List[str]]:
        """
        Generate summaries for text, tables, and images

        Args:
            elements: Dictionary containing extracted elements

        Returns:
            Dictionary containing summaries
        """
        try:
            logger.info("Generating summaries...")

            # Text and table summarization prompt
            text_prompt = ChatPromptTemplate.from_template("""
            You are an assistant tasked with summarizing content for a RAG system.
            Provide a concise but comprehensive summary of the following content.
            Focus on key information, main points, and important details.

            Content: {element}

            Summary:
            """)

            text_chain = text_prompt | self.summarize_model | StrOutputParser()

            # Summarize texts
            text_summaries = []
            if elements["texts"]:
                for text_element in elements["texts"]:
                    content = text_element.page_content if hasattr(text_element, 'page_content') else str(text_element)
                    try:
                        summary = text_chain.invoke({"element": content})
                        text_summaries.append(summary)
                    except Exception as e:
                        logger.warning(f"Failed to summarize text: {str(e)}")
                        text_summaries.append(content[:500] + "..." if len(content) > 500 else content)

            # Summarize tables
            table_summaries = []
            if elements["tables"]:
                for table in elements["tables"]:
                    table_content = table.page_content if hasattr(table, 'page_content') else str(table)
                    try:
                        summary = text_chain.invoke({"element": table_content})
                        table_summaries.append(summary)
                    except Exception as e:
                        logger.warning(f"Failed to summarize table: {str(e)}")
                        table_summaries.append(table_content[:500] + "..." if len(table_content) > 500 else table_content)

            # Summarize images using Gemini Vision
            image_summaries = []
            if elements["images"]:
                for img_b64 in elements["images"]:
                    try:
                        image_summary = self._summarize_image_with_gemini(img_b64)
                        image_summaries.append(image_summary)
                    except Exception as e:
                        logger.warning(f"Failed to summarize image: {str(e)}")
                        image_summaries.append("Image content could not be analyzed")

            logger.info(f"Generated {len(text_summaries)} text summaries, {len(table_summaries)} table summaries, {len(image_summaries)} image summaries")

            return {
                "text_summaries": text_summaries,
                "table_summaries": table_summaries,
                "image_summaries": image_summaries
            }

        except Exception as e:
            logger.error(f"Error generating summaries: {str(e)}")
            return {"text_summaries": [], "table_summaries": [], "image_summaries": []}

    def _summarize_image_with_gemini(self, image_base64: str) -> str:
        """Summarize image using Gemini Vision"""
        try:
            # Decode base64 to bytes
            image_bytes = base64.b64decode(image_base64)

            # Create Gemini vision model
            model = genai.GenerativeModel('gemini-2.0-flash')

            # Prepare image
            image = Image.open(BytesIO(image_bytes))

            prompt = """Analyze this image in detail. Describe what you see, including:
            - Main objects, people, or elements
            - Text content if any
            - Charts, graphs, or diagrams
            - Colors, layout, and composition
            - Any relevant context or meaning

            Provide a comprehensive description suitable for retrieval and question answering."""

            response = model.generate_content([prompt, image])
            return response.text

        except Exception as e:
            logger.error(f"Error summarizing image with Gemini: {str(e)}")
            return "Image analysis failed"

    def add_to_vectorstore(self, elements: Dict[str, List], summaries: Dict[str, List[str]]):
        """
        Add elements and summaries to vector store

        Args:
            elements: Original elements
            summaries: Generated summaries
        """
        try:
            logger.info("Adding content to vector store...")

            # Add texts
            if elements["texts"] and summaries["text_summaries"]:
                doc_ids = [str(uuid.uuid4()) for _ in elements["texts"]]
                summary_docs = [
                    Document(
                        page_content=summary,
                        metadata={
                            self.id_key: doc_ids[i],
                            "source": elements["source"],
                            "type": "text",
                            "source_type": elements["type"]
                        }
                    )
                     for i, summary in enumerate(summaries["text_summaries"])
                ]
                self.retriever.vectorstore.add_documents(summary_docs)
                self.retriever.docstore.mset(list(zip(doc_ids, elements["texts"])))

            # Add tables
            if elements["tables"] and summaries["table_summaries"]:
                table_ids = [str(uuid.uuid4()) for _ in elements["tables"]]
                table_docs = [
                    Document(
                        page_content=summary,
                        metadata={
                            self.id_key: table_ids[i],
                            "source": elements["source"],
                            "type": "table",
                            "source_type": elements["type"]
                        }
                    )
                     for i, summary in enumerate(summaries["table_summaries"])
                ]
                self.retriever.vectorstore.add_documents(table_docs)
                self.retriever.docstore.mset(list(zip(table_ids, elements["tables"])))

            # Add images
            if elements["images"] and summaries["image_summaries"]:
                img_ids = [str(uuid.uuid4()) for _ in elements["images"]]
                img_docs = [
                    Document(
                        page_content=summary,
                        metadata={
                            self.id_key: img_ids[i],
                            "source": elements["source"],
                            "type": "image",
                            "source_type": elements["type"]
                        }
                    )
                     for i, summary in enumerate(summaries["image_summaries"])
                ]
                self.retriever.vectorstore.add_documents(img_docs)
                self.retriever.docstore.mset(list(zip(img_ids, elements["images"])))

            # Persist the vectorstore
            self.vectorstore.persist()

            logger.info("Content successfully added to vector store")

        except Exception as e:
            logger.error(f"Error adding to vector store: {str(e)}")
            raise

    def process_source(self, source: str) -> str:
        """
        Process a source (PDF file path or URL)

        Args:
            source: Path to PDF file or URL

        Returns:
            Status message
        """
        try:
            # Determine if source is URL or file path
            if source.startswith(('http://', 'https://')):
                elements = self.extract_from_url(source)
            else:
                if not os.path.exists(source):
                    return f"File not found: {source}"
                elements = self.extract_from_pdf(source)

            if not any([elements["texts"], elements["tables"], elements["images"]]):
                return f"No content extracted from {source}"

            # Generate summaries
            summaries = self.summarize_elements(elements)

            # Add to vector store
            self.add_to_vectorstore(elements, summaries)

            # Store processed document info
            self.processed_docs.append({
                "source": elements["source"],
                "type": elements["type"],
                "text_count": len(elements["texts"]),
                "table_count": len(elements["tables"]),
                "image_count": len(elements["images"])
            })

            return f"✅ Successfully processed {elements['source']}\n📄 Texts: {len(elements['texts'])}\n📊 Tables: {len(elements['tables'])}\n🖼️ Images: {len(elements['images'])}"

        except Exception as e:
            logger.error(f"Error processing source {source}: {str(e)}")
            return f"❌ Error processing {source}: {str(e)}"

    def parse_retrieved_docs(self, docs: List) -> Dict[str, List]:
        """Parse retrieved documents into images and texts"""
        try:
            images = []
            texts = []

            for doc in docs:
                # Check if it's base64 encoded image content
                if isinstance(doc, str) and len(doc) > 1000: # Heuristic for base64
                    try:
                        # Basic validation
                        base64.b64decode(doc, validate=True)
                        images.append(doc)
                        continue
                    except Exception:
                        pass

                # Extract text content
                if hasattr(doc, 'page_content'):
                    texts.append(doc.page_content)
                elif hasattr(doc, 'text'):
                    texts.append(doc.text)
                else:
                    texts.append(str(doc))

            return {"images": images, "texts": texts}

        except Exception as e:
            logger.error(f"Error parsing retrieved docs: {str(e)}")
            return {"images": [], "texts": []}

    def query(self, question: str, k: int = 5) -> Dict[str, Any]:
        """
        Query the RAG system

        Args:
            question: User question
            k: Number of documents to retrieve

        Returns:
            Dictionary containing response and context
        """
        try:
            logger.info(f"Processing query: {question}")

            # Retrieve relevant documents
            retrieved_docs = self.retriever.invoke(question, k=k)
            context = self.parse_retrieved_docs(retrieved_docs)

            # Build context text
            context_text = ""
            if context["texts"]:
                context_text = "\n\n---\n\n".join(context["texts"][:3])  # Limit context

            # Create prompt
            if context["images"] and len(context["images"]) > 0:
                # Multimodal prompt with images
                try:
                    model = genai.GenerativeModel('gemini-2.0-flash')

                    prompt_parts = [f"""Based on the following context and images, answer the question accurately and comprehensively.

Context:
{context_text}

Question: {question}

Answer:"""]

                    # Add first image if available
                    if context["images"]:
                        try:
                            image_bytes = base64.b64decode(context["images"][0])
                            image = Image.open(BytesIO(image_bytes))
                            prompt_parts.append(image)
                        except Exception as e:
                            logger.warning(f"Failed to add image to prompt: {e}")

                    response = model.generate_content(prompt_parts)
                    answer = response.text

                except Exception as e:
                    logger.warning(f"Multimodal generation failed, falling back to text-only: {e}")
                    # Fallback to text-only
                    prompt = f"""Based on the following context, answer the question accurately and comprehensively.

Context:
{context_text}

Question: {question}

Answer:"""

                    response = self.llm.invoke(prompt)
                    answer = response.content if hasattr(response, 'content') else str(response)
            else:
                # Text-only prompt
                prompt = f"""Based on the following context, answer the question accurately and comprehensively.

Context:
{context_text if context_text else "No specific context available."}

Question: {question}

Answer:"""

                response = self.llm.invoke(prompt)
                answer = response.content if hasattr(response, 'content') else str(response)

            return {
                "response": answer,
                "context": context,
                "retrieved_docs": retrieved_docs
            }

        except Exception as e:
            logger.error(f"Error processing query: {str(e)}")
            return {
                "response": f"I apologize, but I encountered an error while processing your question: {str(e)}",
                "context": {"images": [], "texts": []},
                "retrieved_docs": []
            }

    def get_processed_docs_info(self) -> str:
        """Get information about processed documents"""
        if not self.processed_docs:
            return "No documents processed yet."

        info = "📚 **Processed Documents:**\n\n"
        for i, doc in enumerate(self.processed_docs, 1):
            info += f"{i}. **{doc['source']}** ({doc['type']})\n"
            info += f"   - 📄 Texts: {doc['text_count']} | 📊 Tables: {doc['table_count']} | 🖼️ Images: {doc['image_count']}\n\n"

        return info

def create_gradio_interface(rag_system: MultimodalRAGSystem):
    """Create Gradio interface for the RAG system"""

    def process_sources_ui(files, urls):
        """Process multiple sources through UI"""
        if not files and not urls:
            return "⚠️ Please upload at least one PDF or provide a URL."

        statuses = []
        if files:
            for file in files:
                statuses.append(rag_system.process_source(file.name))

        if urls:
            for url in urls.splitlines():
                if url.strip():
                    statuses.append(rag_system.process_source(url.strip()))

        return "\n\n".join(statuses)


    def query_ui(question, chat_history):
        """Process query through UI and display images"""
        if not question or not question.strip():
            return chat_history, ""

        try:
            result = rag_system.query(question.strip())
            response_text = result["response"]
            context_images = result["context"]["images"]

            # Add text response to chat
            chat_history.append((question, response_text))

            # If there's a relevant image, display it
            if context_images:
                try:
                    img_b64 = context_images[0]
                    img_bytes = base64.b64decode(img_b64)

                    with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as temp_file:
                        temp_file.write(img_bytes)
                        temp_file_path = temp_file.name

                    # Add image to chat history
                    chat_history.append((None, (temp_file_path,)))

                except Exception as e:
                    logger.warning(f"Failed to display image in chat: {e}")

            return chat_history, ""

        except Exception as e:
            error_msg = f"❌ Error: {str(e)}"
            chat_history.append((question, error_msg))
            return chat_history, ""

    def show_context_ui(question):
        """Show context for a question"""
        if not question or not question.strip():
            return "Please provide a question", []

        try:
            result = rag_system.query(question.strip())
            context = result["context"]

            # Prepare context text
            context_text = "### 🔍 Retrieved Context:\n\n"
            for i, doc in enumerate(result["retrieved_docs"], 1):
                if hasattr(doc, 'page_content'):
                    context_text += f"**Document {i}:**\n"
                    context_text += f"```\n{doc.page_content[:500]}...\n```\n"
                    if hasattr(doc, 'metadata'):
                        context_text += f"   *📍 Source: {doc.metadata.get('source', 'Unknown')}*\n"
                        context_text += f"   *🏷️ Type: {doc.metadata.get('type', 'Unknown')}*\n"
                context_text += "\n---\n"

            # Prepare images for display
            images_to_display = []
            for img_b64 in context["images"]:
                try:
                    img_bytes = base64.b64decode(img_b64)
                    img = Image.open(BytesIO(img_bytes))
                    images_to_display.append(img)
                except Exception as e:
                    logger.warning(f"Failed to decode image for display: {str(e)}")

            return context_text, images_to_display

        except Exception as e:
            return f"❌ Error retrieving context: {str(e)}", []

    def get_docs_info():
        """Get processed documents information"""
        return rag_system.get_processed_docs_info()

    # Create Gradio interface
    with gr.Blocks(title="Multimodal RAG System", theme=gr.themes.Default(primary_hue="blue", secondary_hue="blue")) as demo:
        gr.Markdown("""
        # 🤖 Multimodal RAG System with Gemini

        This system processes PDFs and websites, extracts text, tables, and images,
        and answers questions using the processed content with Gemini AI.
        """)

        with gr.Tab("📄 Document Processing"):
            gr.Markdown("### Add Documents to the Knowledge Base")

            with gr.Row():
                pdf_upload = gr.File(
                    label="Upload PDFs",
                    file_count="multiple",
                    file_types=[".pdf"],
                    scale=1
                )
                url_input = gr.Textbox(
                    label="Enter URLs (one per line)",
                    placeholder="https://example.com\nhttps://another-example.com",
                    lines=4,
                    scale=2
                )

            process_btn = gr.Button("🔄 Process All Sources", variant="primary")
            process_output = gr.Textbox(label="Processing Status", interactive=False, lines=6)

            docs_info_btn = gr.Button("📋 Show All Processed Documents", variant="secondary")
            docs_info_output = gr.Markdown(label="Processed Document List")

            process_btn.click(
                fn=process_sources_ui,
                inputs=[pdf_upload, url_input],
                outputs=[process_output]
            )

            docs_info_btn.click(
                fn=get_docs_info,
                outputs=[docs_info_output]
            )

        with gr.Tab("💬 Chat Interface"):
            gr.Markdown("### Ask Questions About Your Documents")

            chatbot = gr.Chatbot(
                label="Conversation",
                height=550,
                show_label=False,
                avatar_images=(None, "https://www.google.com/images/branding/googlelogo/1x/googlelogo_color_272x92dp.png")
            )

            with gr.Row():
                question_input = gr.Textbox(
                    label="Your Question",
                    placeholder="Ask a question about your documents...",
                    scale=4,
                    show_label=False
                )
                ask_btn = gr.Button("🚀 Ask", variant="primary", scale=1)

            question_input.submit(
                fn=query_ui,
                inputs=[question_input, chatbot],
                outputs=[chatbot, question_input]
            )

            ask_btn.click(
                fn=query_ui,
                inputs=[question_input, chatbot],
                outputs=[chatbot, question_input]
            )

        with gr.Tab("🔍 View Context"):
            gr.Markdown("### See the Context Used for Your Last Question")
            gr.Markdown("Enter the same question you asked in the chat to see the retrieved text and images that were used to generate the answer.")

            with gr.Row():
                context_question_input = gr.Textbox(
                    label="Enter your question again to see the context",
                    placeholder="Enter the question here...",
                    scale=3
                )
                context_btn = gr.Button("👁️ Show Context", variant="secondary", scale=1)

            context_text_output = gr.Markdown(
                label="Retrieved Text Context"
            )
            context_images_output = gr.Gallery(
                label="Retrieved Images",
                show_label=True,
                elem_id="gallery",
                columns=[5],
                object_fit="contain",
                height="auto"
            )

            context_btn.click(
                fn=show_context_ui,
                inputs=[context_question_input],
                outputs=[context_text_output, context_images_output]
            )
    return demo


if __name__ == "__main__":
    # IMPORTANT: Paste your Gemini API key here
    api_key = ""

    if not api_key or api_key == "YOUR_GEMINI_API_KEY_HERE":
        print("🔴 FATAL ERROR: The 'GEMINI_API_KEY' is not set in the script.")
        print("Please get your API key from Google AI Studio (https://aistudio.google.com/) and paste it into the script.")
        # Create a simple Gradio interface to display the error and exit
        with gr.Blocks() as error_demo:
            gr.Markdown(
                """
                # 🔴 ERROR: `GEMINI_API_KEY` is not set.
                You must paste your API key directly into the Python script to run the application.
                1. Get your API key from [Google AI Studio](https://aistudio.google.com/).
                2. Open the script and replace `"YOUR_GEMINI_API_KEY_HERE"` with your actual key.
                3. Run the script again.
                """
            )
        error_demo.launch()
    else:
        try:
            # Create a temporary directory for the ChromaDB
            temp_dir = tempfile.mkdtemp()
            logger.info(f"ChromaDB will be persisted to temporary directory: {temp_dir}")

            # Initialize the RAG system
            rag_system = MultimodalRAGSystem(
                gemini_api_key=api_key,
                persist_directory=temp_dir
            )

            # Create and launch the Gradio interface
            app = create_gradio_interface(rag_system)
            app.launch(share=True, debug=True)

        except Exception as e:
            logger.error(f"An error occurred during application startup: {e}", exc_info=True)
            # Create a Gradio interface to display the startup error
            with gr.Blocks() as error_demo:
                gr.Markdown(
                    f"""
                    # 💥 Application Failed to Start
                    An unexpected error occurred:
                    `{e}`
                    Please check the console logs for more detailed information.
                    """
                )
            error_demo.launch()


  chatbot = gr.Chatbot(


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://65038bfdacc73add93.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-2.0-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 4
}
].
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-2.0-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 2
}
]
  quota_metric: "generativelanguage.googleapis.com/generate_con

In [None]:
pip install fitz



In [None]:
!pip install gradio
!pip install langchain
!pip install langchain-community
!pip install langchain-google-genai
!pip install chromadb
!pip install unstructured[all-docs]
!pip install google-generativeai
!pip install requests
!pip install beautifulsoup4
!pip install pillow

Collecting langchain-community
  Downloading langchain_community-0.3.24-py3-none-any.whl.metadata (2.5 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.9.1-py3-none-any.whl.metadata (3.8 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting python-dotenv>=0.21.0 (from pydantic-settings<3.0.0,>=2.4.0->langchain-community)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB

Collecting chromadb
  Downloading chromadb-1.0.12-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting fastapi==0.115.9 (from chromadb)
  Downloading fastapi-0.115.9-py3-none-any.whl.metadata (27 kB)
Collecting posthog>=2.4.0 (from chromadb)
  Downloading posthog-4.4.0-py2.py3-none-any.whl.metadata (5.5 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.22.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting opentelemetry-api>=1.2.0 (from chromadb)
  Downloading opentelemetry_api-1.34.0-py3-none-any.whl.metadata (1.5 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.34.0-py3-none-any.whl.metadata (2.4 kB)
Collecting opentelemetry-instrumentation-fastapi>=0.41b0 (from chromadb)
  Downloading opentelemetry_instrumentation_fastapi-0.55b0-py3-none-any.whl.metadata (2.2 kB)
Collecting opentelemetry-sdk>=1.2.0 (fr

Collecting google-ai-generativelanguage==0.6.15 (from google-generativeai)
  Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl.metadata (5.7 kB)
Downloading google_ai_generativelanguage-0.6.15-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-ai-generativelanguage
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.6.18
    Uninstalling google-ai-generativelanguage-0.6.18:
      Successfully uninstalled google-ai-generativelanguage-0.6.18
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
langchain-google-genai 2.1.5 requires google-ai-generativelanguage<0.7.0,>=0.6.18, but you have google-ai-generativelanguage 0.6.15 which is incompatible.[0m[

