# Assignment 5  Jazil- Multi-Agent Customer Service System with A2A and MCP



In [4]:
import sys
import os

# Compatibility patch for a2a-sdk
from a2a.client import client as real_client_module
from a2a.client.card_resolver import A2ACardResolver

class PatchedClientModule:
    def __init__(self, real_module) -> None:
        for attr in dir(real_module):
            if not attr.startswith('_'):
                setattr(self, attr, getattr(real_module, attr))
        self.A2ACardResolver = A2ACardResolver

patched_module = PatchedClientModule(real_client_module)
sys.modules['a2a.client.client'] = patched_module

import asyncio
import logging
import threading
import time
from typing import Any, Optional
import json
import re
import datetime
import uuid

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai.types import Part, Content
from types import SimpleNamespace

nest_asyncio.apply()

logging.basicConfig(
    level=logging.INFO,  # Changed to INFO for better visibility
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
)


# DATABASE SETUP

In [5]:
import sqlite3

DB = "mcp.db"

def db_conn():
    conn = sqlite3.connect(DB, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn

def create_and_seed():
    conn = db_conn()
    cur = conn.cursor()
    cur.execute("""
    CREATE TABLE IF NOT EXISTS customers (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT,
        email TEXT,
        phone TEXT,
        status TEXT,
        created_at TEXT,
        updated_at TEXT
    )""")
    cur.execute("""
    CREATE TABLE IF NOT EXISTS tickets (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        customer_id INTEGER,
        issue TEXT,
        status TEXT,
        priority TEXT,
        created_at TEXT
    )""")
    conn.commit()
    cur.execute("DELETE FROM tickets")
    cur.execute("DELETE FROM customers")
    now = datetime.datetime.now(datetime.UTC).isoformat()
    customers = [
        ("Alice Premium","alice@example.com","111-111-1111","active",now,now),
        ("Bob Standard","bob@example.com","222-222-2222","active",now,now),
        ("Charlie Disabled","charlie@example.com","333-333-3333","disabled",now,now),
        ("Diana Premium","diana@example.com","444-444-4444","active",now,now),
        ("Eve Standard","eve@example.com","555-555-5555","active",now,now),
    ]
    cur.executemany("INSERT INTO customers (name,email,phone,status,created_at,updated_at) VALUES (?,?,?,?,?,?)", customers)
    tickets = [
        (1, "Billing duplicate charge", "open", "high", now),
        (1, "Unable to login", "in_progress", "medium", now),
        (2, "Request upgrade", "open", "low", now),
        (4, "Critical outage", "open", "high", now),
        (5, "Password reset", "open", "low", now),
    ]
    cur.executemany("INSERT INTO tickets (customer_id,issue,status,priority,created_at) VALUES (?,?,?,?,?)", tickets)
    conn.commit()
    conn.close()
    print("✓ Database created & seeded at", DB)

create_and_seed()


✓ Database created & seeded at mcp.db


# MCP 

In [6]:
class MCPClient:
    """Client for calling MCP server tools."""
    
    def __init__(self, mcp_server_url: str = "http://127.0.0.1:8000"):
        self.mcp_server_url = mcp_server_url
        self._client = None
    
    async def _get_client(self):
        """Get or create HTTP client."""
        if self._client is None:
            self._client = httpx.AsyncClient(timeout=30.0)
        return self._client
    
    async def call_tool(self, tool_name: str, **params):
        """Call an MCP tool."""
        client = await self._get_client()
        try:
            response = await client.post(
                f"{self.mcp_server_url}/call",
                json={"tool": tool_name, "params": params}
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            return {"success": False, "error": f"MCP call failed: {str(e)}"}
    
    async def get_customer(self, customer_id: int):
        """Get customer by ID via MCP."""
        return await self.call_tool("get_customer", customer_id=customer_id)
    
    async def list_customers(self, status: str = None, limit: int = 100):
        """List customers via MCP."""
        params = {"limit": limit}
        if status:
            params["status"] = status
        return await self.call_tool("list_customers", **params)
    
    async def update_customer(self, customer_id: int, data: dict):
        """Update customer via MCP."""
        return await self.call_tool("update_customer", customer_id=customer_id, data=data)
    
    async def create_ticket(self, customer_id: int, issue: str, priority: str = "medium"):
        """Create ticket via MCP."""
        return await self.call_tool("create_ticket", customer_id=customer_id, issue=issue, priority=priority)
    
    async def get_customer_history(self, customer_id: int):
        """Get customer ticket history via MCP."""
        return await self.call_tool("get_customer_history", customer_id=customer_id)
    
    async def close(self):
        """Close HTTP client."""
        if self._client:
            await self._client.aclose()
            self._client = None

# Create global MCP client instance
mcp_client = MCPClient()
print("✓ MCP Client created (connecting to http://127.0.0.1:8000)")


✓ MCP Client created (connecting to http://127.0.0.1:8000)


"""Standalone MCP Server for testing."""
import sqlite3
import datetime
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route

# Database setup
DB = "mcp.db"

def db_conn():
    conn = sqlite3.connect(DB, check_same_thread=False)
    conn.row_factory = sqlite3.Row
    return conn

# Initialize database if needed
def init_db():
    """Initialize database with sample data."""
    conn = db_conn()
    cur = conn.cursor()
    
    # Create tables
    cur.execute("""
        CREATE TABLE IF NOT EXISTS customers (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT,
            email TEXT,
            phone TEXT,
            status TEXT,
            created_at TEXT,
            updated_at TEXT
        )""")
    cur.execute("""
        CREATE TABLE IF NOT EXISTS tickets (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            customer_id INTEGER,
            issue TEXT,
            status TEXT,
            priority TEXT,
            created_at TEXT
        )""")
    conn.commit()
    
    # Check if data exists
    cur.execute("SELECT COUNT(*) FROM customers")
    count = cur.fetchone()[0]
    
    if count == 0:
        # Insert sample data
        now = datetime.datetime.now(datetime.UTC).isoformat()
        customers = [
            ("Alice Premium", "alice@example.com", "111-111-1111", "active", now, now),
            ("Bob Standard", "bob@example.com", "222-222-2222", "active", now, now),
            ("Charlie Disabled", "charlie@example.com", "333-333-3333", "disabled", now, now),
            ("Diana Premium", "diana@example.com", "444-444-4444", "active", now, now),
            ("Eve Standard", "eve@example.com", "555-555-5555", "active", now, now),
        ]
        cur.executemany(
            "INSERT INTO customers (name,email,phone,status,created_at,updated_at) VALUES (?,?,?,?,?,?)",
            customers
        )
        tickets = [
            (1, "Billing duplicate charge", "open", "high", now),
            (1, "Unable to login", "in_progress", "medium", now),
            (2, "Request upgrade", "open", "low", now),
            (4, "Critical outage", "open", "high", now),
            (5, "Password reset", "open", "low", now),
        ]
        cur.executemany(
            "INSERT INTO tickets (customer_id,issue,status,priority,created_at) VALUES (?,?,?,?,?)",
            tickets
        )
        conn.commit()
        print(f"✓ Database initialized with sample data")
    else:
        print(f"✓ Database already has {count} customers")
    
    conn.close()


# MCP Server Implementation
class MCPServer:
    """MCP Server that exposes database operations as tools."""
    
    def __init__(self):
        self.tools = {
            "get_customer": self._get_customer_tool,
            "list_customers": self._list_customers_tool,
            "update_customer": self._update_customer_tool,
            "create_ticket": self._create_ticket_tool,
            "get_customer_history": self._get_customer_history_tool,
        }
    
    async def _get_customer_tool(self, customer_id: int):
        """MCP tool: Get customer by ID."""
        conn = db_conn()
        cur = conn.cursor()
        cur.execute("SELECT * FROM customers WHERE id=?", (customer_id,))
        row = cur.fetchone()
        conn.close()
        if row:
            return {"success": True, "data": dict(row)}
        return {"success": False, "error": f"Customer {customer_id} not found"}
    
    async def _list_customers_tool(self, status: str = None, limit: int = 100):
        """MCP tool: List customers, optionally filtered by status."""
        conn = db_conn()
        cur = conn.cursor()
        if status:
            cur.execute("SELECT * FROM customers WHERE status=? LIMIT ?", (status, limit))
        else:
            cur.execute("SELECT * FROM customers LIMIT ?", (limit,))
        rows = cur.fetchall()
        conn.close()
        return {"success": True, "data": [dict(r) for r in rows], "count": len(rows)}
    
    async def _update_customer_tool(self, customer_id: int, data: dict):
        """MCP tool: Update customer fields."""
        conn = db_conn()
        cur = conn.cursor()
        
        # Build update query
        allowed_fields = ["name", "email", "phone", "status"]
        updates = {k: v for k, v in data.items() if k in allowed_fields}
        
        if not updates:
            conn.close()
            return {"success": False, "error": "No valid fields to update"}
        
        set_clause = ", ".join([f"{k}=?" for k in updates.keys()])
        params = list(updates.values()) + [datetime.datetime.now(datetime.UTC).isoformat(), customer_id]
        cur.execute(f"UPDATE customers SET {set_clause}, updated_at=? WHERE id=?", params)
        conn.commit()
        
        # Fetch updated record
        cur.execute("SELECT * FROM customers WHERE id=?", (customer_id,))
        row = cur.fetchone()
        conn.close()
        
        if row:
            return {"success": True, "data": dict(row)}
        return {"success": False, "error": f"Customer {customer_id} not found"}
    
    async def _create_ticket_tool(self, customer_id: int, issue: str, priority: str = "medium"):
        """MCP tool: Create a new support ticket."""
        conn = db_conn()
        cur = conn.cursor()
        
        # Validate priority
        if priority.lower() not in ["low", "medium", "high"]:
            conn.close()
            return {"success": False, "error": "Priority must be 'low', 'medium', or 'high'"}
        
        now = datetime.datetime.now(datetime.UTC).isoformat()
        cur.execute(
            "INSERT INTO tickets (customer_id, issue, status, priority, created_at) VALUES (?, ?, ?, ?, ?)",
            (customer_id, issue, "open", priority.lower(), now)
        )
        conn.commit()
        ticket_id = cur.lastrowid
        cur.execute("SELECT * FROM tickets WHERE id=?", (ticket_id,))
        row = cur.fetchone()
        conn.close()
        
        if row:
            return {"success": True, "data": dict(row)}
        return {"success": False, "error": "Failed to create ticket"}
    
    async def _get_customer_history_tool(self, customer_id: int):
        """MCP tool: Get ticket history for a customer."""
        conn = db_conn()
        cur = conn.cursor()
        cur.execute(
            "SELECT * FROM tickets WHERE customer_id=? ORDER BY created_at DESC",
            (customer_id,)
        )
        rows = cur.fetchall()
        conn.close()
        return {"success": True, "data": [dict(r) for r in rows], "count": len(rows)}
    
    async def call_tool(self, tool_name: str, **kwargs):
        """Call an MCP tool by name."""
        if tool_name not in self.tools:
            return {"success": False, "error": f"Unknown tool: {tool_name}"}
        try:
            return await self.tools[tool_name](**kwargs)
        except Exception as e:
            return {"success": False, "error": str(e)}


# Create MCP server instance
mcp_server = MCPServer()

# MCP Server HTTP endpoints
async def mcp_tools_list(request):
    """List available MCP tools."""
    return JSONResponse({
        "tools": [
            {
                "name": "get_customer",
                "description": "Get customer information by ID",
                "parameters": {"customer_id": "integer"}
            },
            {
                "name": "list_customers",
                "description": "List customers, optionally filtered by status",
                "parameters": {"status": "string (optional)", "limit": "integer (optional)"}
            },
            {
                "name": "update_customer",
                "description": "Update customer fields (name, email, phone, status)",
                "parameters": {"customer_id": "integer", "data": "dict"}
            },
            {
                "name": "create_ticket",
                "description": "Create a new support ticket",
                "parameters": {"customer_id": "integer", "issue": "string", "priority": "string (low/medium/high)"}
            },
            {
                "name": "get_customer_history",
                "description": "Get ticket history for a customer",
                "parameters": {"customer_id": "integer"}
            }
        ]
    })


async def mcp_call_tool(request):
    """Call an MCP tool."""
    try:
        body = await request.json()
        tool_name = body.get("tool")
        params = body.get("params", {})
        
        result = await mcp_server.call_tool(tool_name, **params)
        return JSONResponse(result)
    except Exception as e:
        return JSONResponse({"success": False, "error": str(e)}, status_code=500)


# Create MCP server application
mcp_app = Starlette(routes=[
    Route("/tools", mcp_tools_list, methods=["GET"]),
    Route("/call", mcp_call_tool, methods=["POST"]),
])


if __name__ == "__main__":
    print("Initializing MCP Server...")
    init_db()
    print("\nStarting MCP Server on http://127.0.0.1:8000")
    print("Press Ctrl+C to stop\n")
    
    config = uvicorn.Config(
        mcp_app,
        host="127.0.0.1",
        port=8000,
        log_level="info"
    )
    server = uvicorn.Server(config)
    asyncio.run(server.serve())



In [7]:
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def create_adk_event(text: str, partial: bool = False, invocation_id: str = None):

    class _RuntimePart:
        def __init__(self, text: str):
            self.root = SimpleNamespace(text=text)
            self.text = text

    class _RuntimeArtifact:
        def __init__(self, text: str):
            self.parts = [_RuntimePart(text)]

    class _RuntimeEvent:
        def __init__(self, content_text: str, partial: bool, invocation_id: str):
            self.invocation_id = invocation_id or str(uuid.uuid4())
            self.event_id = str(uuid.uuid4())
            self.timestamp = int(time.time() * 1000)
            self.timestamp_ms = self.timestamp
            self.content = Content(parts=[Part(text=content_text)])
            self.artifacts = [_RuntimeArtifact(content_text)]
            self.partial = partial
            self.turn_complete = not partial
            self.author = "agent"
            self.actions = SimpleNamespace(state_delta=None)
            self.error_code = None
            self.error_message = None
            self.grounding_metadata = None
            self.execution_metadata = None
            self.custom_metadata = None
            self.delta = None
            self.type = "agent_event"
            self.usage_metadata = None
            self.branch = None
            self.depth = 0
            self.parent_id = None
            self.session_id = None
            self.turn_id = None
            self.metadata = {
                "agent_name": "adk_agent",
                "timestamp_ms": self.timestamp_ms,
                "invocation_id": self.invocation_id,
            }

    return _RuntimeEvent(text, partial, invocation_id)


In [8]:
# ============================================================================
# AGENT IMPLEMENTATIONS
# ============================================================================

class CustomerDataAgent(Agent):
    """Customer Data Agent - Accesses customer database via MCP."""

    def __init__(self):
        super().__init__(
            name="customer_data_agent",
            model="gemini-2.0-flash-exp",
            instruction="""
You are a customer data agent with access to a customer database via MCP.

Supported operations (via MCP):
- get_customer(id): Retrieve a customer
- list_customers(status): List customers (optionally filter by active)
- get_customer_history(id): Retrieve ticket history
- update_customer(id, email/phone/status): Update customer fields
- create_ticket(customer_id, issue, priority): Create a support ticket

Your job: Understand the user's intent and return clean JSON.
Always use MCP tools to access the database.
            """
        )

    async def run_async(self, ctx):
        input_data = ""
        if hasattr(ctx, 'user_content') and ctx.user_content:
            content_list = ctx.user_content if isinstance(ctx.user_content, list) else [ctx.user_content]
            for content in content_list:
                if hasattr(content, 'parts'):
                    for part in content.parts:
                        if hasattr(part, 'text') and part.text:
                            input_data += part.text

        original_text = input_data
        text = input_data.lower().strip()
        result = {"agent": "CustomerDataAgent", "status": "success"}

        try:
            customer_id = self._extract_id(original_text)

            # Improved pattern matching
            if customer_id and any(k in text for k in ["get customer", "customer information", "customer info", "customer id", "customer"]):
                customer = await self._get_customer(customer_id)
                if customer:
                    result["action"] = f"Retrieved customer {customer_id} via MCP"
                    result["data"] = customer
                else:
                    result["status"] = "error"
                    result["error"] = f"Customer {customer_id} not found"

            elif "list" in text and "customer" in text:
                status = "active" if "active" in text else None
                customers = await self._list_customers(status)
                result["action"] = "Listed customers via MCP"
                result["data"] = customers

            elif customer_id and any(k in text for k in ["ticket history", "history", "show tickets", "tickets"]):
                history = await self._get_history(customer_id)
                result["action"] = f"Customer {customer_id} ticket history via MCP"
                result["data"] = history

            elif "update" in text and customer_id:
                email = self._extract_email(original_text)
                phone = self._extract_phone(original_text)
                updates = {}
                if email:
                    updates["email"] = email
                if phone:
                    updates["phone"] = phone
                if not updates:
                    result["status"] = "error"
                    result["error"] = "No valid update fields found"
                else:
                    updated = await self._update_customer(customer_id, **updates)
                    if updated:
                        result["action"] = f"Updated customer {customer_id} via MCP"
                        result["data"] = updated
                    else:
                        result["status"] = "error"
                        result["error"] = f"Failed to update customer {customer_id}"

            elif customer_id and any(k in text for k in ["refund", "charged", "problem", "issue", "complaint", "create ticket"]):
                issue = original_text
                priority = "high" if any(p in text for p in ["urgent", "critical", "immediately"]) else "medium"
                ticket = await self._create_ticket(customer_id, issue, priority)
                if ticket:
                    result["action"] = "Created ticket via MCP"
                    result["data"] = ticket
                else:
                    result["status"] = "error"
                    result["error"] = "Failed to create ticket"

            elif customer_id and ("upgrade" in text or "help" in text):
                # For upgrade/help requests, get customer info first
                customer = await self._get_customer(customer_id)
                if customer:
                    result["action"] = f"Retrieved customer {customer_id} for upgrade/help request via MCP"
                    result["data"] = customer
                    result["note"] = "Customer data retrieved. Support agent should handle the upgrade request."
                else:
                    result["status"] = "error"
                    result["error"] = f"Customer {customer_id} not found"

            else:
                result["status"] = "error"
                result["error"] = "Could not understand request"
                result["request"] = original_text

        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)

        yield create_adk_event(json.dumps(result, indent=2))

    def _extract_id(self, text: str) -> Optional[int]:
        match = re.search(r"(?:id|customer)\s*[:\-]?\s*(\d+)", text.lower())
        return int(match.group(1)) if match else None

    def _extract_email(self, text: str) -> Optional[str]:
        match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
        return match.group(0) if match else None

    def _extract_phone(self, text: str) -> Optional[str]:
        match = re.search(r"\d{3}[-.]?\d{3}[-.]?\d{4}", text)
        return match.group(0) if match else None

    async def _get_customer(self, customer_id: int):
        """Get customer via MCP."""
        result = await mcp_client.get_customer(customer_id)
        if result.get("success"):
            return result.get("data")
        return None

    async def _list_customers(self, status: str = None):
        """List customers via MCP."""
        result = await mcp_client.list_customers(status=status)
        if result.get("success"):
            return result.get("data", [])
        return []

    async def _get_history(self, customer_id: int):
        """Get customer history via MCP."""
        result = await mcp_client.get_customer_history(customer_id)
        if result.get("success"):
            return result.get("data", [])
        return []

    async def _update_customer(self, customer_id: int, **fields):
        """Update customer via MCP."""
        result = await mcp_client.update_customer(customer_id, fields)
        if result.get("success"):
            return result.get("data")
        return None

    async def _create_ticket(self, customer_id: int, issue: str, priority: str):
        """Create ticket via MCP."""
        result = await mcp_client.create_ticket(customer_id, issue, priority)
        if result.get("success"):
            return result.get("data")
        return None


In [9]:
class SupportAgent(Agent):
    """Support Agent - Handles support queries and uses MCP for ticket operations."""

    def __init__(self):
        super().__init__(
            name="support_agent",
            model="gemini-2.0-flash-exp",
            instruction="""
You are a support agent with access to support tickets via MCP.

Supported operations:
- Detect refund issues (duplicate charges, billing issues, double payment)
- Detect upgrade requests (plan upgrade, account upgrade)
- Query tickets by customer ID or priority (via MCP)
- Create new tickets for general issues (via MCP)

Understand the user's request and output structured JSON.
Always use MCP tools for database operations.
            """
        )

    async def run_async(self, ctx):
        input_data = ""
        if hasattr(ctx, 'user_content') and ctx.user_content:
            content_list = ctx.user_content if isinstance(ctx.user_content, list) else [ctx.user_content]
            for content in content_list:
                if hasattr(content, 'parts'):
                    for part in content.parts:
                        if hasattr(part, 'text') and part.text:
                            input_data += part.text

        text = input_data.lower().strip()
        result = {"agent": "SupportAgent", "status": "success"}

        try:
            customer_id = self._extract_id(input_data)
            priority = self._extract_priority(text)

            refund_keywords = [
                "refund", "charged twice", "duplicate charge", "double charged",
                "billing error", "wrong charge", "money deducted", "unauthorized"
            ]

            if any(word in text for word in refund_keywords):
                result["action"] = "URGENT_REFUND"
                result["message"] = "Refund request identified"
                result["priority"] = "HIGH"
                if customer_id:
                    result["customer_id"] = customer_id
                else:
                    result["warning"] = "Customer ID missing"

            elif any(word in text for word in ["upgrade", "plan change", "switch plan", "upgrade account"]):
                result["action"] = "ACCOUNT_UPGRADE"
                result["message"] = "Account upgrade request identified"
                result["priority"] = priority or "MEDIUM"
                if customer_id:
                    result["customer_id"] = customer_id
                else:
                    result["warning"] = "Customer ID missing"

            elif any(word in text for word in ["query tickets", "show tickets", "ticket history", "open tickets"]):
                if customer_id:
                    history_result = await mcp_client.get_customer_history(customer_id)
                    if history_result.get("success"):
                        tickets = history_result.get("data", [])
                        if priority:
                            tickets = [t for t in tickets if t.get("priority", "").lower() == priority.lower()]
                        result["action"] = "QUERY_TICKETS"
                        result["data"] = tickets
                    else:
                        result["status"] = "error"
                        result["error"] = history_result.get("error", "Failed to query tickets")
                else:
                    result["status"] = "error"
                    result["error"] = "Customer ID required for ticket query"

            else:
                if not customer_id:
                    result["status"] = "error"
                    result["error"] = "Customer ID missing for ticket creation"
                else:
                    ticket_result = await mcp_client.create_ticket(customer_id, input_data, priority or "medium")
                    if ticket_result.get("success"):
                        result["action"] = "CREATE_TICKET"
                        result["priority"] = priority or "MEDIUM"
                        result["data"] = ticket_result.get("data")
                    else:
                        result["status"] = "error"
                        result["error"] = ticket_result.get("error", "Failed to create ticket")

        except Exception as e:
            result["status"] = "error"
            result["error"] = str(e)

        yield create_adk_event(json.dumps(result, indent=2))

    def _extract_id(self, text: str):
        match = re.search(r"(?:id|customer)\s*[:\-]?\s*(\d+)", text.lower())
        return int(match.group(1)) if match else None

    def _extract_priority(self, text: str):
        if "urgent" in text or "critical" in text or "immediately" in text:
            return "HIGH"
        if "low" in text:
            return "LOW"
        if "medium" in text:
            return "MEDIUM"
        return None


class RouterAgent(Agent):
    """Router Agent - Orchestrates requests between CustomerDataAgent and SupportAgent using A2A."""

    model_config = {"extra": "allow"}

    def __init__(self, customer_agent_url: str, support_agent_url: str):
        super().__init__(
            name="router_agent",
            model="gemini-2.0-flash-exp",
            instruction="""You are a router agent that orchestrates customer service requests.

You receive natural-language requests and decide whether to call:
- CustomerDataAgent (via A2A) -> customer info, history, updates
- SupportAgent (via A2A) -> refunds, upgrades, ticket actions

You can call one or both agents as necessary, combine their JSON outputs and return a final JSON response.

Use A2A protocol to communicate with other agents.
            """
        )
        self.customer_agent_url = customer_agent_url
        self.support_agent_url = support_agent_url
        self._a2a_client = None

    def _create_event(self, text: str, partial: bool = False, invocation_id: str = None):
        class _RuntimePart:
            def __init__(self, text: str):
                self.root = SimpleNamespace(text=text)
                self.text = text

        class _RuntimeArtifact:
            def __init__(self, text: str):
                self.parts = [_RuntimePart(text)]

        class _RuntimeEvent:
            def __init__(self, content_text: str, partial: bool, invocation_id: str):
                self.invocation_id = invocation_id or str(uuid.uuid4())
                self.event_id = str(uuid.uuid4())
                self.timestamp = int(time.time() * 1000)
                self.timestamp_ms = self.timestamp
                self.content = Content(parts=[Part(text=content_text)])
                self.artifacts = [_RuntimeArtifact(content_text)]
                self.partial = partial
                self.turn_complete = not partial
                self.author = "agent"
                self.actions = SimpleNamespace(state_delta=None)
                self.error_code = None
                self.error_message = None
                self.grounding_metadata = None
                self.execution_metadata = None
                self.custom_metadata = None
                self.delta = None
                self.type = "agent_event"
                self.usage_metadata = None
                self.branch = None
                self.depth = 0
                self.parent_id = None
                self.session_id = None
                self.turn_id = None
                self.metadata = {
                    "agent_name": "router_agent",
                    "timestamp_ms": self.timestamp_ms,
                    "invocation_id": self.invocation_id,
                }

        return _RuntimeEvent(text, partial, invocation_id)

    async def _get_a2a_client(self):
        """Get or create A2A client."""
        if self._a2a_client is None:
            timeout_config = httpx.Timeout(timeout=120.0, connect=10.0, read=120.0, write=10.0, pool=5.0)
            self._a2a_client = httpx.AsyncClient(timeout=timeout_config)
        return self._a2a_client

    async def _call_agent_via_a2a(self, agent_url: str, message: str) -> dict:
        """Call an agent via A2A protocol."""
        client = await self._get_a2a_client()
        
        try:
            # Get agent card
            card_response = await client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}")
            agent_card_data = card_response.json()
            agent_card = AgentCard(**agent_card_data)

            # Create A2A client
            config = ClientConfig(
                httpx_client=client,
                supported_transports=[TransportProtocol.jsonrpc, TransportProtocol.http_json],
                use_client_preference=True
            )

            factory = ClientFactory(config)
            a2a_client = factory.create(agent_card)

            # Send message
            message_obj = create_text_message_object(content=message)
            responses = []
            async for response in a2a_client.send_message(message_obj):
                responses.append(response)

            # Extract result
            if responses and isinstance(responses[0], tuple) and len(responses[0]) > 0:
                task = responses[0][0]
                try:
                    result_text = task.artifacts[0].parts[0].root.text
                    return json.loads(result_text)
                except (AttributeError, IndexError, json.JSONDecodeError):
                    return {"status": "error", "error": "Failed to parse response"}

            return {"status": "error", "error": "No response received"}
        except Exception as e:
            return {"status": "error", "error": str(e)}

    async def run_async(self, ctx):
        """Route + orchestrate agent calls via A2A (async generator)."""
        input_data = ""
        if hasattr(ctx, 'user_content') and ctx.user_content:
            content_list = ctx.user_content if isinstance(ctx.user_content, list) else [ctx.user_content]
            for content in content_list:
                if hasattr(content, 'parts'):
                    for part in content.parts:
                        if hasattr(part, 'text') and part.text:
                            input_data += part.text

        text = (input_data or "").lower()

        result = {
            "router": "RouterAgent",
            "request": input_data,
            "actions": [],
            "results": {},
            "error": None
        }

        invocation_id = str(uuid.uuid4())
        start_event = self._create_event(
            text=json.dumps({"status": "router_started", "request": input_data}),
            partial=True,
            invocation_id=invocation_id
        )
        yield start_event

        try:
            if not input_data or not text:
                result["error"] = "No input text received"
            else:
                # Determine routing - improved pattern matching
                needs_customer = any(w in text for w in ["customer", "get", "list", "history", "update", "show", "active customers", "open tickets"])
                needs_support = any(w in text for w in ["support", "refund", "upgrade", "ticket", "charged", "help", "tickets"])

                print(f"\n[Router] Analyzing request: {input_data}")
                print(f"[Router] Needs customer: {needs_customer}, Needs support: {needs_support}")

                # Route to appropriate agents via A2A
                if "upgrade" in text and "customer" in text:
                    # Scenario: Multi-step coordination
                    print("[Router] → CustomerDataAgent (A2A): Getting customer details")
                    customer_result = await self._call_agent_via_a2a(self.customer_agent_url, input_data)
                    result["results"]["customer"] = customer_result
                    result["actions"].append("Retrieved customer details via A2A")

                    print("[Router] → SupportAgent (A2A): Processing upgrade request")
                    support_result = await self._call_agent_via_a2a(self.support_agent_url, input_data)
                    result["results"]["support"] = support_result
                    result["actions"].append("Processed upgrade request via A2A")

                elif needs_customer and needs_support:
                    # Scenario: Negotiation/Escalation
                    print("[Router] → CustomerDataAgent (A2A): Getting customer context")
                    customer_result = await self._call_agent_via_a2a(self.customer_agent_url, input_data)
                    result["results"]["customer"] = customer_result
                    result["actions"].append("Retrieved customer context via A2A")

                    print("[Router] → SupportAgent (A2A): Handling support request")
                    support_result = await self._call_agent_via_a2a(self.support_agent_url, input_data)
                    result["results"]["support"] = support_result
                    result["actions"].append("Handled support request via A2A")
                
                elif "upgrade" in text or ("help" in text and customer_id):
                    # Handle upgrade requests - get customer data first, then support
                    print("[Router] → CustomerDataAgent (A2A): Getting customer details for upgrade")
                    customer_result = await self._call_agent_via_a2a(self.customer_agent_url, input_data)
                    result["results"]["customer"] = customer_result
                    result["actions"].append("Retrieved customer details via A2A")

                    print("[Router] → SupportAgent (A2A): Processing upgrade request")
                    support_result = await self._call_agent_via_a2a(self.support_agent_url, input_data)
                    result["results"]["support"] = support_result
                    result["actions"].append("Processed upgrade request via A2A")

                elif needs_customer:
                    # Scenario: Task Allocation
                    print("[Router] → CustomerDataAgent (A2A): Handling customer data request")
                    customer_result = await self._call_agent_via_a2a(self.customer_agent_url, input_data)
                    result["results"]["customer"] = customer_result
                    result["actions"].append("Called customer agent via A2A")

                elif needs_support:
                    print("[Router] → SupportAgent (A2A): Handling support request")
                    support_result = await self._call_agent_via_a2a(self.support_agent_url, input_data)
                    result["results"]["support"] = support_result
                    result["actions"].append("Called support agent via A2A")

                else:
                    result["error"] = "Router could not determine which agent to call"

        except Exception as e:
            result["error"] = str(e)
            import traceback
            result["traceback"] = traceback.format_exc()

        payload = json.dumps(result, indent=2)
        final_event = self._create_event(
            text=payload,
            partial=False,
            invocation_id=invocation_id
        )
        yield final_event


In [10]:
# ============================================================================
# SESSION SERVICE AND RUNNER PATCHES
# ============================================================================

from google.adk.sessions.in_memory_session_service import InMemorySessionService

class SafeSessionService(InMemorySessionService):
    async def append_event(self, session, event):
        if not hasattr(event, "timestamp"):
            return
        return await super().append_event(session, event)

def patch_event(event):
    if not hasattr(event, "timestamp"):
        event.timestamp = None
    if not hasattr(event, "error_code"):
        event.error_code = None
    if not hasattr(event, "error_message"):
        event.error_message = None
    return event

class SafeRunner(Runner):
    async def run_async(self, *args, **kwargs):
        async for event in super().run_async(*args, **kwargs):
            yield patch_event(event)


In [11]:
# ============================================================================
# A2A SERVER SETUP
# ============================================================================

def create_agent_server(agent: Agent, agent_name: str, port: int):
    """Create an A2A server for an agent."""
    agent_card = AgentCard(
        name=agent_name,
        url=f"http://127.0.0.1:{port}",
        description=f"{agent_name} A2A Server",
        version="1.0",
        capabilities=AgentCapabilities(streaming=False),
        default_input_modes=["text/plain"],
        default_output_modes=["application/json"],
        preferred_transport=TransportProtocol.jsonrpc,
        skills=[
            AgentSkill(
                id="process",
                name="Process Request",
                description=f"Process requests via {agent_name}",
                tags=["agent"],
                examples=["Process user request"]
            )
        ]
    )

    runner = SafeRunner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=SafeSessionService(),
        memory_service=InMemoryMemoryService(),
    )

    executor = A2aAgentExecutor(
        runner=runner,
        config=A2aAgentExecutorConfig()
    )

    handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore()
    )

    return A2AStarletteApplication(agent_card=agent_card, http_handler=handler), agent_card


