# Assignment: Multi-Agent Customer Service System with A2A and MCP

## Overview

Build a multi-agent customer service system where specialized agents coordinate using Agent-to-Agent (A2A) communication and access customer data through the Model Context Protocol (MCP). This project will showcase your skills for interviews.

## Learning Objectives

- Implement agent coordination using A2A protocols
- Integrate external tools via MCP
- Design multi-agent task allocation and negotiation
- Build a practical customer service automation system

## Assignment Requirements

### Part 1: System Architecture

Design a multi-agent system with at least two specialized agents:

#### Router Agent (Orchestrator)

- Receives customer queries
- Analyzes query intent
- Routes to appropriate specialist agent
- Coordinates responses from multiple agents

#### Customer Data Agent (Specialist)

- Accesses customer database via MCP
- Retrieves customer information
- Updates customer records
- Handles data validation

#### Support Agent (Specialist)

- Handles general customer support queries
- Can escalate complex issues
- Requests customer context from Data Agent
- Provides solutions and recommendations

---

# Import Libraries

In [1]:
!pip install --upgrade -q google-genai google-adk==1.9.0 a2a-sdk==0.3.0 python-dotenv aiohttp uvicorn requests mermaid-python nest-asyncio

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m1.8/1.8 MB[0m [31m71.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.8/1.8 MB[0m [31m38.7 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/130.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m130.3/130.3 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[2K   

## Environment Configuration

In [2]:
# Targeted workaround for google-adk==1.9.0 compatibility with a2a-sdk==0.3.0
# This cell shall be removed when google-adk releases the version next to >1.9.0
# (after https://github.com/google/adk-python/pull/2297)


import sys

from a2a.client import client as real_client_module
from a2a.client.card_resolver import A2ACardResolver


class PatchedClientModule:
    def __init__(self, real_module) -> None:
        for attr in dir(real_module):
            if not attr.startswith('_'):
                setattr(self, attr, getattr(real_module, attr))
        self.A2ACardResolver = A2ACardResolver


patched_module = PatchedClientModule(real_client_module)
sys.modules['a2a.client.client'] = patched_module  # type: ignore

In [3]:
import asyncio
import logging
import os
import sys
import threading
import time

from typing import Any

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from dotenv import load_dotenv
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search

  from google.cloud.aiplatform.utils import gcs_utils


In [4]:
# Set Google Cloud Configuration
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'FLASE'
os.environ['GOOGLE_CLOUD_PROJECT'] = (
    'adsp-34002-ip07-visionary-ai'  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
)
os.environ['GOOGLE_CLOUD_LOCATION'] = (
    'us-central1'  # Replace with your location
)

load_dotenv()
from google.colab import userdata

os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY')



print('Environment variables configured:')
print(f'GOOGLE_GENAI_USE_VERTEXAI: {os.environ["GOOGLE_GENAI_USE_VERTEXAI"]}')
print(f'GOOGLE_CLOUD_PROJECT: {os.environ["GOOGLE_CLOUD_PROJECT"]}')
print(f'GOOGLE_CLOUD_LOCATION: {os.environ["GOOGLE_CLOUD_LOCATION"]}')

Environment variables configured:
GOOGLE_GENAI_USE_VERTEXAI: FLASE
GOOGLE_CLOUD_PROJECT: adsp-34002-ip07-visionary-ai
GOOGLE_CLOUD_LOCATION: us-central1


In [5]:
# Authenticate your notebook environment (Colab only)
if 'google.colab' in sys.modules:
    from google.colab import auth

    auth.authenticate_user(project_id=os.environ['GOOGLE_CLOUD_PROJECT'])

In [6]:
# Setup logging
logging.basicConfig(
    level=logging.ERROR,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
)

## Building Agents

### Customer Data Agent

- Accesses customer database via MCP
- Retrieves customer information
- Updates customer records
- Handles data validation


In [7]:
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset
from google.adk.tools.mcp_tool import StreamableHTTPConnectionParams

# The McpToolset class seems to be unavailable or moved in google-adk==1.9.0.
# I modified to MCPToolset not McpToolset

In [8]:
MCP_SERVER_URL = "https://tiffiny-bulbourethral-unparsimoniously.ngrok-free.dev/mcp"

In [9]:
# Customer Data Agent
customer_data_agent = Agent(
    model='gemini-2.5-pro',
    name='customer_data_agent',
    instruction="""
    You are a Customer Data Agent specialized in managing customer database operations via MCP.

    Your responsibilities:

    1. **Retrieve Customer Information**:
       - Use get_customer(customer_id) to fetch individual customer records by ID
       - Use list_customers(status, limit) to retrieve multiple customers
         * status can be 'active', 'disabled', or None for all customers
         * limit controls maximum number of results (default: 100)
       - Always return complete customer information when requested
       - Format responses clearly with customer details

    2. **Update Customer Records**:
       - Use update_customer(customer_id, data) to modify customer information
       - The data parameter should be a dictionary/object with fields to update:
         * name: Customer's full name
         * email: Email address (must contain '@')
         * phone: Phone number
         * status: 'active' or 'disabled'
       - Validate all updates before executing:
         * Email must contain '@' symbol
         * Status must be 'active' or 'disabled'
         * Name cannot be empty if provided
       - Always confirm successful updates with the updated customer record

    3. **Data Validation**:
       - Verify customer IDs exist before operations
       - Check data formats (email, phone, status values)
       - Return clear error messages if validation fails
       - Never update records with invalid data
       - Handle database errors gracefully

    4. **Communication**:
       - When other agents request customer data, provide it clearly and completely
       - Return structured responses that other agents can use
       - Include success/error status in all responses
       - Log all database operations for debugging

    5. **Response Format**:
       - Always return JSON-formatted responses when possible
       - Include success/error status in responses
       - Provide clear error messages if operations fail
       - When returning customer data, include all relevant fields

    Examples:
    - "Get customer 5" ‚Üí Use get_customer(5)
    - "List all active customers" ‚Üí Use list_customers(status='active')
    - "Update customer 5 email to new@email.com" ‚Üí Use update_customer(5, {'email': 'new@email.com'})
    - "Show me customer 10" ‚Üí Use get_customer(10) and format the response clearly

    Always validate input and handle errors gracefully. Never proceed with invalid data.
    """,
    tools=[
        MCPToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=MCP_SERVER_URL
            )
        )
    ],
)

print('Customer Data Agent created successfully!')

Customer Data Agent created successfully!


In [10]:
customer_data_agent_card = AgentCard(
    name='Customer Data Agent',
    url='http://localhost:10021',
    description='Manages customer database operations including retrieval, updates, and validation via MCP',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='customer_retrieval',
            name='Customer Retrieval',
            description='Retrieves customer information by ID or status',
            tags=['customer', 'data', 'retrieval'],
            examples=[
                'Get customer information for ID 5',
                'List all active customers',
                'Show me customer details',
            ],
        ),
        AgentSkill(
            id='customer_updates',
            name='Customer Updates',
            description='Updates customer records with data validation',
            tags=['customer', 'data', 'updates'],
            examples=[
                'Update customer email',
                'Change customer status',
                'Modify customer information',
            ],
        ),
    ],
)

In [11]:
remote_customer_data_agent = RemoteA2aAgent(
    name='customer_data_agent',
    description='Accesses customer database via MCP',
    agent_card=f'http://localhost:10021{AGENT_CARD_WELL_KNOWN_PATH}',
)

  remote_customer_data_agent = RemoteA2aAgent(


### Support Agent


- Handles general customer support queries
- Can escalate complex issues
- Requests customer context from Data Agent
- Provides solutions and recommendations

In [12]:
# Support Agent
support_agent = Agent(
    model='gemini-2.5-pro',
    name='support_agent',
    instruction="""
    You are a Support Agent specialized in handling customer support queries and managing support tickets.

    Your responsibilities:

    1. **Handle General Customer Support Queries**:
       - Answer questions about products, services, accounts, billing, and technical issues
       - Provide helpful, clear, and professional responses
       - Use customer context when available to personalize responses
       - Offer step-by-step solutions when appropriate

    2. **Manage Support Tickets**:
       - Use create_ticket(customer_id, issue, priority) to create new support tickets
         * Priority levels: 'low' for minor issues, 'medium' for standard issues, 'high' for urgent issues
         * Always create tickets for issues that need tracking or follow-up
       - Use get_customer_history(customer_id) to view a customer's ticket history
         * Review past tickets to understand recurring issues
         * Use history to provide better context-aware support

    3. **Request Customer Context from Data Agent**:
       - When you need customer information (name, email, status, etc.), communicate with the Customer Data Agent
       - The Customer Data Agent can provide:
         * Customer details via get_customer(customer_id)
         * Customer lists via list_customers(status)
         * Updated customer information
       - Always request customer context when:
         * A customer ID is mentioned but details are needed
         * You need to verify customer status before providing support
         * You need to personalize your response
       - Format requests clearly: "Get customer info for ID {customer_id}"

    4. **Escalate Complex Issues**:
       - Identify issues that require escalation:
         * Billing disputes or refund requests
         * Security concerns
         * Account access problems
         * Complex technical issues beyond your scope
       - When escalating, clearly communicate:
         * The nature of the issue
         * What information you've gathered
         * Why it requires escalation
         * Recommended priority level
       - Create a high-priority ticket for escalated issues

    5. **Provide Solutions and Recommendations**:
       - Offer actionable solutions based on the issue type
       - Provide multiple options when appropriate
       - Recommend best practices or preventive measures
       - Suggest relevant resources or documentation
       - Follow up on whether solutions worked

    6. **Coordination with Other Agents**:
       - Work with the Router Agent to receive routed queries
       - Coordinate with Customer Data Agent to get customer information
       - Communicate clearly about what information you need
       - Share relevant context back to the Router Agent

    Examples:
    - "I'm customer 5 and can't log in" ‚Üí
      1. Request customer info from Data Agent
      2. Check ticket history
      3. Provide login troubleshooting steps
      4. Create ticket if issue persists

    - "I've been charged twice" ‚Üí
      1. Request customer info
      2. Check billing history
      3. Escalate as high-priority billing issue
      4. Create high-priority ticket
      5. Provide reassurance and next steps

    - "Show my ticket history" ‚Üí
      1. Use get_customer_history(customer_id)
      2. Format and present ticket history clearly

    - "I need help upgrading my account" ‚Üí
      1. Request customer info to check current status
      2. Provide upgrade instructions
      3. Create medium-priority ticket for tracking

    Always be professional, empathetic, and solution-oriented. Prioritize customer satisfaction while following proper procedures.
    """,
    tools=[
        MCPToolset(
            connection_params=StreamableHTTPConnectionParams(
                url=MCP_SERVER_URL
            )
        )
    ]
)

print('Support Agent created successfully!')

Support Agent created successfully!


In [13]:
support_agent_card = AgentCard(
    name='Support Agent',
    url='http://localhost:10020',
    description='Handles customer support queries, manages tickets, and coordinates with Customer Data Agent',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='customer_support',
            name='Customer Support',
            description='Handles general customer support queries and provides solutions',
            tags=['support', 'service', 'help'],
            examples=[
                'I need help with my account',
                'How do I upgrade my subscription?',
                'I cannot log in',
            ],
        ),
        AgentSkill(
            id='ticket_management',
            name='Ticket Management',
            description='Creates and manages support tickets, retrieves ticket history',
            tags=['tickets', 'support', 'tracking'],
            examples=[
                'Create a ticket for my issue',
                'Show my ticket history',
                'I need to report a problem',
            ],
        ),
        AgentSkill(
            id='issue_escalation',
            name='Issue Escalation',
            description='Identifies and escalates complex issues appropriately',
            tags=['escalation', 'urgent', 'billing'],
            examples=[
                'I have been charged twice',
                'This is an urgent security issue',
                'I need immediate assistance',
            ],
        ),
    ],
)

