# Semantic Kernel with Agent-to-Agent (A2A) Protocol using Azure OpenAI

This notebook demonstrates how to use Semantic Kernel with the A2A protocol to create a multi-agent travel planning system using Azure OpenAI via Azure AI Foundry. To setup your environment variables, you can follow the [Setup Lesson ](/00-course-setup/README.md)

## What You'll Build

A three-agent travel planning system:
1. **Currency Exchange Agent** - Handles currency conversion using real-time exchange rates
2. **Activity Planner Agent** - Plans activities and provides travel recommendations
3. **Travel Manager Agent** - Orchestrates the other agents to provide comprehensive travel assistance

## Installation

First, let's install the required dependencies:

## Import Required Libraries

In [2]:
import asyncio
import json
import logging
import os
import threading
import time
from typing import Any, Annotated, AsyncIterable, Literal
from enum import Enum

import httpx
import nest_asyncio
import uvicorn
from dotenv import load_dotenv
from pydantic import BaseModel

# A2A imports
# Add this to your imports at the top
from a2a.server.agent_execution import AgentExecutor
from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, BasePushNotificationSender
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH

# Semantic Kernel imports
from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
    OpenAIChatPromptExecutionSettings,
)
from semantic_kernel.contents import (
    FunctionCallContent,
    FunctionResultContent,
    StreamingTextContent,
)
from semantic_kernel.functions import KernelArguments, kernel_function

## Environment Configuration