async def run_server(agent: Agent, agent_name: str, port: int):
    """Run a single agent server."""
    app, card = create_agent_server(agent, agent_name, port)
    config = uvicorn.Config(
        app.build(),
        host="127.0.0.1",
        port=port,
        log_level="error",
        loop="none"
    )
    server = uvicorn.Server(config)
    print(f"✓ {agent_name} A2A server starting on port {port}")
    await server.serve()


async def start_all_servers():
    """Start all A2A servers."""
    tasks = [
        asyncio.create_task(run_server(customer_data_agent, "CustomerDataAgent", 9300)),
        asyncio.create_task(run_server(support_agent, "SupportAgent", 9301)),
        asyncio.create_task(run_server(router_agent, "RouterAgent", 9400))
    ]
    await asyncio.sleep(3)
    print("\n" + "="*60)
    print("All A2A servers are running!")
    print("   - CustomerDataAgent: http://127.0.0.1:9300")
    print("   - SupportAgent:      http://127.0.0.1:9301")
    print("   - RouterAgent:       http://127.0.0.1:9400")
    print("="*60 + "\n")

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        pass

def run_servers_background():
    """Run servers in background thread."""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(start_all_servers())
    except KeyboardInterrupt:
        pass
    finally:
        pending = asyncio.all_tasks(loop)
        for task in pending:
            task.cancel()
        loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
        loop.close()