In [14]:
remote_support_agent = RemoteA2aAgent(
    name='support_agent',
    description='Handles general customer support queries',
    agent_card=f'http://localhost:10020{AGENT_CARD_WELL_KNOWN_PATH}',
)

  remote_support_agent = RemoteA2aAgent(


### Router Agent

In [15]:
# Create the Host ADK Agent
host_agent = SequentialAgent(
    name='trend_analysis_host',
    sub_agents=[remote_customer_data_agent, remote_support_agent],
)

In [16]:
host_agent_card = AgentCard(
    name='Customer Service Host',
    url='http://localhost:10022',
    description='Orchestrates, sequentially, customer service using specialized agents',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['application/json'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="customer_support",
            name="customer_support",
            description="Handles general customer support queries and provides solutions",
            tags=[]
        ),
        AgentSkill(
            id="ticket_management",
            name="ticket_management",
            description="Creates and manages support tickets, retrieves ticket history",
            tags=[]
        ),
        AgentSkill(
            id="issue_escalation",
            name="issue_escalation",
            description="Identifies and escalates complex issues appropriately",
            tags=[]
        ),
        AgentSkill(
            id="customer_coordination",
            name="customer_coordination",
            description="Requests and uses customer context from Customer Data Agent",
            tags=[]
        ),
    ]
)



