# **üåø EcoSight: Gemini Vision Multi-Agent Waste Sorting System**

#### **AI-powered agents detect, classify, and automate waste sorting for optimized recycling efficiency.**

EcoSight addresses waste contamination and low recycling accuracy through an intelligent multi-agent architecture. Leveraging ***Google's Gemini Vision API***, the system accurately identifies waste items and classifies them into recyclable, compostable, or landfill categories with location-aware disposal rules. Supporting both individual and batch processing, it generates comprehensive PDF reports with environmental impact analytics to enhance recycling efficiency and support data-driven sustainability initiatives.



# üó∫Ô∏è System Architecture Visualization

Interactive diagram showing the multi-agent waste sorting workflow
This cell generates the complete EcoSight architecture diagram using Mermaid.js visualization:

* üîÑ Sequential Flow ‚Äì Single images move through Vision ‚Üí Classification ‚Üí Reporting agents
* ‚ö° Parallel Flow ‚Äì Multiple images processed simultaneously for efficiency
* üõ†Ô∏è Tool Integration ‚Äì Each agent connects to specialized utilities (Gemini Vision, WasteDB, etc.)
* üíæ Memory System ‚Äì Two-way communication with MemoryBank for personalized insights
* üéØ Orchestration ‚Äì Central controller manages both sequential and parallel workflows

In [1]:
from IPython.display import display, Markdown

architecture_diagram = """
```mermaid
flowchart TD
    %% ========== INPUT LAYER ==========
    USER[User Input<br/>Waste Images] --> ORCH{Orchestrator<br/>Workflow Controller}
    
    %% ========== SEQUENTIAL FLOW ==========
    subgraph SG_SEQ [Sequential Pipeline]
        direction LR
        VISION[Vision Agent<br/>Object Detection] --> CLASS[Classification Agent<br/>Waste Categorization] --> REPORT[Reporting Agent<br/>PDF Generation]
    end
    
    ORCH --> SG_SEQ
    
    %% ========== PARALLEL FLOW ==========
    subgraph SG_PAR [Parallel Processing]
        direction TB
        MULTI[Multiple Images] --> VISION_P1[Vision Agent] --> CLASS_P1[Classification Agent] --> REPORT_P1[Reporting Agent]
        MULTI --> VISION_P2[Vision Agent] --> CLASS_P2[Classification Agent] --> REPORT_P2[Reporting Agent]
    end
    
    ORCH --> SG_PAR
    
    %% ========== TOOL INTEGRATION ==========
    VISION --> V_TOOL([Gemini Vision API])
    CLASS --> C_TOOL1([WasteDB<br/>Material Rules])
    CLASS --> C_TOOL2([LocationFinder<br/>Regional Guidelines])
    REPORT --> R_TOOL([PDF Generator<br/>Professional Reports])
    
    R_TOOL --> OUTPUT[PDF Report<br/>Environmental Impact Analysis]
    
    %% ========== MEMORY & MONITORING ==========
    REPORT <--> MEMORY{{MemoryBank<br/>User History & Patterns}}
    
    %% ========== CONTINUOUS MONITORING ==========
    MONITOR[Loop Agent<br/>Continuous Monitoring] -.->|Triggers New Analysis| ORCH
    
    %% ========== HIGH-CONTRAST STYLING ==========
    classDef agent fill:#e1f5fe,stroke:#01579b,color:#000000
    classDef tool fill:#fff3e0,stroke:#e65100,color:#000000
    classDef storage fill:#e8f5e8,stroke:#1b5e20,color:#000000
    classDef control fill:#fce4ec,stroke:#c2185b,color:#000000
    classDef input fill:#f3e5f5,stroke:#7b1fa2,color:#000000
    classDef output fill:#e8f5e8,stroke:#1b5e20,color:#000000
    classDef parallel fill:#f1f8e9,stroke:#33691e,color:#000000
    
    class VISION,CLASS,REPORT,VISION_P1,VISION_P2,CLASS_P1,CLASS_P2,REPORT_P1,REPORT_P2 agent
    class V_TOOL,C_TOOL1,C_TOOL2,R_TOOL tool
    class MEMORY storage
    class ORCH control
    class USER input
    class OUTPUT output
    class SG_PAR parallel
"""

display(Markdown(architecture_diagram))
print("‚úÖ EcoSight Multi-Agent Architecture Diagram")


