Run this for Kaggle setup with the Google API Key in the secrets

In [1]:
import os
from kaggle_secrets import UserSecretsClient

try:
    GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
    os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
    print("‚úÖ Gemini API key setup complete.")
except Exception as e:
    print(
        f"üîë Authentication Error: Please make sure you have added 'GOOGLE_API_KEY' to your Kaggle secrets. Details: {e}"
    )

‚úÖ Gemini API key setup complete.


Run this for local install with a .env file

In [1]:
import os
from pathlib import Path

# Try to load from .env file for local development
try:
    from dotenv import load_dotenv
    env_path = Path('.') / '.env'
    if env_path.exists():
        load_dotenv(env_path)
        print("‚úÖ Loaded environment variables from .env file")
    else:
        print("‚ö†Ô∏è  .env file not found, trying other methods...")
except ImportError:
    print("‚ö†Ô∏è  python-dotenv not installed, trying other methods...")
except Exception as e:
    print(f"‚ö†Ô∏è  Error loading .env: {e}")

# Try Kaggle secrets (for Kaggle notebooks)
if "GOOGLE_API_KEY" not in os.environ:
    try:
        from kaggle_secrets import UserSecretsClient
        GOOGLE_API_KEY = UserSecretsClient().get_secret("GOOGLE_API_KEY")
        os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
        print("‚úÖ Loaded GOOGLE_API_KEY from Kaggle secrets")
    except ImportError:
        pass  # Not in Kaggle environment
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not load from Kaggle secrets: {e}")

# Final check
if "GOOGLE_API_KEY" in os.environ:
    print("‚úÖ Setup and authentication complete.")
    print(f"   GOOGLE_API_KEY is set (length: {len(os.environ['GOOGLE_API_KEY'])})")
else:
    print("‚ùå GOOGLE_API_KEY is required. Please set it in .env file or as environment variable.")
    print("   For local development, create a .env file with: GOOGLE_API_KEY=your_key_here")

‚úÖ Loaded environment variables from .env file
‚úÖ Setup and authentication complete.
   GOOGLE_API_KEY is set (length: 39)


In [6]:
import json
import requests
import subprocess
import time
import uuid
import asyncio
import nest_asyncio

from google.adk.agents import LlmAgent
from google.adk.agents.remote_a2a_agent import (
    RemoteA2aAgent,
    AGENT_CARD_WELL_KNOWN_PATH,
)

from google.adk.a2a.utils.agent_to_a2a import to_a2a
from google.adk.models.google_llm import Gemini
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types

# Hide additional warnings in the notebook
import warnings

warnings.filterwarnings("ignore")

# Enable nested event loops for Jupyter notebooks
nest_asyncio.apply()

print("‚úÖ ADK components imported successfully.")
print("‚úÖ Async support enabled for Jupyter notebooks")

‚úÖ ADK components imported successfully.
‚úÖ Async support enabled for Jupyter notebooks


In [3]:
retry_config = types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1,
    http_status_codes=[429, 500, 503, 504],  # Retry on these HTTP errors
)

# Focus Filter - Intelligent Notification Filtering Agent

This notebook implements a **multi-agent system** that intelligently filters and manages notifications by:
- Classifying notifications as urgent, irrelevant, or less urgent
- Taking appropriate actions (pass through, block, or store in memory)
- Learning from patterns to improve over time

## Contest Track: Concierge
## Key Concepts Demonstrated:
1. **Multi-agent system** (sequential agents: Classification ‚Üí Action ‚Üí Memory)
2. **Custom tools** for notification management
3. **Sessions & Memory** (long-term memory storage)
4. **Observability** (logging and tracing)
5. **Agent evaluation** (LLM-as-judge)

## Multi-Agent Architecture

This implementation uses a **sequential multi-agent system** with three specialized agents:

1. **Classification Agent**: Analyzes notifications and determines urgency category
2. **Action Agent**: Executes appropriate actions based on classification results
3. **Memory Agent**: Handles memory extraction, consolidation, and storage

The agents work sequentially: Classification ‚Üí Action ‚Üí Memory (when needed)


In [4]:
# Notification Data Structure and Memory Management
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class Notification:
    """Represents a notification from an app or service"""
    id: str
    app: str
    title: str
    body: str
    timestamp: str
    category: Optional[str] = None
    
    def to_dict(self) -> Dict:
        return {
            "id": self.id,
            "app": self.app,
            "title": self.title,
            "body": self.body,
            "timestamp": self.timestamp,
            "category": self.category
        }
    
    def __str__(self) -> str:
        return f"[{self.app}] {self.title}: {self.body}"