### Part 3: A2A Coordination

Choose ONE of the following approaches:

#### Option A: Lab Notebook Approach (Recommended Starting Point)

- Use the A2A coordination pattern from your lab notebook: https://colab.research.google.com/drive/1YTVbosORUrKe_qOysA3XEhhBaCwdbMj5
- Extend it to support the three required scenarios (task allocation, negotiation, multi-step)
- Add explicit logging to show agent-to-agent communication
- Document how agents coordinate and transfer control
- You may use the same framework but must demonstrate more complex coordination patterns

#### Option B: LangGraph Message Passing

- Define a shared state structure that agents can read/write
- Create nodes for each agent
- Implement conditional edges for routing between agents
- Use message passing to share information between agents
- Handle state transitions explicitly

#### LangGraph using A2A (Preferred since LangGraph now supports A2A)

- Define agents using LangGraph's message-based state structure (must have a `messages` key)
- Each assistant auto-exposes an Agent Card at `/a2a/{assistant_id}`
- Other A2A-compatible agents discover and communicate via JSON-RPC to that endpoint

---
# Starting A2A Server

In [17]:
def create_agent_a2a_server(agent, agent_card):
    """Create an A2A server for any ADK agent.

    Args:
        agent: The ADK agent instance
        agent_card: The ADK agent card

    Returns:
        A2AStarletteApplication instance
    """
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=config)

    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    # Create A2A application
    return A2AStarletteApplication(
        agent_card=agent_card, http_handler=request_handler
    )