In [12]:
# ============================================================================
# INSTANTIATE AGENTS
# ============================================================================

print("\n" + "="*60)
print("Creating Agents...")
print("="*60)

customer_data_agent = CustomerDataAgent()
support_agent = SupportAgent()
router_agent = RouterAgent(
    customer_agent_url="http://127.0.0.1:9300",
    support_agent_url="http://127.0.0.1:9301"
)

print("✓ CustomerDataAgent created")
print("✓ SupportAgent created")
print("✓ RouterAgent created")


Creating Agents...
✓ CustomerDataAgent created
✓ SupportAgent created
✓ RouterAgent created


In [13]:
# ============================================================================
# START SERVERS
# ============================================================================

print("\n" + "="*60)
print("Starting A2A Servers...")
print("="*60)

server_thread = threading.Thread(target=run_servers_background, daemon=True)
server_thread.start()
time.sleep(5)  # Give servers time to start


  config=A2aAgentExecutorConfig()
  executor = A2aAgentExecutor(



Starting A2A Servers...
✓ CustomerDataAgent A2A server starting on port 9300
✓ SupportAgent A2A server starting on port 9301
✓ RouterAgent A2A server starting on port 9400

All A2A servers are running!
   - CustomerDataAgent: http://127.0.0.1:9300
   - SupportAgent:      http://127.0.0.1:9301
   - RouterAgent:       http://127.0.0.1:9400



In [14]:
# ============================================================================
# TEST CLIENT
# ============================================================================

class A2ASimpleClient:
    """A2A client for testing."""

    def __init__(self, default_timeout: float = 120.0):
        self._agent_info_cache = {}
        self.default_timeout = default_timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0
        )

        async with httpx.AsyncClient(timeout=timeout_config) as client:
            if agent_url not in self._agent_info_cache:
                card_response = await client.get(f"{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}")
                self._agent_info_cache[agent_url] = card_response.json()

            agent_card_data = self._agent_info_cache[agent_url]
            agent_card = AgentCard(**agent_card_data)

            config = ClientConfig(
                httpx_client=client,
                supported_transports=[TransportProtocol.jsonrpc, TransportProtocol.http_json],
                use_client_preference=True
            )

            factory = ClientFactory(config)
            a2a_client = factory.create(agent_card)

            message_obj = create_text_message_object(content=message)

            responses = []
            async for response in a2a_client.send_message(message_obj):
                responses.append(response)

            if responses and isinstance(responses[0], tuple) and len(responses[0]) > 0:
                task = responses[0][0]
                try:
                    return task.artifacts[0].parts[0].root.text
                except (AttributeError, IndexError):
                    return str(task)

            return "No response received"