Configure Azure OpenAI settings. Make sure you have the following environment variables set:
- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`
- `AZURE_OPENAI_ENDPOINT`
- `AZURE_OPENAI_API_KEY`

In [3]:
# Load environment variables
load_dotenv()

# Apply nest_asyncio for running async code in Jupyter
nest_asyncio.apply()

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
)
logger = logging.getLogger(__name__)

print('Environment configured successfully!')


Environment configured successfully!


## Define the Currency Plugin

This plugin provides real-time currency exchange rates using the Frankfurter API.

In [4]:
class CurrencyPlugin:
    """A currency plugin that leverages Frankfurter API for exchange rates."""

    @kernel_function(
        description='Retrieves exchange rate between currency_from and currency_to using Frankfurter API'
    )
    def get_exchange_rate(
        self,
        currency_from: Annotated[str, 'Currency code to convert from, e.g. USD'],
        currency_to: Annotated[str, 'Currency code to convert to, e.g. EUR or INR'],
        date: Annotated[str, "Date or 'latest'"] = 'latest',
    ) -> str:
        try:
            response = httpx.get(
                f'https://api.frankfurter.app/{date}',
                params={'from': currency_from, 'to': currency_to},
                timeout=10.0,
            )
            response.raise_for_status()
            data = response.json()
            if 'rates' not in data or currency_to not in data['rates']:
                return f'Could not retrieve rate for {currency_from} to {currency_to}'
            rate = data['rates'][currency_to]
            return f'1 {currency_from} = {rate} {currency_to}'
        except Exception as e:
            return f'Currency API call failed: {str(e)}'

print('✅ Currency Plugin defined')

✅ Currency Plugin defined


## Define Response Format

Structured response format for agent outputs.

In [5]:
class ResponseFormat(BaseModel):
    """A Response Format model to direct how the model should respond."""
    status: Literal['input_required', 'completed', 'error'] = 'input_required'
    message: str

print('✅ Response format defined')

✅ Response format defined


## Create the A2A Agent Executor

This wraps Semantic Kernel agents to work with the A2A protocol.

In [6]:
class SemanticKernelTravelAgentExecutor(AgentExecutor):
    """A2A Executor for Semantic Kernel Travel Agent."""

    def __init__(self):
        # Create Azure OpenAI service
        self.chat_service = AzureChatCompletion(
            deployment_name=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"),
            endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        )

        # Create Currency Exchange Agent
        self.currency_agent = ChatCompletionAgent(
            service=self.chat_service,
            name='CurrencyExchangeAgent',
            instructions=(
                'You specialize in handling currency-related requests from travelers. '
                'This includes providing current exchange rates, converting amounts between different currencies, '
                'explaining fees or charges related to currency exchange, and giving advice on the best practices for exchanging currency. '
                'Your goal is to assist travelers promptly and accurately with all currency-related questions.'
            ),
            plugins=[CurrencyPlugin()],
        )

        # Create Activity Planner Agent
        self.activity_agent = ChatCompletionAgent(
            service=self.chat_service,
            name='ActivityPlannerAgent',
            instructions=(
                'You specialize in planning and recommending activities for travelers. '
                'This includes suggesting sightseeing options, local events, dining recommendations, '
                'booking tickets for attractions, advising on travel itineraries, and ensuring activities '
                'align with traveler preferences and schedule. '
                'Your goal is to create enjoyable and personalized experiences for travelers.'
            ),
        )

        # Create the main Travel Manager Agent - simplified approach
        self.travel_agent = ChatCompletionAgent(
            service=self.chat_service,
            name='TravelManagerAgent',
            instructions=(
                "Your role is to carefully analyze the traveler's request and forward it to the appropriate agent based on the "
                'specific details of the query. '
                'Forward any requests involving monetary amounts, currency exchange rates, currency conversions, fees related '
                'to currency exchange, financial transactions, or payment methods to the CurrencyExchangeAgent. '
                'Forward requests related to planning activities, sightseeing recommendations, dining suggestions, event '
                'booking, itinerary creation, or any experiential aspects of travel that do not explicitly involve monetary '
                'transactions to the ActivityPlannerAgent. '
                'Your primary goal is precise and efficient delegation to ensure travelers receive accurate and specialized '
                'assistance promptly.'
            ),
            plugins=[self.currency_agent, self.activity_agent],
        )

        self.thread = None
        self.SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']

    async def execute(self, context, event_queue):
        """Execute method required by A2A framework."""
        try:
            # Import required A2A utilities
            from a2a.utils import new_agent_text_message, new_task, new_text_artifact
            from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatus, TaskStatusUpdateEvent

            # Get user input using the correct context method
            user_input = context.get_user_input()
            task = context.current_task

            if not task:
                task = new_task(context.message)
                await event_queue.enqueue_event(task)

            # Ensure thread exists
            session_id = task.context_id
            await self._ensure_thread_exists(session_id)

            # Process the request - no special response format, let it respond naturally
            response = await self.travel_agent.get_response(
                messages=user_input,
                thread=self.thread,
            )

            # Get the content directly as string
            content = response.content if isinstance(response.content, str) else str(response.content)

            # Send completion event with artifact
            await event_queue.enqueue_event(
                TaskArtifactUpdateEvent(
                    append=False,
                    context_id=task.context_id,
                    task_id=task.id,
                    last_chunk=True,
                    artifact=new_text_artifact(
                        name='travel_result',
                        description='Travel planning result',
                        text=content,
                    ),
                )
            )
            
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(state=TaskState.completed),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

        except Exception as e:
            logger.error(f"Error in SemanticKernelTravelAgentExecutor.execute: {str(e)}")
            # Send error status
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(
                        state=TaskState.input_required,
                        message=new_agent_text_message(
                            f"Error processing request: {str(e)}",
                            task.context_id,
                            task.id,
                        ),
                    ),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

    async def cancel(self, context, event_queue):
        """Cancel method - not supported for this agent."""
        raise Exception('cancel not supported')

    async def _ensure_thread_exists(self, session_id: str) -> None:
        """Ensure thread exists for the session."""
        if self.thread is None or self.thread.id != session_id:
            if self.thread:
                await self.thread.delete()
            self.thread = ChatHistoryAgentThread(thread_id=session_id)


print('✅ Travel Manager Agent Executor simplified - removed JSON formatting constraints')

✅ Travel Manager Agent Executor simplified - removed JSON formatting constraints


## Create Individual A2A Agents

Now we'll create A2A wrappers for each specialized agent.

In [7]:
class CurrencyAgentExecutor(AgentExecutor):
    """A2A Executor for Currency Exchange Agent."""

    def __init__(self):
        self.chat_service = AzureChatCompletion(
            deployment_name=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"),
            endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        )

        self.agent = ChatCompletionAgent(
            service=self.chat_service,
            name='CurrencyExchangeAgent',
            instructions=(
                'You are a currency exchange specialist. Provide accurate exchange rates and currency conversion information. '
                'Use the get_exchange_rate function to get real-time rates. '
                'Always provide clear, concise information about currency conversions.'
            ),
            plugins=[CurrencyPlugin()],
        )
        self.thread = None
        self.SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']

    async def execute(self, context, event_queue):
        """Execute method required by A2A framework."""
        try:
            # Import required A2A utilities
            from a2a.utils import new_agent_text_message, new_task, new_text_artifact
            from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatus, TaskStatusUpdateEvent

            # Get user input using the correct context method
            user_input = context.get_user_input()
            task = context.current_task
            
            if not task:
                task = new_task(context.message)
                await event_queue.enqueue_event(task)

            # Ensure thread exists
            session_id = task.context_id
            if self.thread is None or self.thread.id != session_id:
                if self.thread:
                    await self.thread.delete()
                self.thread = ChatHistoryAgentThread(thread_id=session_id)

            # Process the request
            response = await self.agent.get_response(messages=user_input, thread=self.thread)
            content = response.content if isinstance(response.content, str) else str(response.content)

            # Send completion event with artifact
            await event_queue.enqueue_event(
                TaskArtifactUpdateEvent(
                    append=False,
                    context_id=task.context_id,
                    task_id=task.id,
                    last_chunk=True,
                    artifact=new_text_artifact(
                        name='currency_result',
                        description='Currency exchange information',
                        text=content,
                    ),
                )
            )
            
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(state=TaskState.completed),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

        except Exception as e:
            logger.error(f"Error in CurrencyAgentExecutor.execute: {str(e)}")
            # Send error status
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(
                        state=TaskState.input_required,
                        message=new_agent_text_message(
                            f"Error processing request: {str(e)}",
                            task.context_id,
                            task.id,
                        ),
                    ),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

    async def cancel(self, context, event_queue):
        """Cancel method - not supported for this simple agent."""
        raise Exception('cancel not supported')

# Updated Activity Planner Agent Executor
class ActivityAgentExecutor(AgentExecutor):
    """A2A Executor for Activity Planner Agent."""

    def __init__(self):
        self.chat_service = AzureChatCompletion(
            deployment_name=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"),
            endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        )

        self.agent = ChatCompletionAgent(
            service=self.chat_service,
            name='ActivityPlannerAgent',
            instructions=(
                'You are a travel activity planning specialist. Create detailed, personalized activity recommendations. '
                'Include specific times, locations, and practical tips. '
                'Consider budget, preferences, and local culture in your suggestions.'
            ),
        )
        self.thread = None
        self.SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']

    async def execute(self, context, event_queue):
        """Execute method required by A2A framework."""
        try:
            # Import required A2A utilities
            from a2a.utils import new_agent_text_message, new_task, new_text_artifact
            from a2a.types import TaskArtifactUpdateEvent, TaskState, TaskStatus, TaskStatusUpdateEvent

            # Get user input using the correct context method
            user_input = context.get_user_input()
            task = context.current_task
            
            if not task:
                task = new_task(context.message)
                await event_queue.enqueue_event(task)

            # Ensure thread exists
            session_id = task.context_id
            if self.thread is None or self.thread.id != session_id:
                if self.thread:
                    await self.thread.delete()
                self.thread = ChatHistoryAgentThread(thread_id=session_id)

            # Process the request
            response = await self.agent.get_response(messages=user_input, thread=self.thread)
            content = response.content if isinstance(response.content, str) else str(response.content)

            # Send completion event with artifact
            await event_queue.enqueue_event(
                TaskArtifactUpdateEvent(
                    append=False,
                    context_id=task.context_id,
                    task_id=task.id,
                    last_chunk=True,
                    artifact=new_text_artifact(
                        name='activity_result',
                        description='Activity planning recommendations',
                        text=content,
                    ),
                )
            )
            
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(state=TaskState.completed),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

        except Exception as e:
            logger.error(f"Error in ActivityAgentExecutor.execute: {str(e)}")
            # Send error status
            await event_queue.enqueue_event(
                TaskStatusUpdateEvent(
                    status=TaskStatus(
                        state=TaskState.input_required,
                        message=new_agent_text_message(
                            f"Error processing request: {str(e)}",
                            task.context_id,
                            task.id,
                        ),
                    ),
                    final=True,
                    context_id=task.context_id,
                    task_id=task.id,
                )
            )

    async def cancel(self, context, event_queue):
        """Cancel method - not supported for this simple agent."""
        raise Exception('cancel not supported')

## Define Agent Cards

Agent cards describe the capabilities of each agent for A2A discovery.

In [8]:
# Currency Exchange Agent Card
currency_agent_card = AgentCard(
    name='Currency Exchange Agent',
    url='http://localhost:10020',
    description='Provides real-time currency exchange rates and conversion services',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='currency_exchange',
            name='Currency Exchange',
            description='Get exchange rates and convert between currencies',
            tags=['currency', 'exchange', 'conversion', 'forex'],
            examples=[
                'What is the exchange rate from USD to EUR?',
                'Convert 1000 USD to JPY',
                'How much is 500 EUR in GBP?',
            ],
        )
    ],
)

# Activity Planner Agent Card
activity_agent_card = AgentCard(
    name='Activity Planner Agent',
    url='http://localhost:10021',
    description='Plans activities and provides travel recommendations',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='activity_planning',
            name='Activity Planning',
            description='Create personalized travel itineraries and activity recommendations',
            tags=['travel', 'activities', 'itinerary', 'recommendations'],
            examples=[
                'Plan a day trip in Paris',
                'Recommend restaurants in Tokyo',
                'What are the must-see attractions in Rome?',
            ],
        )
    ],
)

# Travel Manager Agent Card (Main orchestrator)
travel_manager_card = AgentCard(
    name='SK Travel Manager',
    url='http://localhost:10022',
    description='Comprehensive travel planning agent that orchestrates currency and activity services',
    version='1.0',
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=['text/plain'],
    default_output_modes=['text/plain'],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id='comprehensive_travel_planning',
            name='Comprehensive Travel Planning',
            description='Handles all aspects of travel planning including currency and activities',
            tags=['travel', 'planning', 'currency', 'activities', 'orchestration'],
            examples=[
                'Plan a budget-friendly trip to Seoul with currency exchange info',
                'I need help with my Tokyo trip including money exchange and activities',
                'What should I do in London and how much money should I exchange?',
            ],
        )
    ],
)

print('✅ Agent cards defined')

✅ Agent cards defined


## Create A2A Server Helper Function

This function creates an A2A (Agent-to-Agent) protocol server by setting up HTTP communication infrastructure, configuring a request handler with task management and push notifications, wrapping everything in a Starlette web application, and returning the runnable server instance. 

A2AStarletteApplication is a web application wrapper built on the Starlette ASGI framework that implements the A2A protocol standard, allowing AI agents to communicate with each other over HTTP using standardized message formats and discovery mechanisms.

In [9]:
def create_a2a_server(agent_executor, agent_card):
    """Create an A2A server for any agent executor."""
    httpx_client = httpx.AsyncClient()
    push_config_store = InMemoryPushNotificationConfigStore()

    request_handler = DefaultRequestHandler(
        agent_executor=agent_executor,
        task_store=InMemoryTaskStore(),
        push_config_store=push_config_store,
        push_sender=BasePushNotificationSender(
            httpx_client, push_config_store),
    )

    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=request_handler
    )

    # Return the actual Starlette app
    # Check if we need to build or get the app
    if hasattr(app, 'build'):
        return app.build()
    elif hasattr(app, 'app'):
        return app.app
    else:
        return app


print('✅ A2A server helper function created')

✅ A2A server helper function created


## Start All A2A Servers

We'll run all three agents as separate A2A servers using uvicorn.

In [None]:

async def run_agent_server(agent_executor, agent_card, port):
    """Run a single agent server with proper error handling."""
    try:
        app = create_a2a_server(agent_executor, agent_card)

        config = uvicorn.Config(
            app,
            host='127.0.0.1',
            port=port,
            log_level='info',
            loop='none',
            timeout_keep_alive=30,
            limit_concurrency=100,
        )

        server = uvicorn.Server(config)
        await server.serve()
    except Exception as e:
        logger.error(f"Error starting server on port {port}: {str(e)}")
        raise


# ...existing code...

# Global variable to track running servers
running_servers = []


async def start_all_servers_background():
    """Start all servers in background tasks."""
    global running_servers

    try:
        # Create agent executors
        currency_executor = CurrencyAgentExecutor()
        activity_executor = ActivityAgentExecutor()
        travel_executor = SemanticKernelTravelAgentExecutor()

        # Create tasks for all servers
        tasks = [
            asyncio.create_task(run_agent_server(
                currency_executor, currency_agent_card, 10020)),
            asyncio.create_task(run_agent_server(
                activity_executor, activity_agent_card, 10021)),
            asyncio.create_task(run_agent_server(
                travel_executor, travel_manager_card, 10022)),
        ]

        running_servers = tasks

        # Give servers time to start
        await asyncio.sleep(3)

        print('✅ All A2A agent servers started in background!')
        print('   - Currency Exchange Agent: http://127.0.0.1:10020')
        print('   - Activity Planner Agent: http://127.0.0.1:10021')
        print('   - Travel Manager Agent: http://127.0.0.1:10022')

        # Don't await the tasks here - let them run in background
        return tasks

    except Exception as e:
        logger.error(f"Error in start_all_servers: {str(e)}")
        print(f"Failed to start servers: {str(e)}")
        raise

# Start the servers in background
server_tasks = await start_all_servers_background()

INFO:     Started server process [39134]
INFO:     Waiting for application startup.
INFO:     Started server process [39134]
INFO:     Waiting for application startup.
INFO:     Started server process [39134]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Application startup complete.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:10020 (Press CTRL+C to quit)
INFO:     Uvicorn running on http://127.0.0.1:10021 (Press CTRL+C to quit)
INFO:     Uvicorn running on http://127.0.0.1:10022 (Press CTRL+C to quit)


✅ All A2A agent servers started in background!
   - Currency Exchange Agent: http://127.0.0.1:10020
   - Activity Planner Agent: http://127.0.0.1:10021
   - Travel Manager Agent: http://127.0.0.1:10022


INFO:     127.0.0.1:58921 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58923 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58925 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58933 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58933 - "POST / HTTP/1.1" 200 OK


2025-09-08 17:33:07,277 - INFO - httpx - HTTP Request: POST https://da-dev-resource.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2024-10-21 "HTTP/1.1 200 OK"
2025-09-08 17:33:07,301 - INFO - semantic_kernel.connectors.ai.open_ai.services.open_ai_handler - OpenAI usage: CompletionUsage(completion_tokens=63, prompt_tokens=149, total_tokens=212, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))
2025-09-08 17:33:07,306 - INFO - semantic_kernel.connectors.ai.chat_completion_client_base - processing 2 tool calls in parallel.
2025-09-08 17:33:07,308 - INFO - semantic_kernel.kernel - Calling CurrencyPlugin-get_exchange_rate function with args: {"currency_from": "USD", "currency_to": "EUR"}
2025-09-08 17:33:07,318 - INFO - semantic_kernel.functions.kernel_function - Function CurrencyPlugin-get

INFO:     127.0.0.1:58962 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58962 - "POST / HTTP/1.1" 200 OK


2025-09-08 17:33:20,674 - INFO - httpx - HTTP Request: POST https://da-dev-resource.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2024-10-21 "HTTP/1.1 200 OK"
2025-09-08 17:33:20,691 - INFO - semantic_kernel.connectors.ai.open_ai.services.open_ai_handler - OpenAI usage: CompletionUsage(completion_tokens=917, prompt_tokens=63, total_tokens=980, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))


INFO:     127.0.0.1:58984 - "GET /.well-known/agent-card.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:58984 - "POST / HTTP/1.1" 200 OK


2025-09-08 17:33:30,025 - INFO - httpx - HTTP Request: POST https://da-dev-resource.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2024-10-21 "HTTP/1.1 200 OK"
2025-09-08 17:33:30,030 - INFO - semantic_kernel.connectors.ai.open_ai.services.open_ai_handler - OpenAI usage: CompletionUsage(completion_tokens=89, prompt_tokens=369, total_tokens=458, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))
2025-09-08 17:33:30,031 - INFO - semantic_kernel.connectors.ai.chat_completion_client_base - processing 2 tool calls in parallel.
2025-09-08 17:33:30,032 - INFO - semantic_kernel.kernel - Calling CurrencyExchangeAgent-CurrencyExchangeAgent function with args: {"messages": "What is the current exchange rate from USD to JPY?"}
2025-09-08 17:33:30,033 - INFO - semantic_kernel.functions.kernel_functio

INFO:     127.0.0.1:59005 - "POST / HTTP/1.1" 200 OK


2025-09-08 17:33:50,920 - INFO - httpx - HTTP Request: POST https://da-dev-resource.openai.azure.com/openai/deployments/gpt-4o-mini/chat/completions?api-version=2024-10-21 "HTTP/1.1 200 OK"
2025-09-08 17:33:50,936 - INFO - semantic_kernel.connectors.ai.open_ai.services.open_ai_handler - OpenAI usage: CompletionUsage(completion_tokens=75, prompt_tokens=360, total_tokens=435, completion_tokens_details=CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), prompt_tokens_details=PromptTokensDetails(audio_tokens=0, cached_tokens=0))
2025-09-08 17:33:50,947 - INFO - semantic_kernel.connectors.ai.chat_completion_client_base - processing 2 tool calls in parallel.
2025-09-08 17:33:50,948 - INFO - semantic_kernel.kernel - Calling CurrencyExchangeAgent-CurrencyExchangeAgent function with args: {"messages": "What is the current exchange rate for EUR to GBP?"}
2025-09-08 17:33:50,950 - INFO - semantic_kernel.functions.kernel_function

In [11]:
# Add this cell after starting the servers but before testing
async def verify_servers():
    """Verify that all A2A servers are running and accessible."""
    import httpx

    servers = [
        ('Currency Exchange Agent', 'http://localhost:10020'),
        ('Activity Planner Agent', 'http://localhost:10021'),
        ('Travel Manager Agent', 'http://localhost:10022'),
    ]

    print("🔍 Verifying A2A servers...")
    print("="*50)

    async with httpx.AsyncClient() as client:
        for name, url in servers:
            try:
                response = await client.get(f"{url}{AGENT_CARD_WELL_KNOWN_PATH}", timeout=5.0)
                if response.status_code == 200:
                    print(f"✅ {name} is running at {url}")
                else:
                    print(f"⚠️ {name} returned status {response.status_code}")
            except Exception as e:
                print(f"❌ {name} is not accessible: {str(e)}")

    print("="*50)

# Run server verification
await verify_servers()



🔍 Verifying A2A servers...


2025-09-08 17:32:55,244 - INFO - httpx - HTTP Request: GET http://localhost:10020/.well-known/agent-card.json "HTTP/1.1 200 OK"


✅ Currency Exchange Agent is running at http://localhost:10020


2025-09-08 17:32:55,255 - INFO - httpx - HTTP Request: GET http://localhost:10021/.well-known/agent-card.json "HTTP/1.1 200 OK"


✅ Activity Planner Agent is running at http://localhost:10021


2025-09-08 17:32:55,264 - INFO - httpx - HTTP Request: GET http://localhost:10022/.well-known/agent-card.json "HTTP/1.1 200 OK"


✅ Travel Manager Agent is running at http://localhost:10022


## Create A2A Client

Now let's create a client to interact with our A2A agents.

In [12]:
class A2AClient:
    """Simple A2A client to interact with A2A servers."""
    
    def __init__(self, default_timeout: float = 60.0):
        self._agent_info_cache = {}
        self.default_timeout = default_timeout
    
    async def send_message(self, agent_url: str, message: str) -> str:
        """Send a message to an A2A agent."""
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout,
            connect=10.0,
            read=self.default_timeout,
            write=10.0,
            pool=5.0,
        )
        
        async with httpx.AsyncClient(timeout=timeout_config) as httpx_client:
            # Fetch agent card if not cached
            if agent_url not in self._agent_info_cache:
                agent_card_response = await httpx_client.get(
                    f'{agent_url}{AGENT_CARD_WELL_KNOWN_PATH}'
                )
                self._agent_info_cache[agent_url] = agent_card_response.json()
            
            agent_card_data = self._agent_info_cache[agent_url]
            agent_card = AgentCard(**agent_card_data)
            
            # Create A2A client
            config = ClientConfig(
                httpx_client=httpx_client,
                supported_transports=[
                    TransportProtocol.jsonrpc,
                    TransportProtocol.http_json,
                ],
                use_client_preference=True,
            )
            
            factory = ClientFactory(config)
            client = factory.create(agent_card)
            
            # Create and send message
            message_obj = create_text_message_object(content=message)
            
            responses = []
            async for response in client.send_message(message_obj):
                responses.append(response)
            
            # Enhanced response parsing
            if responses:
                try:
                    # The response should be a tuple (task, any_additional_data)
                    for response_item in responses:
                        if isinstance(response_item, tuple) and len(response_item) > 0:
                            task = response_item[0]
                            
                            # Try multiple ways to extract the response text
                            if hasattr(task, 'artifacts') and task.artifacts:
                                for artifact in task.artifacts:
                                    if hasattr(artifact, 'parts') and artifact.parts:
                                        for part in artifact.parts:
                                            if hasattr(part, 'root') and hasattr(part.root, 'text'):
                                                return part.root.text
                                            elif hasattr(part, 'text'):
                                                return part.text
                                            elif hasattr(part, 'content'):
                                                return str(part.content)
                                    elif hasattr(artifact, 'text'):
                                        return artifact.text
                                    elif hasattr(artifact, 'content'):
                                        return str(artifact.content)
                            
                            # If artifacts don't work, try direct task properties
                            elif hasattr(task, 'text'):
                                return task.text
                            elif hasattr(task, 'content'):
                                return str(task.content)
                            elif hasattr(task, 'result'):
                                return str(task.result)
                            else:
                                # Debug: print the task structure
                                print(f"Debug - Task type: {type(task)}")
                                print(f"Debug - Task attributes: {dir(task)}")
                                if hasattr(task, '__dict__'):
                                    print(f"Debug - Task dict: {task.__dict__}")
                                return f"Received response but couldn't parse content. Task: {str(task)}"
                        else:
                            # Handle direct response objects
                            response_text = str(response_item)
                            if response_text and response_text != "None":
                                return response_text
                    
                    return f"Received {len(responses)} responses but couldn't extract text content"
                    
                except Exception as e:
                    return f"Error parsing response: {str(e)}. Raw responses: {str(responses)}"
            
            return 'No response received'

# Create client instance
a2a_client = A2AClient()
print('✅ A2A client updated with enhanced response parsing')

✅ A2A client updated with enhanced response parsing


## Test Individual Agents

Let's test each agent individually to see how they work.

In [13]:
# Test Currency Exchange Agent
async def test_currency_agent():
    """Test the currency exchange agent."""
    print("\n🔍 Testing Currency Exchange Agent")
    print("="*50)
    
    response = await a2a_client.send_message(
        'http://localhost:10020',
        'What is the exchange rate from USD to EUR and JPY?'
    )
    
    print("User: What is the exchange rate from USD to EUR and JPY?")
    print("\nCurrency Agent Response:")
    print(response)

await test_currency_agent()


🔍 Testing Currency Exchange Agent


2025-09-08 17:33:05,932 - INFO - httpx - HTTP Request: GET http://localhost:10020/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-09-08 17:33:06,081 - INFO - httpx - HTTP Request: POST http://localhost:10020 "HTTP/1.1 200 OK"
2025-09-08 17:33:06,088 - INFO - a2a.client.client_task_manager - New task created with id: 17d6125c-4a53-4f73-b3c4-f72bf9df6d09


User: What is the exchange rate from USD to EUR and JPY?

Currency Agent Response:
The current exchange rates are as follows:
- 1 USD = 0.85266 EUR
- 1 USD = 147.84 JPY

If you need further assistance with currency conversions, feel free to ask!


In [14]:
# Test Activity Planner Agent
async def test_activity_agent():
    """Test the activity planner agent."""
    print("\n🔍 Testing Activity Planner Agent")
    print("="*50)
    
    response = await a2a_client.send_message(
        'http://localhost:10021',
        'Plan a one-day itinerary for Paris including must-see attractions'
    )
    
    print("User: Plan a one-day itinerary for Paris including must-see attractions")
    print("\nActivity Agent Response:")
    print(response)

await test_activity_agent()


🔍 Testing Activity Planner Agent


2025-09-08 17:33:12,574 - INFO - httpx - HTTP Request: GET http://localhost:10021/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-09-08 17:33:12,658 - INFO - httpx - HTTP Request: POST http://localhost:10021 "HTTP/1.1 200 OK"
2025-09-08 17:33:12,661 - INFO - a2a.client.client_task_manager - New task created with id: a08f345a-ac4e-4017-9a0c-352dbab6541b


User: Plan a one-day itinerary for Paris including must-see attractions

Activity Agent Response:
Sure! Here’s a one-day itinerary for Paris that includes popular attractions while allowing you to soak in the city’s culture. This itinerary is designed to be budget-friendly, while still giving you a taste of the local culture.

### Paris One-Day Itinerary

**Morning:**

#### 8:00 AM - Breakfast at a Local Café
- **Location:** Café de Flore (172 Boulevard Saint-Germain)
- **Recommendation:** Try a classic French breakfast with croissants or pain au chocolat and a café au lait. Budget around €10-€15. Enjoy the charming ambiance in a famous historic café.

#### 9:00 AM - Visit the Notre-Dame Cathedral
- **Location:** Île de la Cité
- **Tips:** While the interior is currently closed due to restoration, you can admire the exterior architecture and the surroundings. Take a stroll around the Île de la Cité.
- **Budget:** Free

#### 10:00 AM - Sainte-Chapelle
- **Location:** 8 Boulevard du Pala

## Test the Travel Manager (Orchestrator)

Now let's test the main Travel Manager agent that orchestrates the other agents.

In [15]:
# Test Travel Manager with comprehensive request
async def test_travel_manager():
    """Test the travel manager orchestrating multiple agents."""
    print("\n🔍 Testing Travel Manager Agent (Orchestrator)")
    print("="*50)
    
    response = await a2a_client.send_message(
        'http://localhost:10022',
        'I am planning a trip to Tokyo. I have 1000 USD to exchange. What is the current exchange rate to JPY and what activities do you recommend for a 2-day visit?'
    )
    
    print("User: I am planning a trip to Tokyo. I have 1000 USD to exchange.")
    print("      What is the current exchange rate to JPY and what activities")
    print("      do you recommend for a 2-day visit?")
    print("\nTravel Manager Response:")
    print(response)

await test_travel_manager()


🔍 Testing Travel Manager Agent (Orchestrator)


2025-09-08 17:33:28,422 - INFO - httpx - HTTP Request: GET http://localhost:10022/.well-known/agent-card.json "HTTP/1.1 200 OK"
2025-09-08 17:33:28,439 - INFO - httpx - HTTP Request: POST http://localhost:10022 "HTTP/1.1 200 OK"
2025-09-08 17:33:28,442 - INFO - a2a.client.client_task_manager - New task created with id: 4ca95452-c27d-430e-a02e-ccdbc42af5c6


User: I am planning a trip to Tokyo. I have 1000 USD to exchange.
      What is the current exchange rate to JPY and what activities
      do you recommend for a 2-day visit?

Travel Manager Response:
The current exchange rate from USD to JPY is 1 USD = 147.84 JPY.

### Recommended 2-Day Itinerary for Tokyo:

#### Day 1: Exploring Traditional Tokyo

**Morning:**
- **Asakusa and Senso-ji Temple:** Start your day at Senso-ji, Tokyo's oldest temple. Don't miss Nakamise Shopping Street for local snacks like melon bread and ningyo-yaki (small cakes with red bean paste).

**Lunch:**
- **Asakusa Kagetsudo:** Enjoy tempura or soba noodles nearby, renowned for its delightful melon bread.

**Afternoon:**
- **Ueno Park:** Visit museums like the Tokyo National Museum or simply enjoy a stroll in the park with beautiful scenery and possible street performances.
- **Ameyoko Street:** Head to this bustling market street for souvenirs and snacks.

**Dinner:**
- **Nid Café:** A themed café in Ameyoko kn

## Interactive Testing

Try your own travel planning queries!

In [16]:
async def interactive_test():
    """Interactive testing function."""
    print("\n🎯 Interactive Travel Planning Assistant")
    print("="*50)
    print("Available agents:")
    print("1. Currency Exchange Agent (port 10020) - Currency conversions")
    print("2. Activity Planner Agent (port 10021) - Travel recommendations")
    print("3. Travel Manager Agent (port 10022) - Comprehensive planning")
    print("\nExample queries:")
    print("- 'Convert 500 EUR to GBP'")
    print("- 'Plan a romantic dinner in Rome'")
    print("- 'I need help planning a budget trip to Seoul with 2000 USD'")
    
    # You can modify this query to test different scenarios
    user_query = "I'm visiting London next week with a budget of 1500 EUR. What's the exchange rate to GBP and what are the top attractions I should visit?"
    
    print(f"\nYour query: {user_query}")
    print("\nProcessing with Travel Manager...")
    
    response = await a2a_client.send_message(
        'http://localhost:10022',
        user_query
    )
    
    print("\nResponse:")
    print(response)

await interactive_test()


🎯 Interactive Travel Planning Assistant
Available agents:
1. Currency Exchange Agent (port 10020) - Currency conversions
2. Activity Planner Agent (port 10021) - Travel recommendations
3. Travel Manager Agent (port 10022) - Comprehensive planning

Example queries:
- 'Convert 500 EUR to GBP'
- 'Plan a romantic dinner in Rome'
- 'I need help planning a budget trip to Seoul with 2000 USD'

Your query: I'm visiting London next week with a budget of 1500 EUR. What's the exchange rate to GBP and what are the top attractions I should visit?

Processing with Travel Manager...


2025-09-08 17:33:49,384 - INFO - httpx - HTTP Request: POST http://localhost:10022 "HTTP/1.1 200 OK"
2025-09-08 17:33:49,385 - INFO - a2a.client.client_task_manager - New task created with id: 57e16173-9147-4de8-bfd6-ad9f628624f5



Response:
The current exchange rate for 1 EUR is **0.8669 GBP**.

As for top attractions in London, here are some recommendations:

1. **The British Museum** - Explore a vast collection of world art and artifacts, including the Rosetta Stone and the Elgin Marbles. Admission is free.

2. **The Tower of London** - A historic castle and former royal palace, known for housing the Crown Jewels and its fascinating history.

3. **Buckingham Palace** - The official residence of the British monarch. Don't miss the Changing of the Guard ceremony.

4. **The London Eye** - A massive Ferris wheel on the South Bank of the River Thames, offering stunning views of the city skyline.

5. **The Houses of Parliament and Big Ben** - Iconic symbols of London, located on the banks of the Thames.

6. **The Tate Modern** - A modern art gallery housed in a former power station. Admission is free, although special exhibitions may have a fee.

7. **Westminster Abbey** - A stunning Gothic church famous for royal 

## Summary

Congratulations! You've successfully built a multi-agent travel planning system using:

### Technologies Used:
- **Semantic Kernel** - For building intelligent agents
- **Azure OpenAI** - For LLM capabilities
- **A2A Protocol** - For standardized agent communication
- **Uvicorn** - For running local A2A servers

### What You've Learned:
1. **Agent Creation**: Building specialized agents with Semantic Kernel
2. **A2A Integration**: Wrapping SK agents for A2A protocol compatibility
3. **Agent Orchestration**: Using a manager agent to coordinate multiple specialists
4. **Real-time Services**: Integrating external APIs (Frankfurter for currency rates)
5. **Local Deployment**: Running multiple A2A servers in a single notebook

### Next Steps:
- Deploy agents to Azure Container Instances or Azure Functions
- Add more specialized agents (flight booking, hotel recommendations)
- Implement agent memory for context-aware conversations
- Add authentication and security for production deployment
- Create a web interface for the travel planning system

### Key Advantages of This Architecture:
- **Modularity**: Each agent can be developed and deployed independently
- **Scalability**: Agents can be scaled based on demand
- **Reusability**: Agents can be reused in different applications
- **Interoperability**: A2A protocol allows integration with agents from different frameworks