# Day 7 - Lab 2: Agent Interoperability with A2A Protocol

**Objective:** Master agent-to-agent communication using the A2A protocol for building collaborative multi-agent onboarding systems.

**Estimated Time:** 135 minutes

## Introduction

In this lab, you'll implement the A2A (Agent-to-Agent) protocol from scratch using pure Python. You'll build specialized agents that collaborate to onboard new employees.

### Why A2A?

Modern AI systems require multiple specialized agents:
- **DocumentationAgent**: Answers policy questions
- **AccessSetupAgent**: Manages system permissions
- **TrainingScheduleAgent**: Plans learning paths
- **CoordinatorAgent**: Orchestrates the workflow

A2A enables these agents to:
1. **Discover** each other's capabilities
2. **Communicate** with structured messages
3. **Collaborate** on complex tasks

By the end, you'll have a working multi-agent onboarding system.

## Step 1: Environment Setup (No External Dependencies!)

In [None]:
import sys
import os
import json
import uuid
import time
import asyncio
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any, Callable
from enum import Enum
from collections import defaultdict
import queue
import threading

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from utils import setup_llm_client, save_artifact
client, model_name, api_provider = setup_llm_client(model_name='gpt-4o')

print('✅ Environment ready - Pure Python A2A implementation')

## Step 2: Load Onboarding Data

In [None]:
# Load shared onboarding data
assets_path = 'assets'

with open(f'{assets_path}/onboarding_docs.json', 'r') as f:
    onboarding_docs = json.load(f)

with open(f'{assets_path}/roles_access_matrix.json', 'r') as f:
    roles_access = json.load(f)

with open(f'{assets_path}/training_catalog.json', 'r') as f:
    training_catalog = json.load(f)

with open(f'{assets_path}/new_hires_sample.json', 'r') as f:
    new_hires = json.load(f)

print(f'Loaded data for {len(new_hires["new_hires"])} new hires')

## Step 3: Define A2A Protocol Components

We'll build the protocol from scratch:

In [None]:
# A2A Message Types
class MessageType(Enum):
    ANNOUNCE = 'announce'
    REQUEST = 'request'
    RESPONSE = 'response'
    ERROR = 'error'
    EVENT = 'event'

# A2A Message Envelope
@dataclass
class A2AMessage:
    protocol: str = 'a2a.v1'
    msg_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    corr_id: Optional[str] = None
    type: MessageType = MessageType.REQUEST
    sender: str = ''
    recipient: str = ''
    capabilities: List[str] = field(default_factory=list)
    payload: Dict[str, Any] = field(default_factory=dict)
    ttl_ms: int = 10000
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    
    def to_dict(self) -> Dict:
        d = asdict(self)
        d['type'] = self.type.value
        return d

print('✅ A2A Protocol components defined')

## Challenge 1 (Foundational): Implement Message Bus and Service Registry

**Task:** Build the core infrastructure for agent communication.

In [None]:
class ServiceRegistry:
    """Registry for agent capabilities."""
    def __init__(self):
        self.capabilities = defaultdict(set)  # capability -> set of agent_ids
        self.agents = {}  # agent_id -> agent_info
    
    def register_capability(self, agent_id: str, capability: str):
        """Register an agent's capability."""
        # TODO: Implement this
        pass
    
    def find_providers(self, capability: str) -> List[str]:
        """Find agents that provide a capability."""
        # TODO: Implement this
        pass

class A2ABus:
    """Message bus for agent communication."""
    def __init__(self):
        self.registry = ServiceRegistry()
        self.message_queue = queue.Queue()
        self.handlers = {}  # agent_id -> handler function
    
    def publish(self, message: A2AMessage):
        """Publish a message to the bus."""
        # TODO: Implement this
        # 1. Add to queue
        # 2. Route to recipient(s)
        pass
    
    def subscribe(self, agent_id: str, handler: Callable):
        """Subscribe an agent to receive messages."""
        # TODO: Implement this
        pass

# Test your implementation
bus = A2ABus()
print('✅ Bus and Registry created')

## Challenge 2 (Intermediate): Build Specialized Agents

**Task:** Implement agents for documentation, access, and training.