# Simple in-memory storage for demonstration
class NotificationMemory:
    """Simple memory store for less urgent notifications"""
    def __init__(self):
        self.memories: List[Dict] = []
    
    def store(self, notification: Notification, extracted_fact: str):
        """Store a notification fact in memory"""
        memory = {
            "notification_id": notification.id,
            "app": notification.app,
            "extracted_fact": extracted_fact,
            "timestamp": notification.timestamp,
            "stored_at": datetime.now().isoformat()
        }
        self.memories.append(memory)
        print(f"üíæ Stored memory: {extracted_fact}")
        return memory
    
    def get_all(self) -> List[Dict]:
        """Retrieve all stored memories"""
        return self.memories
    
    def search(self, query: str) -> List[Dict]:
        """Simple keyword search (would use vector DB in production)"""
        query_lower = query.lower()
        return [
            m for m in self.memories
            if query_lower in m["extracted_fact"].lower() or 
               query_lower in m["app"].lower()
        ]

# Initialize memory store
memory_store = NotificationMemory()
print("‚úÖ Notification and Memory classes initialized")


‚úÖ Notification and Memory classes initialized


In [7]:
# Custom Tools for Notification Management

def display_urgent_notification(app: str, title: str, body: str) -> dict:
    """
    Display an urgent notification to the user immediately.
    This tool is called when a notification requires immediate attention.
    
    Args:
        app: The name of the app sending the notification
        title: The notification title
        body: The notification body text
        
    Returns:
        Dictionary with status and result information.
        Success: {"status": "success", "message": "Notification displayed"}
        Error: {"status": "error", "error_message": "..."}
    """
    print(f"\n{'='*60}")
    print(f"üö® URGENT NOTIFICATION")
    print(f"{'='*60}")
    print(f"App: {app}")
    print(f"Title: {title}")
    print(f"Body: {body}")
    print(f"{'='*60}\n")
    return {"status": "success", "message": f"Urgent notification from {app} displayed to user"}

def block_notification(app: str, title: str, reason: str) -> dict:
    """
    Block/suppress an irrelevant notification.
    This tool is called when a notification is determined to be noise.
    
    Args:
        app: The name of the app sending the notification
        title: The notification title
        reason: The reason for blocking (e.g., "social media noise", "promotional content")
        
    Returns:
        Dictionary with status and result information.
        Success: {"status": "success", "message": "Notification blocked"}
        Error: {"status": "error", "error_message": "..."}
    """
    print(f"üö´ Blocked: [{app}] {title} - {reason}")
    return {"status": "success", "message": f"Notification from {app} blocked: {reason}"}

def save_notification_memory(app: str, title: str, body: str, extracted_fact: str) -> dict:
    """
    Save a less urgent notification as a memory for later review.
    This tool extracts key information and stores it for future reference.
    
    Args:
        app: The name of the app sending the notification
        title: The notification title
        body: The notification body text
        extracted_fact: The key fact or information extracted from the notification
        
    Returns:
        Dictionary with status and result information.
        Success: {"status": "success", "message": "Memory stored"}
        Error: {"status": "error", "error_message": "..."}
    """
    # Create a temporary notification object for storage
    temp_notification = Notification(
        id=str(uuid.uuid4()),
        app=app,
        title=title,
        body=body,
        timestamp=datetime.now().isoformat()
    )
    memory_store.store(temp_notification, extracted_fact)
    return {"status": "success", "message": f"Memory stored: {extracted_fact}"}

print("‚úÖ Custom tools defined")


‚úÖ Custom tools defined


In [8]:
# ============================================================================
# MULTI-AGENT ARCHITECTURE: Sequential Agent System
# ============================================================================
# This demonstrates a multi-agent system with three specialized agents:
# 1. Classification Agent: Analyzes and classifies notifications
# 2. Action Agent: Executes actions based on classification
# 3. Memory Agent: Handles memory extraction and storage

# ----------------------------------------------------------------------------
# Agent 1: Classification Agent
# ----------------------------------------------------------------------------
# This agent's sole responsibility is to analyze notifications and determine
# their urgency category. It does NOT take actions, only classifies.

def classify_notification(app: str, title: str, body: str) -> dict:
    """
    Classify a notification into one of three categories: URGENT, IRRELEVANT, or LESS_URGENT.
    This tool is used by the Classification Agent to output its decision.
    
    Args:
        app: The name of the app sending the notification
        title: The notification title
        body: The notification body text
        
    Returns:
        Dictionary with classification result.
    """
    # This is a tool that the classification agent will call to output its decision
    # The actual classification logic is in the agent's reasoning
    pass  # Placeholder - agent will call this with classification result

