# üåä AQUA SENTINEL: Real-Time AI Agents for Water Crisis Prevention
## Kaggle AI Agents Capstone | Track: Agents for Good

---

### üéØ Features Demonstrated:
- **4 ADK Agent Patterns**: LlmAgent, ParallelAgent, SequentialAgent, LoopAgent
- **5 Real-Time APIs**: Open-Meteo, USGS, NASA EONET, REST Countries, Alert System
- **Full Observability**: Logging, Tracing, Metrics
- **12 Evaluation Test Cases** across 4 categories
- **Timing Comparison**: Sequential vs Parallel execution benchmarks
- **Advanced Concepts**: MCP Integration Pattern, Long-Running Operations, A2A Protocol

### üìä ADK Concepts Coverage:
| Concept | Implementation | Status |
|---------|---------------|--------|
| LlmAgent | HydroOrchestrator + 6 specialists | ‚úÖ |
| ParallelAgent | SentinelAgent (3 concurrent) | ‚úÖ |
| SequentialAgent | GuardianAgent (state passing) | ‚úÖ |
| LoopAgent | ResponderAgent (5 iterations) | ‚úÖ |
| Custom Tools | 5 real-time APIs | ‚úÖ |
| Sessions | InMemorySessionService | ‚úÖ |
| Observability | Logging, Tracing, Metrics | ‚úÖ |
| Evaluation | 12 test cases, multi-dimensional | ‚úÖ |
| MCP Pattern | Tool server architecture | ‚úÖ |
| Long-Running Ops | Async with status tracking | ‚úÖ |
| A2A Protocol | Agent-to-Agent messaging | ‚úÖ |

In [None]:
# ============================================================================
# CELL 1: INSTALLATION & WARNINGS
# ============================================================================
# Uncomment for local setup:
# !pip install -q google-genai google-adk requests

import warnings
import logging

warnings.filterwarnings('ignore')
logging.getLogger('google_genai.types').setLevel(logging.ERROR)
logging.getLogger('asyncio').setLevel(logging.ERROR)

print("‚úÖ Warnings suppressed for cleaner output")

In [None]:
# ============================================================================
# CELL 2: IMPORTS
# ============================================================================

import os
import json
import asyncio
import requests
import time
import uuid
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Callable
from dataclasses import dataclass, field
from functools import wraps
from enum import Enum
from concurrent.futures import ThreadPoolExecutor
import threading

# Google ADK - Agent framework
from google.adk.agents import (
    LlmAgent,
    ParallelAgent,
    SequentialAgent,
    LoopAgent,
)
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService

# Google GenAI
from google import genai
from google.genai import types

print("‚úÖ All imports successful")

In [None]:
# ============================================================================
# CELL 3: OBSERVABILITY FRAMEWORK - Logging, Tracing, Metrics
# ============================================================================

class AquaSentinelObservability:
    """
    Comprehensive observability system for AQUA SENTINEL.
    Implements: Logging, Tracing, and Metrics collection.
    
    This addresses the ADK Observability requirement by providing:
    - Structured logging with severity levels
    - Distributed tracing with trace IDs and spans
    - Metrics collection for performance monitoring
    """
    
    def __init__(self):
        self.logs: List[Dict] = []
        self.traces: List[Dict] = []
        self.metrics: Dict[str, Any] = {
            "api_latency": [],
            "agent_execution_time": [],
            "tool_calls": [],
            "error_count": 0,
            "success_count": 0,
            "parallel_vs_sequential": [],  # NEW: Timing comparisons
        }
        self.current_trace_id: Optional[str] = None
        self.span_stack: List[Dict] = []
    
    def _generate_trace_id(self) -> str:
        """Generate unique trace ID for distributed tracing."""
        return f"trace-{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}"
    
    def _generate_span_id(self) -> str:
        """Generate unique span ID."""
        return f"span-{datetime.utcnow().strftime('%H%M%S%f')}"
    
    def start_trace(self, operation: str) -> str:
        """Start a new trace for an operation."""
        self.current_trace_id = self._generate_trace_id()
        trace = {
            "trace_id": self.current_trace_id,
            "operation": operation,
            "start_time": datetime.utcnow().isoformat(),
            "spans": [],
        }
        self.traces.append(trace)
        self.log("INFO", f"Started trace for: {operation}", {"trace_id": self.current_trace_id})
        return self.current_trace_id
    
    def start_span(self, name: str, attributes: Dict = None) -> str:
        """Start a new span within the current trace."""
        span_id = self._generate_span_id()
        span = {
            "span_id": span_id,
            "name": name,
            "start_time": time.time(),
            "start_timestamp": datetime.utcnow().isoformat(),
            "attributes": attributes or {},
            "parent_span": self.span_stack[-1]["span_id"] if self.span_stack else None,
        }
        self.span_stack.append(span)
        return span_id
    
    def end_span(self, status: str = "OK", attributes: Dict = None):
        """End the current span and record duration."""
        if not self.span_stack:
            return
        
        span = self.span_stack.pop()
        span["end_time"] = time.time()
        span["duration_ms"] = round((span["end_time"] - span["start_time"]) * 1000, 2)
        span["status"] = status
        if attributes:
            span["attributes"].update(attributes)
        
        if self.traces:
            self.traces[-1]["spans"].append(span)
        
        self.metrics["agent_execution_time"].append({
            "span": span["name"],
            "duration_ms": span["duration_ms"],
            "timestamp": datetime.utcnow().isoformat(),
        })
    
    def log(self, level: str, message: str, context: Dict = None):
        """Structured logging with context."""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": level,
            "message": message,
            "trace_id": self.current_trace_id,
            "context": context or {},
        }
        self.logs.append(log_entry)
        emoji = {"INFO": "‚ÑπÔ∏è", "WARN": "‚ö†Ô∏è", "ERROR": "‚ùå", "DEBUG": "üîç"}.get(level, "üìù")
        print(f"[{log_entry['timestamp'][:19]}] {emoji} {level}: {message}")
    
    def record_api_call(self, api_name: str, latency_ms: float, success: bool):
        """Record API call metrics."""
        self.metrics["api_latency"].append({
            "api": api_name,
            "latency_ms": latency_ms,
            "success": success,
            "timestamp": datetime.utcnow().isoformat(),
        })
        self.metrics["tool_calls"].append(api_name)
        if success:
            self.metrics["success_count"] += 1
        else:
            self.metrics["error_count"] += 1
    
    def record_timing_comparison(self, operation: str, sequential_ms: float, parallel_ms: float):
        """Record parallel vs sequential timing comparison."""
        speedup = sequential_ms / parallel_ms if parallel_ms > 0 else 0
        self.metrics["parallel_vs_sequential"].append({
            "operation": operation,
            "sequential_ms": round(sequential_ms, 2),
            "parallel_ms": round(parallel_ms, 2),
            "speedup": round(speedup, 2),
            "timestamp": datetime.utcnow().isoformat(),
        })
        self.log("INFO", f"Timing comparison: {operation}", {
            "sequential_ms": round(sequential_ms, 2),
            "parallel_ms": round(parallel_ms, 2),
            "speedup": f"{speedup:.2f}x"
        })
    
    def get_metrics_summary(self) -> Dict:
        """Get summary of collected metrics."""
        api_latencies = self.metrics["api_latency"]
        avg_latency = sum(m["latency_ms"] for m in api_latencies) / len(api_latencies) if api_latencies else 0
        
        return {
            "total_api_calls": len(api_latencies),
            "average_latency_ms": round(avg_latency, 2),
            "success_rate": f"{(self.metrics['success_count'] / max(1, len(api_latencies))) * 100:.1f}%",
            "error_count": self.metrics["error_count"],
            "unique_tools_used": list(set(self.metrics["tool_calls"])),
            "traces_collected": len(self.traces),
            "timing_comparisons": len(self.metrics["parallel_vs_sequential"]),
        }
    
    def get_trace_summary(self) -> Dict:
        """Get summary of the most recent trace."""
        if not self.traces:
            return {"status": "no traces"}
        
        trace = self.traces[-1]
        total_duration = sum(s.get("duration_ms", 0) for s in trace["spans"])
        
        return {
            "trace_id": trace["trace_id"],
            "operation": trace["operation"],
            "total_spans": len(trace["spans"]),
            "total_duration_ms": round(total_duration, 2),
            "spans": [{"name": s["name"], "duration_ms": s.get("duration_ms", 0)} for s in trace["spans"]],
        }
    
    def get_timing_comparison_summary(self) -> List[Dict]:
        """Get all timing comparisons."""
        return self.metrics["parallel_vs_sequential"]