In [None]:
class BaseAgent:
    """Base class for all A2A agents."""
    def __init__(self, agent_id: str, capabilities: List[str], bus: A2ABus):
        self.agent_id = agent_id
        self.capabilities = capabilities
        self.bus = bus
        self.running = False
    
    def start(self):
        """Start the agent and announce capabilities."""
        self.running = True
        self.bus.subscribe(self.agent_id, self.handle_message)
        
        # TODO: Send ANNOUNCE message
        announce = A2AMessage(
            type=MessageType.ANNOUNCE,
            sender=self.agent_id,
            recipient='*',  # Broadcast
            capabilities=self.capabilities
        )
        # self.bus.publish(announce)
    
    def handle_message(self, message: A2AMessage):
        """Handle incoming messages."""
        # TODO: Implement message handling
        pass

class DocumentationAgent(BaseAgent):
    """Agent that answers policy and documentation questions."""
    def __init__(self, bus: A2ABus):
        super().__init__('agent://documentation', ['onboarding.docs'], bus)
    
    def handle_message(self, message: A2AMessage):
        """Process documentation requests."""
        if message.type == MessageType.REQUEST:
            # TODO: Implement documentation lookup
            # 1. Parse request from payload
            # 2. Search onboarding_docs
            # 3. Send RESPONSE with results
            pass

# TODO: Implement AccessSetupAgent
class AccessSetupAgent(BaseAgent):
    pass

# TODO: Implement TrainingScheduleAgent  
class TrainingScheduleAgent(BaseAgent):
    pass

## Challenge 3 (Advanced): Orchestrate Multi-Agent Collaboration

**Task:** Build a coordinator that orchestrates the onboarding workflow.

In [None]:
class CoordinatorAgent(BaseAgent):
    """Orchestrates the onboarding workflow."""
    def __init__(self, bus: A2ABus):
        super().__init__('agent://coordinator', ['onboarding.orchestration'], bus)
        self.pending_requests = {}
        self.onboarding_plans = {}
    
    async def create_onboarding_plan(self, hire_data: Dict) -> Dict:
        """Create a complete onboarding plan for a new hire."""
        plan = {
            'hire_id': hire_data['id'],
            'documentation': [],
            'access_requests': [],
            'training_schedule': []
        }
        
        # TODO: Implement orchestration
        # 1. Discover available agents
        # 2. Request documentation policies
        # 3. Request access setup
        # 4. Request training schedule
        # 5. Aggregate responses
        # 6. Return complete plan
        
        return plan

# Test orchestration
async def test_orchestration():
    bus = A2ABus()
    
    # Start all agents
    doc_agent = DocumentationAgent(bus)
    doc_agent.start()
    
    # TODO: Start other agents
    
    coordinator = CoordinatorAgent(bus)
    coordinator.start()
    
    # Create plan for first hire
    hire = new_hires['new_hires'][0]
    # plan = await coordinator.create_onboarding_plan(hire)
    # print(f"Created plan for {hire['name']}")

# Run test (uncomment to execute)
# import asyncio
# asyncio.run(test_orchestration())

## Self-Assessment Checklist

In [None]:
def run_a2a_checks():
    checks = []
    
    # Check 1: Protocol components
    try:
        msg = A2AMessage(sender='test', recipient='test')
        assert msg.protocol == 'a2a.v1'
        checks.append('✅ A2A message protocol defined')
    except:
        checks.append('❌ A2A message protocol not working')
    
    # Check 2: Bus and Registry
    try:
        test_bus = A2ABus()
        test_bus.registry.register_capability('test-agent', 'test.capability')
        # providers = test_bus.registry.find_providers('test.capability')
        checks.append('✅ Bus and Registry implemented')
    except:
        checks.append('⚠️ Bus and Registry incomplete')
    
    # Check 3: Agent classes
    try:
        test_bus = A2ABus()
        agent = BaseAgent('test', ['test.cap'], test_bus)
        checks.append('✅ Agent base class defined')
    except:
        checks.append('❌ Agent base class not working')
    
    print('\n'.join(checks))
    return all('✅' in c for c in checks if '⚠️' not in c)

all_passed = run_a2a_checks()
print(f"\nAll checks passed: {all_passed}")

## Conclusion

Excellent work! You've built a complete A2A protocol implementation that enables:

1. **Agent Discovery**: Automatic capability registration
2. **Structured Communication**: Type-safe message passing
3. **Orchestrated Workflows**: Coordinator-driven collaboration

### Key Takeaways:
- A2A enables truly autonomous multi-agent systems
- Discovery and capability advertisement are critical
- Message correlation enables complex workflows

### Combined with MCP:
- MCP provides structure within each agent
- A2A provides structure between agents
- Together, they enable enterprise-grade AI systems