classification_agent = LlmAgent(
    name="classification_agent",
    model=Gemini(model="gemini-2.0-flash-exp", retry_options=retry_config),
    instruction="""You are a Classification Agent in the Focus Filter system.

Your ONLY job is to analyze notifications and classify them into one of three categories:

1. **URGENT**: Requires immediate attention or action
   - Security alerts (bank, account access)
   - Critical deadlines or time-sensitive tasks
   - Emergency communications
   - Important personal messages requiring immediate response

2. **IRRELEVANT**: Noise that should be blocked
   - Social media likes, follows, generic updates
   - Marketing/promotional content
   - Low-value informational updates
   - Spam or unwanted notifications

3. **LESS_URGENT**: Important but not immediate - should be stored in memory
   - Project updates, deadline changes
   - Informational updates worth remembering
   - Non-critical but useful information
   - Things the user might want to reference later

When you receive a notification, analyze the app, title, and body text, then respond with:
- The classification (URGENT, IRRELEVANT, or LESS_URGENT)
- A brief reasoning for your decision
- If LESS_URGENT, also provide the key fact or information that should be extracted

Format your response as:
Classification: [URGENT/IRRELEVANT/LESS_URGENT]
Reasoning: [your reasoning]
Key Fact (if LESS_URGENT): [extracted fact]

Be conservative with URGENT - only use it for truly time-sensitive or critical items.""",
    tools=[],  # Classification agent doesn't take actions, only classifies
)

# ----------------------------------------------------------------------------
# Agent 2: Action Agent
# ----------------------------------------------------------------------------
# This agent receives classification results and executes the appropriate action

action_agent = LlmAgent(
    name="action_agent",
    model=Gemini(model="gemini-2.0-flash-exp", retry_options=retry_config),
    instruction="""You are an Action Agent in the Focus Filter system.

Your job is to execute actions based on classification results from the Classification Agent.

You will receive:
- The notification details (app, title, body)
- The classification result (URGENT, IRRELEVANT, or LESS_URGENT)
- Any additional context (like extracted facts for LESS_URGENT items)

Based on the classification, you must:
1. For URGENT: Call `display_urgent_notification(app, title, body)`
2. For IRRELEVANT: Call `block_notification(app, title, reason)` with a clear reason
3. For LESS_URGENT: Call `save_notification_memory(app, title, body, extracted_fact)` with the key fact

Execute the appropriate action immediately based on the classification provided.""",
    tools=[
        display_urgent_notification,
        block_notification,
        save_notification_memory,
    ],
)

# ----------------------------------------------------------------------------
# Agent 3: Memory Agent (for advanced memory operations)
# ----------------------------------------------------------------------------
# This agent handles memory extraction, consolidation, and retrieval

def extract_memory_fact(app: str, title: str, body: str) -> dict:
    """
    Extract the key fact or information from a notification for memory storage.
    This tool is used by the Memory Agent to extract structured information.
    
    Args:
        app: The name of the app sending the notification
        title: The notification title
        body: The notification body text
        
    Returns:
        Dictionary with extracted fact.
    """
    # This tool allows the memory agent to output extracted facts
    pass  # Placeholder - agent will call this with extracted fact

memory_agent = LlmAgent(
    name="memory_agent",
    model=Gemini(model="gemini-2.0-flash-exp", retry_options=retry_config),
    instruction="""You are a Memory Agent in the Focus Filter system.

Your job is to extract, consolidate, and manage memories from notifications.

When you receive a LESS_URGENT notification:
1. Extract the most important fact or piece of information
2. Format it as a concise, searchable memory
3. Ensure it's useful for future reference

For example:
- "Project deadline moved to Tuesday" (not "Your project deadline has moved to Tuesday")
- "New team member joined: Alice" (not the full notification text)

Focus on extracting actionable, referenceable facts that the user might want to recall later.""",
    tools=[],  # Memory agent primarily extracts facts (can be extended with retrieval tools)
)

print("‚úÖ Multi-agent system created:")
print("  ‚Ä¢ Classification Agent - Analyzes and classifies notifications")
print("  ‚Ä¢ Action Agent - Executes actions based on classification")
print("  ‚Ä¢ Memory Agent - Handles memory extraction and consolidation")


‚úÖ Multi-agent system created:
  ‚Ä¢ Classification Agent - Analyzes and classifies notifications
  ‚Ä¢ Action Agent - Executes actions based on classification
  ‚Ä¢ Memory Agent - Handles memory extraction and consolidation


In [None]:
# ============================================================================
# OBSERVABILITY: Logging, Tracing, and Metrics
# ============================================================================
# This module provides comprehensive observability for the multi-agent system

import logging
from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional
import json

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("FocusFilter")