In [17]:
# ============================================================================
# TEST SCENARIOS
# ============================================================================

async def run_all_tests():
    """Run all test scenarios."""
    test_client = A2ASimpleClient()

    print("\n" + "="*80)
    print("TEST SUITE - Multi-Agent Customer Service System")
    print("="*80)

    tests = [
        {
            "name": "TEST 1: Simple Query - Get Customer Information",
            "url": "http://127.0.0.1:9400",
            "message": "Get customer information for ID 1",
            "description": "Single agent, straightforward MCP call"
        },
        {
            "name": "TEST 2: Coordinated Query - Account Upgrade",
            "url": "http://127.0.0.1:9400",
            "message": "I'm customer ID 2 and need help upgrading my account",
            "description": "Multiple agents coordinate: data fetch + support response"
        },
        {
            "name": "TEST 3: Complex Query - Active Customers with Open Tickets",
            "url": "http://127.0.0.1:9400",
            "message": "Show me all active customers who have open tickets",
            "description": "Requires negotiation between data and support agents"
        },
        {
            "name": "TEST 4: Escalation - Refund Request",
            "url": "http://127.0.0.1:9400",
            "message": "I've been charged twice, please refund immediately! Customer ID 1",
            "description": "Router must identify urgency and route appropriately"
        },
        {
            "name": "TEST 5: Multi-Intent - Update and History",
            "url": "http://127.0.0.1:9400",
            "message": "Update customer ID 5 email to newemail@example.com and show my ticket history",
            "description": "Parallel task execution and coordination"
        },
        {
            "name": "TEST 6: List Active Customers",
            "url": "http://127.0.0.1:9400",
            "message": "List all active customers",
            "description": "Simple customer data query"
        },
        {
            "name": "TEST 7: Ticket History",
            "url": "http://127.0.0.1:9400",
            "message": "Show ticket history for customer ID 1",
            "description": "Get customer ticket history"
        }
    ]

    results = []
    for i, test in enumerate(tests, 1):
        print(f"\n{test['name']}")
        print(f"Description: {test['description']}")
        print("-" * 80)
        try:
            result = await test_client.create_task(test["url"], test["message"])
            print(result)
            results.append({"test": test["name"], "status": "PASSED"})
        except Exception as e:
            print(f"ERROR: {e}")
            results.append({"test": test["name"], "status": "FAILED", "error": str(e)})

    print("\n" + "="*80)
    print("TEST SUMMARY")
    print("="*80)
    for result in results:
        status_icon = "✓" if result["status"] == "PASSED" else "✗"
        print(f"{status_icon} {result['test']}: {result['status']}")
        if "error" in result:
            print(f"   Error: {result['error']}")

    print("="*80 + "\n")