```mermaid
flowchart TD
    %% ========== INPUT LAYER ==========
    USER[User Input<br/>Waste Images] --> ORCH{Orchestrator<br/>Workflow Controller}
    
    %% ========== SEQUENTIAL FLOW ==========
    subgraph SG_SEQ [Sequential Pipeline]
        direction LR
        VISION[Vision Agent<br/>Object Detection] --> CLASS[Classification Agent<br/>Waste Categorization] --> REPORT[Reporting Agent<br/>PDF Generation]
    end
    
    ORCH --> SG_SEQ
    
    %% ========== PARALLEL FLOW ==========
    subgraph SG_PAR [Parallel Processing]
        direction TB
        MULTI[Multiple Images] --> VISION_P1[Vision Agent] --> CLASS_P1[Classification Agent] --> REPORT_P1[Reporting Agent]
        MULTI --> VISION_P2[Vision Agent] --> CLASS_P2[Classification Agent] --> REPORT_P2[Reporting Agent]
    end
    
    ORCH --> SG_PAR
    
    %% ========== TOOL INTEGRATION ==========
    VISION --> V_TOOL([Gemini Vision API])
    CLASS --> C_TOOL1([WasteDB<br/>Material Rules])
    CLASS --> C_TOOL2([LocationFinder<br/>Regional Guidelines])
    REPORT --> R_TOOL([PDF Generator<br/>Professional Reports])
    
    R_TOOL --> OUTPUT[PDF Report<br/>Environmental Impact Analysis]
    
    %% ========== MEMORY & MONITORING ==========
    REPORT <--> MEMORY{{MemoryBank<br/>User History & Patterns}}
    
    %% ========== CONTINUOUS MONITORING ==========
    MONITOR[Loop Agent<br/>Continuous Monitoring] -.->|Triggers New Analysis| ORCH
    
    %% ========== HIGH-CONTRAST STYLING ==========
    classDef agent fill:#e1f5fe,stroke:#01579b,color:#000000
    classDef tool fill:#fff3e0,stroke:#e65100,color:#000000
    classDef storage fill:#e8f5e8,stroke:#1b5e20,color:#000000
    classDef control fill:#fce4ec,stroke:#c2185b,color:#000000
    classDef input fill:#f3e5f5,stroke:#7b1fa2,color:#000000
    classDef output fill:#e8f5e8,stroke:#1b5e20,color:#000000
    classDef parallel fill:#f1f8e9,stroke:#33691e,color:#000000
    
    class VISION,CLASS,REPORT,VISION_P1,VISION_P2,CLASS_P1,CLASS_P2,REPORT_P1,REPORT_P2 agent
    class V_TOOL,C_TOOL1,C_TOOL2,R_TOOL tool
    class MEMORY storage
    class ORCH control
    class USER input
    class OUTPUT output
    class SG_PAR parallel


‚úÖ EcoSight Multi-Agent Architecture Diagram



# üöÄ Project Structure Setup
Creates the organized folder system that supports the multi-agent workflow:

* agents/ ‚Äì AI agent implementations
* tools/ ‚Äì utility modules and databases
* memory/ ‚Äì user history and session storage
* orchestration/ ‚Äì workflow coordination
* utils/ ‚Äì helper functions
* data/ ‚Äì datasets and processed outputs

In [2]:
# EcoSight: Gemini Vision Multi-Agent Waste Sorting
import os
print("üöÄ Setting up EcoSight project structure...")

# Create main project directory and all subfolders
!mkdir -p EcoSight/tools EcoSight/agents EcoSight/memory EcoSight/orchestration EcoSight/utils EcoSight/data

# Verify folder structure
print("üìÅ Project structure created:")
!find EcoSight -type d | sort
print("‚úÖ EcoSight folder structure ready!")

üöÄ Setting up EcoSight project structure...
üìÅ Project structure created:
EcoSight
EcoSight/agents
EcoSight/data
EcoSight/memory
EcoSight/orchestration
EcoSight/tools
EcoSight/utils
‚úÖ EcoSight folder structure ready!



# üìä TrashNet Dataset Setup

**Loading and validating the research-quality waste image dataset**

This cell initializes the TrashNet dataset ‚Äì a Stanford research dataset containing 2,527 labeled waste images across 6 categories (cardboard, glass, metal, paper, plastic, trash). The system automatically searches multiple Kaggle paths, copies the data into the project structure, and verifies the folder hierarchy. If the dataset isn't found, clear instructions guide users through attaching it via Kaggle's data interface.

**Dataset Details:**
* üì∏ 2,527 images ‚Äì Balanced across waste categories
* üè∑Ô∏è 6 material types ‚Äì Cardboard, glass, metal, paper, plastic, general trash
* üìè Standardized format ‚Äì 512√ó384 pixel resolution
* üéØ Research-grade ‚Äì Used for academic computer vision studies

In [3]:
# TrashNet Dataset Setup for Kaggle
print("üì• Setting up TrashNet Dataset...")

# The TrashNet dataset on Kaggle is:

dataset_paths = [
    "/kaggle/input/trashnet/data/dataset",  
    "/kaggle/input/trashnet/data",
    "/kaggle/input/trashnet/dataset",
    "/kaggle/input/trashnet",
    "/kaggle/input/garbage-classification/Garbage classification/Garbage classification", 
]

found_path = None
for path in dataset_paths:
    if os.path.exists(path):
        found_path = path
        print(f"‚úÖ Dataset found at: {path}")
        break

if found_path:
    # Copy to our project structure
    !mkdir -p EcoSight/data/trashnet
    !cp -r {found_path}/* EcoSight/data/trashnet/ 2>/dev/null || echo "Using direct access"
    
    # Verify the copy worked
    if os.path.exists("EcoSight/data/trashnet"):
        print("üìÅ Dataset successfully copied to project structure")
    else:
        print("üìÅ Using dataset from original location")
else:
    print("‚ùå TrashNet dataset not found in expected locations")
    print("""
üìù HOW TO ADD THE CORRECT TRASHNET DATASET:

1. Click '+ Add Data' button in Kaggle notebook
2. Search for: "trashnet"
3. Select the dataset: "trashnet" by yangyang111
4. Click 'Add' to attach it to your notebook
5. Wait for dataset to load (green checkmark)
6. Restart session: Session ‚Üí Restart Session  
7. Re-run all cells

The dataset should then be available at:
/kaggle/input/trashnet/data/dataset/

üìä Original TrashNet Dataset Info:
- 2,527 images across 6 categories
- Categories: cardboard, glass, metal, paper, plastic, trash
- Image sizes: 512x384 pixels
- Research-quality dataset from Stanford
""")

# Verify what we can access
print("\nüîç Checking dataset structure...")
if found_path:
    !find {found_path} -type d 2>/dev/null | head -15
else:
    # Check our project structure
    !find EcoSight/data/trashnet -type d 2>/dev/null | head -10 || echo "No dataset found"

print("\n‚úÖ TrashNet setup complete!")

üì• Setting up TrashNet Dataset...
‚úÖ Dataset found at: /kaggle/input/trashnet
üìÅ Dataset successfully copied to project structure

üîç Checking dataset structure...
/kaggle/input/trashnet
/kaggle/input/trashnet/trashnet
/kaggle/input/trashnet/trashnet/val
/kaggle/input/trashnet/trashnet/val/metal
/kaggle/input/trashnet/trashnet/val/glass
/kaggle/input/trashnet/trashnet/val/paper
/kaggle/input/trashnet/trashnet/val/trash
/kaggle/input/trashnet/trashnet/val/cardboard
/kaggle/input/trashnet/trashnet/val/plastic
/kaggle/input/trashnet/trashnet/test
/kaggle/input/trashnet/trashnet/test/metal
/kaggle/input/trashnet/trashnet/test/glass
/kaggle/input/trashnet/trashnet/test/paper
/kaggle/input/trashnet/trashnet/test/trash
/kaggle/input/trashnet/trashnet/test/cardboard

‚úÖ TrashNet setup complete!


# üîß Python Path Configuration

**Setting up module imports for the multi-agent system**

This cell ensures Python can locate all EcoSight modules by adding the project root directory to the system path. This enables clean imports between agents, tools, and orchestration components without complex relative import statements.

**Path Configuration:**

* üìç /kaggle/working/EcoSight ‚Äì Added to sys.path
* üì¶ Module accessibility ‚Äì All project folders become importable
* üîÑ Clean imports ‚Äì Enables from EcoSight.agents.vision_agent import VisionAnalysisAgent

In [4]:
# Configure Python path to include our project
import sys

# Add EcoSight to Python path
sys.path.append('/kaggle/working/EcoSight')

print("‚úÖ Python path configured for EcoSight/ project structure")
print(f"Project root: /kaggle/working/EcoSight")

‚úÖ Python path configured for EcoSight/ project structure
Project root: /kaggle/working/EcoSight



# üîê Gemini API Configuration
**Secure integration with Google's Gemini Vision AI**

This cell handles secure API key management using Kaggle Secrets, enabling real AI-powered waste detection while protecting sensitive credentials.

In [5]:
# =============================================================================
# üîë SECURE GEMINI API CONFIGURATION (Using Kaggle Secrets)
# =============================================================================

import os
from kaggle_secrets import UserSecretsClient

print("üîë Configuring Gemini AI Integration Securely...")

try:
    # Get Gemini API key from Kaggle Secrets
    GEMINI_API_KEY = UserSecretsClient().get_secret("GEMINI_API_KEY")
    
    if GEMINI_API_KEY and GEMINI_API_KEY.startswith("AIza"):
        os.environ["GEMINI_API_KEY"] = GEMINI_API_KEY
        print("‚úÖ Gemini API key loaded securely from Kaggle Secrets!")
        print("üöÄ Real object detection will be used")
        print(f"üîê Key preview: {GEMINI_API_KEY[:10]}...")
    else:
        print("‚ùå Invalid Gemini API key format in Kaggle Secrets")
        print("üí° Please ensure your key starts with 'AIza'")
        os.environ["GEMINI_API_KEY"] = "MOCK_KEY_NO_VALID_KEY"
        print("üé≠ Using mock mode - no valid Gemini API key detected")
        
except Exception as e:
    print(f"üîë Kaggle Secrets Error: {e}")
    print("""
üìù HOW TO ADD YOUR GEMINI API KEY SECURELY:

1. Click the 'Settings' tab in your Kaggle notebook
2. Scroll down to 'Secrets' section
3. Click 'Add new secret'
4. Set:
   - Name: GEMINI_API_KEY
   - Value: Your actual Gemini API key (starts with AIza...)
5. Click 'Save'
6. Restart your notebook session
7. Re-run this cell

üîí Security Benefits:
   - API key never appears in your code
   - Key is encrypted and secure
   - Easy to rotate/update without code changes
    """)
    os.environ["GEMINI_API_KEY"] = "MOCK_KEY_SECRETS_ERROR"
    print("üé≠ Using mock mode until valid API key is configured")

üîë Configuring Gemini AI Integration Securely...
‚úÖ Gemini API key loaded securely from Kaggle Secrets!
üöÄ Real object detection will be used
üîê Key preview: AIzaSyB_tQ...


# üß± Agent Base Module
**Foundation for the multi-agent system with context management and logging**

This cell creates the core agent_base.py module that provides essential infrastructure for all EcoSight agents. The AgentContext class manages workflow state across the pipeline, while logging utilities enable performance monitoring and observability.

**Core Components:**

* üìã AgentContext ‚Äì Session management with user/location metadata
* üìù create_agent_logger() ‚Äì Structured logging for each agent
* üìä log_metric() ‚Äì Observability metrics for performance tracking
* üß© Modular design ‚Äì Clean separation of concerns without complex inheritance

In [6]:
%%writefile EcoSight/agents/agent_base.py
# Simplified agent foundation without complex inheritance
import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass

@dataclass
class AgentContext:
    session_id: str
    user_id: Optional[str] = None
    location: Optional[str] = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

def create_agent_logger(name: str):
    """Create a logger for any agent"""
    logger = logging.getLogger(f"agent.{name}")
    return logger

def log_metric(logger, metric_name: str, value: float, tags: Dict[str, str] = None):
    """Log observability metrics for any agent"""
    tags = tags or {}
    tags['agent'] = logger.name
    logger.info(f"METRIC:{metric_name}={value} {tags}")

Writing EcoSight/agents/agent_base.py



# üëÅÔ∏è Vision Provider Module
**AI-powered image analysis for waste detection**

This cell creates the vision_provider.py module with dual-mode image analysis capabilities. It implements both real Gemini Vision API integration and a mock provider for testing, with intelligent response parsing and duplicate filtering.

**Key Features:**

* üîÆ Dual providers ‚Äì GeminiVisionProvider (real AI) and MockVisionProvider (testing)
* üìù Enhanced parsing ‚Äì Smart categorization into 7 waste types with confidence scoring
* üîÑ Automatic fallback ‚Äì Seamless switch to mock mode if API fails
* üßπ Smart filtering ‚Äì Duplicate removal and quality-based item selection
* üìç Model discovery ‚Äì Automatic detection of available Gemini models

In [7]:
%%writefile EcoSight/tools/vision_provider.py
import random
from abc import ABC, abstractmethod
from typing import Dict, Any
import base64
import os
import asyncio

class VisionProvider(ABC):
    @abstractmethod
    async def analyze_image(self, image_b64: str) -> Dict[str, Any]:
        pass

class MockVisionProvider(VisionProvider):
    """Mock vision provider for testing - uses TrashNet categories"""
    
    async def analyze_image(self, image_b64: str) -> Dict[str, Any]:
        await asyncio.sleep(0.1)
        
        # Mock detection based on TrashNet categories
        trashnet_categories = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
        detected_category = random.choice(trashnet_categories)
        
        mock_items = [
            {"name": f"{detected_category} item", "confidence": random.uniform(0.85, 0.98), "bbox": [100, 100, 50, 50]},
        ]
        
        # Sometimes detect multiple items (1-4 total)
        additional_items = random.randint(0, 3)
        for _ in range(additional_items):
            second_category = random.choice([c for c in trashnet_categories if c != detected_category])
            mock_items.append({
                "name": f"{second_category} item", 
                "confidence": random.uniform(0.75, 0.90), 
                "bbox": [random.randint(50, 300), random.randint(50, 200), 40, 40]
            })
        
        return {
            "items": mock_items,
            "analysis_confidence": random.uniform(0.8, 0.95),
            "model_version": "mock-trashnet-v1.0",
            "provider": "mock"
        }

class GeminiVisionProvider(VisionProvider):
    """Real Google Gemini Vision implementation"""
    
    def __init__(self):
        self.api_key = os.getenv("GEMINI_API_KEY")
        if not self.api_key or self.api_key == "MOCK_MODE_NO_VALID_KEY":
            raise ValueError("Gemini API key not found. Please set GEMINI_API_KEY in Kaggle Secrets")
        
        # Initialize Gemini client
        try:
            import google.generativeai as genai
            self.genai = genai
            genai.configure(api_key=self.api_key)
            
            # Get available models - optimized for newer Gemini 2.5 models
            self.model, self.model_name = self._get_working_model()
            print(f"‚úÖ Gemini Vision initialized with model: {self.model_name}")
            
        except ImportError:
            raise ImportError("Google GenAI library not installed. Run: pip install google-generativeai")
    
    def _get_working_model(self):
        """Get a working Gemini model - optimized for Gemini 2.5 models"""
        try:
            # Only try Gemini 2.5 models which are available in your environment
            model_candidates = [
                "gemini-2.5-flash",
                "gemini-2.5-flash-exp",
                "gemini-2.5-pro-exp",
                "gemini-2.0-flash",
                "gemini-1.5-flash",  # Keep for backward compatibility but likely won't work
                "gemini-1.5-pro",    # Keep for backward compatibility
            ]
            
            for model_name in model_candidates:
                try:
                    model = self.genai.GenerativeModel(model_name)
                    # Simple test to check if model works
                    response = model.generate_content("Say 'Hello' in one word")
                    if response and response.text:
                        print(f"‚úÖ Model {model_name} is working")
                        return model, model_name
                except Exception as e:
                    error_msg = str(e)
                    # Don't print errors for models we know might not exist
                    if not ("404" in error_msg and "not found" in error_msg):
                        print(f"‚ö†Ô∏è Model {model_name} failed: {error_msg[:100]}...")
                    continue
            
            # If specific names don't work, try to list available models
            print("üîç Looking for available models...")
            available_models = []
            try:
                available_models = list(self.genai.list_models())
            except Exception as e:
                print(f"‚ö†Ô∏è Could not list models: {e}")
            
            # Filter for models that support generateContent
            content_gen_models = []
            for model_info in available_models:
                model_name = model_info.name
                if 'generateContent' in model_info.supported_generation_methods:
                    content_gen_models.append(model_info)
                    print(f"üìã Available: {model_name}")
            
            # Try each available model
            for model_info in content_gen_models:
                model_full_name = model_info.name
                try:
                    # Extract simple model name
                    simple_name = model_full_name.replace("models/", "")
                    model = self.genai.GenerativeModel(simple_name)
                    response = model.generate_content("Test")
                    if response and response.text:
                        print(f"‚úÖ Successfully using model: {simple_name}")
                        return model, simple_name
                except Exception as e:
                    # Try with full model name
                    try:
                        model = self.genai.GenerativeModel(model_full_name)
                        response = model.generate_content("Test")
                        if response and response.text:
                            print(f"‚úÖ Successfully using model with full name: {model_full_name}")
                            return model, model_full_name
                    except:
                        continue
            
            # If nothing works, raise appropriate error
            if available_models:
                available_names = [m.name for m in available_models]
                raise Exception(f"No working Gemini model found. Available models: {available_names}")
            else:
                raise Exception("No Gemini models available. Check API key and permissions.")
            
        except Exception as e:
            print(f"‚ùå Gemini model initialization failed: {e}")
            raise
    
    async def analyze_image(self, image_b64: str) -> Dict[str, Any]:
        """Analyze image using real Gemini Vision API"""
        try:
            print(f"üîÆ Calling Gemini Vision API with model: {self.model_name}...")
            
            # Decode base64 image
            image_data = base64.b64decode(image_b64)
            
            # Enhanced prompt for better waste detection
            prompt = """
            Analyze this image and identify ALL waste items visible. Be thorough and comprehensive.
            
            For EACH distinct waste item found, provide:
            1. Specific type (e.g., "plastic water bottle", "glass jar", "metal can", "cardboard box", "paper cup")
            2. Material composition 
            3. Approximate size/condition
            4. Whether it appears to be recyclable, compostable, or landfill
            
            IMPORTANT: 
            - List EACH item separately even if they are the same material
            - Be specific about the item type rather than generic
            - Include ALL items you can identify
            - Focus on household waste items commonly found in recycling/trash
            
            Return a clear list of all detected waste items.
            """
            
            # Call Gemini API
            response = await asyncio.get_event_loop().run_in_executor(
                None,
                lambda: self.model.generate_content([prompt, {"mime_type": "image/jpeg", "data": image_data}])
            )
            
            # Parse the response
            detected_items = self._parse_gemini_response(response.text)
            
            return {
                "items": detected_items,
                "analysis_confidence": 0.92,
                "model_version": self.model_name,
                "provider": "gemini",
                "raw_response": response.text[:300] + "..." if len(response.text) > 300 else response.text,
                "total_detected": len(detected_items)
            }
            
        except Exception as e:
            print(f"‚ùå Gemini Vision API error: {e}")
            # Fallback to mock provider
            print("üîÑ Falling back to mock provider...")
            mock_provider = MockVisionProvider()
            return await mock_provider.analyze_image(image_b64)
    
    def _parse_gemini_response(self, response_text: str) -> list:
        """Parse Gemini response with enhanced filtering and better item naming"""
        items = []
        
        # Enhanced waste keywords with more specific item types
        waste_categories = {
            'plastic': {
                'keywords': ['plastic', 'bottle', 'container', 'bag', 'wrapper', 'packaging', 'cup', 'utensil', 'straw'],
                'items': ['water bottle', 'soda bottle', 'food container', 'shopping bag', 'chip bag', 'yogurt cup']
            },
            'paper': {
                'keywords': ['paper', 'cardboard', 'newspaper', 'magazine', 'box', 'carton', 'tissue', 'napkin'],
                'items': ['cardboard box', 'newspaper', 'magazine', 'paper bag', 'egg carton']
            },
            'glass': {
                'keywords': ['glass', 'bottle', 'jar', 'container', 'window'],
                'items': ['glass bottle', 'jar', 'wine bottle', 'beer bottle']
            },
            'metal': {
                'keywords': ['metal', 'can', 'aluminum', 'tin', 'container', 'foil'],
                'items': ['soda can', 'food can', 'aluminum foil', 'metal container']
            },
            'cardboard': {
                'keywords': ['cardboard', 'box', 'packaging', 'carton'],
                'items': ['cardboard box', 'shipping box', 'cereal box', 'pizza box']
            },
            'organic': {
                'keywords': ['food', 'organic', 'fruit', 'vegetable', 'compost', 'banana', 'apple', 'peel'],
                'items': ['fruit peel', 'vegetable scraps', 'food waste', 'coffee grounds']
            },
            'trash': {
                'keywords': ['trash', 'garbage', 'waste', 'debris', 'litter', 'general waste'],
                'items': ['general waste', 'mixed trash', 'non-recyclable item']
            }
        }
        
        lines = response_text.split('\n')
        current_item = {}
        
        for line in lines:
            line = line.strip()
            if not line or len(line) < 10:  # Skip very short lines
                continue
                
            # Skip lines that are likely headers or section titles
            if any(skip in line.lower() for skip in ['analysis:', 'items:', 'detected:', 'list:', 'waste:']):
                continue
                
            # Skip numbered items if they're just numbers
            if line.replace('.', '').strip().isdigit():
                continue
                
            # Remove common prefixes like "1.", "-", "‚Ä¢", "*"
            clean_line = self._clean_line(line)
            if not clean_line:
                continue
                
            # Determine category and create specific item name
            detected_category, item_name, confidence = self._categorize_item(clean_line, waste_categories)
            
            # Create unique bounding box
            bbox = [
                random.randint(50, 400),
                random.randint(50, 300),
                random.randint(40, 120),
                random.randint(40, 120)
            ]
            
            items.append({
                "name": item_name,
                "confidence": confidence,
                "bbox": bbox,
                "description": clean_line[:100],
                "category_hint": detected_category
            })
        
        # Enhanced filtering
        if not items:
            items.append({
                "name": "unidentified waste item",
                "confidence": 0.5,
                "bbox": [100, 100, 50, 50],
                "description": "Waste item detected but not specifically identified",
                "category_hint": "trash"
            })
        
        # Apply smart filtering
        filtered_items = self._apply_smart_filtering(items)
        
        print(f"üìä Detected {len(items)} raw items ‚Üí {len(filtered_items)} after enhanced filtering")
        
        return filtered_items
    
    def _clean_line(self, line: str) -> str:
        """Clean and normalize a line of text"""
        # Remove common prefixes
        prefixes = ['- ', '‚Ä¢ ', '* ', '1. ', '2. ', '3. ', '4. ', '5. ', '6. ', '7. ', '8. ', '9. ', '10. ']
        for prefix in prefixes:
            if line.startswith(prefix):
                line = line[len(prefix):]
                break
                
        # Remove content in parentheses and brackets
        import re
        line = re.sub(r'\([^)]*\)', '', line)  # Remove (content)
        line = re.sub(r'\[[^\]]*\]', '', line)  # Remove [content]
        
        return line.strip()
    
    def _categorize_item(self, text: str, waste_categories: dict) -> tuple:
        """Categorize item and generate specific name"""
        text_lower = text.lower()
        detected_category = 'trash'
        confidence = 0.7
        item_name = "waste item"
        
        # Find the best matching category
        best_match_score = 0
        best_category = 'trash'
        
        for category, data in waste_categories.items():
            keyword_matches = [kw for kw in data['keywords'] if kw in text_lower]
            match_score = len(keyword_matches)
            
            if match_score > best_match_score:
                best_match_score = match_score
                best_category = category
        
        detected_category = best_category
        
        # Boost confidence based on match quality
        confidence = min(0.95, 0.7 + (best_match_score * 0.08))
        
        # Generate specific item name
        if detected_category != 'trash':
            # Try to find a specific item type from our list
            category_items = waste_categories[detected_category]['items']
            for specific_item in category_items:
                if any(word in text_lower for word in specific_item.split()):
                    item_name = specific_item
                    confidence = min(0.98, confidence + 0.1)  # Boost for specific match
                    break
            else:
                # Use generic category name
                item_name = f"{detected_category} item"
        else:
            # For trash, try to extract key words
            words = text_lower.split()
            if len(words) > 1:
                item_name = f"{words[0]} {words[1]}" if len(words) >= 2 else words[0]
            else:
                item_name = "general waste"
        
        return detected_category, item_name, round(confidence, 2)
    
    def _apply_smart_filtering(self, items: list) -> list:
        """Apply enhanced filtering to remove duplicates and low-quality items"""
        # 1. Remove very low confidence items
        filtered_items = [item for item in items if item['confidence'] > 0.55]
        
        # 2. Enhanced duplicate removal
        unique_items = self._remove_duplicates_enhanced(filtered_items)
        
        # 3. Sort by confidence (highest first)
        sorted_items = sorted(unique_items, key=lambda x: x['confidence'], reverse=True)
        
        # 4. Adaptive limit based on quality of detections
        high_confidence_count = len([item for item in sorted_items if item['confidence'] > 0.8])
        
        if high_confidence_count >= 8:
            limit = min(20, len(sorted_items))  # Many good detections - show more
        elif high_confidence_count >= 4:
            limit = min(15, len(sorted_items))  # Moderate good detections
        else:
            limit = min(10, len(sorted_items))  # Few good detections - be conservative
        
        return sorted_items[:limit]
    
    def _remove_duplicates_enhanced(self, items: list) -> list:
        """Enhanced duplicate removal with similarity scoring"""
        unique_items = []
        seen_signatures = set()
        
        for item in items:
            # Create a signature based on name and key description words
            name_words = set(item['name'].lower().split())
            desc_words = set(item['description'].lower().split()[:5])  # First 5 words
            
            # Combine for uniqueness check
            signature_parts = list(name_words | desc_words)
            signature_parts.sort()
            signature = '_'.join(signature_parts[:4])  # Use first 4 combined words
            
            if signature not in seen_signatures:
                seen_signatures.add(signature)
                unique_items.append(item)
            else:
                # If duplicate found, keep the one with higher confidence
                existing_idx = None
                for i, existing in enumerate(unique_items):
                    existing_sig_parts = list(set(existing['name'].lower().split()) | set(existing['description'].lower().split()[:5]))
                    existing_sig_parts.sort()
                    existing_sig = '_'.join(existing_sig_parts[:4])
                    
                    if existing_sig == signature and item['confidence'] > existing['confidence']:
                        existing_idx = i
                        break
                
                if existing_idx is not None:
                    unique_items[existing_idx] = item  # Replace with higher confidence item
        
        return unique_items

Writing EcoSight/tools/vision_provider.py


# üóÉÔ∏è Waste Database Tool
**Material classification system with recycling guidelines**

This cell creates the waste_db.py module that provides standardized waste categorization aligned with TrashNet data. It maps detected items to recyclable, compost, or landfill categories with material-specific processing instructions.

**Classification Rules:**

* üîÑ Recyclable ‚Äì Cardboard, glass, metal, paper, plastic (with processing steps)
* üöÆ Landfill ‚Äì General trash and non-recyclable items
* üè≠ Material-specific ‚Äì Different handling for bottles, cans, containers
* üìç Location-aware ‚Äì Framework for regional recycling rule adjustments
* ‚ùì Unknown items ‚Äì Default to landfill with proper identification notes

In [8]:
%%writefile EcoSight/tools/waste_db.py
from typing import Dict, Any, Optional

class WasteDB:
    """Tool for waste classification database - aligned with TrashNet categories"""
    
    def __init__(self, db_path: Optional[str] = None):
        self.classification_rules = self._load_default_rules()
    
    def _load_default_rules(self) -> Dict[str, Any]:
        # Classification rules aligned with TrashNet categories
        return {
            "cardboard": {"category": "recyclable", "material": "cardboard", "processing": "flatten"},
            "glass": {"category": "recyclable", "material": "glass", "processing": "separate_colors"},
            "metal": {"category": "recyclable", "material": "metal", "processing": "clean"},
            "paper": {"category": "recyclable", "material": "paper", "processing": "dry"},
            "plastic": {"category": "recyclable", "material": "plastic", "processing": "check_code"},
            "trash": {"category": "landfill", "material": "mixed", "notes": "non-recyclable"},
            "bottle": {"category": "recyclable", "material": "plastic", "processing": "rinse"},
            "can": {"category": "recyclable", "material": "metal", "processing": "rinse"},
            "container": {"category": "recyclable", "material": "mixed", "processing": "check_label"},
        }
    
    async def classify_item(self, item_name: str, location: Optional[str] = None) -> Dict[str, Any]:
        item_lower = item_name.lower()
        
        # Match against TrashNet categories and common patterns
        for key, rule in self.classification_rules.items():
            if key in item_lower:
                result = rule.copy()
                # Add location-specific adjustments if needed
                if location and self._has_location_override(location, key):
                    result.update(self._get_location_override(location, key))
                return result
        
        # Default classification for unknown items
        return {"category": "landfill", "material": "unknown", "notes": "unidentified"}
    
    def _has_location_override(self, location: str, item_key: str) -> bool:
        # Simplified location override check
        return False
    
    def _get_location_override(self, location: str, item_key: str) -> Dict[str, Any]:
        # Simplified location override rules
        return {}

Writing EcoSight/tools/waste_db.py


# üìç Location Finder Tool
**Geographic-aware waste disposal instructions**

This cell creates the location_finder.py module that provides location-specific recycling and disposal guidelines. It includes detailed rules for major cities (NYC, SF) with fallback defaults for other locations.

**City-Specific Rules:**

* üóΩ NYC ‚Äì Separate paper/metal/glass/plastic, DSNY facilities, color-coded bins
* üåâ San Francisco ‚Äì Mixed recyclables (blue bin), compost (green bin), Recology centers
* üåê Default ‚Äì General guidelines with local facility recommendations
* üìã Category-based ‚Äì Different instructions for recyclable, compost, landfill items
* üßπ Preparation ‚Äì Specific cleaning and preparation requirements per location

In [9]:
%%writefile EcoSight/tools/location_finder.py
from typing import Dict, Any, List, Optional

class LocationFinder:
    """Tool for location-specific waste disposal rules"""
    
    def __init__(self):
        self.location_rules = self._load_default_rules()
    
    def _load_default_rules(self) -> Dict[str, Any]:
        return {
            "NYC": {
                "recyclable": {
                    "instructions": "Place in blue recycling bin - separate paper and metal/glass/plastic",
                    "facilities": ["Curbside pickup", "DSNY recycling centers"],
                    "preparation": "Rinse containers, flatten boxes, no plastic bags"
                },
                "compost": {
                    "instructions": "Brown bin or designated compost drop-off",
                    "facilities": ["DSNY compost sites", "Farmer's markets"],
                    "preparation": "Use compostable bags, no plastic contamination"
                },
                "landfill": {
                    "instructions": "Black bin for non-recyclable waste",
                    "facilities": ["Curbside pickup"],
                    "preparation": "Bag securely to prevent litter"
                }
            },
            "SF": {
                "recyclable": {
                    "instructions": "Blue bin for mixed recyclables",
                    "facilities": ["Curbside pickup", "Recology centers"],
                    "preparation": "No plastic bags, rinse containers"
                },
                "compost": {
                    "instructions": "Green bin for compostables",
                    "facilities": ["Curbside pickup", "Community compost"],
                    "preparation": "No plastic contamination"
                }
            },
            "default": {
                "recyclable": {
                    "instructions": "Check local recycling guidelines",
                    "facilities": ["Local recycling center"],
                    "preparation": "Rinse and sort by material"
                },
                "compost": {
                    "instructions": "Compost bin or municipal collection",
                    "facilities": ["Local compost facility"],
                    "preparation": "No plastic contamination"
                },
                "landfill": {
                    "instructions": "Regular trash bin",
                    "facilities": ["Curbside pickup", "Landfill site"],
                    "preparation": "Bag securely"
                }
            }
        }
    
    async def get_disposal_instructions(self, category: str, location: Optional[str] = None) -> Dict[str, Any]:
        location_key = location if location and location in self.location_rules else "default"
        
        # Get category rules with fallback
        category_rules = self.location_rules[location_key].get(
            category, 
            self.location_rules["default"].get(category, {})
        )
        
        return {
            "instructions": category_rules.get("instructions", "Dispose according to local regulations"),
            "facilities": category_rules.get("facilities", []),
            "preparation": category_rules.get("preparation", "Follow general guidelines"),
            "location": location_key
        }

Writing EcoSight/tools/location_finder.py


# üìÑ PDF Generator Tool
**Professional report generation with environmental impact analytics**

This cell creates the pdf_generator.py module that produces branded, professional PDF reports from waste analysis results. It includes comprehensive sections with styled tables, environmental impact metrics, and personalized recommendations.

**Report Features:**

* üé® Professional branding ‚Äì EcoSight color scheme and typography
* üìä Executive summary ‚Äì Key metrics and performance ratings
* üìà Waste breakdown ‚Äì Visual composition analysis with color-coded categories
* üåç Environmental impact ‚Äì CO‚ÇÇ, water, and energy savings with real-world equivalents
* üí° AI recommendations ‚Äì Personalized tips and general best practices
* üßπ Data cleaning ‚Äì Automatic text formatting and overflow prevention
* üìã Detailed analysis ‚Äì Item-by-item disposal guidance with confidence scores

In [10]:
%%writefile EcoSight/tools/pdf_generator.py

from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.graphics.shapes import Drawing, Rect
from reportlab.graphics.charts.barcharts import VerticalBarChart
import io
from datetime import datetime
from typing import Dict, Any
import re

class PDFGenerator:
    """Enhanced PDF generator with professional styling and branding"""
    
    def __init__(self):
        self.styles = getSampleStyleSheet()
        self._setup_custom_styles()
    
    def _setup_custom_styles(self):
        """Setup enhanced custom styles for professional reports"""
        # EcoSight color scheme
        self.primary_color = colors.HexColor('#2E7D32')  # Eco green
        self.secondary_color = colors.HexColor('#4CAF50')  # Light green
        self.accent_color = colors.HexColor('#FF9800')    # Orange
        self.dark_color = colors.HexColor('#1B5E20')      # Dark green
        self.light_color = colors.HexColor('#E8F5E8')     # Light background
        
        # Enhanced title style
        self.styles.add(ParagraphStyle(
            name='EcoTitle',
            parent=self.styles['Title'],
            fontSize=24,
            spaceAfter=20,
            textColor=self.primary_color,
            alignment=1,  # Center
            fontName='Helvetica-Bold'
        ))
        
        # Section header style
        self.styles.add(ParagraphStyle(
            name='EcoHeading1',
            parent=self.styles['Heading1'],
            fontSize=16,
            spaceAfter=12,
            textColor=self.dark_color,
            fontName='Helvetica-Bold',
            leftIndent=10
        ))
        
        # Subsection style
        self.styles.add(ParagraphStyle(
            name='EcoHeading2',
            parent=self.styles['Heading2'],
            fontSize=14,
            spaceAfter=8,
            textColor=self.primary_color,
            fontName='Helvetica-Bold'
        ))
        
        # Body text with better spacing
        self.styles.add(ParagraphStyle(
            name='EcoBody',
            parent=self.styles['BodyText'],
            fontSize=10,
            spaceAfter=6,
            textColor=colors.black,
            fontName='Helvetica'
        ))
        
        # Highlight style for important numbers
        self.styles.add(ParagraphStyle(
            name='EcoHighlight',
            parent=self.styles['BodyText'],
            fontSize=11,
            textColor=self.primary_color,
            fontName='Helvetica-Bold',
            backColor=self.light_color,
            borderPadding=5,
            spaceAfter=8
        ))
        
        # Footer style
        self.styles.add(ParagraphStyle(
            name='EcoFooter',
            parent=self.styles['Italic'],
            fontSize=8,
            textColor=colors.gray,
            alignment=1  # Center
        ))
    
    def _clean_disposal_text(self, text):
        """Clean up corrupted or malformed disposal guidance text"""
        if not isinstance(text, str):
            return "Check local recycling guidelines"
        
        # Remove common corrupted patterns like @c@ai/glass/plastic or 0.6/8/8/gas/plastic
        text = re.sub(r'[@0-9\.]+/[a-zA-Z/]+', '', text)
        # Remove other weird patterns
        text = re.sub(r'[\\/][\\/]+', '/', text)
        # Remove trailing special characters
        text = re.sub(r'[^\w\s\-\.\(\)]+$', '', text)
        
        # Clean up extra spaces
        text = ' '.join(text.split())
        
        # If text is still problematic, provide generic guidance
        if len(text) < 5 or text.lower() == 'n/a':
            return "Check local recycling guidelines"
        
        return text
    
    def _wrap_text(self, text, max_length=20):
        """Wrap text to prevent overflow"""
        if not isinstance(text, str):
            return str(text)
        
        if len(text) <= max_length:
            return text
        
        # Try to break at natural points
        words = text.split()
        lines = []
        current_line = ""
        
        for word in words:
            if len(current_line) + len(word) + 1 <= max_length:
                current_line += " " + word if current_line else word
            else:
                if current_line:
                    lines.append(current_line)
                current_line = word
        
        if current_line:
            lines.append(current_line)
        
        return "\n".join(lines)
    
    def _create_header(self, story):
        """Create professional header with branding"""
        # Header with logo placeholder and title
        header_table_data = [
            ['EcoSight', 'AI-Powered Waste Analysis']
        ]
        
        header_table = Table(header_table_data, colWidths=[3*inch, 3*inch])
        header_table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (0, 0), self.primary_color),
            ('BACKGROUND', (1, 0), (1, 0), self.secondary_color),
            ('TEXTCOLOR', (0, 0), (-1, -1), colors.white),
            ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
            ('FONTNAME', (0, 0), (-1, -1), 'Helvetica-Bold'),
            ('FONTSIZE', (0, 0), (0, 0), 12),
            ('FONTSIZE', (1, 0), (1, 0), 12),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
            ('BOX', (0, 0), (-1, -1), 1, colors.white),
            ('LINEBELOW', (0, 0), (-1, -1), 1, colors.white),
        ]))
        
        story.append(header_table)
        story.append(Spacer(1, 0.1*inch))
    
    def _create_cover_page(self, story, session_id):
        """Create a professional cover page"""
        # Title
        story.append(Paragraph("WASTE ANALYSIS REPORT", self.styles['EcoTitle']))
        story.append(Spacer(1, 0.3*inch))
        
        # Session info in a styled box
        info_data = [
            ['Session ID:', session_id],
            ['Generated:', datetime.now().strftime('%Y-%m-%d %H:%M')],
            ['Report Type:', 'Comprehensive Waste Analysis'],
            ['AI Model:', 'Gemini Vision + Multi-Agent System']
        ]
        
        info_table = Table(info_data, colWidths=[1.5*inch, 4*inch])
        info_table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (0, -1), self.light_color),
            ('BACKGROUND', (1, 0), (1, -1), colors.white),
            ('TEXTCOLOR', (0, 0), (-1, -1), colors.black),
            ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
            ('FONTNAME', (1, 0), (1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 0), (-1, -1), 10),
            ('GRID', (0, 0), (-1, -1), 1, self.primary_color),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
            ('PADDING', (0, 0), (-1, -1), 6),
        ]))
        
        story.append(info_table)
        story.append(Spacer(1, 0.5*inch))
        
        # Mission statement
        mission_text = """
        <b>Our Mission:</b> To leverage artificial intelligence for smarter waste management, 
        promoting recycling efficiency and environmental sustainability through advanced 
        multi-agent systems and computer vision technology.
        """
        story.append(Paragraph(mission_text, self.styles['EcoBody']))
        
        # Page break for next section
        story.append(Spacer(1, 0.5*inch))
        story.append(Paragraph("--- Report Contents ---", self.styles['EcoFooter']))
    
    def _create_executive_summary(self, story, report_data):
        """Create enhanced executive summary with visual elements"""
        story.append(Paragraph("Executive Summary", self.styles['EcoHeading1']))
        story.append(Spacer(1, 0.1*inch))
        
        summary = report_data.get('summary', {})
        environmental = report_data.get('environmental_impact', {})
        
        # Key metrics in a table
        metrics_data = [
            ['METRIC', 'VALUE', 'IMPACT'],
            ['Total Items Analyzed', str(summary.get('total_items', 0)), 'Analysis Scope'],
            ['Recycling Rate', f"{summary.get('recyclable_percent', 0):.1f}%", 'Efficiency Score'],
            ['CO‚ÇÇ Reduction', f"{environmental.get('co2_saved_kg', 0):.1f} kg", 'Environmental Impact'],
            ['Water Saved', f"{environmental.get('water_saved_liters', 0):.0f} L", 'Resource Conservation'],
            ['Energy Saved', f"{environmental.get('energy_saved_kwh', 0):.1f} kWh", 'Energy Efficiency']
        ]
        
        metrics_table = Table(metrics_data, colWidths=[1.8*inch, 1.5*inch, 2.2*inch])
        metrics_table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (-1, 0), self.primary_color),
            ('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
            ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 0), (-1, -1), 9),
            ('GRID', (0, 0), (-1, -1), 1, colors.lightgrey),
            ('BACKGROUND', (0, 1), (-1, -1), colors.white),
            ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, self.light_color]),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
            ('PADDING', (0, 0), (-1, -1), 6),
        ]))
        
        story.append(metrics_table)
        story.append(Spacer(1, 0.2*inch))
        
        # Performance indicator
        recyclable_rate = summary.get('recyclable_percent', 0)
        if recyclable_rate >= 70:
            performance = "Excellent"
            color = self.primary_color
        elif recyclable_rate >= 50:
            performance = "Good"
            color = self.accent_color
        else:
            performance = "Needs Improvement"
            color = colors.red
        
        performance_text = f"""
        <b>Performance Rating:</b> <font color="{color.toHex()}">{performance}</font><br/>
        <b>Overall Assessment:</b> Your waste stream shows a recycling efficiency of {recyclable_rate:.1f}%. 
        This analysis provides actionable insights to optimize your waste management practices.
        """
        story.append(Paragraph(performance_text, self.styles['EcoBody']))
        story.append(Spacer(1, 0.2*inch))
    
    def _create_waste_breakdown(self, story, report_data):
        """Create visual waste breakdown section"""
        story.append(Paragraph("Waste Composition Analysis", self.styles['EcoHeading1']))
        story.append(Spacer(1, 0.1*inch))
        
        summary = report_data.get('summary', {})
        
        # Waste distribution table
        distribution_data = [
            ['CATEGORY', 'PERCENTAGE', 'ITEMS'],
            ['Recyclable', f"{summary.get('recyclable_percent', 0):.1f}%", 'Plastic, Metal, Glass, Paper'],
            ['Compostable', f"{summary.get('compost_percent', 0):.1f}%", 'Food Waste, Organic Materials'],
            ['Landfill', f"{summary.get('landfill_percent', 0):.1f}%", 'Non-Recyclable Items'],
            ['Hazardous', f"{summary.get('hazardous_percent', 0):.1f}%", 'Special Handling Required']
        ]
        
        dist_table = Table(distribution_data, colWidths=[1.5*inch, 1.2*inch, 3*inch])
        dist_table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (-1, 0), self.dark_color),
            ('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
            ('ALIGN', (0, 0), (-1, -1), 'LEFT'),
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 0), (-1, -1), 9),
            ('GRID', (0, 0), (-1, -1), 1, colors.lightgrey),
            ('BACKGROUND', (0, 1), (0, 1), self.secondary_color),
            ('BACKGROUND', (0, 2), (0, 2), colors.orange),
            ('BACKGROUND', (0, 3), (0, 3), colors.lightgrey),
            ('BACKGROUND', (0, 4), (0, 4), colors.red),
            ('TEXTCOLOR', (0, 1), (0, 4), colors.white),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
            ('PADDING', (0, 0), (-1, -1), 6),
        ]))
        
        story.append(dist_table)
        story.append(Spacer(1, 0.2*inch))
    
    def _create_detailed_analysis(self, story, report_data):
        """Create enhanced detailed analysis section with text wrapping"""
        story.append(Paragraph("Detailed Item Analysis", self.styles['EcoHeading1']))
        story.append(Spacer(1, 0.1*inch))
        
        breakdown = report_data.get('detailed_breakdown', [])
        
        if breakdown:
            # Create professional table for item analysis
            table_data = [['ITEM', 'CATEGORY', 'DISPOSAL GUIDANCE', 'CONFIDENCE']]
            
            for item in breakdown:
                # Clean disposal guidance text
                disposal_guidance = item.get('disposal_instructions', {}).get('instructions', 'N/A')
                disposal_guidance = self._clean_disposal_text(disposal_guidance)
                
                # Wrap text for better display
                item_name = self._wrap_text(item.get('item', 'Unknown'), max_length=15)
                category_name = self._wrap_text(item.get('category', 'Unknown').title(), max_length=10)
                disposal_wrapped = self._wrap_text(disposal_guidance, max_length=50)
                
                table_data.append([
                    item_name,
                    category_name,
                    disposal_wrapped,
                    f"{item.get('confidence', 0)*100:.1f}%"
                ])
            
            # Use proper column widths with text wrapping
            analysis_table = Table(table_data, colWidths=[1.0*inch, 0.8*inch, 3.0*inch, 0.7*inch])
            analysis_table.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (-1, 0), self.primary_color),
                ('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
                ('ALIGN', (0, 0), (-1, -1), 'LEFT'),
                ('ALIGN', (3, 1), (3, -1), 'CENTER'),
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
                ('FONTSIZE', (0, 0), (-1, -1), 7),  # Smaller font for better fit
                ('GRID', (0, 0), (-1, -1), 1, colors.lightgrey),
                ('BACKGROUND', (0, 1), (-1, -1), colors.white),
                ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, self.light_color]),
                ('VALIGN', (0, 0), (-1, -1), 'TOP'),  # Top alignment for wrapped text
                ('PADDING', (0, 0), (-1, -1), 3),  # Smaller padding
                ('WORDWRAP', (0, 0), (-1, -1), True),  # Enable word wrapping
            ]))
            
            story.append(analysis_table)
        else:
            story.append(Paragraph("No detailed item data available for this analysis.", self.styles['EcoBody']))
        
        story.append(Spacer(1, 0.2*inch))
    
    def _create_environmental_impact(self, story, report_data):
        """Create enhanced environmental impact section"""
        story.append(Paragraph("Environmental Impact Assessment", self.styles['EcoHeading1']))
        story.append(Spacer(1, 0.1*inch))
        
        impact = report_data.get('environmental_impact', {})
        
        impact_data = [
            ['ENVIRONMENTAL METRIC', 'AMOUNT SAVED', 'EQUIVALENT TO'],
            ['CO‚ÇÇ Emissions', f"{impact.get('co2_saved_kg', 0):.1f} kg", 'Driving 5 miles in a car'],
            ['Water Usage', f"{impact.get('water_saved_liters', 0):.0f} liters", '100 showers'],
            ['Energy Consumption', f"{impact.get('energy_saved_kwh', 0):.1f} kWh", 'Powering a home for 1 day']
        ]
        
        impact_table = Table(impact_data, colWidths=[2*inch, 1.5*inch, 2*inch])
        impact_table.setStyle(TableStyle([
            ('BACKGROUND', (0, 0), (-1, 0), self.accent_color),
            ('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
            ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
            ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
            ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
            ('FONTSIZE', (0, 0), (-1, -1), 9),
            ('GRID', (0, 0), (-1, -1), 1, colors.lightgrey),
            ('BACKGROUND', (0, 1), (-1, -1), colors.white),
            ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
            ('PADDING', (0, 0), (-1, -1), 6),
        ]))
        
        story.append(impact_table)
        story.append(Spacer(1, 0.2*inch))
        
        # Impact summary
        total_impact = sum([impact.get('co2_saved_kg', 0), impact.get('water_saved_liters', 0)/100, impact.get('energy_saved_kwh', 0)])
        impact_text = f"""
        <b>Overall Environmental Impact Score:</b> {total_impact:.1f} points<br/>
        Your recycling efforts are making a measurable difference in reducing environmental footprint 
        and promoting sustainable waste management practices.
        """
        story.append(Paragraph(impact_text, self.styles['EcoHighlight']))
        story.append(Spacer(1, 0.2*inch))
    
    def _create_recommendations(self, story, report_data):
        """Create enhanced recommendations section"""
        story.append(Paragraph("AI-Powered Recommendations", self.styles['EcoHeading1']))
        story.append(Spacer(1, 0.1*inch))
        
        tips = report_data.get('personalized_tips', [])
        
        if tips:
            for i, tip in enumerate(tips, 1):
                tip_text = f"<b>Recommendation {i}:</b> {tip}"
                story.append(Paragraph(tip_text, self.styles['EcoBody']))
                story.append(Spacer(1, 0.1*inch))
        else:
            story.append(Paragraph("No specific recommendations available for this analysis.", self.styles['EcoBody']))
        
        story.append(Spacer(1, 0.2*inch))
        
        # General best practices
        best_practices = [
            "‚úÖ Always rinse recyclable containers before disposal",
            "‚úÖ Separate materials according to local guidelines", 
            "‚úÖ Reduce single-use plastics when possible",
            "‚úÖ Compost food waste to minimize landfill impact",
            "‚úÖ Stay informed about local recycling program updates"
        ]
        
        story.append(Paragraph("<b>General Best Practices:</b>", self.styles['EcoHeading2']))
        for practice in best_practices:
            story.append(Paragraph(practice, self.styles['EcoBody']))
            story.append(Spacer(1, 0.05*inch))
    
    def _create_footer(self, story):
        """Create professional footer"""
        story.append(Spacer(1, 0.3*inch))
        
        footer_text = """
        <b>EcoSight AI System</b><br/>
        Multi-Agent Waste Detection & Recycling Optimization Platform<br/>
        Generated with Advanced Computer Vision and AI Analysis<br/>
        <i>Driving sustainability through intelligent waste management</i>
        """
        story.append(Paragraph(footer_text, self.styles['EcoFooter']))
    
    async def generate_EcoSight_report(self, report_data: Dict[str, Any], session_id: str) -> bytes:
        """Generate enhanced professional PDF report"""
        
        buffer = io.BytesIO()
        doc = SimpleDocTemplate(buffer, pagesize=A4, topMargin=0.5*inch, bottomMargin=0.5*inch)
        story = []
        
        # Build enhanced PDF content
        self._create_header(story)
        self._create_cover_page(story, session_id)
        self._create_executive_summary(story, report_data)
        self._create_waste_breakdown(story, report_data)
        self._create_detailed_analysis(story, report_data) 
        self._create_environmental_impact(story, report_data)
        self._create_recommendations(story, report_data)
        self._create_footer(story)
        
        # Build PDF
        doc.build(story)
        pdf_bytes = buffer.getvalue()
        buffer.close()
        
        return pdf_bytes

Writing EcoSight/tools/pdf_generator.py


# üì§ PDF Downloader Utility
**Interactive PDF viewing, downloading, and management**

This cell creates the pdf_downloader.py module that provides multiple ways to access and interact with generated reports. It includes interactive dashboards, embedded previews, and file management utilities.

**Interactive Features:**

* üì• Download links ‚Äì Styled download buttons with file size information
* üîç Embedded previews ‚Äì In-notebook PDF viewing with navigation controls
* üöÄ Interactive dashboard ‚Äì Toggle preview, save copies, print, and share options
* üíæ File management ‚Äì Automatic saving to Kaggle output directory
* üìä File info ‚Äì Detailed metadata about generated PDF reports
* üé® Professional UI ‚Äì Branded interface with hover effects and responsive design

In [11]:
%%writefile EcoSight/utils/pdf_downloader.py
import base64
from IPython.display import HTML, display, IFrame
from typing import Optional
import tempfile

class PDFDownloader:
    """Enhanced utility for PDF handling with preview and download functionality"""
    
    @staticmethod
    def create_download_link(pdf_bytes: bytes, filename: str, link_text: str = "Download PDF Report") -> HTML:
        """Create a downloadable link for PDF in Kaggle notebook"""
        try:
            b64 = base64.b64encode(pdf_bytes).decode()
            download_html = f'''
            <div style="padding: 15px; border: 2px solid #4CAF50; border-radius: 8px; background-color: #f9f9f9; margin: 10px 0;">
                <div style="display: flex; align-items: center; justify-content: space-between;">
                    <div>
                        <strong style="color: #2E7D32;">üìÑ {filename}</strong><br>
                        <span style="color: #666; font-size: 12px;">Size: {len(pdf_bytes)/1024:.1f} KB</span>
                    </div>
                    <a href="data:application/pdf;base64,{b64}" download="{filename}" 
                       style="background-color: #4CAF50; color: white; padding: 10px 20px; 
                              text-decoration: none; border-radius: 5px; font-weight: bold;
                              transition: background-color 0.3s;">
                       {link_text}
                    </a>
                </div>
            </div>
            '''
            return HTML(download_html)
        except Exception as e:
            error_html = f'''
            <div style="color: red; padding: 10px; border: 1px solid red; border-radius: 5px;">
                ‚ùå Error creating download link: {str(e)}
            </div>
            '''
            return HTML(error_html)
    
    @staticmethod
    def create_pdf_preview(pdf_bytes: bytes, filename: str, width: str = "100%", height: str = "600px") -> HTML:
        """Create an embedded PDF preview with download options"""
        try:
            b64 = base64.b64encode(pdf_bytes).decode()
            
            preview_html = f'''
            <div style="border: 2px solid #4CAF50; border-radius: 10px; padding: 15px; margin: 15px 0; background: white;">
                <div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 15px; padding-bottom: 10px; border-bottom: 1px solid #eee;">
                    <h3 style="color: #2E7D32; margin: 0;">üîç PDF Preview: {filename}</h3>
                    <div style="display: flex; gap: 10px;">
                        <a href="data:application/pdf;base64,{b64}" download="{filename}" 
                           style="background-color: #4CAF50; color: white; padding: 8px 16px; 
                                  text-decoration: none; border-radius: 4px; font-size: 14px;">
                           üì• Download
                        </a>
                    </div>
                </div>
                
                <div style="background: #f5f5f5; padding: 10px; border-radius: 5px; margin-bottom: 10px;">
                    <strong>File Info:</strong> {len(pdf_bytes)/1024:.1f} KB ‚Ä¢ Generated by EcoSight AI
                </div>
                
                <iframe src="data:application/pdf;base64,{b64}" 
                        width="{width}" 
                        height="{height}" 
                        style="border: 1px solid #ddd; border-radius: 5px;">
                </iframe>
                
                <div style="margin-top: 10px; text-align: center; color: #666; font-size: 12px;">
                    üí° Scroll to navigate ‚Ä¢ Use download button above to save
                </div>
            </div>
            '''
            return HTML(preview_html)
        except Exception as e:
            error_html = f'''
            <div style="color: red; padding: 15px; border: 1px solid red; border-radius: 5px; margin: 10px 0;">
                ‚ùå Error creating PDF preview: {str(e)}
            </div>
            '''
            return HTML(error_html)
    
    @staticmethod
    def create_interactive_pdf_dashboard(pdf_bytes: bytes, filename: str) -> HTML:
        """Create an interactive PDF dashboard with preview and multiple download options"""
        try:
            b64 = base64.b64encode(pdf_bytes).decode()
            file_size_kb = len(pdf_bytes) / 1024
            
            dashboard_html = f'''
            <div style="border: 2px solid #2E7D32; border-radius: 12px; padding: 20px; margin: 20px 0; background: linear-gradient(135deg, #f8fff8, #e8f5e8);">
                <!-- Header -->
                <div style="text-align: center; margin-bottom: 20px;">
                    <h2 style="color: #2E7D32; margin: 0 0 5px 0;">üåø EcoSight Report Ready</h2>
                    <p style="color: #666; margin: 0;">AI-Powered Waste Analysis Complete</p>
                </div>
                
                <!-- File Info Card -->
                <div style="background: white; padding: 15px; border-radius: 8px; margin-bottom: 20px; border-left: 4px solid #4CAF50;">
                    <div style="display: flex; justify-content: space-between; align-items: center;">
                        <div>
                            <strong style="color: #2E7D32;">üìä Report Details</strong>
                            <div style="color: #666; font-size: 14px; margin-top: 5px;">
                                üìÑ <strong>{filename}</strong><br>
                                üíæ Size: {file_size_kb:.1f} KB<br>
                                üè∑Ô∏è Type: Professional Waste Analysis
                            </div>
                        </div>
                        <div style="text-align: right;">
                            <div style="background: #4CAF50; color: white; padding: 5px 10px; border-radius: 15px; font-size: 12px;">
                                ‚úÖ READY
                            </div>
                        </div>
                    </div>
                </div>
                
                <!-- Action Buttons -->
                <div style="display: grid; grid-template-columns: 1fr 1fr; gap: 10px; margin-bottom: 20px;">
                    <a href="data:application/pdf;base64,{b64}" download="{filename}" 
                       style="background: linear-gradient(135deg, #4CAF50, #2E7D32); color: white; padding: 12px; 
                              text-decoration: none; border-radius: 6px; text-align: center; font-weight: bold;
                              transition: transform 0.2s; display: block;">
                       üì• Download PDF
                    </a>
                    <button onclick="togglePreview()" 
                            style="background: linear-gradient(135deg, #FF9800, #F57C00); color: white; padding: 12px; 
                                   border: none; border-radius: 6px; text-align: center; font-weight: bold;
                                   cursor: pointer; transition: transform 0.2s;">
                       üëÅÔ∏è Toggle Preview
                    </button>
                </div>
                
                <!-- Preview Section -->
                <div id="previewSection" style="display: none;">
                    <div style="background: white; padding: 15px; border-radius: 8px; border: 1px solid #ddd;">
                        <div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px;">
                            <strong style="color: #2E7D32;">üîç Live Preview</strong>
                            <span style="color: #666; font-size: 12px;">Scroll to navigate ‚Ä¢ Click to interact</span>
                        </div>
                        <iframe src="data:application/pdf;base64,{b64}#toolbar=1&navpanes=1" 
                                width="100%" 
                                height="500px" 
                                style="border: 1px solid #ddd; border-radius: 5px;">
                        </iframe>
                    </div>
                </div>
                
                <!-- Quick Actions -->
                <div style="background: #e8f5e8; padding: 15px; border-radius: 8px; margin-top: 15px;">
                    <strong style="color: #2E7D32;">üöÄ Quick Actions</strong>
                    <div style="display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 8px; margin-top: 10px;">
                        <a href="data:application/pdf;base64,{b64}" download="{filename.replace('.pdf', '_compact.pdf')}" 
                           style="background: white; color: #2E7D32; padding: 8px; text-decoration: none; 
                                  border-radius: 4px; text-align: center; font-size: 12px; border: 1px solid #4CAF50;">
                           üíæ Save Copy
                        </a>
                        <button onclick="window.print()" 
                                style="background: white; color: #2E7D32; padding: 8px; border: 1px solid #4CAF50; 
                                       border-radius: 4px; font-size: 12px; cursor: pointer;">
                           üñ®Ô∏è Print
                        </button>
                        <button onclick="shareReport()" 
                                style="background: white; color: #2E7D32; padding: 8px; border: 1px solid #4CAF50; 
                                       border-radius: 4px; font-size: 12px; cursor: pointer;">
                           üì§ Share
                        </button>
                    </div>
                </div>
            </div>
            
            <script>
            function togglePreview() {{
                var preview = document.getElementById('previewSection');
                if (preview.style.display === 'none') {{
                    preview.style.display = 'block';
                }} else {{
                    preview.style.display = 'none';
                }}
            }}
            
            function shareReport() {{
                alert('Share functionality: Download the PDF and share the file directly. For web apps, this would integrate with sharing APIs.');
            }}
            </script>
            
            <style>
            a:hover, button:hover {{
                transform: translateY(-2px);
                box-shadow: 0 4px 8px rgba(0,0,0,0.1);
            }}
            </style>
            '''
            return HTML(dashboard_html)
        except Exception as e:
            error_html = f'''
            <div style="color: red; padding: 20px; border: 1px solid red; border-radius: 8px; margin: 20px 0; text-align: center;">
                ‚ùå Error creating PDF dashboard: {str(e)}
            </div>
            '''
            return HTML(error_html)
    
    @staticmethod
    def display_pdf_info(pdf_bytes: bytes, filename: str):
        """Display enhanced PDF file information"""
        size_kb = len(pdf_bytes) / 1024
        print(f"üìä PDF Report Generated:")
        print(f"   üìÑ Filename: {filename}")
        print(f"   üì¶ File size: {size_kb:.1f} KB")
        print(f"   üè∑Ô∏è  Pages: 1 (Professional Report)")
        print(f"   üîß Features: Enhanced Styling, Branding, Analytics")
        print(f"   üíæ Download methods:")
        print(f"      ‚Ä¢ Click download link below")
        print(f"      ‚Ä¢ Use preview interface")
        print(f"      ‚Ä¢ Access from Kaggle output tab")
    
    @staticmethod
    def save_pdf_to_output(pdf_bytes: bytes, filename: str) -> str:
        """Save PDF to Kaggle working directory for manual download"""
        try:
            filepath = f"/kaggle/working/{filename}"
            with open(filepath, 'wb') as f:
                f.write(pdf_bytes)
            print(f"üíæ PDF saved to output directory: {filepath}")
            return filepath
        except Exception as e:
            raise Exception(f"Failed to save PDF: {str(e)}")
    
    @staticmethod
    def create_simple_preview(pdf_bytes: bytes, filename: str) -> HTML:
        """Create a simple PDF preview for quick viewing"""
        try:
            b64 = base64.b64encode(pdf_bytes).decode()
            
            simple_html = f'''
            <div style="border: 1px solid #ccc; border-radius: 5px; padding: 10px; margin: 10px 0;">
                <div style="display: flex; justify-content: space-between; align-items: center; margin-bottom: 10px;">
                    <strong>Preview: {filename}</strong>
                    <a href="data:application/pdf;base64,{b64}" download="{filename}" 
                       style="background: #4CAF50; color: white; padding: 5px 10px; text-decoration: none; border-radius: 3px; font-size: 12px;">
                       Download
                    </a>
                </div>
                <iframe src="data:application/pdf;base64,{b64}" 
                        width="100%" 
                        height="400px" 
                        style="border: 1px solid #ddd;">
                </iframe>
            </div>
            '''
            return HTML(simple_html)
        except Exception as e:
            return HTML(f'<div style="color: red;">Preview error: {str(e)}</div>')

Writing EcoSight/utils/pdf_downloader.py


# üíæ Memory Bank Module
**Persistent storage for user history and personalized insights**

This cell creates the memory_bank.py module that stores analysis history and user profiles for personalized recommendations. It tracks recycling patterns over time and enables trend-based waste management suggestions.

**Memory Features:**

* üìä User profiles ‚Äì Tracks analysis counts, item volumes, and category percentages
* üïí Time-based queries ‚Äì Retrieves history within specified date ranges
* üìà Trend analysis ‚Äì Calculates running averages for waste composition
* üßÆ Profile updates ‚Äì Incremental learning from each new analysis session
* üîç Summary extraction ‚Äì Distills complex results into key metrics for storage
* üå± Default profiles ‚Äì Automatic creation for new users with zero-initialized data

In [12]:
%%writefile EcoSight/memory/memory_bank.py
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta

class MemoryBank:
    """Long-term memory storage for user profiles and analysis history"""
    
    def __init__(self, storage_path: Optional[str] = None):
        self.storage_path = storage_path
        self.user_profiles = {}
        self.analysis_history = {}
        
    async def store_analysis(self, user_id: str, session_id: str, results: Dict[str, Any]):
        if user_id not in self.analysis_history:
            self.analysis_history[user_id] = []
        
        record = {
            "session_id": session_id,
            "timestamp": datetime.utcnow().isoformat(),
            "results": results,
            "summary": self._extract_summary(results)
        }
        
        self.analysis_history[user_id].append(record)
        await self._update_user_profile(user_id, record)
        
    async def get_user_history(self, user_id: str, days: int = 30) -> List[Dict[str, Any]]:
        if user_id not in self.analysis_history:
            return []
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        history = self.analysis_history[user_id]
        
        return [
            record for record in history
            if datetime.fromisoformat(record['timestamp'].replace('Z', '+00:00')) > cutoff_date
        ]
    
    async def get_user_profile(self, user_id: str) -> Dict[str, Any]:
        if user_id not in self.user_profiles:
            return self._create_default_profile(user_id)
        return self.user_profiles[user_id]
    
    async def _update_user_profile(self, user_id: str, new_record: Dict[str, Any]):
        profile = await self.get_user_profile(user_id)
        summary = new_record['summary']
        
        profile['total_analyses'] += 1
        profile['total_items_analyzed'] += summary['total_items']
        
        for category in ['recyclable', 'compost', 'landfill', 'hazardous']:
            current_avg = profile['category_percentages'].get(category, 0)
            new_value = summary.get(f'{category}_percent', 0)
            profile['category_percentages'][category] = (
                (current_avg * (profile['total_analyses'] - 1) + new_value) / 
                profile['total_analyses']
            )
        
        profile['last_analysis'] = new_record['timestamp']
        self.user_profiles[user_id] = profile
    
    def _extract_summary(self, results: Dict[str, Any]) -> Dict[str, Any]:
        classified_items = results.get('classified_items', [])
        stats = results.get('summary_stats', {})
        
        return {
            'total_items': len(classified_items),
            'recyclable_percent': stats.get('recyclable_percent', 0),
            'compost_percent': stats.get('compost_percent', 0),
            'landfill_percent': stats.get('landfill_percent', 0),
            'hazardous_percent': stats.get('hazardous_percent', 0)
        }
    
    def _create_default_profile(self, user_id: str) -> Dict[str, Any]:
        profile = {
            'user_id': user_id,
            'total_analyses': 0,
            'total_items_analyzed': 0,
            'category_percentages': {
                'recyclable': 0,
                'compost': 0,
                'landfill': 0,
                'hazardous': 0
            },
            'created_date': datetime.utcnow().isoformat(),
            'last_analysis': None,
        }
        
        self.user_profiles[user_id] = profile
        return profile

Writing EcoSight/memory/memory_bank.py


# üëÅÔ∏è Vision Analysis Agent
**AI-powered waste detection using Gemini Vision**

This cell creates the vision_agent.py module that serves as the first agent in the EcoSight pipeline. It processes waste images through either real Gemini Vision API or mock detection, producing structured JSON output with detected items and confidence scores.

**Agent Capabilities:**

* üñºÔ∏è Image processing ‚Äì Converts images to base64 for API compatibility
* üîÆ Dual mode ‚Äì Automatically uses Gemini when API key available, falls back to mock
* üìä Metrics tracking ‚Äì Logs images processed and items detected for observability
* üéØ Smart provider selection ‚Äì Chooses between real AI and simulated detection
* üîç Detailed logging ‚Äì Tracks provider type, model version, and analysis completion
* üß© Context-aware ‚Äì Uses session context for traceability and user tracking

In [13]:
%%writefile EcoSight/agents/vision_agent.py
import base64
import json
from typing import List, Dict, Any
from EcoSight.agents.agent_base import AgentContext, create_agent_logger, log_metric

class VisionAnalysisAgent:
    """Agent for analyzing waste items in images using vision AI"""
    
    def __init__(self, vision_provider: Any, use_gemini: bool = False):
        self.name = "vision_analysis"
        self.logger = create_agent_logger(self.name)
        self.vision_provider = vision_provider
        self.use_gemini = use_gemini
        
        if use_gemini:
            print("üîÆ Vision Agent configured to use Gemini (if API key available)")
    
    async def execute(self, context: AgentContext, image_data: bytes) -> Dict[str, Any]:
        self.logger.info(f"Analyzing image for session {context.session_id}")
        
        try:
            # Convert image to base64 for API consumption
            image_b64 = base64.b64encode(image_data).decode('utf-8')
            
            if self.use_gemini:
                print("üéØ Attempting Gemini Vision analysis...")
            
            # Call vision provider (Gemini or Mock)
            detections = await self.vision_provider.analyze_image(image_b64)
            
            log_metric(self.logger, "images_processed", 1)
            log_metric(self.logger, "items_detected", len(detections.get('items', [])))
            
            # Enhanced logging
            provider = detections.get('provider', 'mock')
            model_version = detections.get('model_version', 'unknown')
            print(f"‚úÖ Vision analysis complete - Provider: {provider}, Model: {model_version}")
            
            return {
                "session_id": context.session_id,
                "detections": detections,
                "agent": self.name,
                "provider": provider
            }
            
        except Exception as e:
            self.logger.error(f"Vision analysis failed: {str(e)}")
            raise

Writing EcoSight/agents/vision_agent.py


# üè∑Ô∏è Waste Classification Agent
**Material categorization with recycling labels and location rules**

This cell creates the classification_agent.py module that transforms detected waste items into categorized output with disposal instructions. It applies waste database rules and location-specific guidelines to generate recycling labels and statistical summaries.

**Classification Features:**

* üóÇÔ∏è Multi-source classification ‚Äì Combines WasteDB material rules with LocationFinder guidelines
* üìç Location-aware ‚Äì Custom disposal instructions based on user's geographic location
* üè∑Ô∏è Label generation ‚Äì Creates standardized recycling labels (RECYCLE_PLASTIC, COMPOST, etc.)
* üìä Statistics calculation ‚Äì Computes recyclable, compost, landfill, and hazardous percentages
* üîó Item enrichment ‚Äì Augments vision detections with classification and disposal data
* üìà Performance metrics ‚Äì Tracks classification rates and recyclable percentages


In [14]:
%%writefile EcoSight/agents/classification_agent.py
from typing import Dict, Any, List
from EcoSight.agents.agent_base import AgentContext, create_agent_logger, log_metric

class WasteClassificationAgent:
    """Agent for classifying waste items and determining disposal methods"""
    
    def __init__(self, waste_db_tool: Any, location_tool: Any):
        self.name = "waste_classification"
        self.logger = create_agent_logger(self.name)
        self.waste_db = waste_db_tool
        self.location_tool = location_tool
    
    async def execute(self, context: AgentContext, vision_results: Dict[str, Any]) -> Dict[str, Any]:
        self.logger.info(f"Classifying items for session {context.session_id}")
        
        classified_items = []
        
        # Process each detected item
        for item in vision_results['detections'].get('items', []):
            # Classify using waste database
            classification = await self.waste_db.classify_item(
                item['name'], 
                context.location
            )
            
            # Get location-specific disposal instructions
            disposal_info = await self.location_tool.get_disposal_instructions(
                classification['category'],
                context.location
            )
            
            classified_item = {
                **item,
                "classification": classification,
                "disposal_instructions": disposal_info,
                "recycling_label": self._generate_recycling_label(classification)
            }
            classified_items.append(classified_item)
        
        # Calculate summary statistics
        stats = self._calculate_classification_stats(classified_items)
        
        log_metric(self.logger, "items_classified", len(classified_items))
        log_metric(self.logger, "recyclable_percent", stats['recyclable_percent'])
        
        return {
            "session_id": context.session_id,
            "classified_items": classified_items,
            "summary_stats": stats,
            "agent": self.name
        }
    
    def _generate_recycling_label(self, classification: Dict[str, Any]) -> str:
        category = classification['category']
        if category == 'recyclable':
            return f"RECYCLE_{classification.get('material', 'MIXED')}"
        elif category == 'compost':
            return "COMPOST"
        elif category == 'hazardous':
            return "HAZARDOUS"
        else:
            return "LANDFILL"
    
    def _calculate_classification_stats(self, items: List[Dict]) -> Dict[str, float]:
        categories = [item['classification']['category'] for item in items]
        total = len(categories)
        
        if total == 0:
            return {'recyclable_percent': 0, 'compost_percent': 0, 
                   'landfill_percent': 0, 'hazardous_percent': 0}
        
        return {
            'recyclable_percent': (categories.count('recyclable') / total) * 100,
            'compost_percent': (categories.count('compost') / total) * 100,
            'landfill_percent': (categories.count('landfill') / total) * 100,
            'hazardous_percent': (categories.count('hazardous') / total) * 100
        }

Writing EcoSight/agents/classification_agent.py


# üìä Reporting & Education Agent
**Professional report generation with environmental analytics**

This cell creates the reporting_agent.py module that serves as the final agent in the EcoSight pipeline. It generates comprehensive reports with environmental impact metrics, personalized recommendations, and professional PDF outputs while managing historical data.

**Reporting Features:**

* üìà Environmental impact ‚Äì Calculates CO‚ÇÇ, water, and energy savings from recycling
* üíæ Historical tracking ‚Äì Stores analysis results in MemoryBank for trend analysis
* üéØ Personalized tips ‚Äì Generates recommendations based on current and historical performance
* üìÑ PDF generation ‚Äì Creates professional reports with branded styling
* üìä Comprehensive analytics ‚Äì Combines classification results with environmental metrics
* üîó Memory integration ‚Äì Two-way communication with MemoryBank for personalized insights
* üí° Educational content ‚Äì Provides actionable waste management advice

In [15]:
%%writefile EcoSight/agents/reporting_agent.py
from typing import Dict, Any, List
from datetime import datetime
from EcoSight.agents.agent_base import AgentContext, create_agent_logger, log_metric
from EcoSight.tools.pdf_generator import PDFGenerator
from EcoSight.utils.pdf_downloader import PDFDownloader

class ReportingEducationAgent:
    """Agent for generating reports and educational content with PDF support"""
    
    def __init__(self, memory_bank: Any):
        self.name = "reporting_education"
        self.logger = create_agent_logger(self.name)
        self.memory_bank = memory_bank
        self.pdf_generator = PDFGenerator()
    
    async def execute(self, context: AgentContext, classification_results: Dict[str, Any]) -> Dict[str, Any]:
        self.logger.info(f"Generating report for session {context.session_id}")
        
        # Store results in memory for historical tracking
        await self.memory_bank.store_analysis(
            context.user_id,
            context.session_id,
            classification_results
        )
        
        # Get user history for personalized insights
        user_history = await self.memory_bank.get_user_history(context.user_id)
        
        # Generate standard report
        report = await self._generate_comprehensive_report(
            classification_results,
            user_history,
            context
        )
        
        # Generate PDF report
        pdf_bytes = await self.generate_pdf_report(report, context.session_id)
        
        # Save PDF to file system
        filename = f"EcoSight_report_{context.session_id}.pdf"
        saved_path = PDFDownloader.save_pdf_to_output(pdf_bytes, filename)
        print(f"üíæ PDF saved to: {saved_path}")
        
        # Add PDF data to report
        report['pdf_bytes'] = pdf_bytes
        report['pdf_filename'] = filename
        report['saved_path'] = saved_path
        
        log_metric(self.logger, "reports_generated", 1)
        log_metric(self.logger, "environmental_savings_kg", report['environmental_impact']['co2_saved_kg'])
        
        return report
    
    async def generate_pdf_report(self, report_data: Dict[str, Any], session_id: str) -> bytes:
        """Generate PDF version of the report"""
        self.logger.info(f"Generating PDF report for session {session_id}")
        
        try:
            pdf_bytes = await self.pdf_generator.generate_EcoSight_report(report_data, session_id)
            log_metric(self.logger, "pdf_reports_generated", 1)
            return pdf_bytes
        except Exception as e:
            self.logger.error(f"PDF generation failed: {str(e)}")
            raise
    
    async def _generate_comprehensive_report(self, classification_results: Dict, user_history: List[Dict], context: AgentContext) -> Dict[str, Any]:
        stats = classification_results['summary_stats']
        items = classification_results['classified_items']
        
        # Calculate environmental impact
        environmental_impact = self._calculate_environmental_impact(items)
        
        # Generate personalized tips based on current stats and history
        personalized_tips = self._generate_personalized_tips(stats, user_history)
        
        return {
            "session_id": context.session_id,
            "timestamp": datetime.utcnow().isoformat(),
            "summary": {
                "total_items": len(items),
                "recyclable_percent": stats['recyclable_percent'],
                "compost_percent": stats['compost_percent'],
                "landfill_percent": stats['landfill_percent']
            },
            "environmental_impact": environmental_impact,
            "personalized_tips": personalized_tips,
            "detailed_breakdown": [
                {
                    "item": item['name'],
                    "category": item['classification']['category'],
                    "disposal_instructions": item['disposal_instructions'],
                    "recycling_label": item.get('recycling_label')
                }
                for item in items
            ],
            "agent": self.name
        }
    
    def _calculate_environmental_impact(self, items: List[Dict]) -> Dict[str, float]:
        recyclable_count = sum(1 for item in items if item['classification']['category'] == 'recyclable')
        
        return {
            'co2_saved_kg': recyclable_count * 0.5,
            'water_saved_liters': recyclable_count * 10,
            'energy_saved_kwh': recyclable_count * 0.3
        }
    
    def _generate_personalized_tips(self, current_stats: Dict, user_history: List[Dict]) -> List[str]:
        tips = []
        
        if current_stats['recyclable_percent'] < 50:
            tips.append("Try to separate more recyclable materials like plastic, paper, and metal.")
        
        if current_stats['compost_percent'] < 20:
            tips.append("Consider composting food waste to reduce landfill usage.")
        
        # Add general educational tips
        tips.extend([
            "Rinse recyclable containers to reduce contamination.",
            "Check local guidelines for specific recycling rules.",
            "Reduce single-use plastics by choosing reusable alternatives."
        ])
        
        return tips

Writing EcoSight/agents/reporting_agent.py


# üé≠ EcoSight Orchestrator
**Central controller for multi-agent waste analysis workflows**

This cell creates the orchestrator.py module that coordinates the entire EcoSight pipeline. It manages both sequential single-image processing and parallel bulk analysis, handling session creation, error management, and agent coordination.

**Orchestration Features:**

* üîÑ Sequential workflow ‚Äì Single images flow through Vision ‚Üí Classification ‚Üí Reporting agents
* ‚ö° Parallel processing ‚Äì Multiple images analyzed concurrently with replicated agent instances
* üéØ Session management ‚Äì Unique session IDs with user/location context for traceability
* üß© Agent coordination ‚Äì Coordinates all three agents with proper data passing
* üìä Result aggregation ‚Äì Collects and organizes outputs from parallel processing
* üö® Error handling ‚Äì Graceful failure management with success/error reporting
* üÜî Context creation ‚Äì Generates AgentContext with metadata for each analysis session

In [16]:
%%writefile EcoSight/orchestration/orchestrator.py
import asyncio
from typing import Dict, Any, List
from datetime import datetime
import uuid

from EcoSight.agents.vision_agent import VisionAnalysisAgent
from EcoSight.agents.classification_agent import WasteClassificationAgent
from EcoSight.agents.reporting_agent import ReportingEducationAgent
from EcoSight.agents.agent_base import AgentContext

class EcoSightOrchestrator:
    """Orchestrates the multi-agent workflow for waste analysis"""
    
    def __init__(self, vision_provider, waste_db, location_finder, memory_bank, use_gemini_vision: bool = False):
        self.vision_agent = VisionAnalysisAgent(vision_provider, use_gemini=use_gemini_vision)
        self.classification_agent = WasteClassificationAgent(waste_db, location_finder)
        self.reporting_agent = ReportingEducationAgent(memory_bank)
        
    async def process_single_image(self, image_data: bytes, user_id: str = None, 
                                 location: str = None) -> Dict[str, Any]:
        """Sequential workflow: Vision ‚Üí Classification ‚Üí Reporting"""
        
        session_id = str(uuid.uuid4())
        context = AgentContext(
            session_id=session_id,
            user_id=user_id or "anonymous",
            location=location,
            metadata={"timestamp": datetime.utcnow().isoformat()}
        )
        
        try:
            print("üîç Starting Vision Analysis...")
            vision_results = await self.vision_agent.execute(context, image_data)
            print(f"   Detected {len(vision_results['detections'].get('items', []))} items")
            
            print("üè∑Ô∏è Starting Waste Classification...")
            classification_results = await self.classification_agent.execute(context, vision_results)
            print(f"   Classified {len(classification_results['classified_items'])} items")
            
            print("üìä Generating Report & PDF...")
            final_report = await self.reporting_agent.execute(context, classification_results)
            print("   PDF report generated successfully")
            
            return {
                "success": True,
                "session_id": session_id,
                "report": final_report,
            }
            
        except Exception as e:
            print(f"‚ùå Pipeline error: {str(e)}")
            return {
                "success": False,
                "session_id": session_id,
                "error": str(e),
            }
    
    async def process_multiple_images(self, images_data: List[bytes], user_id: str = None,
                                    location: str = None) -> Dict[str, Any]:
        """Parallel image processing workflow"""
        
        session_id = str(uuid.uuid4())
        tasks = []
        
        print(f"üîÑ Processing {len(images_data)} images in parallel...")
        
        for i, image_data in enumerate(images_data):
            task_context = AgentContext(
                session_id=f"{session_id}_{i}",
                user_id=user_id,
                location=location,
                metadata={"image_index": i}
            )
            
            task = asyncio.create_task(
                self._process_single_image_parallel(image_data, task_context)
            )
            tasks.append(task)
        
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks, return_exceptions=True)
        successful_results = [r for r in results if not isinstance(r, Exception)]
        
        return {
            "session_id": session_id,
            "total_images": len(images_data),
            "successful_processing": len(successful_results),
            "results": successful_results
        }
    
    async def _process_single_image_parallel(self, image_data: bytes, context: AgentContext) -> Dict[str, Any]:
        """Process single image in parallel workflow"""
        vision_results = await self.vision_agent.execute(context, image_data)
        classification_results = await self.classification_agent.execute(context, vision_results)
        return classification_results

Writing EcoSight/orchestration/orchestrator.py


# üì¶ Dependencies Installation
**Setting up required Python packages for the multi-agent system**

This cell installs essential libraries needed for EcoSight's functionality. The reportlab package enables professional PDF generation, while google-genai provides access to Gemini Vision API for AI-powered waste detection.

**Core Dependencies:**

* üìÑ reportlab ‚Äì Professional PDF report generation with styling and layouts
* ü§ñ google-genai ‚Äì Official Google Gemini API client for vision analysis
* üîÑ Built-in modules ‚Äì asyncio for async workflows, base64 for image encoding, typing for type hints

In [17]:
# Install required packages for EcoSight
print("üì¶ Installing EcoSight dependencies...")

!pip install reportlab
!pip install google-genai

print("‚úÖ Dependencies installed successfully!")
print("üìö Required packages: reportlab, asyncio, base64, typing")

üì¶ Installing EcoSight dependencies...
Collecting reportlab
  Downloading reportlab-4.4.5-py3-none-any.whl.metadata (1.7 kB)
Downloading reportlab-4.4.5-py3-none-any.whl (2.0 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.0/2.0 MB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: reportlab
Successfully installed reportlab-4.4.5
Collecting cachetools<6.0,>=2.0.0 (from google-auth<3.0.0,>=2.14.1->google-genai)
  Downloading cachetools-5.5.2-py3-none-any.whl.metadata (5.4 kB)
Downloading cachetools-5.5.2-py3-none-any.whl (10 kB)
Installing collected packages: cachetools
  Attempting uninstall: cachetools
    Found existing installation: cachetools 6.2.1
    Uninstalling cachetools-6.2.1:
      Successfully uninstalled cachetools-6.2.1
Successfully installed cachetools-5.5.2
‚úÖ Dependencies installed successfully!
üìö Required packages: repo

# üîÑ TrashNet Image Loader
**Robust utility for accessing and validating the waste image dataset**

This cell creates a flexible image loading system that handles multiple possible TrashNet dataset structures in Kaggle. It automatically searches for the dataset across various paths, validates categories, and prepares images for AI processing.

**Loader Features:**

* üîÑ Multiple path detection ‚Äì Searches 8 possible dataset locations automatically
* üìÅ Category validation ‚Äì Ensures loaded categories match TrashNet's 6 waste types
* üñºÔ∏è Image preprocessing ‚Äì Converts to RGB, resizes large images, and encodes as JPEG bytes
* üìä Dataset exploration ‚Äì Lists available categories with image counts
* üß© Flexible loading ‚Äì Can load specific categories or random samples
* üíæ Memory optimization ‚Äì Efficient byte encoding suitable for Vision Agent input
* üîç Error handling ‚Äì Graceful fallback with clear error messages for missing data

In [18]:
# Robust TrashNet Image Loader for Correct Dataset Structure
import os
from PIL import Image
import random

def load_trashnet_sample(category=None):
    """Load a random sample image from TrashNet dataset"""
    
    # Try multiple possible dataset locations (correct TrashNet structure)
    possible_paths = [
        "EcoSight/data/trashnet/data/dataset",  # Copied structure
        "EcoSight/data/trashnet/dataset",       # Alternative copy
        "EcoSight/data/trashnet",               # Direct copy
        "/kaggle/input/trashnet/data/dataset",     # Original location
        "/kaggle/input/trashnet/data",             # Alternative original
        "/kaggle/input/trashnet/dataset",          # Another alternative
        "/kaggle/input/trashnet",                  # Root location
        "/kaggle/input/garbage-classification/Garbage classification/Garbage classification",  # Fallback
    ]
    
    base_path = None
    for path in possible_paths:
        if os.path.exists(path):
            base_path = path
            print(f"üìÅ Using dataset from: {path}")
            break
    
    if not base_path:
        print("‚ùå TrashNet dataset not found in any location")
        print("üí° Using mock data for demonstration")
        return None
    
    # TrashNet categories (correct order from original dataset)
    categories = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
    
    # If no category specified, pick random one
    if category is None:
        category = random.choice(categories)
    elif category not in categories:
        print(f"‚ö†Ô∏è Category '{category}' not found. Available: {categories}")
        category = random.choice(categories)
    
    # Try to find the category directory in various possible structures
    category_paths_to_try = [
        os.path.join(base_path, category),
        os.path.join(base_path, 'data', 'dataset', category),
        os.path.join(base_path, 'dataset', category),
    ]
    
    category_path = None
    for test_path in category_paths_to_try:
        if os.path.exists(test_path):
            category_path = test_path
            break
    
    if not category_path:
        print(f"‚ùå Could not find category '{category}' in dataset")
        print(f"üí° Available directories in {base_path}:")
        try:
            items = os.listdir(base_path)
            dirs = [item for item in items if os.path.isdir(os.path.join(base_path, item))]
            print(f"   {dirs}")
        except:
            print("   Could not list directory contents")
        return None
    
    # Get all image files in category
    try:
        image_files = [f for f in os.listdir(category_path) 
                      if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
    except FileNotFoundError:
        print(f"‚ùå Category directory not found: {category_path}")
        return None
    
    if not image_files:
        print(f"‚ùå No images found in {category} category at {category_path}")
        return None
    
    # Select random image
    selected_image = random.choice(image_files)
    image_path = os.path.join(category_path, selected_image)
    
    try:
        # Load and convert to bytes (matching VisionAgent input format)
        with Image.open(image_path) as img:
            # Convert to RGB if needed
            if img.mode != 'RGB':
                img = img.convert('RGB')
            
            # Resize if too large (for efficiency)
            if img.size[0] > 800 or img.size[1] > 800:
                img.thumbnail((800, 800))
            
            # Convert to bytes
            import io
            img_bytes = io.BytesIO()
            img.save(img_bytes, format='JPEG', quality=85)
            image_data = img_bytes.getvalue()
        
        print(f"üì∏ Loaded TrashNet image: {category}/{selected_image}")
        print(f"üìè Image size: {img.size}, Format: JPEG")
        
        return image_data, category, selected_image
        
    except Exception as e:
        print(f"‚ùå Error loading image {image_path}: {e}")
        return None

def list_trashnet_categories():
    """List all available categories in TrashNet dataset"""
    possible_paths = [
        "EcoSight/data/trashnet/data/dataset",
        "EcoSight/data/trashnet/dataset", 
        "EcoSight/data/trashnet",
        "/kaggle/input/trashnet/data/dataset",
        "/kaggle/input/trashnet/data",
        "/kaggle/input/trashnet/dataset",
        "/kaggle/input/trashnet",
    ]
    
    categories = []
    base_path = None
    
    for path in possible_paths:
        if os.path.exists(path):
            base_path = path
            break
    
    if base_path:
        print(f"üìÅ Dataset base path: {base_path}")
        try:
            # Try to list directories at base path
            items = os.listdir(base_path)
            potential_categories = [item for item in items 
                                  if os.path.isdir(os.path.join(base_path, item))]
            
            # Filter for actual TrashNet categories
            trashnet_categories = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']
            categories = [cat for cat in potential_categories if cat in trashnet_categories]
            
            if not categories:
                # Maybe we're at a higher level, check subdirectories
                for item in items:
                    sub_path = os.path.join(base_path, item)
                    if os.path.isdir(sub_path):
                        sub_items = os.listdir(sub_path)
                        for sub_item in sub_items:
                            if sub_item in trashnet_categories and sub_item not in categories:
                                categories.append(sub_item)
            
            print("üìã Available TrashNet categories:")
            for cat in sorted(categories):
                # Find the actual path to count images
                for test_base in possible_paths:
                    test_paths = [
                        os.path.join(test_base, cat),
                        os.path.join(test_base, 'data', 'dataset', cat),
                        os.path.join(test_base, 'dataset', cat),
                    ]
                    for test_path in test_paths:
                        if os.path.exists(test_path):
                            try:
                                image_count = len([f for f in os.listdir(test_path) 
                                                 if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
                                print(f"   ‚Ä¢ {cat}: {image_count} images")
                                break
                            except:
                                print(f"   ‚Ä¢ {cat}: found but cannot access")
                            break
                    else:
                        continue
                    break
                else:
                    print(f"   ‚Ä¢ {cat}: directory not found")
                    
        except Exception as e:
            print(f"‚ùå Error reading dataset structure: {e}")
    else:
        print("‚ùå TrashNet dataset not found")
        print("üí° Please add: 'trashnet' by yangyang111 via '+ Add Data'")
        categories = ['cardboard', 'glass', 'metal', 'paper', 'plastic', 'trash']  # Default list
    
    return categories

print("üîÑ TrashNet utilities loaded with correct dataset structure!")

üîÑ TrashNet utilities loaded with correct dataset structure!



# üöÄ EcoSight System Initialization
**Bootstrapping the complete multi-agent architecture**

This cell initializes all components of the EcoSight system, creating the integrated multi-agent pipeline. It automatically detects API key availability, configures the appropriate vision provider, and establishes connections between all modules.

**System Components:**

* ü§ñ Agent initialization ‚Äì Vision, Classification, and Reporting agents instantiated
* üîÆ Smart provider selection ‚Äì Auto-detects Gemini API key, falls back to mock mode
* üß© Component integration ‚Äì All tools and agents connected via orchestrator
* üíæ Memory system ‚Äì MemoryBank ready for user history tracking
* üìÑ PDF pipeline ‚Äì Generator and downloader configured for report output
* üìç Location-aware ‚Äì LocationFinder with city-specific disposal rules
* üóÉÔ∏è Waste classification ‚Äì WasteDB with material categorization rules
* üé≠ Dual-mode operation ‚Äì Real AI or mock detection based on API availability

In [19]:
# EcoSight System Initialization
print("üöÄ Initializing EcoSight Multi-Agent System...")

import sys

# Ensure EcoSight is in path
sys.path.append('/kaggle/working/EcoSight')

# Import all components
from EcoSight.tools.vision_provider import MockVisionProvider, GeminiVisionProvider
from EcoSight.tools.waste_db import WasteDB
from EcoSight.tools.location_finder import LocationFinder
from EcoSight.tools.pdf_generator import PDFGenerator
from EcoSight.memory.memory_bank import MemoryBank
from EcoSight.orchestration.orchestrator import EcoSightOrchestrator
from EcoSight.utils.pdf_downloader import PDFDownloader

# Initialize components with error handling
try:
    # Try Gemini first if API key is available
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if gemini_api_key and gemini_api_key != "MOCK_MODE_NO_VALID_KEY" and gemini_api_key.startswith("AIza"):
        print("üîÆ Using REAL Gemini Vision Provider")
        vision_provider = GeminiVisionProvider()
        use_gemini = True
    else:
        print("üé≠ Using Mock Vision Provider (no valid Gemini API key)")
        vision_provider = MockVisionProvider()
        use_gemini = False
except Exception as e:
    print(f"üé≠ Using Mock Vision Provider (Gemini initialization failed: {e})")
    vision_provider = MockVisionProvider()
    use_gemini = False

# Initialize other components
waste_db = WasteDB()
location_finder = LocationFinder()
memory_bank = MemoryBank()

# Create orchestrator
orchestrator = EcoSightOrchestrator(
    vision_provider=vision_provider,
    waste_db=waste_db,
    location_finder=location_finder,
    memory_bank=memory_bank,
    use_gemini_vision=use_gemini
)

print("‚úÖ EcoSight system initialized successfully!")
print("üìã Components ready:")
print("   - Vision Analysis Agent")
print("   - Waste Classification Agent") 
print("   - Reporting & Education Agent (with PDF)")
print("   - Memory Bank")
print("   - PDF Generator")
print(f"   - Vision Provider: {'Gemini' if use_gemini else 'Mock'}")
print("   - TrashNet dataset integrated")

# Display Gemini status
if use_gemini:
    print("üéâ Real AI detection: ENABLED")
else:
    print("üí° Real AI detection: DISABLED - Add Gemini API key to Kaggle Secrets")

üöÄ Initializing EcoSight Multi-Agent System...
üîÆ Using REAL Gemini Vision Provider
‚úÖ Model gemini-2.5-flash is working
‚úÖ Gemini Vision initialized with model: gemini-2.5-flash
üîÆ Vision Agent configured to use Gemini (if API key available)
‚úÖ EcoSight system initialized successfully!
üìã Components ready:
   - Vision Analysis Agent
   - Waste Classification Agent
   - Reporting & Education Agent (with PDF)
   - Memory Bank
   - PDF Generator
   - Vision Provider: Gemini
   - TrashNet dataset integrated
üéâ Real AI detection: ENABLED


# üß™ TrashNet Dataset Verification
**Comprehensive validation of waste image dataset structure and content**

This cell performs a thorough verification of the TrashNet dataset, checking folder structure, counting images by category and split, and providing detailed statistics. It ensures the dataset is properly loaded and accessible for the multi-agent system.

**Verification Checks:**

* üìÅ Folder structure ‚Äì Validates train/val/test splits and category organization
* üñºÔ∏è Image counts ‚Äì Counts images per category across all dataset splits
* üìä Detailed statistics ‚Äì Provides totals for training, validation, and test sets
* üîç Category validation ‚Äì Confirms all 6 TrashNet waste categories are present
* üß© Debug information ‚Äì Shows complete directory tree if issues are detected
* ‚úÖ Success confirmation ‚Äì Clear success/failure indicators with actionable error messages
* üìà Dataset summary ‚Äì Comprehensive breakdown of 2,527 total images with category distributions

In [20]:
# =============================================================================
# üß™ TEST CELL: Dataset Verification 
# =============================================================================

print("üß™ Verifying TrashNet Dataset Access...")
print("1. Checking available categories...")

import glob

def find_actual_categories():
    """Find the actual waste categories with images"""
    
    # The dataset path
    base_path = '/kaggle/input/trashnet/trashnet'
    
    if not os.path.exists(base_path):
        print(f"‚ùå Dataset path not found: {base_path}")
        return None, []
    
    print(f"üìÅ Dataset path: {base_path}")
    
    # List ALL items in the base path
    all_items = os.listdir(base_path)
    print(f"üì¶ Main folders: {all_items}")
    
    # Check each subfolder (train, val, test) for categories
    found_categories = {}
    
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(base_path, split)
        if os.path.exists(split_path):
            print(f"\nüîç Checking {split} folder:")
            
            # List categories in this split
            categories_in_split = os.listdir(split_path)
            print(f"   Categories in {split}: {categories_in_split}")
            
            for category in categories_in_split:
                category_path = os.path.join(split_path, category)
                if os.path.isdir(category_path):
                    # Count images in this category
                    image_files = glob.glob(os.path.join(category_path, "*.jpg")) + \
                                 glob.glob(os.path.join(category_path, "*.jpeg")) + \
                                 glob.glob(os.path.join(category_path, "*.png"))
                    
                    if image_files:
                        if category not in found_categories:
                            found_categories[category] = {'total': 0, 'train': 0, 'val': 0, 'test': 0}
                        found_categories[category][split] = len(image_files)
                        found_categories[category]['total'] += len(image_files)
                        print(f"   ‚úÖ {category}: {len(image_files)} images")
    
    return base_path, found_categories

# Find the actual categories
dataset_path, categories_dict = find_actual_categories()

if categories_dict:
    print(f"\nüéâ SUCCESS: Found TrashNet dataset with categories!")
    print("üìã Available TrashNet categories (with split counts):")
    
    for category in sorted(categories_dict.keys()):
        counts = categories_dict[category]
        print(f"   ‚úÖ {category}:")
        print(f"      üìä Total: {counts['total']} images")
        if counts['train'] > 0:
            print(f"      üèãÔ∏è‚Äç‚ôÇÔ∏è Train: {counts['train']} images")
        if counts['val'] > 0:
            print(f"      üìà Val: {counts['val']} images")
        if counts['test'] > 0:
            print(f"      üß™ Test: {counts['test']} images")
    
    # Calculate totals
    total_images = sum(cat_info['total'] for cat_info in categories_dict.values())
    total_train = sum(cat_info['train'] for cat_info in categories_dict.values())
    total_val = sum(cat_info['val'] for cat_info in categories_dict.values())
    total_test = sum(cat_info['test'] for cat_info in categories_dict.values())
    
    print(f"\nüìä DATASET SUMMARY:")
    print(f"   üéØ Categories: {len(categories_dict)}")
    print(f"   üñºÔ∏è  Total images: {total_images}")
    print(f"   üèãÔ∏è‚Äç‚ôÇÔ∏è Training images: {total_train}")
    print(f"   üìà Validation images: {total_val}")
    print(f"   üß™ Test images: {total_test}")
    
else:
    print("\n‚ùå Could not find any categories!")
    if dataset_path:
        print(f"üîç Debugging structure of: {dataset_path}")
        print(f"\nüìÇ Complete contents:")
        for item in os.listdir(dataset_path):
            item_path = os.path.join(dataset_path, item)
            if os.path.isdir(item_path):
                print(f"\n   üìÅ {item}/:")
                subitems = os.listdir(item_path)
                for subitem in subitems:
                    subitem_path = os.path.join(item_path, subitem)
                    if os.path.isdir(subitem_path):
                        image_files = glob.glob(os.path.join(subitem_path, "*.jpg")) + \
                                     glob.glob(os.path.join(subitem_path, "*.jpeg")) + \
                                     glob.glob(os.path.join(subitem_path, "*.png"))
                        print(f"      üìÅ {subitem}/: {len(image_files)} images")
                        if len(image_files) > 0:
                            for img in image_files[:2]:  # Show first 2 images
                                print(f"         üñºÔ∏è  {os.path.basename(img)}")

print("\n" + "="*60)
print("‚úÖ Dataset verification completed!")

üß™ Verifying TrashNet Dataset Access...
1. Checking available categories...
üìÅ Dataset path: /kaggle/input/trashnet/trashnet
üì¶ Main folders: ['val', 'test', 'train']

üîç Checking train folder:
   Categories in train: ['metal', 'glass', 'paper', 'trash', 'cardboard', 'plastic']
   ‚úÖ metal: 287 images
   ‚úÖ glass: 350 images
   ‚úÖ paper: 415 images
   ‚úÖ trash: 95 images
   ‚úÖ cardboard: 282 images
   ‚úÖ plastic: 337 images

üîç Checking val folder:
   Categories in val: ['metal', 'glass', 'paper', 'trash', 'cardboard', 'plastic']
   ‚úÖ metal: 82 images
   ‚úÖ glass: 100 images
   ‚úÖ paper: 118 images
   ‚úÖ trash: 27 images
   ‚úÖ cardboard: 80 images
   ‚úÖ plastic: 96 images

üîç Checking test folder:
   Categories in test: ['metal', 'glass', 'paper', 'trash', 'cardboard', 'plastic']
   ‚úÖ metal: 41 images
   ‚úÖ glass: 51 images
   ‚úÖ paper: 61 images
   ‚úÖ trash: 15 images
   ‚úÖ cardboard: 41 images
   ‚úÖ plastic: 49 images

üéâ SUCCESS: Found TrashNet data


# üß™ Single Image Pipeline Test
**End-to-end validation of the multi-agent waste sorting system**

This cell performs comprehensive testing of the complete EcoSight pipeline using real TrashNet images. It includes workarounds for common issues (PDF color conversion, API limits) and provides detailed diagnostics for each processing stage.

**Test Features:**

* üîß Color issue fixes ‚Äì Patches PDF generation problems with float-to-int conversions
* üîÑ Flexible testing ‚Äì Supports category-specific tests and PDF generation toggling
* üìä Detailed output ‚Äì Shows detection results, classification percentages, and item breakdowns
* üé≠ Mock mode safety ‚Äì Forces mock detection to avoid API quota issues during testing
* üìÑ PDF diagnostics ‚Äì Reports PDF file size and generation status
* üí° Personalized insights ‚Äì Displays AI-generated waste management recommendations
* üîç Debug information ‚Äì Provides troubleshooting guidance for common pipeline issues
* üñºÔ∏è Random sampling ‚Äì Tests with random waste images across all 6 TrashNet categories

In [21]:
# =============================================================================
# üß™ SINGLE TRASHNET IMAGE TEST 
# =============================================================================

print("üß™ Testing EcoSight with Real TrashNet Waste Images...")

import random
import glob
from PIL import Image
import io

def get_trashnet_categories():
    """Get available waste categories with their image counts"""
    base_path = '/kaggle/input/trashnet/trashnet'
    
    if not os.path.exists(base_path):
        print("‚ùå Could not find TrashNet dataset")
        return None, {}
    
    categories = set()
    
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(base_path, split)
        if os.path.exists(split_path):
            categories.update([d for d in os.listdir(split_path) if os.path.isdir(os.path.join(split_path, d))])
    
    category_counts = {}
    for category in categories:
        total_images = 0
        for split in ['train', 'val', 'test']:
            split_path = os.path.join(base_path, split, category)
            if os.path.exists(split_path):
                image_files = glob.glob(os.path.join(split_path, "*.jpg")) + \
                             glob.glob(os.path.join(split_path, "*.jpeg")) + \
                             glob.glob(os.path.join(split_path, "*.png"))
                total_images += len(image_files)
        category_counts[category] = total_images
    
    return base_path, category_counts

def get_random_image(category=None):
    """Get a random image from the dataset and convert to bytes"""
    base_path = '/kaggle/input/trashnet/trashnet'
    
    _, categories_dict = get_trashnet_categories()
    
    if not categories_dict:
        return None, None, None
    
    if category and category in categories_dict:
        selected_category = category
    else:
        selected_category = random.choice(list(categories_dict.keys()))
    
    for split in ['train', 'val', 'test']:
        category_path = os.path.join(base_path, split, selected_category)
        if os.path.exists(category_path):
            image_files = glob.glob(os.path.join(category_path, "*.jpg")) + \
                         glob.glob(os.path.join(category_path, "*.jpeg")) + \
                         glob.glob(os.path.join(category_path, "*.png"))
            if image_files:
                selected_image_path = random.choice(image_files)
                
                with Image.open(selected_image_path) as img:
                    if img.mode != 'RGB':
                        img = img.convert('RGB')
                    
                    img_bytes = io.BytesIO()
                    img.save(img_bytes, format='JPEG')
                    image_data = img_bytes.getvalue()
                
                return image_data, selected_category, selected_image_path
    
    return None, selected_category, None

# Get available categories
dataset_path, categories_dict = get_trashnet_categories()

if categories_dict:
    print("üéâ TrashNet Dataset Found!")
    print("üìã Available TrashNet categories:")
    for category in sorted(categories_dict.keys()):
        print(f"   ‚úÖ {category}: {categories_dict[category]} images")
    
    total_images = sum(categories_dict.values())
    print(f"üìä TOTAL: {total_images} images")
else:
    print("‚ùå No waste categories found!")

def fix_color_issue_in_pdf():
    """Fix the Color.toHex issue before any PDF generation"""
    try:
        # Fix the Color.toHex method issue
        import reportlab.lib.colors as colors
        
        if not hasattr(colors.Color, 'toHex'):
            # Add the missing toHex method
            def color_toHex(self):
                # Convert float RGB values (0-1) to integer (0-255)
                r = int(self.red * 255)
                g = int(self.green * 255) 
                b = int(self.blue * 255)
                return "#%02x%02x%02x" % (r, g, b)
            colors.Color.toHex = color_toHex
            print("‚úÖ Fixed Color.toHex method")
        return True
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not fix color issue: {e}")
        return False

async def test_trashnet_pipeline(category=None, use_mock=True):
    """Test the pipeline with a random image from TrashNet (no PDF)"""
    print(f"\nüöÄ Testing pipeline with TrashNet image...")
    
    image_data, selected_category, image_path = get_random_image(category)
    
    if not image_data:
        print("‚ùå Could not load any images!")
        return
    
    print(f"üé≤ Category: {selected_category}")
    print(f"üñºÔ∏è  Image: {os.path.basename(image_path)}")
    
    try:
        # FIX 1: Apply the color fix BEFORE any processing
        color_fixed = fix_color_issue_in_pdf()
        
        # FIX 2: Force mock mode if requested to avoid API issues
        original_use_gemini = getattr(orchestrator, 'use_gemini', True)
        
        if use_mock:
            orchestrator.use_gemini = False
            print("üîß Using mock mode to avoid API issues")
        
        # FIX 3: Completely disable PDF generation at the source
        # Temporarily replace the PDF generator with a dummy
        original_pdf_generator = None
        if hasattr(orchestrator, 'pdf_generator'):
            original_pdf_generator = orchestrator.pdf_generator
            # Create a dummy PDF generator that does nothing
            class DummyPDFGenerator:
                def generate_report_pdf(self, *args, **kwargs):
                    return {
                        'pdf_bytes': b'',
                        'pdf_filename': 'dummy.pdf',
                        'success': True
                    }
            
            orchestrator.pdf_generator = DummyPDFGenerator()
            print("üîß Replaced PDF generator with dummy")
        
        # FIX 4: Also disable any PDF-related methods in orchestrator
        original_methods = {}
        for method_name in ['_create_pdf_report', 'generate_pdf_report', '_generate_pdf']:
            if hasattr(orchestrator, method_name):
                original_methods[method_name] = getattr(orchestrator, method_name)
                setattr(orchestrator, method_name, lambda *args, **kwargs: {
                    'pdf_bytes': b'',
                    'pdf_filename': 'dummy.pdf',
                    'success': True
                })
        
        if original_methods:
            print(f"üîß Disabled {len(original_methods)} PDF methods")
        
        print("üîç Processing image...")
        
        # Call the orchestrator's process_single_image method
        result = await orchestrator.process_single_image(
            image_data=image_data,
            user_id=f"trashnet_test_{selected_category}",
            location="NYC"
        )
        
        # Restore original settings
        orchestrator.use_gemini = original_use_gemini
        
        # Restore original PDF generator
        if original_pdf_generator:
            orchestrator.pdf_generator = original_pdf_generator
        
        # Restore original methods
        for method_name, original_method in original_methods.items():
            setattr(orchestrator, method_name, original_method)
        
        if result and result.get("success"):
            print(f"‚úÖ Pipeline test successful!")
            report = result.get("report", {})
            
            # Display results
            print(f"\nüìä ANALYSIS RESULTS:")
            
            if 'summary' in report:
                summary = report['summary']
                print(f"   Items detected: {summary.get('total_items', 0)}")
                print(f"   Recyclable: {summary.get('recyclable_percent', 0):.1f}%")
                print(f"   Compost: {summary.get('compost_percent', 0):.1f}%")
                print(f"   Landfill: {summary.get('landfill_percent', 0):.1f}%")
            
            if report.get('detailed_breakdown'):
                print(f"\nüîç DETECTED ITEMS:")
                items_displayed = 0
                valid_items = []
                
                for item in report['detailed_breakdown']:
                    # Filter out strange items
                    item_name = item.get('item', 'Unknown')
                    if (not item_name.startswith('**') 
                        and 'material' not in item_name.lower() 
                        and 'specific' not in item_name.lower()
                        and not item_name.startswith('*')
                        and len(item_name.strip()) > 3):
                        valid_items.append(item)
                
                # Show top valid items
                for item in valid_items[:8]:  # Limit to 8 items
                    item_name = item.get('item', 'Unknown')
                    category = item.get('category', 'Unknown')
                    print(f"   ‚Ä¢ {item_name} ‚Üí {category}")
                    items_displayed += 1
                
                if len(valid_items) > items_displayed:
                    print(f"   ... and {len(valid_items) - items_displayed} more items")
            
            print(f"\nüí° PERSONALIZED TIPS:")
            tips = report.get('personalized_tips', [])
            if tips:
                for i, tip in enumerate(tips[:3], 1):
                    print(f"   {i}. {tip}")
            else:
                print("   No tips available in this report.")
                
            return result
        else:
            error_msg = result.get('error', 'Unknown error') if result else 'No result returned'
            print(f"‚ùå Pipeline failed: {error_msg}")
            return None
            
    except Exception as e:
        print(f"‚ùå Pipeline test failed: {str(e)}")
        return None

# Run tests
if categories_dict:
    # Test 1: Basic test with mock mode (no PDF)
    print(f"\nüéØ Test 1: Basic test with mock mode...")
    result1 = await test_trashnet_pipeline(use_mock=True)
    
    if result1 and result1.get("success"):
        print(f"\n‚úÖ SUCCESS! Pipeline works!")
        
        # Optional: Test with a specific category
        print(f"\nüéØ Test 2: Testing specific category (plastic)...")
        result2 = await test_trashnet_pipeline(category='plastic', use_mock=True)
        
        if result2 and result2.get("success"):
            print(f"\n‚úÖ Specific category test successful!")
        else:
            print(f"\n‚ö†Ô∏è  Specific category test had issues")
    else:
        print(f"\n‚ùå Basic test failed.")
    
else:
    print("‚ùå Cannot run tests - no categories available")

print("\n" + "="*60)
print("‚úÖ Test sequence completed")

üß™ Testing EcoSight with Real TrashNet Waste Images...
üéâ TrashNet Dataset Found!
üìã Available TrashNet categories:
   ‚úÖ cardboard: 403 images
   ‚úÖ glass: 501 images
   ‚úÖ metal: 410 images
   ‚úÖ paper: 594 images
   ‚úÖ plastic: 482 images
   ‚úÖ trash: 137 images
üìä TOTAL: 2527 images

üéØ Test 1: Basic test with mock mode...

üöÄ Testing pipeline with TrashNet image...
üé≤ Category: trash
üñºÔ∏è  Image: trash30.jpg
‚úÖ Fixed Color.toHex method
üîß Using mock mode to avoid API issues
üîç Processing image...
üîç Starting Vision Analysis...
üéØ Attempting Gemini Vision analysis...
üîÆ Calling Gemini Vision API with model: gemini-2.5-flash...
üìä Detected 9 raw items ‚Üí 9 after enhanced filtering
‚úÖ Vision analysis complete - Provider: gemini, Model: gemini-2.5-flash
   Detected 9 items
üè∑Ô∏è Starting Waste Classification...
   Classified 9 items
üìä Generating Report & PDF...
üíæ PDF saved to output directory: /kaggle/working/EcoSight_report_f1a96f90-26dd-

# üîÑ Multiple Images Test
**Parallel processing validation with API quota protection**

This cell tests EcoSight's parallel processing capabilities by analyzing multiple waste images simultaneously. It includes intelligent API quota management, category-wise performance tracking, and scalable testing from comprehensive (3 images per category) to large-scale (15+ images) workloads.

**Parallel Test Features:**

* ‚ö° Category-balanced testing ‚Äì Processes equal images from all 6 waste categories
* üìä Performance analytics ‚Äì Calculates average recyclable rates and items per image
* üõ°Ô∏è API quota protection ‚Äì Monitors mock provider usage to prevent quota exhaustion
* üè∑Ô∏è Category-wise analysis ‚Äì Shows recyclable percentages for each waste material type
* üìà Scalable testing ‚Äì Progresses from comprehensive (18 images) to large-scale (15 images) processing
* üîç Error tracking ‚Äì Differentiates between API quota issues and system failures
* üìã Result aggregation ‚Äì Combines parallel results into comprehensive statistics
* üí° Smart limiting ‚Äì Reduces test scale when API quota is nearly exhausted
* ‚úÖ Success validation ‚Äì Verifies all parallel agent instances complete successfully

In [22]:
# =============================================================================
# üîÑ Multiple Images TEST (Optimized with Rate Limiting)
# =============================================================================

print("üîÑ Testing Comprehensive Bulk Processing with Multiple Images Per Category...")

async def test_comprehensive_bulk_processing():
    """Process multiple images from each category with rate limiting"""
    
    images_per_category = 3  # Process 3 images from each category
    total_target_images = len(categories_dict) * images_per_category
    
    print(f"üéØ Target: Processing {total_target_images} images ({images_per_category} from each category)")
    
    test_images = []
    loaded_info = []
    
    print("üì• Loading multiple images from all categories...")
    
    for category in categories_dict.keys():
        images_loaded = 0
        for i in range(images_per_category):
            image_data, actual_category, image_path = get_random_image(category)
            if image_data:
                test_images.append(image_data)
                loaded_info.append(f"{actual_category}: {os.path.basename(image_path)}")
                images_loaded += 1
        
        print(f"   ‚úÖ {category}: {images_loaded}/{images_per_category} images loaded")
    
    if not test_images:
        print("‚ùå No images loaded for bulk testing")
        return
    
    print(f"\nüì∏ Processing {len(test_images)} TrashNet images with rate limiting...")
    
    try:
        result = await orchestrator.process_multiple_images(
            images_data=test_images,
            user_id=f"comprehensive_bulk_{len(test_images)}_images",
            location="Comprehensive Analysis"
        )
        
        # Analyze results and check for API issues
        api_errors = 0
        successful_gemini = 0
        total_images = result.get('total_images', 0)
        
        for i, batch_result in enumerate(result.get('results', [])):
            if batch_result:
                provider = batch_result.get('provider', 'unknown')
                if provider == 'mock':
                    api_errors += 1
                elif provider == 'gemini':
                    successful_gemini += 1
        
        print(f"\n‚úÖ BULK PROCESSING COMPLETED!")
        print(f"üìä COMPREHENSIVE RESULTS:")
        print(f"   üì∏ Total images processed: {total_images}")
        print(f"   ü§ñ Real Gemini analyses: {successful_gemini}")
        print(f"   üé≠ Mock fallback analyses: {api_errors}")
        
        # Show statistics only if we have real Gemini results
        if successful_gemini > 0:
            print(f"\nüìà REAL AI ANALYSIS (Gemini):")
            print(f"   Successfully analyzed {successful_gemini} images with real AI")
        
        # If too many API errors, show warning
        if api_errors > 5:
            print(f"\n‚ö†Ô∏è  API QUOTA NOTICE:")
            print(f"   {api_errors} images used mock data due to API limits")
            print(f"   Gemini free tier allows ~10 requests per minute")
            print(f"   Consider:")
            print(f"   1. Reducing batch size")
            print(f"   2. Using mock mode for testing")
            print(f"   3. Upgrading to paid Gemini plan")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Comprehensive bulk processing failed: {str(e)}")
        return None

async def test_single_image_with_api_check():
    """Test a single image first to check if API is working"""
    print("\nüîç API Connectivity Test...")
    
    # Get a single test image
    image_data, category, image_path = get_random_image()
    if not image_data:
        print("‚ùå Could not load test image")
        return False
    
    print(f"üì∏ Testing API with 1 image ({category})...")
    
    try:
        # Process single image to test API
        result = await orchestrator.process_single_image(
            image_data=image_data,
            user_id="api_test",
            location="API Test"
        )
        
        provider = result.get('analysis_result', {}).get('provider', 'unknown')
        if provider == 'gemini':
            print("‚úÖ API is working - Gemini responses received")
            return True
        else:
            print("‚ö†Ô∏è  API not working - using mock data")
            return False
            
    except Exception as e:
        print(f"‚ùå API test failed: {e}")
        return False

async def test_limited_batch_processing(batch_size=5):
    """Process a limited batch to avoid quota issues"""
    print(f"\nüéØ LIMITED BATCH: Processing {batch_size} images to avoid quota...")
    
    test_images = []
    
    for i in range(batch_size):
        image_data, category, image_path = get_random_image()
        if image_data:
            test_images.append(image_data)
            print(f"   {i+1}. {category}")
    
    if not test_images:
        print("‚ùå No images loaded")
        return None
    
    try:
        # Add small delay between requests
        import asyncio
        
        result = await orchestrator.process_multiple_images(
            images_data=test_images,
            user_id=f"limited_batch_{batch_size}",
            location="Limited Batch Test"
        )
        
        # Count results
        gemini_count = 0
        mock_count = 0
        
        for batch_result in result.get('results', []):
            if batch_result:
                provider = batch_result.get('provider', 'unknown')
                if provider == 'gemini':
                    gemini_count += 1
                elif provider == 'mock':
                    mock_count += 1
        
        print(f"\nüìä BATCH RESULTS:")
        print(f"   ü§ñ Gemini (real AI): {gemini_count}")
        print(f"   üé≠ Mock (fallback): {mock_count}")
        
        if mock_count > 0:
            print(f"\nüí° TIP: To get more real AI results:")
            print(f"   ‚Ä¢ Wait 1 minute for quota reset")
            print(f"   ‚Ä¢ Process fewer images at once")
            print(f"   ‚Ä¢ Use batch size of 2-3 images")
        
        return result
        
    except Exception as e:
        print(f"‚ùå Batch processing failed: {e}")
        return None

# Run the tests with better rate limiting
print("üöÄ Starting comprehensive testing with API protection...")

# First check if API is working
api_working = await test_single_image_with_api_check()

if api_working:
    print("\n‚úÖ API is available - proceeding with comprehensive test")
    print("üìã Using optimized batch sizes to avoid quota...")
    
    # Run limited batch first
    batch_result = await test_limited_batch_processing(batch_size=3)
    
    if batch_result:
        # Check how many were real Gemini results
        gemini_results = 0
        for res in batch_result.get('results', []):
            if res and res.get('provider') == 'gemini':
                gemini_results += 1
        
        if gemini_results >= 2:  # If we got at least 2 real results
            print("\nüìà Good API performance - running full test...")
            await test_comprehensive_bulk_processing()
        else:
            print("\n‚ö†Ô∏è  Limited API performance - skipping large tests")
            print("üí° Wait 1 minute for quota reset or use smaller batches")
    else:
        print("‚ùå Batch test failed - check API configuration")
else:
    print("\n‚ö†Ô∏è  API not available - running in mock mode")
    print("üí° To enable real AI analysis:")
    print("   1. Ensure Gemini API key is valid")
    print("   2. Check API quota limits")
    print("   3. Wait if quota is exhausted")

print("\n‚úÖ TESTING COMPLETED WITH API PROTECTION")
print("\nüìã RECOMMENDED SETTINGS FOR FREE TIER:")
print("   ‚Ä¢ Batch size: 2-3 images")
print("   ‚Ä¢ Wait 60+ seconds between batches")
print("   ‚Ä¢ Use mock mode for large-scale testing")
print("   ‚Ä¢ Upgrade to paid plan for unlimited requests")

üîÑ Testing Comprehensive Bulk Processing with Multiple Images Per Category...
üöÄ Starting comprehensive testing with API protection...

üîç API Connectivity Test...
üì∏ Testing API with 1 image (trash)...
üîç Starting Vision Analysis...
üéØ Attempting Gemini Vision analysis...
üîÆ Calling Gemini Vision API with model: gemini-2.5-flash...
üìä Detected 6 raw items ‚Üí 6 after enhanced filtering
‚úÖ Vision analysis complete - Provider: gemini, Model: gemini-2.5-flash
   Detected 6 items
üè∑Ô∏è Starting Waste Classification...
   Classified 6 items
üìä Generating Report & PDF...
üíæ PDF saved to output directory: /kaggle/working/EcoSight_report_48bb04d2-b5c6-4a12-9514-4e4edb4675d7.pdf
üíæ PDF saved to: /kaggle/working/EcoSight_report_48bb04d2-b5c6-4a12-9514-4e4edb4675d7.pdf
   PDF report generated successfully
‚ö†Ô∏è  API not working - using mock data

‚ö†Ô∏è  API not available - running in mock mode
üí° To enable real AI analysis:
   1. Ensure Gemini API key is valid
   2. 

# üìÑ PDF Report Generation
**Comprehensive validation of professional waste analysis reports**

This cell provides a complete test of the PDF generation pipeline with multiple preview options and detailed analytics. It demonstrates how EcoSight transforms waste images into professional reports with environmental impact metrics and interactive download interfaces.

**PDF Test Features:**

* üé® Multiple preview options ‚Äì Interactive dashboard, simple preview, and standard download links
* üìä Comprehensive reporting ‚Äì Shows waste composition, recycling rates, and environmental savings
* üíæ Automatic file management ‚Äì Saves PDFs to Kaggle output directory for manual access
* üîÑ Category-focused testing ‚Äì Prioritizes plastic waste analysis with fallback to random categories
* üîç Full pipeline validation ‚Äì Tests image loading ‚Üí AI analysis ‚Üí PDF generation ‚Üí user interface
* üìã Detailed statistics ‚Äì Displays item counts, recyclable percentages, and CO‚ÇÇ savings
* üí° User guidance ‚Äì Provides preview tips and best practices for report interaction
* ‚úÖ Success confirmation ‚Äì Clear indicators of each processing stage completion
* üöÄ Interactive interface ‚Äì Tests toggle preview, multiple download options, and file management

In [23]:
# =============================================================================
# üìÑ PDF REPORT GENERATION TEST WITH PREVIEW
# =============================================================================

print("üìÑ Testing Enhanced PDF Report Generation with Preview...")

async def test_enhanced_pdf_generation():
    """Test PDF generation with enhanced preview functionality"""
    # Load a TrashNet image for testing
    print("üîÑ Loading waste image for enhanced PDF test...")
    image_data, category, image_path = get_random_image('plastic')
    
    if image_data is None:
        print("‚ùå Failed to load image, trying random category...")
        image_data, category, image_path = get_random_image()
    
    if image_data is None:
        print("‚ùå Could not load any images for PDF test")
        return
    
    print(f"üéØ Generating enhanced PDF report for: {category} waste")
    print(f"üñºÔ∏è Source image: {os.path.basename(image_path)}")
    
    # Process through pipeline
    print("üîç Processing through EcoSight pipeline...")
    pipeline_result = await orchestrator.process_single_image(
        image_data=image_data,
        user_id=f"enhanced_pdf_test_{category}",
        location="NYC"
    )
    
    if pipeline_result["success"]:
        if 'pdf_bytes' in pipeline_result["report"]:
            report = pipeline_result["report"]
            pdf_bytes = report['pdf_bytes']
            pdf_filename = report['pdf_filename']
            
            # Display enhanced PDF information
            print("‚úÖ Enhanced PDF Generation Successful!")
            PDFDownloader.display_pdf_info(pdf_bytes, pdf_filename)
            
            # Save to output directory
            saved_path = PDFDownloader.save_pdf_to_output(pdf_bytes, pdf_filename)
            
            print("\n" + "="*60)
            print("üé® ENHANCED PDF PREVIEW OPTIONS:")
            print("="*60)
            
            # Option 1: Interactive Dashboard 
            print("\n1. üöÄ INTERACTIVE DASHBOARD:")
            dashboard = PDFDownloader.create_interactive_pdf_dashboard(pdf_bytes, pdf_filename)
            display(dashboard)
            
            # Option 2: Simple Preview
            print("\n2. üëÄ SIMPLE PREVIEW:")
            simple_preview = PDFDownloader.create_simple_preview(pdf_bytes, pdf_filename)
            display(simple_preview)
            
            # Option 3: Standard Download Link
            print("\n3. üì• STANDARD DOWNLOAD:")
            download_link = PDFDownloader.create_download_link(pdf_bytes, pdf_filename, "üì• Download Enhanced Report")
            display(download_link)
            
            # Report summary
            print("\nüìã ENHANCED REPORT SUMMARY:")
            print(f"   üìä Items Analyzed: {report['summary']['total_items']}")
            print(f"   ‚ôªÔ∏è  Recyclable: {report['summary']['recyclable_percent']:.1f}%")
            print(f"   üå± Compost: {report['summary']['compost_percent']:.1f}%")
            print(f"   üóëÔ∏è  Landfill: {report['summary']['landfill_percent']:.1f}%")
            print(f"   üåç CO2 Saved: {report['environmental_impact']['co2_saved_kg']:.1f} kg")
            
            # Show preview tips
            print("\nüí° PREVIEW TIPS:")
            print("   ‚Ä¢ Use the Interactive Dashboard for best experience")
            print("   ‚Ä¢ Toggle preview to view PDF without downloading")
            print("   ‚Ä¢ Download multiple copies if needed")
            print("   ‚Ä¢ Check Kaggle output tab for saved file")
            
        else:
            print("‚ùå PDF generation failed - no PDF data in report")
    else:
        print("‚ùå Pipeline processing failed")
        print(f"   Error: {pipeline_result.get('error', 'Unknown error')}")

# Run enhanced PDF generation test
print("üöÄ Starting enhanced PDF generation test...")
await test_enhanced_pdf_generation()

print("\n" + "="*60)
print("üéØ SINGLE COMPREHENSIVE PDF GENERATED SUCCESSFULLY!")

üìÑ Testing Enhanced PDF Report Generation with Preview...
üöÄ Starting enhanced PDF generation test...
üîÑ Loading waste image for enhanced PDF test...
üéØ Generating enhanced PDF report for: plastic waste
üñºÔ∏è Source image: plastic169.jpg
üîç Processing through EcoSight pipeline...
üîç Starting Vision Analysis...
üéØ Attempting Gemini Vision analysis...
üîÆ Calling Gemini Vision API with model: gemini-2.5-flash...
üìä Detected 16 raw items ‚Üí 14 after enhanced filtering
‚úÖ Vision analysis complete - Provider: gemini, Model: gemini-2.5-flash
   Detected 14 items
üè∑Ô∏è Starting Waste Classification...
   Classified 14 items
üìä Generating Report & PDF...
üíæ PDF saved to output directory: /kaggle/working/EcoSight_report_65654526-5ff5-4a7f-89dd-c9df3e8f41bc.pdf
üíæ PDF saved to: /kaggle/working/EcoSight_report_65654526-5ff5-4a7f-89dd-c9df3e8f41bc.pdf
   PDF report generated successfully
‚úÖ Enhanced PDF Generation Successful!
üìä PDF Report Generated:
   üìÑ Filenam


2. üëÄ SIMPLE PREVIEW:



3. üì• STANDARD DOWNLOAD:



üìã ENHANCED REPORT SUMMARY:
   üìä Items Analyzed: 14
   ‚ôªÔ∏è  Recyclable: 57.1%
   üå± Compost: 0.0%
   üóëÔ∏è  Landfill: 42.9%
   üåç CO2 Saved: 4.0 kg

üí° PREVIEW TIPS:
   ‚Ä¢ Use the Interactive Dashboard for best experience
   ‚Ä¢ Toggle preview to view PDF without downloading
   ‚Ä¢ Download multiple copies if needed
   ‚Ä¢ Check Kaggle output tab for saved file

üéØ SINGLE COMPREHENSIVE PDF GENERATED SUCCESSFULLY!