class ObservabilityManager:
    """Manages logging, tracing, and metrics for the multi-agent system"""
    
    def __init__(self):
        self.traces: List[Dict] = []
        self.metrics = {
            "total_notifications": 0,
            "classifications": defaultdict(int),
            "actions": defaultdict(int),
            "agent_timings": defaultdict(list),
            "errors": []
        }
        self.current_trace: Optional[Dict] = None
    
    def start_trace(self, notification_id: str, notification_text: str):
        """Start a new trace for a notification processing session"""
        self.current_trace = {
            "trace_id": f"trace_{uuid.uuid4().hex[:8]}",
            "notification_id": notification_id,
            "notification_text": notification_text,
            "start_time": datetime.now().isoformat(),
            "agents": [],
            "classification": None,
            "action": None,
            "memory_operation": None,
            "end_time": None,
            "duration_ms": None,
            "errors": []
        }
        logger.info(f"üîç Trace started: {self.current_trace['trace_id']}")
    
    def log_agent_step(self, agent_name: str, step: str, input_data: Dict, output_data: Dict, duration_ms: float):
        """Log an agent step in the current trace"""
        if self.current_trace is None:
            return
        
        agent_step = {
            "agent": agent_name,
            "step": step,
            "timestamp": datetime.now().isoformat(),
            "input": input_data,
            "output": output_data,
            "duration_ms": duration_ms
        }
        self.current_trace["agents"].append(agent_step)
        self.metrics["agent_timings"][f"{agent_name}_{step}"].append(duration_ms)
        
        logger.info(f"üìä {agent_name} - {step} completed in {duration_ms:.2f}ms")
    
    def log_classification(self, classification: str, reasoning: str, confidence: Optional[float] = None):
        """Log classification result"""
        if self.current_trace is None:
            return
        
        self.current_trace["classification"] = {
            "result": classification,
            "reasoning": reasoning,
            "confidence": confidence,
            "timestamp": datetime.now().isoformat()
        }
        self.metrics["classifications"][classification] += 1
        
        logger.info(f"üè∑Ô∏è  Classification: {classification} - {reasoning[:100]}")
    
    def log_action(self, action_type: str, action_details: Dict):
        """Log action execution"""
        if self.current_trace is None:
            return
        
        self.current_trace["action"] = {
            "type": action_type,
            "details": action_details,
            "timestamp": datetime.now().isoformat()
        }
        self.metrics["actions"][action_type] += 1
        
        logger.info(f"‚ö° Action: {action_type} - {json.dumps(action_details, default=str)}")
    
    def log_memory_operation(self, operation: str, details: Dict):
        """Log memory operation"""
        if self.current_trace is None:
            return
        
        self.current_trace["memory_operation"] = {
            "operation": operation,
            "details": details,
            "timestamp": datetime.now().isoformat()
        }
        
        logger.info(f"üíæ Memory: {operation} - {json.dumps(details, default=str)}")
    
    def log_error(self, error_type: str, error_message: str, context: Optional[Dict] = None):
        """Log an error"""
        error_entry = {
            "type": error_type,
            "message": error_message,
            "context": context or {},
            "timestamp": datetime.now().isoformat()
        }
        self.metrics["errors"].append(error_entry)
        
        if self.current_trace:
            self.current_trace["errors"].append(error_entry)
        
        logger.error(f"‚ùå Error [{error_type}]: {error_message}")
    
    def end_trace(self):
        """End the current trace and store it"""
        if self.current_trace is None:
            return
        
        end_time = datetime.now()
        start_time = datetime.fromisoformat(self.current_trace["start_time"])
        duration_ms = (end_time - start_time).total_seconds() * 1000
        
        self.current_trace["end_time"] = end_time.isoformat()
        self.current_trace["duration_ms"] = duration_ms
        
        self.traces.append(self.current_trace.copy())
        self.metrics["total_notifications"] += 1
        
        logger.info(f"‚úÖ Trace completed: {self.current_trace['trace_id']} in {duration_ms:.2f}ms")
        self.current_trace = None
    
    def get_metrics_summary(self) -> Dict:
        """Get a summary of all metrics"""
        avg_timings = {}
        for key, timings in self.metrics["agent_timings"].items():
            if timings:
                avg_timings[key] = {
                    "avg_ms": sum(timings) / len(timings),
                    "min_ms": min(timings),
                    "max_ms": max(timings),
                    "count": len(timings)
                }
        
        return {
            "total_notifications": self.metrics["total_notifications"],
            "classifications": dict(self.metrics["classifications"]),
            "actions": dict(self.metrics["actions"]),
            "average_timings": avg_timings,
            "error_count": len(self.metrics["errors"]),
            "total_traces": len(self.traces)
        }
    
    def get_recent_traces(self, limit: int = 10) -> List[Dict]:
        """Get the most recent traces"""
        return self.traces[-limit:] if self.traces else []
    
    def print_metrics_summary(self):
        """Print a formatted metrics summary"""
        summary = self.get_metrics_summary()
        
        print("\n" + "="*70)
        print("üìä OBSERVABILITY METRICS SUMMARY")
        print("="*70)
        print(f"\nüìà Total Notifications Processed: {summary['total_notifications']}")
        print(f"üìù Total Traces Captured: {summary['total_traces']}")
        
        print("\nüè∑Ô∏è  Classification Distribution:")
        for classification, count in summary['classifications'].items():
            percentage = (count / summary['total_notifications'] * 100) if summary['total_notifications'] > 0 else 0
            print(f"   {classification}: {count} ({percentage:.1f}%)")
        
        print("\n‚ö° Action Distribution:")
        for action, count in summary['actions'].items():
            percentage = (count / summary['total_notifications'] * 100) if summary['total_notifications'] > 0 else 0
            print(f"   {action}: {count} ({percentage:.1f}%)")
        
        if summary['average_timings']:
            print("\n‚è±Ô∏è  Performance Metrics:")
            for key, timing in summary['average_timings'].items():
                print(f"   {key}:")
                print(f"      Average: {timing['avg_ms']:.2f}ms")
                print(f"      Min: {timing['min_ms']:.2f}ms")
                print(f"      Max: {timing['max_ms']:.2f}ms")
                print(f"      Count: {timing['count']}")
        
        if summary['error_count'] > 0:
            print(f"\n‚ùå Errors: {summary['error_count']}")
            for error in self.metrics['errors'][-5:]:  # Show last 5 errors
                print(f"   [{error['type']}] {error['message']}")
        
        print("="*70 + "\n")