# Initialize global observability instance
observability = AquaSentinelObservability()

print("‚úÖ Observability Framework initialized")
print("   ‚Ä¢ Structured logging with trace context")
print("   ‚Ä¢ Distributed tracing with spans")
print("   ‚Ä¢ Metrics collection for API calls and agent execution")
print("   ‚Ä¢ Timing comparison tracking (parallel vs sequential)")

In [None]:
# ============================================================================
# CELL 4: ADVANCED ADK CONCEPTS - MCP, Long-Running Ops, A2A Protocol
# ============================================================================

# -----------------------------------------------------------------------------
# MODEL CONTEXT PROTOCOL (MCP) - Tool Server Architecture
# -----------------------------------------------------------------------------

class MCPToolServer:
    """
    Model Context Protocol (MCP) Tool Server Implementation.
    
    MCP provides a standardized way for AI agents to interact with external tools
    and data sources. This implementation demonstrates:
    - Tool registration and discovery
    - Standardized request/response format
    - Tool capability advertisement
    
    Reference: https://modelcontextprotocol.io/
    """
    
    def __init__(self, server_name: str):
        self.server_name = server_name
        self.tools: Dict[str, Dict] = {}
        self.capabilities = {
            "tools": True,
            "resources": True,
            "prompts": False,
        }
    
    def register_tool(self, name: str, description: str, handler: Callable, parameters: Dict):
        """Register a tool with the MCP server."""
        self.tools[name] = {
            "name": name,
            "description": description,
            "handler": handler,
            "parameters": parameters,
            "registered_at": datetime.utcnow().isoformat(),
        }
        observability.log("INFO", f"MCP: Registered tool '{name}'")
    
    def list_tools(self) -> List[Dict]:
        """List all available tools (MCP tools/list)."""
        return [
            {
                "name": t["name"],
                "description": t["description"],
                "inputSchema": t["parameters"],
            }
            for t in self.tools.values()
        ]
    
    def call_tool(self, name: str, arguments: Dict) -> Dict:
        """Execute a tool (MCP tools/call)."""
        if name not in self.tools:
            return {"error": f"Tool '{name}' not found"}
        
        tool = self.tools[name]
        try:
            result = tool["handler"](**arguments)
            return {
                "content": [{"type": "text", "text": json.dumps(result)}],
                "isError": False,
            }
        except Exception as e:
            return {
                "content": [{"type": "text", "text": str(e)}],
                "isError": True,
            }
    
    def get_server_info(self) -> Dict:
        """Get MCP server information."""
        return {
            "name": self.server_name,
            "version": "1.0.0",
            "capabilities": self.capabilities,
            "tools_count": len(self.tools),
        }


# Initialize MCP Tool Server
mcp_server = MCPToolServer("aqua-sentinel-mcp")


# -----------------------------------------------------------------------------
# LONG-RUNNING OPERATIONS - Async Processing with Status Tracking
# -----------------------------------------------------------------------------

class OperationStatus(Enum):
    """Status states for long-running operations."""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"
    CANCELLED = "cancelled"


class LongRunningOperation:
    """
    Long-Running Operation Manager.
    
    Demonstrates ADK pattern for operations that may take extended time:
    - Operation creation with unique ID
    - Status polling and updates
    - Pause/Resume capability
    - Progress tracking
    """
    
    def __init__(self):
        self.operations: Dict[str, Dict] = {}
    
    def create_operation(self, operation_type: str, params: Dict) -> str:
        """Create a new long-running operation."""
        op_id = f"op-{uuid.uuid4().hex[:8]}"
        self.operations[op_id] = {
            "id": op_id,
            "type": operation_type,
            "params": params,
            "status": OperationStatus.PENDING.value,
            "progress": 0,
            "created_at": datetime.utcnow().isoformat(),
            "updated_at": datetime.utcnow().isoformat(),
            "result": None,
            "error": None,
        }
        observability.log("INFO", f"LRO: Created operation {op_id} ({operation_type})")
        return op_id
    
    def get_status(self, op_id: str) -> Dict:
        """Get operation status."""
        if op_id not in self.operations:
            return {"error": "Operation not found"}
        return self.operations[op_id]
    
    def update_progress(self, op_id: str, progress: int, status: OperationStatus = None):
        """Update operation progress."""
        if op_id in self.operations:
            self.operations[op_id]["progress"] = progress
            self.operations[op_id]["updated_at"] = datetime.utcnow().isoformat()
            if status:
                self.operations[op_id]["status"] = status.value
    
    def complete_operation(self, op_id: str, result: Any):
        """Mark operation as completed."""
        if op_id in self.operations:
            self.operations[op_id]["status"] = OperationStatus.COMPLETED.value
            self.operations[op_id]["progress"] = 100
            self.operations[op_id]["result"] = result
            self.operations[op_id]["updated_at"] = datetime.utcnow().isoformat()
            observability.log("INFO", f"LRO: Completed operation {op_id}")
    
    def pause_operation(self, op_id: str) -> bool:
        """Pause a running operation."""
        if op_id in self.operations and self.operations[op_id]["status"] == OperationStatus.RUNNING.value:
            self.operations[op_id]["status"] = OperationStatus.PAUSED.value
            observability.log("INFO", f"LRO: Paused operation {op_id}")
            return True
        return False
    
    def resume_operation(self, op_id: str) -> bool:
        """Resume a paused operation."""
        if op_id in self.operations and self.operations[op_id]["status"] == OperationStatus.PAUSED.value:
            self.operations[op_id]["status"] = OperationStatus.RUNNING.value
            observability.log("INFO", f"LRO: Resumed operation {op_id}")
            return True
        return False


# Initialize Long-Running Operations Manager
lro_manager = LongRunningOperation()


# -----------------------------------------------------------------------------
# AGENT-TO-AGENT (A2A) PROTOCOL - Inter-Agent Communication
# -----------------------------------------------------------------------------

class A2AMessage:
    """Agent-to-Agent message format."""
    def __init__(self, sender: str, recipient: str, message_type: str, payload: Dict):
        self.id = f"msg-{uuid.uuid4().hex[:8]}"
        self.sender = sender
        self.recipient = recipient
        self.message_type = message_type  # request, response, broadcast, handoff
        self.payload = payload
        self.timestamp = datetime.utcnow().isoformat()
        self.status = "pending"


class A2AProtocol:
    """
    Agent-to-Agent (A2A) Protocol Implementation.
    
    Enables structured communication between agents:
    - Message passing with typed payloads
    - Request/Response patterns
    - Task handoff between agents
    - Broadcast messaging
    
    Reference: Google ADK A2A Protocol specification
    """
    
    def __init__(self):
        self.agents: Dict[str, Dict] = {}
        self.message_queue: List[A2AMessage] = []
        self.message_history: List[Dict] = []
    
    def register_agent(self, agent_id: str, capabilities: List[str], endpoint: str = None):
        """Register an agent with the A2A network."""
        self.agents[agent_id] = {
            "id": agent_id,
            "capabilities": capabilities,
            "endpoint": endpoint,
            "registered_at": datetime.utcnow().isoformat(),
            "status": "active",
        }
        observability.log("INFO", f"A2A: Registered agent '{agent_id}' with capabilities: {capabilities}")
    
    def send_message(self, sender: str, recipient: str, message_type: str, payload: Dict) -> A2AMessage:
        """Send a message from one agent to another."""
        msg = A2AMessage(sender, recipient, message_type, payload)
        self.message_queue.append(msg)
        self.message_history.append({
            "id": msg.id,
            "sender": sender,
            "recipient": recipient,
            "type": message_type,
            "timestamp": msg.timestamp,
        })
        observability.log("DEBUG", f"A2A: Message {msg.id} from {sender} to {recipient}")
        return msg
    
    def handoff_task(self, from_agent: str, to_agent: str, task: Dict, context: Dict) -> A2AMessage:
        """Handoff a task from one agent to another."""
        payload = {
            "task": task,
            "context": context,
            "handoff_reason": task.get("reason", "capability match"),
        }
        msg = self.send_message(from_agent, to_agent, "handoff", payload)
        observability.log("INFO", f"A2A: Task handoff from {from_agent} to {to_agent}")
        return msg
    
    def broadcast(self, sender: str, payload: Dict) -> List[A2AMessage]:
        """Broadcast a message to all registered agents."""
        messages = []
        for agent_id in self.agents:
            if agent_id != sender:
                msg = self.send_message(sender, agent_id, "broadcast", payload)
                messages.append(msg)
        observability.log("INFO", f"A2A: Broadcast from {sender} to {len(messages)} agents")
        return messages
    
    def get_agent_by_capability(self, capability: str) -> Optional[str]:
        """Find an agent with a specific capability."""
        for agent_id, agent_info in self.agents.items():
            if capability in agent_info["capabilities"]:
                return agent_id
        return None
    
    def get_communication_summary(self) -> Dict:
        """Get summary of A2A communications."""
        return {
            "registered_agents": len(self.agents),
            "total_messages": len(self.message_history),
            "message_types": list(set(m["type"] for m in self.message_history)),
            "agents": list(self.agents.keys()),
        }


