# Scenario 03: A2A Agent-to-Agent Protocol

**Estimated Time**: 45 minutes

## Learning Objectives
- Understand A2A protocol for agent interoperability
- Create Agent Cards for capability discovery
- Build A2A server and client implementations
- Enable cross-agent communication

## Prerequisites
- Completed Scenario 01 (Simple Agent + MCP)
- Understanding of REST APIs and JSON-RPC

## Part 1: Understanding A2A Protocol

### What is A2A?

A2A (Agent-to-Agent) is a protocol that enables agents to:
- **Discover** each other via Agent Cards
- **Invoke** capabilities remotely via JSON-RPC
- **Track** task progress and status
- **Exchange** structured messages and artifacts

### Key Concepts

1. **Agent Card**: Metadata describing agent capabilities (`/.well-known/agent-card.json`)
2. **Skills**: Specific capabilities an agent exposes
3. **Tasks**: Work items that agents process
4. **JSON-RPC**: Communication protocol for method invocation

### Task States

```
submitted → working → completed
                   → failed
                   → cancelled
         → input-required → ...
```

## Part 2: Setting Up the Environment

In [1]:
# Load environment and configure paths
import sys
from pathlib import Path

# Add project root to path
project_root = Path("..").resolve()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Load environment variables
from dotenv import load_dotenv
load_dotenv(project_root / ".env")

print(f"✅ Project root: {project_root}")

✅ Project root: C:\Users\jonasrotter\OneDrive - Microsoft\Desktop\Jonas Privat\MyCodingProjects\agents-workshop


In [2]:
from azure.monitor.opentelemetry.exporter import (AzureMonitorTraceExporter)

In [3]:
# Verify imports
import sys
import json
from pathlib import Path

# Ensure we can import from src
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

# Import A2A server from workshop implementation
from src.agents import A2AServer, create_a2a_server

# Import all A2A types directly from SDK
from a2a.types import (
    AgentCard,
    AgentSkill,
    AgentCapabilities,
    TaskState,
    Task,
    TaskStatus,
    Message,
    TextPart,
    Artifact,
    JSONRPCRequest,
    JSONRPCError,
    JSONRPCResponse,
)
from a2a.server.tasks import InMemoryTaskStore

from src.common.telemetry import setup_telemetry, get_tracer

# Setup telemetry
setup_telemetry()
tracer = get_tracer(__name__)

print("✅ A2A SDK components imported successfully!")
print(f"\nTask states: {[s.value for s in TaskState]}")

Patch applied successfully
✓ Applied clr_loader patch for .NET 10+ compatibility
[tfm] Applying fix for version: 10.0.1 -> net10.0
[floor_version] Applying fix for version: 10.0.1 -> 10.0.0
✅ A2A SDK components imported successfully!