# Initialize observability manager
obs_manager = ObservabilityManager()

print("‚úÖ Observability system initialized")
print("   ‚Ä¢ Structured logging enabled")
print("   ‚Ä¢ Trace capture ready")
print("   ‚Ä¢ Metrics collection active")


In [None]:
# ============================================================================
# ORCHESTRATION LAYER: Sequential Agent Coordination
# ============================================================================
# This layer coordinates the sequential flow: Classification ‚Üí Action ‚Üí Memory

# Create runners for each agent
classification_runner = Runner(
    app_name="FocusFilter_Classification",
    agent=classification_agent,
    session_service=InMemorySessionService(),
)

action_runner = Runner(
    app_name="FocusFilter_Action",
    agent=action_agent,
    session_service=InMemorySessionService(),
)

memory_runner = Runner(
    app_name="FocusFilter_Memory",
    agent=memory_agent,
    session_service=InMemorySessionService(),
)

# Main session service for the orchestration
session_service = InMemorySessionService()

def parse_classification_result(text: str) -> dict:
    """Parse the classification agent's response to extract structured data"""
    result = {
        "classification": None,
        "reasoning": None,
        "key_fact": None
    }
    
    text_lower = text.lower()
    
    # Try to find classification - look for URGENT, IRRELEVANT, or LESS_URGENT
    for classification in ["URGENT", "IRRELEVANT", "LESS_URGENT", "LESS URGENT"]:
        if classification.lower() in text_lower:
            # Check if it's in a structured format like "Classification: URGENT"
            if "classification:" in text_lower:
                idx = text_lower.find("classification:")
                after_colon = text[text_lower.find("classification:") + len("classification:"):].strip()
                # Extract the first word or phrase after the colon
                words = after_colon.split()
                if words:
                    result["classification"] = words[0].upper().rstrip(".,;")
                    break
            else:
                # Look for the classification word in context
                result["classification"] = classification.upper()
                break
    
    # Extract reasoning
    if "reasoning:" in text_lower:
        idx = text_lower.find("reasoning:")
        reasoning_text = text[idx + len("reasoning:"):].strip()
        # Take up to the next section or end of text
        next_section = min(
            reasoning_text.find("\n\n"),
            reasoning_text.find("Key Fact"),
            reasoning_text.find("key fact"),
            len(reasoning_text)
        )
        if next_section > 0:
            result["reasoning"] = reasoning_text[:next_section].strip()
        else:
            result["reasoning"] = reasoning_text.strip()
    
    # Extract key fact (for LESS_URGENT items)
    if "key fact" in text_lower:
        idx = text_lower.find("key fact")
        fact_text = text[idx + len("key fact"):].strip()
        # Remove colon if present
        if fact_text.startswith(":"):
            fact_text = fact_text[1:].strip()
        # Take up to next line break or end
        next_line = fact_text.find("\n")
        if next_line > 0:
            result["key_fact"] = fact_text[:next_line].strip()
        else:
            result["key_fact"] = fact_text.strip()
    
    # Fallback: if classification not found, try to infer from text
    if result["classification"] is None:
        if any(word in text_lower for word in ["urgent", "immediate", "critical", "security alert"]):
            result["classification"] = "URGENT"
        elif any(word in text_lower for word in ["irrelevant", "noise", "block", "spam"]):
            result["classification"] = "IRRELEVANT"
        elif any(word in text_lower for word in ["less urgent", "store", "memory", "later"]):
            result["classification"] = "LESS_URGENT"
    
    return result