In [18]:
print("\n" + "="*80)
print("Assignment 5 - Multi-Agent Customer Service System")
print("A2A Protocol + MCP Integration")
print("="*80)

# Wait a bit more for servers to be fully ready
print("\nWaiting for servers to be ready...")
time.sleep(3)

# Run tests
asyncio.run(run_all_tests())

print("\n✓ All tests completed!")
print("\nNote: Servers are running in background. Press Ctrl+C to stop.")




Assignment 5 - Multi-Agent Customer Service System
A2A Protocol + MCP Integration

Waiting for servers to be ready...

TEST SUITE - Multi-Agent Customer Service System

TEST 1: Simple Query - Get Customer Information
Description: Single agent, straightforward MCP call
--------------------------------------------------------------------------------


2025-11-29 16:28:18,634 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9400/.well-known/agent-card.json "HTTP/1.1 200 OK"
  run_args = convert_a2a_request_to_adk_run_args(context)
  convert_a2a_part_to_genai_part(part)
  task_result_aggregator = TaskResultAggregator()
  for a2a_event in convert_event_to_a2a_events(
  message = convert_event_to_a2a_message(event, invocation_context)
  a2a_part = convert_genai_part_to_a2a_part(part)



[Router] Analyzing request: Get customer information for ID 1
[Router] Needs customer: True, Needs support: False
[Router] → CustomerDataAgent (A2A): Handling customer data request


2025-11-29 16:28:19,013 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: e79a7321-140a-4016-9dcf-dfd807732c5e, context_id: 879bad89-8741-4c6e-88e9-6c7bff423ad7).
2025-11-29 16:28:19,017 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:19,343 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: fc4cc4f6-2ec1-4880-849a-811daaa5d608, context_id: 8e898199-d34f-4b1f-a053-d3bffafc03e1).
2025-11-29 16:28:19,362 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:19,373 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:19,383 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9400 "HTTP/1.1 200 OK"