# Initialize A2A Protocol
a2a_protocol = A2AProtocol()

# Register AQUA SENTINEL agents with A2A
a2a_protocol.register_agent("HydroOrchestrator", ["orchestration", "routing", "coordination"])
a2a_protocol.register_agent("WeatherAgent", ["weather", "forecast", "precipitation"])
a2a_protocol.register_agent("WaterLevelAgent", ["water_level", "usgs", "rivers"])
a2a_protocol.register_agent("DisasterAgent", ["disasters", "nasa", "events"])
a2a_protocol.register_agent("AlertAgent", ["alerts", "notifications", "emergency"])
a2a_protocol.register_agent("AnalysisAgent", ["analysis", "recommendations", "synthesis"])

print("\n‚úÖ Advanced ADK Concepts initialized:")
print("   ‚Ä¢ MCP Tool Server (Model Context Protocol)")
print("   ‚Ä¢ Long-Running Operations Manager")
print("   ‚Ä¢ A2A Protocol (Agent-to-Agent Communication)")
print(f"   ‚Ä¢ {len(a2a_protocol.agents)} agents registered in A2A network")

In [None]:
# ============================================================================
# CELL 5: API CONFIGURATION
# ============================================================================

GOOGLE_API_KEY = None

# Method 1: Try Kaggle Secrets
try:
    from kaggle_secrets import UserSecretsClient
    secrets = UserSecretsClient()
    GOOGLE_API_KEY = secrets.get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Google API key loaded from Kaggle Secrets")
except Exception as e:
    print(f"‚ö†Ô∏è Kaggle Secrets not available: {e}")

# Method 2: Environment variable
if not GOOGLE_API_KEY:
    GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
    if GOOGLE_API_KEY:
        print("‚úÖ Google API key loaded from environment variable")

if GOOGLE_API_KEY:
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print(f"\nüîë API Key Status: Configured (ends with ...{GOOGLE_API_KEY[-4:]})")
else:
    print("\n‚ùå API Key Status: NOT SET - Agent queries will fail!")

# Model configuration
MODEL = "gemini-2.0-flash"

# External API endpoints
API_ENDPOINTS = {
    "open_meteo": "https://api.open-meteo.com/v1/forecast",
    "usgs_water": "https://waterservices.usgs.gov/nwis/iv/",
    "nasa_eonet": "https://eonet.gsfc.nasa.gov/api/v3/events",
    "rest_countries": "https://restcountries.com/v3.1",
}

print(f"\nüì° Model: {MODEL}")
print("\nüåê External APIs configured (all FREE, no keys needed):")
for name, url in API_ENDPOINTS.items():
    print(f"   ‚Ä¢ {name}: {url[:45]}...")

In [None]:
# ============================================================================
# CELL 6: REAL-TIME TOOLS WITH OBSERVABILITY & MCP REGISTRATION
# ============================================================================

# Extended location database
LOCATIONS = {
    "california": {"lat": 36.7783, "lon": -119.4179, "name": "California, USA"},
    "bangladesh": {"lat": 23.6850, "lon": 90.3563, "name": "Dhaka, Bangladesh"},
    "kenya": {"lat": -1.2921, "lon": 36.8219, "name": "Nairobi, Kenya"},
    "india": {"lat": 28.6139, "lon": 77.2090, "name": "Delhi, India"},
    "brazil": {"lat": -15.7975, "lon": -47.8919, "name": "Brasilia, Brazil"},
    "australia": {"lat": -33.8688, "lon": 151.2093, "name": "Sydney, Australia"},
    "ethiopia": {"lat": 9.1450, "lon": 40.4897, "name": "Addis Ababa, Ethiopia"},
    "somalia": {"lat": 5.1521, "lon": 46.1996, "name": "Mogadishu, Somalia"},
    "texas": {"lat": 31.9686, "lon": -99.9018, "name": "Texas, USA"},
    "florida": {"lat": 27.6648, "lon": -81.5158, "name": "Florida, USA"},
}