async def process_notification_multi_agent(
    notification_text: str,
    user_id: str = "default",
    session_id: str = None
):
    """
    Orchestrate the sequential multi-agent processing of a notification.
    
    Flow:
    1. Classification Agent analyzes and classifies
    2. Action Agent executes appropriate action
    3. Memory Agent (if needed) handles memory extraction
    
    Args:
        notification_text: The notification message to process
        user_id: User identifier
        session_id: Optional session identifier (auto-generated if None)
    """
    import time
    
    if session_id is None:
        session_id = f"session_{uuid.uuid4().hex[:8]}"
    
    # Start observability trace
    notification_id = f"notif_{uuid.uuid4().hex[:8]}"
    obs_manager.start_trace(notification_id, notification_text)
    
    print(f"\n{'='*70}")
    print(f"üîç Processing notification with multi-agent system")
    print(f"{'='*70}\n")
    
    # Step 1: Classification Agent
    print("üìä Step 1: Classification Agent analyzing...")
    classification_start = time.time()
    classification_result_text = ""
    
    try:
        session = await classification_runner.session_service.create_session(
            app_name=classification_runner.app_name,
            user_id=user_id,
            session_id=f"{session_id}_classify"
        )
    except Exception as e:
        obs_manager.log_error("session_creation", f"Failed to create classification session: {e}")
        session = await classification_runner.session_service.get_session(
            app_name=classification_runner.app_name,
            user_id=user_id,
            session_id=f"{session_id}_classify"
        )
    
    message = types.Content(
        role="user",
        parts=[types.Part(text=notification_text)]
    )
    
    try:
        async for event in classification_runner.run_async(
            user_id=user_id,
            session_id=session.id,
            new_message=message
        ):
            if event.content and event.content.parts:
                if event.content.parts[0].text:
                    classification_result_text += event.content.parts[0].text
    except Exception as e:
        obs_manager.log_error("classification_error", f"Classification agent error: {e}", {"session_id": session_id})
        raise
    
    classification_duration = (time.time() - classification_start) * 1000
    print(f"‚úÖ Classification complete")
    print(f"   Result: {classification_result_text[:200]}...\n")
    
    # Parse classification result
    classification_data = parse_classification_result(classification_result_text)
    
    # Log classification step
    obs_manager.log_agent_step(
        agent_name="Classification Agent",
        step="classify",
        input_data={"notification_text": notification_text[:200]},
        output_data=classification_data,
        duration_ms=classification_duration
    )
    
    # Log classification result
    if classification_data['classification']:
        obs_manager.log_classification(
            classification=classification_data['classification'],
            reasoning=classification_data.get('reasoning', 'No reasoning provided')
        )
    
    # Extract notification details from input
    # Format: "I received a notification: App: X, Title: Y, Body: Z"
    app = "Unknown"
    title = "Unknown"
    body = "Unknown"
    
    if "App:" in notification_text:
        parts = notification_text.split("App:")[-1]
        if "Title:" in parts:
            app = parts.split("Title:")[0].strip().rstrip(",")
            title_part = parts.split("Title:")[-1]
            if "Body:" in title_part:
                title = title_part.split("Body:")[0].strip().rstrip(",")
                body = title_part.split("Body:")[-1].strip()
    
    # Step 2: Action Agent
    print("‚ö° Step 2: Action Agent executing...")
    action_start = time.time()
    
    # Prepare action message with classification result
    action_message_text = f"""Notification Details:
- App: {app}
- Title: {title}
- Body: {body}

Classification Result:
- Classification: {classification_data['classification']}
- Reasoning: {classification_data['reasoning']}
{f"- Key Fact: {classification_data['key_fact']}" if classification_data['key_fact'] else ""}

Please execute the appropriate action based on the classification."""
    
    try:
        action_session = await action_runner.session_service.create_session(
            app_name=action_runner.app_name,
            user_id=user_id,
            session_id=f"{session_id}_action"
        )
    except Exception as e:
        obs_manager.log_error("session_creation", f"Failed to create action session: {e}")
        action_session = await action_runner.session_service.get_session(
            app_name=action_runner.app_name,
            user_id=user_id,
            session_id=f"{session_id}_action"
        )
    
    action_message = types.Content(
        role="user",
        parts=[types.Part(text=action_message_text)]
    )
    
    action_output = ""
    try:
        async for event in action_runner.run_async(
            user_id=user_id,
            session_id=action_session.id,
            new_message=action_message
        ):
            if event.content and event.content.parts:
                if event.content.parts[0].text:
                    action_output += event.content.parts[0].text
                    print(event.content.parts[0].text)
    except Exception as e:
        obs_manager.log_error("action_error", f"Action agent error: {e}", {"session_id": session_id})
        raise
    
    action_duration = (time.time() - action_start) * 1000
    
    # Determine action type from output
    action_type = "unknown"
    if "display_urgent_notification" in action_output.lower() or "urgent notification" in action_output.lower():
        action_type = "display_urgent"
    elif "block" in action_output.lower() or "blocked" in action_output.lower():
        action_type = "block"
    elif "save" in action_output.lower() or "memory" in action_output.lower() or "stored" in action_output.lower():
        action_type = "save_memory"
    
    # Log action step
    obs_manager.log_agent_step(
        agent_name="Action Agent",
        step="execute",
        input_data={
            "classification": classification_data['classification'],
            "app": app,
            "title": title
        },
        output_data={"action_type": action_type, "output": action_output[:200]},
        duration_ms=action_duration
    )
    
    # Log action
    obs_manager.log_action(
        action_type=action_type,
        action_details={
            "app": app,
            "title": title,
            "classification": classification_data['classification']
        }
    )
    
    print(f"‚úÖ Action execution complete\n")
    
    # Step 3: Memory Agent (only for LESS_URGENT items that need extraction refinement)
    if classification_data['classification'] == 'LESS_URGENT':
        print("üíæ Step 3: Memory Agent extracting fact...")
        memory_start = time.time()
        
        # Log memory operation
        if classification_data.get('key_fact'):
            obs_manager.log_memory_operation(
                operation="store",
                details={
                    "app": app,
                    "extracted_fact": classification_data['key_fact'],
                    "title": title
                }
            )
        
        memory_duration = (time.time() - memory_start) * 1000
        obs_manager.log_agent_step(
            agent_name="Memory Agent",
            step="extract",
            input_data={"notification": f"{app}: {title}"},
            output_data={"extracted_fact": classification_data.get('key_fact', '')},
            duration_ms=memory_duration
        )
        
        # The action agent already stored the memory, but we can use memory agent
        # for additional processing if needed (consolidation, deduplication, etc.)
        # For now, we'll skip this step as the action agent handles storage
        print("‚úÖ Memory extraction handled by Action Agent\n")
    
    # End trace
    obs_manager.end_trace()
    
    print(f"{'='*70}")
    print(f"‚úÖ Multi-agent processing complete")
    print(f"{'='*70}\n")