In [18]:
# Apply nest_asyncio
nest_asyncio.apply()

# Store server tasks
server_tasks: list[asyncio.Task] = []

async def run_agent_server(agent, agent_card, port) -> None:
    """Run a single agent server."""
    app = create_agent_a2a_server(agent, agent_card)

    config = uvicorn.Config(
        app.build(),
        host='127.0.0.1',
        port=port,
        log_level='warning',
        loop='asyncio',  # Changed from 'none' to 'asyncio'
    )

    server = uvicorn.Server(config)
    await server.serve()


async def start_all_servers() -> None:
    """Start all servers in the same event loop."""
    # Create tasks for all servers
    tasks = [
        asyncio.create_task(
            run_agent_server(customer_data_agent, customer_data_agent_card, 10021)  # Fixed port
        ),
        asyncio.create_task(
            run_agent_server(support_agent, support_agent_card, 10020)  # Fixed port
        ),
        asyncio.create_task(
            run_agent_server(host_agent, host_agent_card, 10022)
        ),
    ]

    # Give servers time to start
    await asyncio.sleep(2)

    print('‚úÖ All agent servers started!')
    print('   - Customer Data Agent: http://127.0.0.1:10021')
    print('   - Support Agent: http://127.0.0.1:10020')
    print('   - Host Agent: http://127.0.0.1:10022')

    # Keep servers running
    try:
        await asyncio.gather(*tasks)
    except KeyboardInterrupt:
        print('Shutting down servers...')
        # Cancel all tasks
        for task in tasks:
            task.cancel()
        # Wait for cancellation
        await asyncio.gather(*tasks, return_exceptions=True)


# Run in a background thread with proper event loop handling
def run_servers_in_background() -> None:
    """Run servers in a background thread with a new event loop."""
    try:
        # Create a new event loop for this thread
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        # Run the async function
        try:
            loop.run_until_complete(start_all_servers())
        except KeyboardInterrupt:
            print('Received interrupt signal, shutting down...')
        finally:
            # Clean up the loop
            pending = asyncio.all_tasks(loop)
            for task in pending:
                task.cancel()
            if pending:
                loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
            loop.close()
    except Exception as e:
        print(f'Error in server thread: {e}')
        import traceback
        traceback.print_exc()


# Start the thread
server_thread = threading.Thread(target=run_servers_in_background, daemon=True)
server_thread.start()

# Wait for servers to be ready
time.sleep(3)

  config = A2aAgentExecutorConfig()
  executor = A2aAgentExecutor(runner=runner, config=config)