Task states: ['submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'rejected', 'auth-required', 'unknown']


## Part 3: Understanding Agent Cards

Agent Cards are JSON documents that advertise an agent's capabilities.
They are served at `/.well-known/agent-card.json`.

In [4]:
# Create an Agent Card using SDK types
agent_card = AgentCard(
    name="Research Agent",
    description="Researches topics and provides summaries with sources",
    url="http://localhost:8000",
    version="1.0.0",
    capabilities=AgentCapabilities(),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[
        AgentSkill(
            id="research_topic",
            name="Research Topic",
            description="Research a topic and return findings",
            tags=["research", "topic"],
            input_schema={
                "type": "object",
                "properties": {
                    "topic": {"type": "string"},
                    "depth": {
                        "type": "string",
                        "enum": ["brief", "detailed", "comprehensive"]
                    }
                },
                "required": ["topic"]
            }
        ),
        AgentSkill(
            id="summarize",
            name="Summarize",
            description="Summarize a document or text",
            tags=["summarize", "text"],
            input_schema={
                "type": "object",
                "properties": {
                    "text": {"type": "string"},
                    "max_length": {"type": "integer"}
                },
                "required": ["text"]
            }
        )
    ]
)

print("=== Agent Card ===")
print(json.dumps(agent_card.model_dump(by_alias=True), indent=2))

=== Agent Card ===
{
  "additionalInterfaces": null,
  "capabilities": {
    "extensions": null,
    "pushNotifications": null,
    "stateTransitionHistory": null,
    "streaming": null
  },
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "description": "Researches topics and provides summaries with sources",
  "documentationUrl": null,
  "iconUrl": null,
  "name": "Research Agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "provider": null,
  "security": null,
  "securitySchemes": null,
  "signatures": null,
  "skills": [
    {
      "description": "Research a topic and return findings",
      "examples": null,
      "id": "research_topic",
      "inputModes": null,
      "name": "Research Topic",
      "outputModes": null,
      "security": null,
      "tags": [
        "research",
        "topic"
      ]
    },
    {
      "description": "Summarize a document or text",
      "examples": null,
      "id": "summarize",
    

In [5]:
# Inspect capabilities (SDK uses snake_case for Python attributes)
print("=== Agent Capabilities ===")
print(f"Streaming: {agent_card.capabilities.streaming}")
print(f"Push Notifications: {agent_card.capabilities.push_notifications}")
print(f"State Transition History: {agent_card.capabilities.state_transition_history}")

print("\n=== Agent Skills ===")
for skill in agent_card.skills:
    print(f"  - {skill.name} ({skill.id})")
    print(f"    {skill.description}")
    print(f"    Tags: {skill.tags}")

=== Agent Capabilities ===
Streaming: None
Push Notifications: None
State Transition History: None

=== Agent Skills ===
  - Research Topic (research_topic)
    Research a topic and return findings
    Tags: ['research', 'topic']
  - Summarize (summarize)
    Summarize a document or text
    Tags: ['summarize', 'text']


## Part 4: Creating an A2A Server

In [6]:
# Import FastAPI testing utilities
from fastapi.testclient import TestClient

# Create A2A server with skills
app = create_a2a_server(
    name="Demo Agent",
    description="Demo A2A agent for workshop",
    url="http://localhost:8000",
    skills=[
        AgentSkill(
            id="echo",
            name="Echo",
            description="Echo back the message",
            tags=["demo", "echo"],
        )
    ]
)

client = TestClient(app)

# Test health endpoint
response = client.get("/health")
print(f"Health: {response.json()}")

Health: {'status': 'healthy', 'protocol': 'A2A', 'agent': 'Demo Agent'}


In [7]:
# Fetch Agent Card
response = client.get("/.well-known/agent-card.json")
card = response.json()

print("=== Retrieved Agent Card ===")
print(f"Name: {card['name']}")
print(f"URL: {card['url']}")
print(f"Skills: {[s['name'] for s in card['skills']]}")

=== Retrieved Agent Card ===
Name: Demo Agent
URL: http://localhost:8000
Skills: ['Echo']


## Part 5: JSON-RPC Communication

A2A uses JSON-RPC 2.0 for method invocation. Let's explore the protocol.

In [8]:
# Create a JSON-RPC request (SDK provides JSONRPCRequest model)
# Note: SDK uses camelCase for JSON serialization
rpc_request = JSONRPCRequest(
    id="req-001",
    method="message/send",
    params={
        "message": {
            "role": "user",
            "parts": [{"kind": "text", "text": "Hello, A2A agent!"}],
            "messageId": "msg-001"
        },
        "contextId": "ctx-demo"
    }
)

print("=== JSON-RPC Request ===")
print(json.dumps(rpc_request.model_dump(by_alias=True), indent=2))

=== JSON-RPC Request ===
{
  "id": "req-001",
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": {
    "message": {
      "role": "user",
      "parts": [
        {
          "kind": "text",
          "text": "Hello, A2A agent!"
        }
      ],
      "messageId": "msg-001"
    },
    "contextId": "ctx-demo"
  }
}


In [9]:
# Send message to agent
response = client.post("/", json=rpc_request.model_dump())
result = response.json()

print("=== JSON-RPC Response ===")
print(json.dumps(result, indent=2))

=== JSON-RPC Response ===
{
  "id": "req-001",
  "jsonrpc": "2.0",
  "result": {
    "artifacts": [
      {
        "artifactId": "art-91fda68f",
        "name": "echo",
        "parts": [
          {
            "kind": "text",
            "text": "Echo: Hello, A2A agent!"
          }
        ]
      }
    ],
    "contextId": "ctx-63bc1cb734d2",
    "history": [
      {
        "kind": "message",
        "messageId": "msg-001",
        "parts": [
          {
            "kind": "text",
            "text": "Hello, A2A agent!"
          }
        ],
        "role": "user"
      }
    ],
    "id": "task-15196df62f4a",
    "kind": "task",
    "status": {
      "state": "completed"
    }
  }
}


In [10]:
# Extract task information
if "result" in result:
    task_result = result["result"]
    print("=== Task Created ===")
    print(f"Task ID: {task_result.get('id')}")
    print(f"Context ID: {task_result.get('contextId')}")
    print(f"Status: {task_result.get('status', {}).get('state')}")

=== Task Created ===
Task ID: task-15196df62f4a
Context ID: ctx-63bc1cb734d2
Status: completed


## Part 6: Task Management

A2A tasks go through various states. Let's explore task management.

In [11]:
# Create and manage tasks directly using SDK's InMemoryTaskStore
import uuid
from a2a.types import Task, TaskState, TaskStatus

task_store = InMemoryTaskStore()

# Create a task manually (SDK's store just saves/retrieves, doesn't create)
task_id = str(uuid.uuid4())
context_id = "ctx-test"

# Create initial task with submitted state
message = Message(
    role="user",
    parts=[TextPart(text="Research AI agents")],
    message_id="msg-test"  # SDK uses snake_case
)

task = Task(
    id=task_id,
    context_id=context_id,
    status=TaskStatus(state=TaskState.submitted),  # SDK uses lowercase enum
    history=[message]
)

# Save to store - InMemoryTaskStore.save() is async, use top-level await
await task_store.save(task)
print(f"Created task: {task.id}")
print(f"Status: {task.status.state.value}")

Created task: 6a6f44e4-1204-466a-9ea9-0e1b8d60ef86
Status: submitted


In [12]:
# Simulate task lifecycle using SDK types
async def task_lifecycle():
    print("=== Task Lifecycle ===")
    
    # Move to working - create updated task
    task_working = Task(
        id=task.id,
        context_id=task.context_id,
        status=TaskStatus(state=TaskState.working),
        history=task.history
    )
    await task_store.save(task_working)
    retrieved = await task_store.get(task.id)
    print(f"1. {retrieved.status.state.value}")
    
    # Move to completed with artifact
    artifact = Artifact(
        artifact_id="art-001",  # SDK uses snake_case
        name="research_result",
        parts=[TextPart(text="AI agents are autonomous systems...")]
    )
    
    task_completed = Task(
        id=task.id,
        context_id=task.context_id,
        status=TaskStatus(state=TaskState.completed),
        history=task.history,
        artifacts=[artifact]
    )
    await task_store.save(task_completed)
    retrieved = await task_store.get(task.id)
    print(f"2. {retrieved.status.state.value}")
    print(f"   Artifacts: {len(retrieved.artifacts or [])}")

await task_lifecycle()

=== Task Lifecycle ===
1. working
2. completed
   Artifacts: 1


In [13]:
# Get task via JSON-RPC
get_request = JSONRPCRequest(
    id="req-002",
    method="tasks/get",
    params={"id": task.id}
)

print("=== tasks/get Request ===")
print(json.dumps(get_request.model_dump(by_alias=True), indent=2))

=== tasks/get Request ===
{
  "id": "req-002",
  "jsonrpc": "2.0",
  "method": "tasks/get",
  "params": {
    "id": "6a6f44e4-1204-466a-9ea9-0e1b8d60ef86"
  }
}


In [14]:
# List tasks request
list_request = JSONRPCRequest(
    id="req-003",
    method="tasks/list",
    params={
        "contextId": "ctx-test",
        "pageSize": 10
    }
)

print("=== tasks/list Request ===")
print(json.dumps(list_request.model_dump(by_alias=True), indent=2))

# Note: SDK's InMemoryTaskStore only has get/save/delete
# For listing, you'd need to track IDs separately or iterate
async def check_task():
    result = await task_store.get(task.id)
    print(f"\nTask stored: {result is not None}")

await check_task()

=== tasks/list Request ===
{
  "id": "req-003",
  "jsonrpc": "2.0",
  "method": "tasks/list",
  "params": {
    "contextId": "ctx-test",
    "pageSize": 10
  }
}

Task stored: True


## Part 7: Error Handling

A2A defines standard error codes for various failure scenarios.

In [15]:
# SDK provides typed error classes instead of error codes
from a2a.types import (
    JSONParseError,
    InvalidRequestError, 
    MethodNotFoundError,
    InvalidParamsError,
    InternalError,
    TaskNotFoundError,
)

# Standard JSON-RPC error codes (per JSON-RPC 2.0 spec)
print("=== A2A Error Codes (JSON-RPC 2.0) ===")
error_codes = {
    "PARSE_ERROR": -32700,
    "INVALID_REQUEST": -32600,
    "METHOD_NOT_FOUND": -32601,
    "INVALID_PARAMS": -32602,
    "INTERNAL_ERROR": -32603,
}

for name, code in error_codes.items():
    print(f"  {name}: {code}")

print("\n=== SDK Error Types ===")
sdk_errors = [
    JSONParseError,
    InvalidRequestError,
    MethodNotFoundError,
    InvalidParamsError,
    InternalError,
    TaskNotFoundError,
]
for err in sdk_errors:
    print(f"  {err.__name__}")

=== A2A Error Codes (JSON-RPC 2.0) ===
  PARSE_ERROR: -32700
  INVALID_REQUEST: -32600
  METHOD_NOT_FOUND: -32601
  INVALID_PARAMS: -32602
  INTERNAL_ERROR: -32603

=== SDK Error Types ===
  JSONParseError
  InvalidRequestError
  MethodNotFoundError
  InvalidParamsError
  InternalError
  TaskNotFoundError


In [16]:
# Test method not found error
invalid_request = JSONRPCRequest(
    id="req-error",
    method="unknown/method",
    params={}
)

response = client.post("/", json=invalid_request.model_dump(by_alias=True))
result = response.json()

print("=== Error Response ===")
print(json.dumps(result, indent=2))

Request Error (ID: req-error): Code=-32601, Message='Method not found'


=== Error Response ===
{
  "error": {
    "code": -32601,
    "message": "Method not found"
  },
  "id": "req-error",
  "jsonrpc": "2.0"
}


## Part 8: Agent Discovery Pattern

In a multi-agent system, agents discover each other via Agent Cards.

In [17]:
async def discover_agent(base_url: str, client: TestClient) -> dict:
    """Discover agent capabilities via Agent Card."""
    response = client.get("/.well-known/agent-card.json")
    if response.status_code == 200:
        return response.json()
    raise Exception(f"Failed to discover agent: {response.status_code}")

def find_skill(card: dict, skill_id: str) -> dict | None:
    """Find a specific skill in an agent card."""
    for skill in card.get("skills", []):
        if skill["id"] == skill_id:
            return skill
    return None

# Discover agent
card = await discover_agent("http://localhost:8000", client)
print(f"Discovered: {card['name']}")

# Check for specific skill
echo_skill = find_skill(card, "echo")
if echo_skill:
    print(f"\nFound skill: {echo_skill['name']}")
    print(f"Description: {echo_skill['description']}")

Discovered: Demo Agent

Found skill: Echo
Description: Echo back the message


## Part 9: Connecting with ResearchAgent

In [18]:
# Import agent
from src.agents import ResearchAgent
from src.tools import search_web, calculate

# Try to create A2A server with actual agent
try:
    agent = ResearchAgent(name="research_agent")
    agent.set_tool_handlers({
        "search_web": search_web,
        "calculate": calculate,
    })
    
    server = A2AServer(
        agent=agent,
        name="Research Agent",
        description="Research agent with tools",
        skills=[
            AgentSkill(
                id="research",
                name="Research",
                description="Research any topic",
                tags=["research"],
            ),
            AgentSkill(
                id="calculate",
                name="Calculate",
                description="Perform calculations",
                tags=["calculate"],
            )
        ]
    )
    agent_app = server.create_app()
    print("✅ A2A server with agent created!")
except Exception as e:
    print(f"⚠️ Could not create agent: {e}")
    print("Using demo server...")
    agent_app = app

✅ A2A server with agent created!


## Part 10: Edge Case - Configurable Timeouts

In [19]:
import asyncio
from dataclasses import dataclass

@dataclass
class TimeoutConfig:
    """Configuration for A2A timeouts."""
    connect_timeout: float = 5.0
    read_timeout: float = 30.0
    task_timeout: float = 300.0  # 5 minutes

async def call_with_timeout(
    func,
    timeout: float,
    error_message: str = "Operation timed out"
):
    """Execute function with timeout."""
    try:
        return await asyncio.wait_for(func, timeout=timeout)
    except asyncio.TimeoutError:
        # Return error dict (SDK doesn't have ErrorCode enum)
        return {"error": {"code": -32603, "message": error_message, "data": {"timeout": timeout}}}

# Demonstrate timeout handling
async def slow_operation():
    await asyncio.sleep(2)
    return "Completed"

# This will timeout
result = await call_with_timeout(
    slow_operation(),
    timeout=1.0,
    error_message="Task execution timed out"
)

if isinstance(result, dict) and "error" in result:
    print(f"❌ Timeout: {result['error']['message']}")
else:
    print(f"✅ Result: {result}")

❌ Timeout: Task execution timed out


In [20]:
# Successful with longer timeout
result = await call_with_timeout(
    slow_operation(),
    timeout=5.0,
    error_message="Task execution timed out"
)

if isinstance(result, dict) and "error" in result:
    print(f"❌ Timeout: {result['error']['message']}")
else:
    print(f"✅ Result: {result}")

✅ Result: Completed


## Part 11: Hands-On Exercise

### Exercise: Register Agent with Discovery Service

Implement a mock discovery service that agents can register with.

In [21]:
# Exercise: Complete this discovery service

class MockDiscoveryService:
    """Mock agent discovery service."""
    
    def __init__(self):
        self.agents: dict[str, AgentCard] = {}
    
    def register(self, agent_card: AgentCard) -> str:
        """Register an agent and return registration ID."""
        import uuid
        reg_id = f"reg-{uuid.uuid4().hex[:8]}"
        self.agents[reg_id] = agent_card
        return reg_id
    
    def unregister(self, registration_id: str) -> bool:
        """Unregister an agent."""
        if registration_id in self.agents:
            del self.agents[registration_id]
            return True
        return False
    
    def find_by_skill(self, skill_id: str) -> list[AgentCard]:
        """Find agents that have a specific skill."""
        results = []
        for card in self.agents.values():
            for skill in card.skills:
                if skill.id == skill_id:
                    results.append(card)
                    break
        return results
    
    def list_all(self) -> list[AgentCard]:
        """List all registered agents."""
        return list(self.agents.values())
    
    # TODO: Add method to find agents by capability
    # TODO: Add method to search agents by description

# Test the discovery service
discovery = MockDiscoveryService()

# Register agents using SDK types
reg1 = discovery.register(AgentCard(
    name="Research Agent",
    description="Agent for researching topics",
    url="http://agent1.local",
    version="1.0.0",
    capabilities=AgentCapabilities(),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="research", name="Research", description="Research topics", tags=["research"])]
))

reg2 = discovery.register(AgentCard(
    name="Calculator Agent",
    description="Agent for calculations",
    url="http://agent2.local",
    version="1.0.0",
    capabilities=AgentCapabilities(),
    default_input_modes=["text"],
    default_output_modes=["text"],
    skills=[AgentSkill(id="calculate", name="Calculate", description="Do math", tags=["math"])]
))

print(f"Registered agents: {len(discovery.list_all())}")

# Find by skill
research_agents = discovery.find_by_skill("research")
print(f"Agents with 'research' skill: {[a.name for a in research_agents]}")

Registered agents: 2
Agents with 'research' skill: ['Research Agent']


## Summary

In this scenario, you learned:

1. **A2A Protocol**: Agent-to-agent communication standard
2. **Agent Cards**: Capability discovery via `/.well-known/agent-card.json`
3. **Skills**: Defining agent capabilities with schemas
4. **JSON-RPC**: Communication protocol for method invocation
5. **Tasks**: Lifecycle management (submitted → working → completed)
6. **Error Handling**: Standard error codes and responses
7. **Timeouts**: Configurable timeout handling

## Next Steps

- **Scenario 4**: Deterministic multi-agent workflows
- Build agent discovery service
- Implement agent orchestration

## Resources

- [A2A Protocol Spec](specs/001-agentic-patterns-workshop/contracts/a2a-protocol.md)
- [JSON-RPC 2.0 Specification](https://www.jsonrpc.org/specification)
- [Agent Card Standard](https://github.com/agent-card/spec)