# Legacy helper function for backward compatibility (uses single agent approach)
async def run_notification_test(runner_instance, session_service, user_id, session_id, message_text):
    """Helper function to test notification processing (legacy single-agent)"""
    # Create or get session
    try:
        session = await session_service.create_session(
            app_name=runner_instance.app_name, user_id=user_id, session_id=session_id
        )
    except:
        session = await session_service.get_session(
            app_name=runner_instance.app_name, user_id=user_id, session_id=session_id
        )
    
    # Convert message to Content format
    message = types.Content(
        role="user",
        parts=[types.Part(text=message_text)]
    )
    
    # Process the notification
    async for event in runner_instance.run_async(
        user_id=user_id, session_id=session.id, new_message=message
    ):
        if event.content and event.content.parts:
            if event.content.parts[0].text:
                print(event.content.parts[0].text)

print("‚úÖ Multi-agent orchestration layer initialized")
print("‚úÖ Sequential agent coordination ready")
print("‚úÖ Helper functions for async operations ready")


‚úÖ Multi-agent orchestration layer initialized
‚úÖ Sequential agent coordination ready
‚úÖ Helper functions for async operations ready


## Testing the Agent

Let's test the agent with sample notifications:


In [10]:
# Test notification 1: Urgent security alert
USER_ID = "test"

await process_notification_multi_agent(
    "I received a notification: App: Banking App, Title: Security Alert, Body: Your bank flagged suspicious activity on your account. Please verify immediately.",
    user_id=USER_ID,
    session_id="test_session_1"
)

print("\n" + "="*70)
print("Test 1 Complete")
print("="*70)



üîç Processing notification with multi-agent system

üìä Step 1: Classification Agent analyzing...
‚úÖ Classification complete
   Result: Classification: URGENT
Reasoning: A security alert from a bank indicating suspicious activity requires immediate attention to prevent potential fraud or loss.
Key Fact (if LESS_URGENT): N/A
...

‚ö° Step 2: Action Agent executing...





üö® URGENT NOTIFICATION
App: Banking App
Title: Security Alert
Body: Your bank flagged suspicious activity on your account. Please verify immediately.

OK. I have displayed the urgent notification to the user.

‚úÖ Action execution complete

‚úÖ Multi-agent processing complete


Test 1 Complete


In [11]:
# Test notification 2: Irrelevant social media
await process_notification_multi_agent(
    "I received a notification: App: Social Media, Title: New Like, Body: 3 new people liked your photo.",
    user_id=USER_ID,
    session_id="test_session_2"
)

print("\n" + "="*70)
print("Test 2 Complete")
print("="*70)



üîç Processing notification with multi-agent system

üìä Step 1: Classification Agent analyzing...
‚úÖ Classification complete
   Result: Classification: IRRELEVANT
Reasoning: This is a standard social media notification about likes, which doesn't require immediate action or contain important information.
...

‚ö° Step 2: Action Agent executing...




üö´ Blocked: [Social Media] New Like - social media noise
OK. I have blocked the notification from Social Media with the title "New Like" because it is social media noise.