‚úÖ All agent servers started!
   - Customer Data Agent: http://127.0.0.1:10021
   - Support Agent: http://127.0.0.1:10020
   - Host Agent: http://127.0.0.1:10022

üîç Checking server status...
   ‚ö†Ô∏è  Customer Data Agent (port 10021) returned status 404
   ‚ö†Ô∏è  Support Agent (port 10020) returned status 404
   ‚ö†Ô∏è  Host Agent (port 10022) returned status 404


In [19]:
print('Customer Data Agent Card:')
print(customer_data_agent_card)
print('\nSupport Agent Card:')
print(support_agent_card)
print('\nHost Agent Card:')
print(host_agent_card)

Customer Data Agent Card:
additional_interfaces=None capabilities=AgentCapabilities(extensions=None, push_notifications=None, state_transition_history=None, streaming=True) default_input_modes=['text/plain'] default_output_modes=['text/plain'] description='Manages customer database operations including retrieval, updates, and validation via MCP' documentation_url=None icon_url=None name='Customer Data Agent' preferred_transport='JSONRPC' protocol_version='0.3.0' provider=None security=None security_schemes=None signatures=None skills=[AgentSkill(description='Retrieves customer information by ID or status', examples=['Get customer information for ID 5', 'List all active customers', 'Show me customer details'], id='customer_retrieval', input_modes=None, name='Customer Retrieval', output_modes=None, security=None, tags=['customer', 'data', 'retrieval']), AgentSkill(description='Updates customer records with data validation', examples=['Update customer email', 'Change customer status', 'Mo

# Test System

In [20]:
class A2ASimpleClient:
    """A2A Simple to call A2A servers."""

    def __init__(self, default_timeout: float = 240.0):
        self._agent_info_cache: dict[
            str, dict[str, Any] | None
        ] = {}  # Cache for agent metadata
        self.default_timeout = default_timeout

    async def create_task(self, agent_url: str, message: str) -> str:
        """Send a message following the official A2A SDK pattern."""
        # Configure httpx client with timeout
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )

        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            # Check if we have cached agent card data
            if (
                agent_url in self._agent_info_cache
                and self._agent_info_cache[agent_url] is not None
            ):
                agent_card_data = self._agent_info_cache[agent_url]
            else:
                # Fetch the agent card
                agent_card_response = await httpx_client.get(
                    f'{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}'
                )
                agent_card_data = self._agent_info_cache[agent_url] = (
                    agent_card_response.json()
                )

            # Create AgentCard from data
            agent_card = AgentCard(**agent_card_data)

            # Create A2A client with the agent card
            config = ClientConfig(
                httpx_client=httpx_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )

            factory = ClientFactory(config)
            client = factory.create(agent_card)

            # Create the message object
            message_obj = create_text_message_object(content=message)

            # Send the message and collect responses
            responses = []
            async for response in client.send_message(message_obj):
                responses.append(response)

            # The response is a tuple - get the first element (Task object)
            if (
                responses
                and isinstance(responses[0], tuple)
                and len(responses[0]) > 0
            ):
                task = responses[0][0]  # First element of the tuple

                # Extract text: task.artifacts[0].parts[0].root.text
                try:
                    return task.artifacts[0].parts[0].root.text
                except (AttributeError, IndexError):
                    return str(task)

            return 'No response received'

## Helper Function

In [43]:
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
import warnings
import logging
import sys

# Suppress all warnings and errors for cleaner output
warnings.filterwarnings('ignore')
logging.getLogger('google_adk').setLevel(logging.CRITICAL)
logging.getLogger('google_genai').setLevel(logging.CRITICAL)
logging.getLogger('asyncio').setLevel(logging.CRITICAL)

async def ask_agent(query: str, show_usage: bool = False):
    """
    Send a query to the agent and display the response using Google ADK Runner.

    Args:
        query: The question or request to send to the agent
        show_usage: If True, show token usage statistics
    """
    print(f"üë§ USER: {query}")
    print()

    SESSION_ID = "456"
    USER_ID = "12345"
    app_name = "customer_service_agent"

    try:
        # Create content from user query
        content = types.Content(
            role="user",
            parts=[types.Part(text=query)]
        )

        # Create a session service
        session_service = InMemorySessionService()

        # Create session asynchronously
        session = await session_service.create_session(
            app_name=app_name,
            user_id=USER_ID,
            session_id=SESSION_ID
        )

        # Create a runner
        runner = Runner(
            agent=host_agent,
            session_service=session_service,
            app_name=app_name
        )

        # Run the agent with the runner
        events = runner.run(
            user_id=USER_ID,
            session_id=SESSION_ID,
            new_message=content
        )

        # Process events to get the final response
        agent_response = None
        full_response = None
        for event in events:
            if event.is_final_response():
                # Check if content exists and has parts
                if event.content is not None and hasattr(event.content, 'parts') and len(event.content.parts) > 0:
                    agent_response = event.content.parts[0].text
                    full_response = event
                    break
                else:
                    # Content is None or empty - try to get any text from the event
                    if hasattr(event, 'text'):
                        agent_response = event.text
                    elif hasattr(event, 'message') and hasattr(event.message, 'text'):
                        agent_response = event.message.text
                    else:
                        # Log the event structure for debugging
                        agent_response = f"Received final response but content is None. Event type: {type(event)}"
                    full_response = event
                    break

        if agent_response is None:
            # Try to collect any text from all events as fallback
            collected_text = []
            for event in events:
                if hasattr(event, 'content') and event.content is not None:
                    if hasattr(event.content, 'parts'):
                        for part in event.content.parts:
                            if hasattr(part, 'text'):
                                collected_text.append(part.text)
                elif hasattr(event, 'text'):
                    collected_text.append(event.text)

            if collected_text:
                agent_response = " ".join(collected_text)
            else:
                agent_response = "No response received."

        print("ü§ñ AGENT:")
        print(agent_response)
        print()

        return agent_response

    except Exception as e:
        print(f"‚ùå Error: {e}")
        import traceback
        traceback.print_exc()
        print()
        return None

print("‚úÖ Helper function defined (Google ADK Runner version)")


‚úÖ Helper function defined (Google ADK Runner version)


#### Scenario 1: Task Allocation

**Query:** "I need help with my account, customer ID 12345"

**A2A Flow:**

1. Router Agent receives query
2. Router Agent ‚Üí Customer Data Agent: "Get customer info for ID 12345"
3. Customer Data Agent fetches via MCP
4. Customer Data Agent ‚Üí Router Agent: Returns customer data
5. Router Agent analyzes customer tier/status
6. Router Agent ‚Üí Support Agent: "Handle support for premium customer"
7. Support Agent generates response
8. Router Agent returns final response

In [21]:
a2a_client = A2ASimpleClient()

In [35]:
await ask_agent("I need help with my account, customer ID 12345?")

üë§ USER: I need help with my account, customer ID 12345?

ü§ñ AGENT:
I'm sorry, I was unable to find an account with the customer ID 12345. Please double check the customer ID you provided.



"I'm sorry, I was unable to find an account with the customer ID 12345. Please double check the customer ID you provided."

#### Scenario 2: Negotiation/Escalation

**Query:** "I want to cancel my subscription but I'm having billing issues"

**A2A Flow:**

1. Router detects multiple intents (cancellation + billing)
2. Router ‚Üí Support Agent: "Can you handle this?"
3. Support Agent ‚Üí Router: "I need billing context"
4. Router ‚Üí Customer Data Agent: "Get billing info"
5. Router negotiates between agents to formulate response
6. Coordinated response sent to customer

In [36]:
await ask_agent("I want to cancel my subscription but I'm having billing issues?")

üë§ USER: I want to cancel my subscription but I'm having billing issues?

ü§ñ AGENT:
I can help with that, what is your customer ID?



'I can help with that, what is your customer ID?'

#### Scenario 3: Multi-Step Coordination

**Query:** "What's the status of all high-priority tickets for premium customers?"

**A2A Flow:**

1. Router decomposes into sub-tasks
2. Router ‚Üí Customer Data Agent: "Get all premium customers"
3. Customer Data Agent ‚Üí Router: Returns customer list
4. Router ‚Üí Support Agent: "Get high-priority tickets for these IDs"
5. Support Agent queries tickets via MCP
6. Agents coordinate to format report
7. Router synthesizes final answer

In [44]:
await ask_agent("What's the status of all high-priority tickets for premium customers?")

üë§ USER: What's the status of all high-priority tickets for premium customers?

ü§ñ AGENT:
I am sorry, I cannot fulfill this request. I can get the ticket history for a specific customer, but I do not have a way to identify premium customers. If you can provide me with a list of premium customers, I can get the ticket history for each of them.



'I am sorry, I cannot fulfill this request. I can get the ticket history for a specific customer, but I do not have a way to identify premium customers. If you can provide me with a list of premium customers, I can get the ticket history for each of them.'


## Test Scenarios

Your system must successfully handle these queries:

1. **Simple Query:** "Get customer information for ID 5"
   - Single agent, straightforward MCP call

2. **Coordinated Query:** "I'm customer 12345 and need help upgrading my account"
   - Multiple agents coordinate: data fetch + support response

3. **Complex Query:** "Show me all active customers who have open tickets"
   - Requires negotiation between data and support agents

4. **Escalation:** "I've been charged twice, please refund immediately!"
   - Router must identify urgency and route appropriately

5. **Multi-Intent:** "Update my email to new@email.com and show my ticket history"
   - Parallel task execution and coordination

In [38]:
await ask_agent("Get customer information for ID 5")

üë§ USER: Get customer information for ID 5

ü§ñ AGENT:
OK. I have the customer information for ID 5.

**Customer Details:**
- **ID:** 5
- **Name:** Charlie Brown
- **Email:** new@email.com
- **Phone:** +1-555-0105
- **Status:** active
- **Created At:** 2025-12-02 18:25:50
- **Updated At:** 2025-12-02 18:30:02




'OK. I have the customer information for ID 5.\n\n**Customer Details:**\n- **ID:** 5\n- **Name:** Charlie Brown\n- **Email:** new@email.com\n- **Phone:** +1-555-0105\n- **Status:** active\n- **Created At:** 2025-12-02 18:25:50\n- **Updated At:** 2025-12-02 18:30:02\n'

In [39]:
await ask_agent("I'm customer 12345 and need help upgrading my account")

üë§ USER: I'm customer 12345 and need help upgrading my account

ü§ñ AGENT:
I am sorry, but I cannot find a customer with the ID 12345. Please provide a valid customer ID.



'I am sorry, but I cannot find a customer with the ID 12345. Please provide a valid customer ID.'

In [45]:
await ask_agent("Show me all active customers who have open tickets")

üë§ USER: Show me all active customers who have open tickets

ü§ñ AGENT:
Here are the active customers with open tickets:

- **Alice Williams** (ID: 4, Email: alice.w@techcorp.com, Phone: +1-555-0104)
- **Charlie Brown** (ID: 5, Email: new@email.com, Phone: +1-555-0105)
- **Diana Prince** (ID: 6, Email: diana.prince@company.org, Phone: +1-555-0106)
- **Edward Norton** (ID: 7, Email: e.norton@business.net, Phone: +1-555-0107)
- **George Miller** (ID: 9, Email: george.m@enterprise.com, Phone: +1-555-0109)
- **Hannah Lee** (ID: 10, Email: hannah.lee@global.com, Phone: +1-555-0110)
- **Isaac Newton** (ID: 11, Email: isaac.n@science.edu, Phone: +1-555-0111)
- **Jane Smith** (ID: 2, Email: jane.smith@example.com, Phone: +1-555-0102)
- **John Doe** (ID: 1, Email: john.doe@example.com, Phone: +1-555-0101)
- **Julia Roberts** (ID: 12, Email: julia.r@movies.com, Phone: +1-555-0112)
- **Michael Scott** (ID: 15, Email: michael.scott@paper.com, Phone: +1-555-0115)



'Here are the active customers with open tickets:\n\n- **Alice Williams** (ID: 4, Email: alice.w@techcorp.com, Phone: +1-555-0104)\n- **Charlie Brown** (ID: 5, Email: new@email.com, Phone: +1-555-0105)\n- **Diana Prince** (ID: 6, Email: diana.prince@company.org, Phone: +1-555-0106)\n- **Edward Norton** (ID: 7, Email: e.norton@business.net, Phone: +1-555-0107)\n- **George Miller** (ID: 9, Email: george.m@enterprise.com, Phone: +1-555-0109)\n- **Hannah Lee** (ID: 10, Email: hannah.lee@global.com, Phone: +1-555-0110)\n- **Isaac Newton** (ID: 11, Email: isaac.n@science.edu, Phone: +1-555-0111)\n- **Jane Smith** (ID: 2, Email: jane.smith@example.com, Phone: +1-555-0102)\n- **John Doe** (ID: 1, Email: john.doe@example.com, Phone: +1-555-0101)\n- **Julia Roberts** (ID: 12, Email: julia.r@movies.com, Phone: +1-555-0112)\n- **Michael Scott** (ID: 15, Email: michael.scott@paper.com, Phone: +1-555-0115)'

In [41]:
await ask_agent("I've been charged twice, please refund immediately!")

üë§ USER: I've been charged twice, please refund immediately!

ü§ñ AGENT:
I am sorry to hear that you have been charged twice. I am unable to process a refund for you, but I can open a support ticket to have this resolved. Can you please provide me with your customer ID?



'I am sorry to hear that you have been charged twice. I am unable to process a refund for you, but I can open a support ticket to have this resolved. Can you please provide me with your customer ID?'

In [46]:
await ask_agent("I'm customer 5. Update my email to new@email.com and show my ticket history")

üë§ USER: I'm customer 5. Update my email to new@email.com and show my ticket history

ü§ñ AGENT:
Received final response but content is None. Event type: <class 'google.adk.events.event.Event'>



"Received final response but content is None. Event type: <class 'google.adk.events.event.Event'>"


## Deliverables

### 1. Code Repository (GitHub)

- MCP server implementation
- Agent implementations
- Configuration and deployment scripts
- `README.md` with setup instructions (use proper Python venv separation and a `requirements.txt` file that clearly shows Python packages needed)

### 2. Colab Notebook or a Python Program that Runs End to End

- End-to-end demonstration
- At least 3 test scenarios showing A2A coordination
- Output captured properly that shows the queries

### 3. Conclusion

1-2 paragraphs of what you learned and challenges

## Common Pitfalls to Avoid

### MCP Integration Issues

**Problem:** MCP server becomes unreachable during testing  
**Solution:** Keep ngrok session active, implement reconnection logic

**Problem:** Tools timeout or fail silently  
**Solution:** Add explicit error handling and logging in each MCP tool

**Problem:** Database state gets corrupted during testing  
**Solution:** Implement database reset function, use transactions

### A2A Coordination Issues

**Problem:** Agents get stuck in infinite loops  
**Solution:** Add maximum iteration limits, implement timeout logic

**Problem:** Agent responses are inconsistent  
**Solution:** Be explicit in system instructions, add examples

**Problem:** Information gets lost between agent transfers  
**Solution:** Use structured state, log all transfers

### Implementation Challenges

**Problem:** Response times exceed 3 seconds  
**Solution:** Parallelize independent agent calls, cache frequent queries

**Problem:** Agents don't coordinate properly  
**Solution:** Test each agent independently first, then test pairs

**Problem:** Difficult to debug multi-agent interactions  
**Solution:** Add comprehensive logging at every coordination point

# Troubleshooting

### 1. Resolving this error
üë§ USER: What's the status of all high-priority tickets for premium customers?

‚ùå Error: 'NoneType' object has no attribute 'parts'

Traceback (most recent call last):
  File "/tmp/ipython-input-1826302461.py", line 65, in ask_agent
    agent_response = event.content.parts[0].text
                     ^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'parts'

- I updated the ask_agent function to:
- Check if event.content is not None before accessing parts

### 2. Package version collision
The following packages were successfully reinstalled to specific versions: google-genai, google-adk==1.9.0, a2a-sdk==0.3.0, protobuf==5.29.5, and requests==2.32.4.

### 3. Import function name change
MCPToolset was  renamed
I changed import statement from
```from google.adk.tools.mcp_tool import McpToolset```
to
```from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset```
to get rid of the error


---

## Key Takeaways and Lessons Learned
- Learned the technical concepts and hands-on process of building an MCP Server and connecting it to an A2A system.
- Realized the importance of designing the MCP Tool definitions to align with the underlying database schema‚Äîschema mismatches lead to immediate failures or unexpected errors.

Important Technical Insights

1.	Maintaining configuration consistency across all layers
- Port numbers must be aligned across AgentCard URLs, RemoteA2aAgent endpoints, and server startup configurations.
- Any mismatch at any layer results in connectivity failures such as 404 errors.
2.	Using defensive programming for agent responses
- Agent responses may return None content even when is_final_response() is True, so the server must safely handle these cases.
3.	Managing variable scope and module-level dependencies
- Database paths and configuration variables must be defined at the correct module level to avoid import-time errors or undefined behavior.
4.	Handling async event loop management
- Running multiple A2A servers simultaneously requires careful management of asyncio event loops to prevent conflicts or shutdown issues.