def get_realtime_weather(region: str) -> dict:
    """Get REAL-TIME weather data from Open-Meteo API with observability."""
    span_id = observability.start_span("get_realtime_weather", {"region": region})
    start_time = time.time()
    
    region_lower = region.lower().strip()
    
    if region_lower not in LOCATIONS:
        for key in LOCATIONS:
            if key in region_lower or region_lower in key:
                region_lower = key
                break
        else:
            observability.end_span("ERROR", {"error": "unknown_region"})
            return {
                "status": "error",
                "message": f"Unknown region: {region}",
                "available_regions": list(LOCATIONS.keys()),
            }
    
    loc = LOCATIONS[region_lower]
    
    try:
        params = {
            "latitude": loc["lat"],
            "longitude": loc["lon"],
            "current_weather": "true",
            "daily": "precipitation_sum,temperature_2m_max,temperature_2m_min,precipitation_probability_max",
            "timezone": "auto",
            "forecast_days": 7,
        }
        
        response = requests.get(API_ENDPOINTS["open_meteo"], params=params, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("open_meteo", latency_ms, True)
        
        current = data.get("current_weather", {})
        daily = data.get("daily", {})
        precip_7d = sum(daily.get("precipitation_sum", [0]) or [0])
        
        if precip_7d > 100:
            flood_risk, drought_risk = "HIGH", "LOW"
        elif precip_7d > 50:
            flood_risk, drought_risk = "MODERATE", "LOW"
        elif precip_7d < 5:
            flood_risk, drought_risk = "LOW", "HIGH"
        else:
            flood_risk, drought_risk = "LOW", "MODERATE"
        
        observability.end_span("OK", {"flood_risk": flood_risk, "drought_risk": drought_risk})
        observability.log("INFO", f"Weather data fetched for {region}", {"latency_ms": round(latency_ms, 2)})
        
        return {
            "status": "success",
            "source": "Open-Meteo API (LIVE)",
            "region": region,
            "location": loc["name"],
            "coordinates": {"lat": loc["lat"], "lon": loc["lon"]},
            "fetched_at": datetime.utcnow().isoformat() + "Z",
            "current": {
                "temperature_c": current.get("temperature"),
                "windspeed_kmh": current.get("windspeed"),
                "weather_code": current.get("weathercode"),
            },
            "forecast_7d": {
                "dates": daily.get("time", []),
                "precipitation_mm": daily.get("precipitation_sum", []),
                "total_precipitation_mm": round(precip_7d, 1),
            },
            "water_impact": {"flood_risk": flood_risk, "drought_risk": drought_risk},
            "_observability": {"latency_ms": round(latency_ms, 2)},
        }
        
    except Exception as e:
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("open_meteo", latency_ms, False)
        observability.end_span("ERROR", {"error": str(e)})
        return {"status": "error", "message": f"Error: {str(e)}"}


USGS_SITES = {
    "california": {"site_id": "11447650", "name": "Sacramento River at Freeport, CA"},
    "colorado": {"site_id": "09380000", "name": "Colorado River at Lees Ferry, AZ"},
    "mississippi": {"site_id": "07374000", "name": "Mississippi River at Baton Rouge, LA"},
    "texas": {"site_id": "08158000", "name": "Colorado River at Austin, TX"},
    "florida": {"site_id": "02323500", "name": "Suwannee River near Wilcox, FL"},
}


def get_realtime_water_level(region: str) -> dict:
    """Get REAL-TIME water level data from USGS sensors."""
    span_id = observability.start_span("get_realtime_water_level", {"region": region})
    start_time = time.time()
    
    region_lower = region.lower().strip()
    
    if region_lower not in USGS_SITES:
        observability.end_span("ERROR", {"error": "no_usgs_site"})
        return {
            "status": "error",
            "message": f"No USGS site for: {region}",
            "available_regions": list(USGS_SITES.keys()),
            "note": "USGS only covers US water bodies",
        }
    
    site = USGS_SITES[region_lower]
    
    try:
        params = {
            "sites": site["site_id"],
            "format": "json",
            "parameterCd": "00065,00060",
            "siteStatus": "active",
        }
        
        response = requests.get(API_ENDPOINTS["usgs_water"], params=params, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("usgs_water", latency_ms, True)
        
        time_series = data.get("value", {}).get("timeSeries", [])
        readings = {}
        
        for series in time_series:
            var_name = series.get("variable", {}).get("variableName", "Unknown")
            values = series.get("values", [{}])[0].get("value", [])
            if values:
                latest = values[-1]
                readings[var_name] = {
                    "value": float(latest.get("value", 0)),
                    "timestamp": latest.get("dateTime"),
                }
        
        gage_height = readings.get("Gage height, ft", {}).get("value", 0)
        
        if gage_height > 20:
            alert_level, alert_reason = "RED", "Flood risk - water level elevated"
        elif gage_height > 15:
            alert_level, alert_reason = "ORANGE", "Water level above normal"
        elif gage_height < 5:
            alert_level, alert_reason = "ORANGE", "Drought conditions"
        else:
            alert_level, alert_reason = "GREEN", "Normal range"
        
        observability.end_span("OK", {"alert_level": alert_level})
        observability.log("INFO", f"Water level data fetched for {region}", {"latency_ms": round(latency_ms, 2)})
        
        return {
            "status": "success",
            "source": "USGS Water Services (LIVE)",
            "region": region,
            "site_name": site["name"],
            "fetched_at": datetime.utcnow().isoformat() + "Z",
            "readings": readings,
            "alert_level": alert_level,
            "alert_reason": alert_reason,
            "_observability": {"latency_ms": round(latency_ms, 2)},
        }
        
    except Exception as e:
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("usgs_water", latency_ms, False)
        observability.end_span("ERROR", {"error": str(e)})
        return {"status": "error", "message": f"Error: {str(e)}"}


def get_realtime_disasters(category: str = "all", limit: int = 10) -> dict:
    """Get REAL-TIME disaster events from NASA EONET."""
    span_id = observability.start_span("get_realtime_disasters", {"category": category})
    start_time = time.time()
    
    try:
        params = {"status": "open", "limit": limit}
        
        response = requests.get(API_ENDPOINTS["nasa_eonet"], params=params, timeout=15)
        response.raise_for_status()
        data = response.json()
        
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("nasa_eonet", latency_ms, True)
        
        events = data.get("events", [])
        processed = []
        water_related = 0
        
        for event in events:
            cats = [c.get("title", "") for c in event.get("categories", [])]
            is_water = any(c.lower() in ["floods", "drought", "severe storms"] for c in cats)
            if is_water:
                water_related += 1
            processed.append({
                "id": event.get("id"),
                "title": event.get("title"),
                "categories": cats,
                "is_water_related": is_water,
            })
        
        alert_level = "RED" if water_related > 3 else "ORANGE" if water_related > 0 else "GREEN"
        
        observability.end_span("OK", {"events": len(processed)})
        observability.log("INFO", f"Disaster data fetched: {len(processed)} events", {"latency_ms": round(latency_ms, 2)})
        
        return {
            "status": "success",
            "source": "NASA EONET (LIVE)",
            "fetched_at": datetime.utcnow().isoformat() + "Z",
            "total_events": len(processed),
            "water_related_events": water_related,
            "events": processed,
            "alert_level": alert_level,
            "_observability": {"latency_ms": round(latency_ms, 2)},
        }
        
    except Exception as e:
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("nasa_eonet", latency_ms, False)
        observability.end_span("ERROR", {"error": str(e)})
        return {"status": "error", "message": f"Error: {str(e)}"}


def get_country_info(country: str) -> dict:
    """Get country information."""
    start_time = time.time()
    try:
        response = requests.get(f"{API_ENDPOINTS['rest_countries']}/name/{country}", timeout=10)
        response.raise_for_status()
        data = response.json()[0]
        latency_ms = (time.time() - start_time) * 1000
        observability.record_api_call("rest_countries", latency_ms, True)
        return {
            "status": "success",
            "country": data.get("name", {}).get("common", country),
            "population": data.get("population", 0),
            "region": data.get("region", ""),
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}


ALERT_LOG = []

def send_water_alert(region: str, alert_type: str, message: str, priority: str = "normal") -> dict:
    """Send water alert."""
    span_id = observability.start_span("send_water_alert", {"region": region})
    
    timestamp = datetime.utcnow()
    alert_id = f"AQUA-{timestamp.strftime('%Y%m%d%H%M%S')}-{len(ALERT_LOG)+1:04d}"
    verification_code = f"VER-{timestamp.strftime('%H%M%S')}"
    
    country_info = get_country_info(region)
    population = country_info.get("population", 1000000)
    
    reach_mult = {"emergency": 0.85, "high": 0.60, "normal": 0.30, "low": 0.10}
    estimated_reach = int(population * reach_mult.get(priority, 0.30))
    
    channels = {
        "emergency": ["SMS", "Voice", "Radio", "TV", "Sirens", "MobileApp"],
        "high": ["SMS", "MobileApp", "Email", "Radio"],
        "normal": ["MobileApp", "Email"],
        "low": ["MobileApp"],
    }
    
    alert_record = {
        "alert_id": alert_id,
        "verification_code": verification_code,
        "timestamp": timestamp.isoformat() + "Z",
        "region": region,
        "alert_type": alert_type,
        "priority": priority,
        "channels": channels.get(priority, ["MobileApp"]),
        "estimated_reach": estimated_reach,
    }
    
    ALERT_LOG.append(alert_record)
    
    observability.end_span("OK", {"alert_id": alert_id})
    observability.log("INFO", f"Alert sent: {alert_id} to {region}")
    
    return {
        "status": "success",
        "alert_id": alert_id,
        "verification_code": verification_code,
        "timestamp": timestamp.isoformat() + "Z",
        "region": region,
        "alert_type": alert_type,
        "priority": priority,
        "channels": channels.get(priority, ["MobileApp"]),
        "delivery": {
            "estimated_reach": estimated_reach,
            "status": "QUEUED_FOR_DELIVERY",
        },
    }


# Register tools with MCP Server
mcp_server.register_tool(
    "get_realtime_weather",
    "Fetch real-time weather data from Open-Meteo API",
    get_realtime_weather,
    {"type": "object", "properties": {"region": {"type": "string"}}}
)
mcp_server.register_tool(
    "get_realtime_water_level",
    "Fetch water level data from USGS",
    get_realtime_water_level,
    {"type": "object", "properties": {"region": {"type": "string"}}}
)
mcp_server.register_tool(
    "get_realtime_disasters",
    "Fetch disaster events from NASA EONET",
    get_realtime_disasters,
    {"type": "object", "properties": {"category": {"type": "string"}, "limit": {"type": "integer"}}}
)
mcp_server.register_tool(
    "send_water_alert",
    "Send water crisis alert",
    send_water_alert,
    {"type": "object", "properties": {"region": {"type": "string"}, "alert_type": {"type": "string"}, "message": {"type": "string"}, "priority": {"type": "string"}}}
)

print("‚úÖ Created 5 real-time tools with observability")
print("‚úÖ Tools registered with MCP Server")
print(f"   MCP Server: {mcp_server.get_server_info()}")

In [None]:
# ============================================================================
# CELL 7: TIMING COMPARISON - Sequential vs Parallel Execution
# ============================================================================

def run_sequential_fetch(region: str) -> tuple:
    """
    Run API fetches SEQUENTIALLY (one after another).
    Returns (results, total_time_ms)
    """
    start = time.time()
    
    # Sequential execution - each waits for the previous
    weather = get_realtime_weather(region)
    water = get_realtime_water_level(region) if region in USGS_SITES else {"status": "skipped", "reason": "no USGS site"}
    disasters = get_realtime_disasters()
    
    total_time = (time.time() - start) * 1000
    
    return {
        "weather": weather.get("status"),
        "water_level": water.get("status"),
        "disasters": disasters.get("status"),
    }, total_time


def run_parallel_fetch(region: str) -> tuple:
    """
    Run API fetches in PARALLEL (concurrent).
    Returns (results, total_time_ms)
    """
    start = time.time()
    
    results = {}
    
    # Parallel execution using ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=3) as executor:
        weather_future = executor.submit(get_realtime_weather, region)
        water_future = executor.submit(
            get_realtime_water_level, region
        ) if region in USGS_SITES else None
        disasters_future = executor.submit(get_realtime_disasters)
        
        results["weather"] = weather_future.result().get("status")
        results["water_level"] = water_future.result().get("status") if water_future else "skipped"
        results["disasters"] = disasters_future.result().get("status")
    
    total_time = (time.time() - start) * 1000
    
    return results, total_time


def demonstrate_parallel_speedup(region: str = "california"):
    """
    Demonstrate the speedup achieved by ParallelAgent pattern.
    """
    print("\n" + "="*70)
    print("‚è±Ô∏è  TIMING COMPARISON: Sequential vs Parallel Execution")
    print("="*70)
    print(f"\nRegion: {region.upper()}")
    print("APIs: Open-Meteo (weather) + USGS (water) + NASA EONET (disasters)")
    print("\n" + "-"*70)
    
    # Run sequential
    print("\nüîÑ SEQUENTIAL EXECUTION (one after another):")
    seq_results, seq_time = run_sequential_fetch(region)
    print(f"   Results: {seq_results}")
    print(f"   ‚è±Ô∏è  Total Time: {seq_time:.2f}ms")
    
    # Small delay between tests
    time.sleep(0.5)
    
    # Run parallel
    print("\n‚ö° PARALLEL EXECUTION (concurrent):")
    par_results, par_time = run_parallel_fetch(region)
    print(f"   Results: {par_results}")
    print(f"   ‚è±Ô∏è  Total Time: {par_time:.2f}ms")
    
    # Calculate speedup
    speedup = seq_time / par_time if par_time > 0 else 0
    time_saved = seq_time - par_time
    
    # Record in observability
    observability.record_timing_comparison(f"fetch_{region}", seq_time, par_time)
    
    print("\n" + "-"*70)
    print("üìä COMPARISON RESULTS:")
    print("-"*70)
    print(f"   Sequential Time: {seq_time:.2f}ms")
    print(f"   Parallel Time:   {par_time:.2f}ms")
    print(f"   ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ")
    print(f"   üöÄ Speedup:      {speedup:.2f}x faster")
    print(f"   ‚è±Ô∏è  Time Saved:   {time_saved:.2f}ms")
    print("\n" + "="*70)
    
    # A2A: Broadcast timing results to all agents
    a2a_protocol.broadcast("HydroOrchestrator", {
        "type": "timing_benchmark",
        "sequential_ms": seq_time,
        "parallel_ms": par_time,
        "speedup": speedup,
    })
    
    return {
        "sequential_ms": seq_time,
        "parallel_ms": par_time,
        "speedup": speedup,
        "time_saved_ms": time_saved,
    }


print("‚úÖ Timing comparison functions ready")
print("   ‚Ä¢ run_sequential_fetch() - Execute APIs one by one")
print("   ‚Ä¢ run_parallel_fetch() - Execute APIs concurrently")
print("   ‚Ä¢ demonstrate_parallel_speedup() - Run comparison benchmark")

In [None]:
# ============================================================================
# CELL 8: AGENT IMPLEMENTATION
# ============================================================================

# Weather Agent - FOR SENTINEL (Parallel)
weather_agent_sentinel = LlmAgent(
    name="WeatherAgentSentinel",
    model=MODEL,
    instruction="""You analyze REAL-TIME weather data for water impact.
    
    Steps:
    1. Extract region from query (convert to lowercase)
       Available: california, bangladesh, kenya, india, brazil, australia, ethiopia, somalia, texas, florida
    2. Call get_realtime_weather(region="<region>") IMMEDIATELY
    3. Report: temperature, 7-day precipitation, flood/drought risk
    4. Include fetched_at timestamp
    
    CRITICAL: You MUST call the function.""",
    description="Fetches real-time weather data",
    tools=[get_realtime_weather],
)

# Weather Agent - FOR GUARDIAN (Sequential Step 1)
weather_agent_guardian = LlmAgent(
    name="WeatherAgentGuardian",
    model=MODEL,
    instruction="""You are STEP 1 of a sequential pipeline. Your output becomes input for AnalysisAgent.
    
    YOUR TASK: Fetch weather data and OUTPUT STRUCTURED DATA.
    
    Steps:
    1. Extract region from query
    2. Call get_realtime_weather(region="<region>")
    3. OUTPUT FORMAT:
       
       ---WEATHER_DATA_START---
       Region: [name]
       Temperature: [X]¬∞C
       7-Day Precipitation: [X] mm
       Flood Risk: [HIGH/MODERATE/LOW]
       Drought Risk: [HIGH/MODERATE/LOW]
       Fetched At: [timestamp]
       ---WEATHER_DATA_END---
    
    This structured output enables AnalysisAgent to extract the data.""",
    description="Fetches weather data with structured output for sequential processing",
    tools=[get_realtime_weather],
)

# Water Level Agent
water_level_agent = LlmAgent(
    name="WaterLevelAgent",
    model=MODEL,
    instruction="""You monitor REAL-TIME water levels from USGS sensors.
    Call get_realtime_water_level for US regions.
    Available: california, colorado, mississippi, texas, florida
    Note: USGS only covers US water bodies.""",
    description="Monitors water levels from USGS",
    tools=[get_realtime_water_level],
)

# Disaster Agent
disaster_agent = LlmAgent(
    name="DisasterAgent",
    model=MODEL,
    instruction="""You monitor REAL-TIME disasters from NASA EONET.
    Call get_realtime_disasters() to get global events.
    Report: total events, water-related count, alert level.""",
    description="Monitors disasters from NASA EONET",
    tools=[get_realtime_disasters],
)

# Analysis Agent (Sequential Step 2)
analysis_agent = LlmAgent(
    name="AnalysisAgent",
    model=MODEL,
    instruction="""You are STEP 2 of a sequential pipeline. You RECEIVE data from WeatherAgentGuardian.
    
    SEQUENTIAL DEPENDENCY: Extract data from the ---WEATHER_DATA_START--- block.
    
    YOUR TASK:
    1. EXTRACT: Temperature, Precipitation, Flood Risk, Drought Risk
    2. ANALYZE:
       - If Flood Risk HIGH: Recommend evacuation prep
       - If Drought Risk HIGH: Recommend conservation
       - If precipitation > 50mm: Warn about flooding
       - If precipitation < 10mm: Warn about drought
    
    3. GENERATE RECOMMENDATIONS:
       üî¥ HIGH PRIORITY: [immediate actions]
       üü° MEDIUM PRIORITY: [preparatory actions]
       üü¢ LOW PRIORITY: [monitoring actions]
    
    4. CITE SPECIFIC DATA from WeatherAgentGuardian's output.""",
    description="Synthesizes weather data into recommendations",
)

# Alert Agent (Loop Step 1)
alert_agent = LlmAgent(
    name="AlertAgent",
    model=MODEL,
    instruction="""You send water-related alerts.
    
    Call send_water_alert with:
    - region, alert_type, message, priority
    
    OUTPUT for verification:
    - Alert ID, Verification Code, Region, Priority, Estimated Reach, Channels, Status""",
    description="Sends water alerts",
    tools=[send_water_alert],
)

# Verify Agent (Loop Step 2)
verify_agent = LlmAgent(
    name="VerifyAgent",
    model=MODEL,
    instruction="""You verify alert delivery with 7-POINT VALIDATION.
    
    CHECKLIST:
    ‚úì CHECK 1: Alert ID exists (AQUA-YYYYMMDDHHMMSS-####)
    ‚úì CHECK 2: Verification Code exists (VER-HHMMSS)
    ‚úì CHECK 3: Estimated reach > 0
    ‚úì CHECK 4: Status is QUEUED_FOR_DELIVERY
    ‚úì CHECK 5: Timestamp is recent
    ‚úì CHECK 6: Channels list not empty
    ‚úì CHECK 7: Region matches request
    
    OUTPUT:
    Verification Status: [VERIFIED/FAILED]
    Checks Passed: [X/7]""",
    description="Verifies alerts with 7-point validation",
)

print("‚úÖ Created 7 specialist LlmAgents")

In [None]:
# ============================================================================
# CELL 9: MULTI-AGENT ARCHITECTURES
# ============================================================================

# PARALLEL AGENT - SentinelAgent
sentinel_agent = ParallelAgent(
    name="SentinelAgent",
    sub_agents=[weather_agent_sentinel, water_level_agent, disaster_agent],
    description="""Real-time monitoring using PARALLEL EXECUTION.
    
    PARALLELISM BENEFIT (demonstrated in timing comparison):
    - Sequential: ~2000-4000ms (APIs execute one by one)
    - Parallel: ~800-1500ms (APIs execute concurrently)
    - Speedup: ~2-3x faster
    """,
)

print("‚úÖ Created SentinelAgent (ParallelAgent)")
print("   ‚Ä¢ 3 sub-agents execute CONCURRENTLY")

# SEQUENTIAL AGENT - GuardianAgent
guardian_agent = SequentialAgent(
    name="GuardianAgent",
    sub_agents=[weather_agent_guardian, analysis_agent],
    description="""Predictive analytics using SEQUENTIAL EXECUTION with state passing.
    
    Step 1: WeatherAgentGuardian ‚Üí Structured output
    Step 2: AnalysisAgent ‚Üí Extracts data & recommends
    """,
)

print("‚úÖ Created GuardianAgent (SequentialAgent)")
print("   ‚Ä¢ Step 1 ‚Üí Step 2 with state passing")

# LOOP AGENT - ResponderAgent
responder_agent = LoopAgent(
    name="ResponderAgent",
    sub_agents=[alert_agent, verify_agent],
    max_iterations=5,
    description="""Emergency response using LOOP EXECUTION.
    
    Loop: AlertAgent ‚Üí VerifyAgent (7-point check)
    Exit: VERIFIED status or max 5 iterations
    """,
)

print("‚úÖ Created ResponderAgent (LoopAgent)")
print("   ‚Ä¢ Max 5 iterations, 7-point verification")

# ROOT ORCHESTRATOR
ORCHESTRATOR_INSTRUCTION = """
You are HYDRO ORCHESTRATOR, the central coordinator of AQUA SENTINEL.

## REAL-TIME DATA
All tools fetch LIVE data from real APIs with observability tracking.

## QUERY ROUTING

1. **REGIONAL MONITORING** ‚Üí SentinelAgent (ParallelAgent)
   Keywords: "situation in [region]", "water status"
   
2. **FORECAST & ANALYSIS** ‚Üí GuardianAgent (SequentialAgent)
   Keywords: "forecast", "predict", "analyze", "recommend"
   
3. **EMERGENCY ALERTS** ‚Üí ResponderAgent (LoopAgent)
   Keywords: "send alert", "warn", "emergency"
   
4. **GLOBAL DISASTERS** ‚Üí Call get_realtime_disasters() directly
   Keywords: "global", "worldwide"

## RESPONSE FORMAT
- Data Source + Timestamp
- Key Findings
- Risk Level: üü¢ GREEN / üü° ORANGE / üî¥ RED
- Recommendations
"""

hydro_orchestrator = LlmAgent(
    name="HydroOrchestrator",
    model=MODEL,
    instruction=ORCHESTRATOR_INSTRUCTION,
    description="Central coordinator",
    sub_agents=[sentinel_agent, guardian_agent, responder_agent],
    tools=[get_realtime_disasters],
)

print("\n" + "="*70)
print("AQUA SENTINEL AGENT HIERARCHY")
print("="*70)
print("""
HydroOrchestrator (LlmAgent)
‚îÇ
‚îú‚îÄ‚îÄ SentinelAgent (ParallelAgent) ‚îÄ‚îÄ‚îÄ‚îÄ CONCURRENT
‚îÇ   ‚îú‚îÄ‚îÄ WeatherAgent  ‚Üí Open-Meteo
‚îÇ   ‚îú‚îÄ‚îÄ WaterLevelAgent ‚Üí USGS
‚îÇ   ‚îî‚îÄ‚îÄ DisasterAgent ‚Üí NASA EONET
‚îÇ
‚îú‚îÄ‚îÄ GuardianAgent (SequentialAgent) ‚îÄ‚îÄ STATE PASSING
‚îÇ   ‚îú‚îÄ‚îÄ WeatherAgent ‚Üí Structured Output
‚îÇ   ‚îî‚îÄ‚îÄ AnalysisAgent ‚Üí Extract & Recommend
‚îÇ
‚îî‚îÄ‚îÄ ResponderAgent (LoopAgent) ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ 5 ITERATIONS
    ‚îú‚îÄ‚îÄ AlertAgent ‚Üí Send
    ‚îî‚îÄ‚îÄ VerifyAgent ‚Üí 7-Point Check
""")

In [None]:
# ============================================================================
# CELL 10: SESSION MANAGEMENT & QUERY FUNCTION
# ============================================================================

import inspect

session_service = InMemorySessionService()

APP_NAME = "aqua_sentinel_realtime"
USER_ID = "demo_user"
SESSION_ID = f"session_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

runner = Runner(
    agent=hydro_orchestrator,
    app_name=APP_NAME,
    session_service=session_service,
)


async def ensure_session():
    """Create session."""
    try:
        result = session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID,
            session_id=SESSION_ID,
        )
        if inspect.iscoroutine(result):
            await result
        print(f"‚úÖ Session created: {SESSION_ID}")
    except Exception as e:
        print(f"‚ö†Ô∏è Session: {e}")


async def query_aqua_sentinel(
    query: str,
    verbose: bool = True,
    fresh_session: bool = True,
    show_observability: bool = True
) -> str:
    """Send query to AQUA SENTINEL with observability."""
    global SESSION_ID
    
    trace_id = observability.start_trace(f"query: {query[:50]}...")
    
    if fresh_session:
        SESSION_ID = f"session_{datetime.utcnow().strftime('%Y%m%d_%H%M%S%f')}"
        try:
            result = session_service.create_session(
                app_name=APP_NAME,
                user_id=USER_ID,
                session_id=SESSION_ID,
            )
            if inspect.iscoroutine(result):
                await result
        except:
            pass
    
    if verbose:
        print(f"\n{'='*70}")
        print(f"üîç QUERY: {query}")
        print(f"‚è∞ Time: {datetime.utcnow().isoformat()}Z")
        print(f"üîó Trace ID: {trace_id}")
        print(f"{'='*70}")
    
    query_span = observability.start_span("agent_query", {"query": query[:100]})
    
    content = types.Content(
        role="user",
        parts=[types.Part(text=query)]
    )
    
    response_text = ""
    try:
        async for event in runner.run_async(
            user_id=USER_ID,
            session_id=SESSION_ID,
            new_message=content,
        ):
            if hasattr(event, 'content') and event.content:
                for part in event.content.parts:
                    if hasattr(part, 'text') and part.text:
                        response_text += part.text + "\n"
    except Exception as e:
        response_text = f"Error: {str(e)}"
        observability.log("ERROR", f"Query failed: {str(e)}")
    
    observability.end_span("OK" if "Error" not in response_text else "ERROR")
    
    if verbose:
        print(f"\nüìä RESPONSE:\n{response_text}")
        
        if show_observability:
            print(f"\n{'‚îÄ'*70}")
            print("üìà OBSERVABILITY SUMMARY")
            print(f"{'‚îÄ'*70}")
            trace = observability.get_trace_summary()
            print(f"   Trace ID: {trace.get('trace_id', 'N/A')}")
            print(f"   Total Spans: {trace.get('total_spans', 0)}")
            print(f"   Total Duration: {trace.get('total_duration_ms', 0):.2f}ms")
            metrics = observability.get_metrics_summary()
            print(f"   API Calls: {metrics.get('total_api_calls', 0)}")
            print(f"   Avg Latency: {metrics.get('average_latency_ms', 0):.2f}ms")
            print(f"   Success Rate: {metrics.get('success_rate', 'N/A')}")
            print(f"   Tools Used: {', '.join(metrics.get('unique_tools_used', []))}")
    
    return response_text.strip()


print("‚úÖ Query function ready with observability")

In [None]:
# ============================================================================
# CELL 11: EVALUATION FRAMEWORK (12 Test Cases)
# ============================================================================

@dataclass
class TestCase:
    id: str
    name: str
    query: str
    expected_elements: List[str]
    expected_agent: str
    category: str = "happy_path"


GOLDEN_DATASET = [
    # Happy Path (4)
    TestCase("RT-001", "Real-Time Weather", "happy_path",
             "What's the current weather in California?",
             ["weather", "temperature", "california"], "WeatherAgent"),
    TestCase("RT-002", "USGS Water Level", "happy_path",
             "What are the current water levels in California rivers?",
             ["water", "level", "gage"], "WaterLevelAgent"),
    TestCase("RT-003", "NASA Disasters", "happy_path",
             "What natural disasters are currently active?",
             ["disaster", "event", "nasa"], "DisasterAgent"),
    TestCase("RT-004", "Alert Delivery", "happy_path",
             "Send a water conservation alert to India with normal priority.",
             ["alert", "india", "sent"], "ResponderAgent"),
    
    # Error Handling (3)
    TestCase("RT-005", "Invalid Region Error", "error_handling",
             "What's the weather in Atlantis?",
             ["error", "unknown", "available"], "WeatherAgent"),
    TestCase("RT-006", "Non-US Water Level Request", "error_handling",
             "What's the water level in Kenya rivers?",
             ["usgs", "us", "available"], "WaterLevelAgent"),
    TestCase("RT-007", "Ambiguous Region Query", "error_handling",
             "What's the water situation?",
             ["region", "specify", "available"], "HydroOrchestrator"),
    
    # Multi-Agent (3)
    TestCase("RT-008", "Sequential Forecast Analysis", "multi_agent",
             "What's the weather forecast for Kenya? Analyze risks and recommend actions.",
             ["forecast", "recommend", "risk"], "GuardianAgent"),
    TestCase("RT-009", "Parallel Regional Monitoring", "multi_agent",
             "Give me a complete water situation report for California with all available data sources.",
             ["weather", "water", "california"], "SentinelAgent"),
    TestCase("RT-010", "Global Disaster Overview", "multi_agent",
             "What natural disasters are happening globally right now? Focus on water-related events.",
             ["disaster", "global", "water"], "DisasterAgent"),
    
    # Edge Cases (2)
    TestCase("RT-011", "Emergency High Priority Alert", "edge_case",
             "Send an EMERGENCY flood alert to Bangladesh immediately. Critical flooding situation!",
             ["alert", "emergency", "bangladesh"], "ResponderAgent"),
    TestCase("RT-012", "Horn of Africa Drought Region", "edge_case",
             "What's the drought situation in Ethiopia? This is for the Horn of Africa crisis response.",
             ["weather", "drought", "ethiopia"], "WeatherAgent"),
]

print(f"‚úÖ Golden Dataset: {len(GOLDEN_DATASET)} test cases")
print(f"   ‚Ä¢ Happy Path: {sum(1 for t in GOLDEN_DATASET if t.category == 'happy_path')}")
print(f"   ‚Ä¢ Error Handling: {sum(1 for t in GOLDEN_DATASET if t.category == 'error_handling')}")
print(f"   ‚Ä¢ Multi-Agent: {sum(1 for t in GOLDEN_DATASET if t.category == 'multi_agent')}")
print(f"   ‚Ä¢ Edge Cases: {sum(1 for t in GOLDEN_DATASET if t.category == 'edge_case')}")


def evaluate_response(response: str, test_case: TestCase) -> dict:
    """Multi-dimensional evaluation."""
    response_lower = response.lower()
    response_len = len(response)
    
    # Validity (25%)
    error_indicators = [
        "error:" in response_lower and test_case.category != "error_handling",
        "api key" in response_lower,
        response_len < 20,
    ]
    validity_score = 0.0 if any(error_indicators) else 1.0
    
    if test_case.category == "error_handling":
        if any(x in response_lower for x in ["error", "unknown", "available", "not", "usgs"]):
            validity_score = 1.0
    
    # Relevance (35%)
    matches = sum(1 for elem in test_case.expected_elements if elem.lower() in response_lower)
    relevance_score = min(1.0, matches / len(test_case.expected_elements))
    
    # Freshness (20%)
    freshness = [
        "2025" in response_lower or "2024" in response_lower,
        any(x in response_lower for x in ["live", "real-time", "current"]),
        any(x in response_lower for x in ["open-meteo", "usgs", "nasa", "eonet"]),
    ]
    freshness_score = min(1.0, sum(freshness) / 2)
    
    # Quality (20%)
    quality_score = min(1.0, response_len / 200)
    
    overall = (validity_score * 0.25) + (relevance_score * 0.35) + (freshness_score * 0.20) + (quality_score * 0.20)
    
    return {
        "test_id": test_case.id,
        "test_name": test_case.name,
        "category": test_case.category,
        "validity_score": round(validity_score, 2),
        "relevance_score": round(relevance_score, 2),
        "freshness_score": round(freshness_score, 2),
        "quality_score": round(quality_score, 2),
        "overall_score": round(overall, 2),
        "passed": overall >= 0.50,
    }


async def run_evaluation(test_subset: str = "all"):
    """Run evaluation suite."""
    print("\n" + "="*70)
    print("üß™ AQUA SENTINEL EVALUATION FRAMEWORK")
    print("="*70)
    
    tests = GOLDEN_DATASET if test_subset == "all" else [t for t in GOLDEN_DATASET if t.category == test_subset]
    
    print(f"\nüìä Running {len(tests)} tests (subset: {test_subset})")
    print("\nüìà Scoring Dimensions:")
    print("   ‚Ä¢ Validity (25%): Error-free response")
    print("   ‚Ä¢ Relevance (35%): Contains expected elements")
    print("   ‚Ä¢ Freshness (20%): Real-time data indicators")
    print("   ‚Ä¢ Quality (20%): Response completeness")
    print("   ‚Ä¢ Pass Threshold: Overall Score ‚â• 0.50")
    print("\n" + "-"*70)
    
    results = []
    
    for i, tc in enumerate(tests):
        print(f"\nüìã [{tc.id}] {tc.name}")
        print(f"   Category: {tc.category}")
        print(f"   Query: \"{tc.query[:60]}...\"" if len(tc.query) > 60 else f"   Query: \"{tc.query}\"")
        
        if i > 0:
            await asyncio.sleep(1)
        
        try:
            response = await query_aqua_sentinel(tc.query, verbose=False, show_observability=False)
            result = evaluate_response(response, tc)
        except Exception as e:
            result = {
                "test_id": tc.id, "test_name": tc.name, "category": tc.category,
                "validity_score": 0, "relevance_score": 0, "freshness_score": 0,
                "quality_score": 0, "overall_score": 0, "passed": False,
            }
        
        results.append(result)
        status = "‚úÖ PASS" if result["passed"] else "‚ùå FAIL"
        print(f"   ‚îú‚îÄ Validity:  {result['validity_score']:.2f}")
        print(f"   ‚îú‚îÄ Relevance: {result['relevance_score']:.2f}")
        print(f"   ‚îú‚îÄ Freshness: {result['freshness_score']:.2f}")
        print(f"   ‚îú‚îÄ Quality:   {result['quality_score']:.2f}")
        print(f"   ‚îî‚îÄ Overall:   {result['overall_score']:.2f} {status}")
    
    # Summary
    passed = sum(1 for r in results if r["passed"])
    avg = sum(r["overall_score"] for r in results) / len(results)
    
    print("\n" + "="*70)
    print("üìä EVALUATION RESULTS SUMMARY")
    print("="*70)
    print(f"\n   Tests Passed: {passed}/{len(results)}")
    print(f"   Average Score: {avg:.2f}")
    print(f"   Pass Rate: {(passed/len(results))*100:.1f}%")
    
    print("\n   Results by Category:")
    for cat in ["happy_path", "error_handling", "multi_agent", "edge_case"]:
        cat_results = [r for r in results if r.get("category") == cat]
        if cat_results:
            cat_passed = sum(1 for r in cat_results if r["passed"])
            cat_avg = sum(r["overall_score"] for r in cat_results) / len(cat_results)
            print(f"   ‚Ä¢ {cat}: {cat_passed}/{len(cat_results)} passed (avg: {cat_avg:.2f})")
    
    print("\n" + "-"*70)
    if passed == len(results):
        print("üéâ ALL TESTS PASSED - Evaluation Successful!")
    elif passed >= len(results) * 0.75:
        print("‚úÖ EVALUATION PASSED - Most tests successful")
    else:
        print("‚ö†Ô∏è EVALUATION PARTIAL - Some tests failed")
    print("="*70)
    
    return results


print("‚úÖ Evaluation framework ready")

In [None]:
# ============================================================================
# CELL 12: EXECUTE - Timing Comparison Demo
# ============================================================================

# Run timing comparison to demonstrate ParallelAgent speedup
timing_results = demonstrate_parallel_speedup("california")

In [None]:
# ============================================================================
# CELL 13: EXECUTE - Agent Demonstrations
# ============================================================================

async def run_demos():
    """Run all agent pattern demonstrations."""
    await ensure_session()
    
    print("\n" + "="*70)
    print("üåä AQUA SENTINEL - LIVE DEMONSTRATIONS")
    print("="*70)
    
    # Demo 1: ParallelAgent
    print("\n" + "-"*70)
    print("üìä DEMO 1: ParallelAgent (SentinelAgent)")
    print("-"*70)
    await query_aqua_sentinel("What is the current water situation in California?")
    
    await asyncio.sleep(2)
    
    # Demo 2: SequentialAgent
    print("\n" + "-"*70)
    print("üìä DEMO 2: SequentialAgent (GuardianAgent)")
    print("-"*70)
    await query_aqua_sentinel("What's the forecast for India? Analyze risks and recommend actions.")
    
    await asyncio.sleep(2)
    
    # Demo 3: LoopAgent
    print("\n" + "-"*70)
    print("üìä DEMO 3: LoopAgent (ResponderAgent)")
    print("-"*70)
    await query_aqua_sentinel("Send drought alert to Kenya")
    
    await asyncio.sleep(2)
    
    # Demo 4: Global Disasters
    print("\n" + "-"*70)
    print("üìä DEMO 4: Global Disasters (Direct Tool)")
    print("-"*70)
    await query_aqua_sentinel("What disasters are happening globally?")
    
    print("\n" + "="*70)
    print("üéâ ALL DEMONSTRATIONS COMPLETE")
    print("="*70)


await run_demos()

In [None]:
# ============================================================================
# CELL 14: EXECUTE - Full Evaluation (12 Test Cases)
# ============================================================================

eval_results = await run_evaluation("all")

In [None]:
# ============================================================================
# CELL 15: ADVANCED CONCEPTS DEMONSTRATION
# ============================================================================

print("="*70)
print("üîß ADVANCED ADK CONCEPTS SUMMARY")
print("="*70)

# MCP Server Info
print("\nüì° MCP (Model Context Protocol) Server:")
mcp_info = mcp_server.get_server_info()
print(f"   Server Name: {mcp_info['name']}")
print(f"   Version: {mcp_info['version']}")
print(f"   Tools Registered: {mcp_info['tools_count']}")
print("   Available Tools:")
for tool in mcp_server.list_tools():
    print(f"      ‚Ä¢ {tool['name']}: {tool['description'][:50]}...")

# Long-Running Operations
print("\n‚è≥ Long-Running Operations:")
# Create a sample operation
op_id = lro_manager.create_operation("regional_analysis", {"region": "california", "depth": "comprehensive"})
lro_manager.update_progress(op_id, 50, OperationStatus.RUNNING)
lro_manager.complete_operation(op_id, {"status": "analysis_complete", "findings": 5})
op_status = lro_manager.get_status(op_id)
print(f"   Operation ID: {op_status['id']}")
print(f"   Type: {op_status['type']}")
print(f"   Status: {op_status['status']}")
print(f"   Progress: {op_status['progress']}%")

# A2A Protocol
print("\nüîó A2A (Agent-to-Agent) Protocol:")
a2a_summary = a2a_protocol.get_communication_summary()
print(f"   Registered Agents: {a2a_summary['registered_agents']}")
print(f"   Total Messages: {a2a_summary['total_messages']}")
print(f"   Message Types: {a2a_summary['message_types']}")
print(f"   Agents: {', '.join(a2a_summary['agents'])}")

# Demonstrate A2A handoff
print("\n   Demonstrating A2A Task Handoff:")
handoff_msg = a2a_protocol.handoff_task(
    "HydroOrchestrator",
    "WeatherAgent",
    {"type": "weather_analysis", "region": "kenya"},
    {"priority": "high", "reason": "drought_monitoring"}
)
print(f"   Handoff Message ID: {handoff_msg.id}")
print(f"   From: {handoff_msg.sender} ‚Üí To: {handoff_msg.recipient}")

# Timing Comparisons
print("\n‚è±Ô∏è Timing Comparisons (Sequential vs Parallel):")
timing_data = observability.get_timing_comparison_summary()
for t in timing_data:
    print(f"   Operation: {t['operation']}")
    print(f"      Sequential: {t['sequential_ms']:.2f}ms")
    print(f"      Parallel:   {t['parallel_ms']:.2f}ms")
    print(f"      Speedup:    {t['speedup']:.2f}x")

print("\n" + "="*70)

In [None]:
# ============================================================================
# CELL 16: FINAL OBSERVABILITY DATA
# ============================================================================

print("="*70)
print("üìà FINAL OBSERVABILITY REPORT")
print("="*70)

# Metrics Summary
print("\nüìä METRICS SUMMARY:")
metrics = observability.get_metrics_summary()
for key, value in metrics.items():
    print(f"   {key}: {value}")

# Recent Trace
print("\nüîó MOST RECENT TRACE:")
trace = observability.get_trace_summary()
for key, value in trace.items():
    if key != "spans":
        print(f"   {key}: {value}")

# Recent Logs
print("\nüìù RECENT LOGS:")
for log in observability.logs[-5:]:
    print(f"   [{log['level']}] {log['message']}")

# Sent Alerts
print("\nüö® SENT ALERTS:")
for alert in ALERT_LOG:
    print(f"   {alert['alert_id']}: {alert['alert_type']} to {alert['region']} ({alert['priority']})")

print("\n" + "="*70)
print("‚úÖ AQUA SENTINEL COMPLETE")
print("="*70)

---

## üöÄ Deployment

AQUA SENTINEL deployment was attempted to **Vertex AI Agent Engine**.

### Deployment Evidence
- **Platform**: Google Cloud - Vertex AI Agent Engine
- **Project ID**: `aqua-sentinel-480105`
- **Region**: `us-central1`
- **Resource ID**: `projects/127921942048/locations/us-central1/reasoningEngines/4347921975016947712`
- **Staging Bucket**: `gs://aqua-sentinel-staging`

### Deployment Process
1. Created GCP project with $300 free credits
2. Enabled Vertex AI API
3. Created Cloud Storage staging bucket
4. Structured agent code for ADK deployment
5. Executed `adk deploy agent_engine` command

### Deployment Configuration
```json
{
    "min_instances": 0,
    "max_instances": 1,
    "resource_limits": {"cpu": "1", "memory": "1Gi"}
}
```

> **Note**: The agent was created but encountered a startup issue. See [GCP troubleshooting docs](https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/troubleshooting/deploy).

---

## üìö References

- [Google ADK Documentation](https://google.github.io/adk-docs/)
- [Vertex AI Agent Engine](https://cloud.google.com/vertex-ai/generative-ai/docs/agent-engine/overview)
- [Model Context Protocol](https://modelcontextprotocol.io/)
- [UNICEF Horn of Africa Drought](https://www.unicef.org/stories/climate-drought-horn-of-africa)
- [#TeamWater Campaign](https://teamwater.org)

---

## üë®‚Äçüíª Author

**Jai Adithya Ram Nayani**  
Computer Science Master's Student  
Kaggle AI Agents Intensive 2025

---