‚úÖ Action execution complete

‚úÖ Multi-agent processing complete


Test 2 Complete


In [None]:
# Display observability metrics
obs_manager.print_metrics_summary()


In [None]:
# Display a detailed trace example
print("\n" + "="*70)
print("üîç DETAILED TRACE EXAMPLE")
print("="*70)

recent_traces = obs_manager.get_recent_traces(limit=1)
if recent_traces:
    trace = recent_traces[0]
    print(f"\nTrace ID: {trace['trace_id']}")
    print(f"Notification ID: {trace['notification_id']}")
    print(f"Duration: {trace['duration_ms']:.2f}ms")
    print(f"\nNotification Text: {trace['notification_text'][:100]}...")
    
    if trace['classification']:
        print(f"\nüìä Classification:")
        print(f"   Result: {trace['classification']['result']}")
        print(f"   Reasoning: {trace['classification']['reasoning'][:150]}...")
    
    if trace['action']:
        print(f"\n‚ö° Action:")
        print(f"   Type: {trace['action']['type']}")
        print(f"   Details: {trace['action']['details']}")
    
    if trace['memory_operation']:
        print(f"\nüíæ Memory Operation:")
        print(f"   Operation: {trace['memory_operation']['operation']}")
        print(f"   Details: {trace['memory_operation']['details']}")
    
    if trace['agents']:
        print(f"\nü§ñ Agent Steps:")
        for agent_step in trace['agents']:
            print(f"   {agent_step['agent']} - {agent_step['step']}: {agent_step['duration_ms']:.2f}ms")
    
    if trace['errors']:
        print(f"\n‚ùå Errors: {len(trace['errors'])}")
        for error in trace['errors']:
            print(f"   [{error['type']}] {error['message']}")
else:
    print("No traces available yet. Run some notifications first!")

print("="*70 + "\n")


In [12]:
# Test notification 3: Less urgent project update
await process_notification_multi_agent(
    "I received a notification: App: Project Manager, Title: Deadline Update, Body: Your project deadline has moved to Tuesday.",
    user_id=USER_ID,
    session_id="test_session_3"
)

print("\n" + "="*70)
print("Test 3 Complete")
print("="*70)



üîç Processing notification with multi-agent system

üìä Step 1: Classification Agent analyzing...
‚úÖ Classification complete
   Result: Classification: LESS_URGENT
Reasoning: This is an update that is important to remember but doesn't require immediate action.
Key Fact (if LESS_URGENT): Project deadline moved to Tuesday.
...

‚ö° Step 2: Action Agent executing...




üíæ Stored memory: Project deadline moved to Tuesday.
OK. I have saved the notification to memory.

‚úÖ Action execution complete

üíæ Step 3: Memory Agent extracting fact...
‚úÖ Memory extraction handled by Action Agent

‚úÖ Multi-agent processing complete


Test 3 Complete


In [13]:
# Display stored memories
print("\n" + "="*70)
print("üíæ Stored Memories:")
print("="*70)

memories = memory_store.get_all()
if memories:
    for i, memory in enumerate(memories, 1):
        print(f"\n{i}. {memory['extracted_fact']}")
        print(f"   From: {memory['app']} (stored at {memory['stored_at']})")
else:
    print("No memories stored yet.")



üíæ Stored Memories:

1. Project deadline moved to Tuesday.
   From: Project Manager (stored at 2025-11-30T21:46:49.080414)


## Architecture Overview

This implementation demonstrates:

1. **Multi-Agent System**: Sequential agents (Classification ‚Üí Action ‚Üí Memory)
   - **Classification Agent**: Analyzes and classifies notifications
   - **Action Agent**: Executes actions based on classification
   - **Memory Agent**: Handles memory extraction and consolidation
2. **Custom Tools**: Three tools for notification management (display, block, save)
3. **Memory Management**: Simple in-memory storage (can be extended to vector DB)
4. **Orchestration Layer**: Coordinates sequential agent flow
5. **Observability**: Comprehensive logging, tracing, and metrics collection
6. **Agentic Loop**: Get Mission ‚Üí Think ‚Üí Act ‚Üí Observe pattern

## Multi-Agent Flow

```
Notification Input
    ‚Üì
[Classification Agent] ‚Üí Classifies as URGENT/IRRELEVANT/LESS_URGENT
    ‚Üì
[Action Agent] ‚Üí Executes appropriate action (display/block/save)
    ‚Üì
[Memory Agent] ‚Üí (Optional) Refines memory extraction for LESS_URGENT items
    ‚Üì
Result
```

## Next Steps for Full Implementation

- ‚úÖ Multi-agent architecture (COMPLETE)
- ‚úÖ Observability (COMPLETE - logging, tracing, metrics)
- Add vector database for semantic memory search
- Add context engineering with few-shot examples
- Add agent evaluation framework with LLM-as-judge
- Add user preference learning