{
  "router": "RouterAgent",
  "request": "Get customer information for ID 1",
  "actions": [
    "Called customer agent via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Customer 1 not found"
    }
  },
  "error": null
}

TEST 2: Coordinated Query - Account Upgrade
Description: Multiple agents coordinate: data fetch + support response
--------------------------------------------------------------------------------


2025-11-29 16:28:19,731 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 4c86f340-8f9b-4e7c-8a73-ccfb0de24f7b, context_id: e769aac0-e5cc-4635-a8df-8e83e6c0dea0).
2025-11-29 16:28:19,734 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:19,746 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: f7b6084e-4c0b-4624-832e-69e220e13367, context_id: 8e01ce2f-b180-4a88-8541-7d3b9cfffdb0).
2025-11-29 16:28:19,749 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:19,761 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:19,778 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9301/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:19,805 - INFO - a2a.server.tasks.task_manager - Task not found or task


[Router] Analyzing request: I'm customer ID 2 and need help upgrading my account
[Router] Needs customer: True, Needs support: True
[Router] → CustomerDataAgent (A2A): Getting customer context
[Router] → SupportAgent (A2A): Handling support request
{
  "router": "RouterAgent",
  "request": "I'm customer ID 2 and need help upgrading my account",
  "actions": [
    "Retrieved customer context via A2A",
    "Handled support request via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Customer 2 not found"
    },
    "support": {
      "agent": "SupportAgent",
      "status": "success",
      "action": "CREATE_TICKET",
      "priority": "MEDIUM",
      "data": {
        "id": 13,
        "customer_id": 2,
        "issue": "I'm customer ID 2 and need help upgrading my account",
        "status": "open",
        "priority": "medium",
        "created_at": "2025-11-29T22:28:19.815913+00:00"
      }
    }
  },
  "error": n

2025-11-29 16:28:20,262 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 64ea44a2-04b8-4b76-957b-d53b7e1c5bce, context_id: ab9246cc-d6b7-450f-9330-853ff0a3df6e).
2025-11-29 16:28:20,265 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:20,276 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: f74b59ca-5e02-4b55-833a-d8f817c218ac, context_id: c9bb1a33-ccde-464e-ac33-42b3bd1d3d91).
2025-11-29 16:28:20,279 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:20,285 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9301/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:20,301 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 1081b65d-bd88-4d73-95b8-e6d20b50db66, context_id: 82be


[Router] Analyzing request: Show me all active customers who have open tickets
[Router] Needs customer: True, Needs support: True
[Router] → CustomerDataAgent (A2A): Getting customer context
[Router] → SupportAgent (A2A): Handling support request
{
  "router": "RouterAgent",
  "request": "Show me all active customers who have open tickets",
  "actions": [
    "Retrieved customer context via A2A",
    "Handled support request via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Could not understand request",
      "request": "Show me all active customers who have open tickets"
    },
    "support": {
      "agent": "SupportAgent",
      "status": "error",
      "error": "Customer ID required for ticket query"
    }
  },
  "error": null
}

TEST 4: Escalation - Refund Request
Description: Router must identify urgency and route appropriately
------------------------------------------------------------------------------

2025-11-29 16:28:20,787 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 8b5c372a-7686-42db-b355-4d6d28de77c7, context_id: 85f67c82-7ae8-4727-bfb2-597677c97346).
2025-11-29 16:28:20,796 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:20,811 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: e4f8d2cb-81e4-406d-88c6-85c0d13f1d1c, context_id: ebf4688f-4666-4071-baaa-36ecec1c4998).
2025-11-29 16:28:20,818 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:20,831 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:20,847 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9301/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:20,864 - INFO - a2a.server.tasks.task_manager - Task not found or task


[Router] Analyzing request: I've been charged twice, please refund immediately! Customer ID 1
[Router] Needs customer: True, Needs support: True
[Router] → CustomerDataAgent (A2A): Getting customer context
[Router] → SupportAgent (A2A): Handling support request
{
  "router": "RouterAgent",
  "request": "I've been charged twice, please refund immediately! Customer ID 1",
  "actions": [
    "Retrieved customer context via A2A",
    "Handled support request via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Customer 1 not found"
    },
    "support": {
      "agent": "SupportAgent",
      "status": "success",
      "action": "URGENT_REFUND",
      "message": "Refund request identified",
      "priority": "HIGH",
      "customer_id": 1
    }
  },
  "error": null
}

TEST 5: Multi-Intent - Update and History
Description: Parallel task execution and coordination
----------------------------------------------------------

2025-11-29 16:28:21,430 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: aff6ded0-1b94-4b87-b595-ec728db034cb, context_id: 0dcfe579-959a-4b9d-bf63-8b1d8042fe04).
2025-11-29 16:28:21,435 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:21,461 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 80e79523-5d6f-4255-bcc5-daecc2dbf55f, context_id: d33dc482-e28d-4a2d-8927-251f3e4ff83f).
2025-11-29 16:28:21,467 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:21,481 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:21,491 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9301/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:21,515 - INFO - a2a.server.tasks.task_manager - Task not found or task


[Router] Analyzing request: Update customer ID 5 email to newemail@example.com and show my ticket history
[Router] Needs customer: True, Needs support: True
[Router] → CustomerDataAgent (A2A): Getting customer context
[Router] → SupportAgent (A2A): Handling support request
{
  "router": "RouterAgent",
  "request": "Update customer ID 5 email to newemail@example.com and show my ticket history",
  "actions": [
    "Retrieved customer context via A2A",
    "Handled support request via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Customer 5 not found"
    },
    "support": {
      "agent": "SupportAgent",
      "status": "success",
      "action": "QUERY_TICKETS",
      "data": [
        {
          "id": 11,
          "customer_id": 5,
          "issue": "Password reset",
          "status": "open",
          "priority": "low",
          "created_at": "2025-11-29T21:44:55.619901+00:00"
        }
      ]
    }
  },

2025-11-29 16:28:22,199 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 04fb2e17-d7fa-440c-91c5-e862676c3bfe, context_id: bf8f4f62-31fa-4b26-8842-c17e82869b5f).
2025-11-29 16:28:22,205 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:22,227 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 663f4f1e-1e4b-4d0d-9ac8-ce84ff988e5c, context_id: dcf49547-7a78-4525-bd0c-c107fd277618).
2025-11-29 16:28:22,237 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:22,249 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:22,263 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9400 "HTTP/1.1 200 OK"



[Router] Analyzing request: List all active customers
[Router] Needs customer: True, Needs support: False
[Router] → CustomerDataAgent (A2A): Handling customer data request
{
  "router": "RouterAgent",
  "request": "List all active customers",
  "actions": [
    "Called customer agent via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "success",
      "action": "Listed customers via MCP",
      "data": [
        {
          "id": 6,
          "name": "Alice Premium",
          "email": "alice@example.com",
          "phone": "111-111-1111",
          "status": "active",
          "created_at": "2025-11-29T21:44:55.619901+00:00",
          "updated_at": "2025-11-29T21:44:55.619901+00:00"
        },
        {
          "id": 7,
          "name": "Bob Standard",
          "email": "bob@example.com",
          "phone": "222-222-2222",
          "status": "active",
          "created_at": "2025-11-29T21:44:55.619901+00:00",
          "updated

2025-11-29 16:28:22,893 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: ed184939-5baf-4cc1-a4ff-33383e3b8c67, context_id: 605db9c2-094a-429e-8da1-015c5a8eeea2).
2025-11-29 16:28:22,899 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9300/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:22,918 - INFO - a2a.server.tasks.task_manager - Task not found or task_id not set. Creating new task for event (task_id: 7bd1ceee-90ea-4d3d-ae6a-df8fa25684d1, context_id: 7cc7ee8a-cab5-49a5-9b54-d4cadfd44ac7).
2025-11-29 16:28:22,931 - INFO - httpx - HTTP Request: POST http://127.0.0.1:8000/call "HTTP/1.1 200 OK"
2025-11-29 16:28:22,945 - INFO - httpx - HTTP Request: POST http://127.0.0.1:9300 "HTTP/1.1 200 OK"
2025-11-29 16:28:22,957 - INFO - httpx - HTTP Request: GET http://127.0.0.1:9301/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-11-29 16:28:22,982 - INFO - a2a.server.tasks.task_manager - Task not found or task


[Router] Analyzing request: Show ticket history for customer ID 1
[Router] Needs customer: True, Needs support: True
[Router] → CustomerDataAgent (A2A): Getting customer context
[Router] → SupportAgent (A2A): Handling support request
{
  "router": "RouterAgent",
  "request": "Show ticket history for customer ID 1",
  "actions": [
    "Retrieved customer context via A2A",
    "Handled support request via A2A"
  ],
  "results": {
    "customer": {
      "agent": "CustomerDataAgent",
      "status": "error",
      "error": "Customer 1 not found"
    },
    "support": {
      "agent": "SupportAgent",
      "status": "success",
      "action": "QUERY_TICKETS",
      "data": [
        {
          "id": 12,
          "customer_id": 1,
          "issue": "I've been charged twice, please refund immediately! Customer ID 1",
          "status": "open",
          "priority": "high",
          "created_at": "2025-11-29T21:45:06.793677+00:00"
        },
        {
          "id": 7,
          "custo