# Agent-to-Agent (A2A) Communication for LumosTrade Agents

This notebook implements Google's Agent-to-Agent (A2A) protocol to enable communication between the Athena market intelligence agent and the Apollo trading signals agent in the LumosTrade system.

## 1. Setup and Installation

First, let's install the required packages for A2A integration:

In [7]:
!pip install google-adk==1.9.0 a2a-sdk==0.3.0 uvicorn



Now, let's import the necessary modules for both the Athena and Apollo agents, as well as the A2A components:

In [10]:
import asyncio
import logging
import os
import sys
import threading
import time

from typing import Any

import httpx
import nest_asyncio
import uvicorn

from a2a.client import ClientConfig, ClientFactory, create_text_message_object
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    TransportProtocol,
)
from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from dotenv import load_dotenv
from google.adk.a2a.executor.a2a_agent_executor import (
    A2aAgentExecutor,
    A2aAgentExecutorConfig,
)
from google.adk.agents import Agent, SequentialAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search

## 2. Import Lumos Agents

Let's import the necessary components from Athena and Apollo agents:

In [None]:
# Import Athena agent components
from src.agents.athena_workspace.athena import AthenaAgent
from src.agents.athena_workspace.tools.feature_extraction import FeatureExtractor
from src.agents.athena_workspace.tools.pattern_detection import PatternDetector
from src.agents.athena_workspace.tools.regime_detection import RegimeDetector

# Import Apollo agent components
from src.agents.apollo_workspace.apollo import ApolloAgent
from src.agents.apollo_workspace.tools.signal_generator import SignalGenerator
from src.agents.apollo_workspace.models import Signal

# Import LLM client for agents
from src.llm.client import GeminiClient

# Import settings
from config.settings import settings

## 3. Define Agent Cards

We'll now define the agent cards for Athena and Apollo agents. Agent cards specify capabilities, descriptions, and functions that the agents expose to the A2A protocol.

In [13]:
athena_agent_card = AgentCard(
    name="Athena Agent",
    url="http://localhost:10010",
    description=(
        "Athena is a market intelligence agent that analyzes real-time and historical "
        "market data to detect trends, technical patterns, and market regimes."
    ),
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["application/json"],
    default_output_modes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="analyze_market",
            name="Analyze Market",
            description=(
                "Analyze a market symbol and generate structured insights about price action, "
                "volatility, and sentiment."
            ),
            tags=["market", "analysis", "insights", "technical"],
            examples=[
                "Analyze BTC/USD on 1-minute candles.",
                "Show me AAPL's market overview for today.",
            ],
        ),
        AgentSkill(
            id="get_market_regime",
            name="Get Market Regime",
            description="Determine the current market regime (e.g., trending, ranging, volatile).",
            tags=["regime", "classification", "trend"],
            examples=[
                "What's the current regime for BTC/USD?",
                "Get market regime for ETH/USD.",
            ],
        ),
        AgentSkill(
            id="detect_patterns",
            name="Detect Market Patterns",
            description=(
                "Identify technical chart patterns such as head and shoulders, triangles, or reversals."
            ),
            tags=["pattern", "technical", "chart", "detection"],
            examples=[
                "Detect chart patterns in AAPL.",
                "Find technical setups in NIFTY50.",
            ],
        ),
    ],
)


In [14]:
apollo_agent_card = AgentCard(
    name="Apollo Agent",
    url="http://localhost:10011",
    description=(
        "Apollo generates and validates trading signals. "
        "It consumes market insights from Athena to produce trade setups, "
        "compute probabilities, and assess performance."
    ),
    version="1.0",
    capabilities=AgentCapabilities(streaming=True),
    default_input_modes=["application/json"],
    default_output_modes=["application/json"],
    preferred_transport=TransportProtocol.jsonrpc,
    skills=[
        AgentSkill(
            id="generate_signals",
            name="Generate Trading Signals",
            description=(
                "Generate actionable buy/sell signals for a given symbol with confidence scores "
                "and recommended targets."
            ),
            tags=["signal", "trading", "generation", "forecast"],
            examples=[
                "Generate a trading signal for BTC/USD.",
                "Suggest trades for AAPL.",
            ],
        ),
        AgentSkill(
            id="validate_signal",
            name="Validate Signal",
            description="Validate a previously generated trading signal using backtesting data.",
            tags=["signal", "validation", "backtest"],
            examples=[
                "Validate signal with ID sig_12345.",
            ],
        ),
        AgentSkill(
            id="get_signal_probabilities",
            name="Get Signal Probabilities",
            description=(
                "Compute probability metrics such as win rate, risk-reward ratio, and confidence interval."
            ),
            tags=["signal", "probability", "metrics", "statistics"],
            examples=[
                "Get probabilities for signal sig_12345.",
            ],
        ),
    ],
)


## 4. Initialize Agent Instances

Next, we'll initialize instances of both the Athena and Apollo agents that will be used to implement the functions defined in the agent cards.

In [16]:
# Initialize LLM client (used by both agents)
llm_client = GeminiClient()

# Initialize Athena agent
athena_agent = AthenaAgent(
    gemini_api_key=settings.gemini_api_key,
    use_memory=True,
    use_redis=False,  # Use in-memory storage for demo purposes
    cache_dir="./data"  # Cache directory for market data
)

# Initialize Apollo agent
apollo_agent = ApolloAgent(
    llm_client=llm_client,
    use_redis=False  # Use in-memory storage for demo purposes
)

# Initialize the agents
async def initialize_agents():
    await athena_agent.initialize()
    print("✅ Athena agent initialized")
    
    # For Apollo, we don't need explicit initialization
    print("✅ Apollo agent ready")
    
    return athena_agent, apollo_agent

# Run the initialization
athena, apollo = await initialize_agents()

✅ Athena agent initialized
✅ Apollo agent ready


## 5. Implement A2A Function Handlers

Now we'll implement the function handlers that connect the A2A protocol functions with our agent implementations. These handlers will be called when requests come through the A2A protocol.

In [17]:
# Define Athena's function handlers
async def athena_analyze_market(params: dict) -> dict:
    symbol = params.get("symbol", "R_10")
    interval = params.get("interval", 60)
    
    print(f"Athena analyzing market: {symbol}, interval: {interval}")
    
    # Call Athena's observe method
    market_context = await athena.observe(symbol=symbol, interval=interval)
    
    # Return results in a structured format
    return {
        "symbol": symbol,
        "timestamp": market_context.get("timestamp", ""),
        "regime": market_context.get("regime", "unknown"),
        "regime_confidence": market_context.get("regime_confidence", 0),
        "trading_bias": market_context.get("trading_bias", "neutral"),
        "patterns": market_context.get("patterns", []),
        "summary": market_context.get("summary", "No summary available"),
        "trade_ideas": market_context.get("trade_ideas", [])
    }

async def athena_get_market_regime(params: dict) -> dict:
    symbol = params.get("symbol", "BTC/USD")
    
    print(f"Athena getting market regime for: {symbol}")
    
    # Call Athena's observe method and extract just the regime information
    market_context = await athena.observe(symbol=symbol)
    
    return {
        "symbol": symbol,
        "regime": market_context.get("regime", "unknown"),
        "regime_confidence": market_context.get("regime_confidence", 0),
        "description": f"The current market regime for {symbol} is {market_context.get('regime', 'unknown')} with {market_context.get('regime_confidence', 0)*100:.1f}% confidence"
    }

async def athena_detect_patterns(params: dict) -> dict:
    symbol = params.get("symbol", "BTC/USD")
    
    print(f"Athena detecting patterns for: {symbol}")
    
    # Call Athena's observe method and extract just the pattern information
    market_context = await athena.observe(symbol=symbol)
    
    return {
        "symbol": symbol,
        "patterns": market_context.get("patterns", []),
        "trading_bias": market_context.get("trading_bias", "neutral")
    }

# Define Apollo's function handlers
async def apollo_generate_signals(params: dict) -> dict:
    symbol = params.get("symbol", "BTC/USD")
    
    print(f"Apollo generating signals for: {symbol}")
    
    # First, get the market context from Athena
    market_context = await athena.observe(symbol=symbol)
    
    # Then, use Apollo to generate signals
    signals = await apollo.process_athena_observation(symbol=symbol, athena_context=market_context)
    
    # Convert signals to serializable format
    serialized_signals = []
    for signal in signals:
        serialized_signals.append({
            "id": signal.id,
            "symbol": signal.symbol,
            "timestamp": signal.timestamp.isoformat(),
            "pattern": signal.pattern,
            "direction": signal.direction,
            "confidence": signal.confidence,
            "entry": signal.entry,
            "stop_loss": signal.stop_loss,
            "target": signal.target,
            "risk_reward": signal.risk_reward,
            "time_horizon": signal.time_horizon,
            "description": signal.description,
            "reasoning": signal.reasoning[:500] + "..." if len(signal.reasoning) > 500 else signal.reasoning,
            "invalidation_criteria": signal.invalidation_criteria,
            "supporting_factors": signal.supporting_factors
        })
    
    return {
        "symbol": symbol,
        "signals_count": len(serialized_signals),
        "signals": serialized_signals
    }

async def apollo_validate_signal(params: dict) -> dict:
    signal_id = params.get("signal_id")
    
    print(f"Apollo validating signal: {signal_id}")
    
    # In a real implementation, we would look up the signal in the database
    # For this demo, we'll return a mock response
    
    return {
        "signal_id": signal_id,
        "valid": True,
        "reason": "Signal validated against historical data",
        "confidence": 0.85
    }

async def apollo_get_signal_probabilities(params: dict) -> dict:
    signal_id = params.get("signal_id")
    
    print(f"Apollo getting probabilities for signal: {signal_id}")
    
    # In a real implementation, we would look up the signal and calculate probabilities
    # For this demo, we'll return a mock response
    
    return {
        "signal_id": signal_id,
        "base_win_rate": 0.65,
        "adjusted_win_rate": 0.72,
        "expected_value": 1.5,
        "sample_size": 42,
        "description": "Signal has a 72% probability of success based on historical data"
    }

In [29]:
athena_function_registry = {
    "analyze_market": athena_analyze_market,
    "get_market_regime": athena_get_market_regime,
    "detect_patterns": athena_detect_patterns,
}

apollo_function_registry = {
    "generate_signals": apollo_generate_signals,
    "validate_signal": apollo_validate_signal,
    "get_signal_probabilities": apollo_get_signal_probabilities,
}


In [34]:
from google.adk.a2a.server import (
    A2AStarletteApplication,
    DefaultRequestHandler,
)
from google.adk.a2a.models import FunctionCall, AgentCard, TransportProtocol
from google.adk.a2a.client import ClientFactory, ClientConfig
from google.adk.a2a.utils import create_text_message_object


ModuleNotFoundError: No module named 'google.adk.a2a.server'

In [31]:
class FunctionDispatchRequestHandler(DefaultRequestHandler):
    """
    Custom A2A Request Handler that dispatches function calls
    to registered async functions inside the agent.
    """

    def __init__(self, runner, function_registry: dict[str, callable]):
        super().__init__(runner=runner, task_store=InMemoryTaskStore())
        self.function_registry = function_registry

    async def handle_function_call(self, function_call: FunctionCall):
        """Intercepts a function call and routes it to the right handler."""
        func_name = function_call.name
        params = function_call.parameters or {}

        if func_name not in self.function_registry:
            raise ValueError(f"Unknown function: {func_name}")

        handler = self.function_registry[func_name]
        result = await handler(params)
        return result

    async def handle_request(self, request):
        """
        Extend the base handler to check for 'function_call' in the A2A request.
        """
        try:
            data = await request.json()

            # Detect a function call (A2A pattern)
            if "function_call" in data:
                fn_call = FunctionCall(**data["function_call"])
                result = await self.handle_function_call(fn_call)
                return {"status": "ok", "result": result}

            # Otherwise, fallback to the parent handler
            return await super().handle_request(request)

        except Exception as e:
            return {"status": "error", "message": str(e)}


NameError: name 'FunctionCall' is not defined

## 6. Set Up A2A Servers

Now, let's create the A2A servers for both Athena and Apollo. These servers will listen for requests via the HTTP transport protocol and route them to the appropriate function handlers.

In [18]:
def create_agent_a2a_server(agent, agent_card):
    """
    Create an A2A server for any Google ADK agent.
    """
    # Create ADK runner
    runner = Runner(
        app_name=agent.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    # A2A executor handles all skill dispatching
    executor_config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=executor_config)

    # Default HTTP handler
    handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    # Build final A2A Starlette application
    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=handler,
    )

    return app


In [24]:
def create_agent_a2a_server(agent, agent_card):
    """
    Create an A2A server for any Google ADK agent.
    """
    # Create ADK runner
    runner = Runner(
        app_name=agent_card.name,
        agent=agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    # A2A executor handles all skill dispatching
    executor_config = A2aAgentExecutorConfig()
    executor = A2aAgentExecutor(runner=runner, config=executor_config)

    # Default HTTP handler
    handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    # Build final A2A Starlette application
    app = A2AStarletteApplication(
        agent_card=agent_card,
        http_handler=handler,
    )

    return app



In [25]:
async def run_agent_server(agent, agent_card, port: int):
    """
    Run a single agent's A2A server using Uvicorn.
    """
    app = create_agent_a2a_server(agent, agent_card)
    uvicorn_config = uvicorn.Config(
        app.build(),
        host="127.0.0.1",
        port=port,
        log_level="info"
    )
    server = uvicorn.Server(uvicorn_config)
    await server.serve()


In [26]:
async def start_all_servers():
    """
    Launch all A2A agent servers concurrently.
    """
    tasks = [
        asyncio.create_task(run_agent_server(athena_agent, athena_agent_card, 10010)),
        asyncio.create_task(run_agent_server(apollo_agent, apollo_agent_card, 10011)),
    ]

    # Let them initialize
    await asyncio.sleep(2)

    print("✅ All A2A agent servers are running:")
    print("   • Athena Agent: http://127.0.0.1:10010")
    print("   • Apollo Agent: http://127.0.0.1:10011")

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        print("🔻 Servers stopped.")


In [27]:
def run_servers_in_background():
    nest_asyncio.apply()
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(start_all_servers())


In [28]:
# Start the servers
server_thread = threading.Thread(target=run_servers_in_background, daemon=True)
server_thread.start()

# Allow them to boot
time.sleep(3)


  executor_config = A2aAgentExecutorConfig()
  executor = A2aAgentExecutor(runner=runner, config=executor_config)
INFO:     Started server process [60563]
INFO:     Waiting for application startup.
INFO:     Started server process [60563]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Application startup complete.
INFO:     Started server process [60563]
INFO:     Waiting for application startup.
INFO:     Started server process [60563]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:10010 (Press CTRL+C to quit)
INFO:     Uvicorn running on http://127.0.0.1:10011 (Press CTRL+C to quit)
INFO:     Uvicorn running on http://127.0.0.1:10010 (Press CTRL+C to quit)
INFO:     Uvicorn running on http://127.0.0.1:10011 (Press CTRL+C to quit)


✅ All A2A agent servers are running:
   • Athena Agent: http://127.0.0.1:10010
   • Apollo Agent: http://127.0.0.1:10011


## 7. Test A2A Communication

Let's test the communication between Athena and Apollo agents using the A2A protocol. We'll create client instances for each agent to communicate with the other.

In [None]:
# Create client transports to connect to the servers
athena_client_transport = HTTPTransport(host="localhost", port=8000)
apollo_client_transport = HTTPTransport(host="localhost", port=8001)

# Function to make an A2A request from one agent to another
async def make_a2a_request(sender_name, recipient_card, recipient_transport, function_name, parameters):
    # Create a message from sender to recipient
    message = Message(
        dialogue_id="test-dialogue",
        sender=AgentName(sender_name),
        recipient=recipient_card.name,
        content=f"Requesting {function_name} with parameters: {parameters}",
        state=DialogueState.IN_PROGRESS,
        function_call=FunctionCall(
            name=function_name,
            parameters=parameters
        )
    )
    
    # Send the message through the transport
    response = await recipient_transport.send(message)
    
    return response

# Test Apollo requesting market analysis from Athena
async def test_apollo_requesting_from_athena():
    print("\n🔄 Apollo requesting market analysis from Athena...")
    
    # Apollo requests Athena to analyze the market
    response = await make_a2a_request(
        sender_name="apollo",
        recipient_card=athena_card,
        recipient_transport=athena_client_transport,
        function_name="analyze_market",
        parameters={"symbol": "BTC/USD", "interval": 60}
    )
    
    print("✅ Athena's response:")
    print(json.dumps(response.content, indent=2))
    
    return response.content

# Test Athena requesting signal generation from Apollo
async def test_athena_requesting_from_apollo():
    print("\n🔄 Athena requesting signal generation from Apollo...")
    
    # Athena requests Apollo to generate signals
    response = await make_a2a_request(
        sender_name="athena",
        recipient_card=apollo_card,
        recipient_transport=apollo_client_transport,
        function_name="generate_signals",
        parameters={"symbol": "BTC/USD"}
    )
    
    print("✅ Apollo's response:")
    # Print a truncated version of the response to avoid cluttering the notebook
    content = response.content
    if "signals" in content and len(content["signals"]) > 0:
        # Truncate the reasoning field for display
        for signal in content["signals"]:
            if "reasoning" in signal and isinstance(signal["reasoning"], str) and len(signal["reasoning"]) > 200:
                signal["reasoning"] = signal["reasoning"][:200] + "..."
    
    print(json.dumps(content, indent=2))
    
    return response.content

# Run the tests
athena_analysis = await test_apollo_requesting_from_athena()
apollo_signals = await test_athena_requesting_from_apollo()

## 8. Create a Multi-Agent Workflow

Let's create a more complex workflow that involves both agents collaborating to analyze the market and generate signals for multiple symbols.

In [None]:
async def multi_agent_market_analysis_workflow(symbols):
    """
    A workflow that:
    1. Uses Athena to analyze multiple symbols
    2. Has Athena identify the most promising opportunities
    3. Requests Apollo to generate trading signals for those opportunities
    4. Consolidates the results into a complete analysis
    """
    print(f"\n🔄 Starting multi-agent workflow for symbols: {', '.join(symbols)}")
    
    # Step 1: Analyze all symbols with Athena
    print("\nStep 1: Athena analyzing all symbols...")
    market_analyses = {}
    
    for symbol in symbols:
        response = await make_a2a_request(
            sender_name="workflow",
            recipient_card=athena_card,
            recipient_transport=athena_client_transport,
            function_name="analyze_market",
            parameters={"symbol": symbol, "interval": 60}
        )
        market_analyses[symbol] = response.content
        print(f"  ✓ Analyzed {symbol}")
    
    # Step 2: Rank opportunities by confidence
    print("\nStep 2: Ranking opportunities...")
    ranked_symbols = sorted(
        [(symbol, analysis.get("regime_confidence", 0)) for symbol, analysis in market_analyses.items()],
        key=lambda x: x[1],
        reverse=True
    )
    
    top_symbols = [symbol for symbol, _ in ranked_symbols[:2]]
    print(f"  ✓ Top opportunities: {', '.join(top_symbols)}")
    
    # Step 3: Generate signals for top opportunities
    print("\nStep 3: Apollo generating signals for top opportunities...")
    signal_results = {}
    
    for symbol in top_symbols:
        response = await make_a2a_request(
            sender_name="workflow",
            recipient_card=apollo_card,
            recipient_transport=apollo_client_transport,
            function_name="generate_signals",
            parameters={"symbol": symbol}
        )
        signal_results[symbol] = response.content
        print(f"  ✓ Generated signals for {symbol}")
    
    # Step 4: Consolidate results
    print("\nStep 4: Consolidating results...")
    consolidated_results = {
        "timestamp": datetime.now().isoformat(),
        "symbols_analyzed": len(symbols),
        "market_analyses": market_analyses,
        "top_opportunities": top_symbols,
        "trading_signals": signal_results
    }
    
    print(f"  ✓ Analysis complete for {len(symbols)} symbols")
    
    return consolidated_results

# Run the multi-agent workflow with a few symbols
symbols_to_analyze = ["BTC/USD", "ETH/USD", "XRP/USD"]
workflow_results = await multi_agent_market_analysis_workflow(symbols_to_analyze)

# Display a summary of the workflow results
print("\n📊 Workflow Summary:")
print(f"Symbols analyzed: {workflow_results['symbols_analyzed']}")
print(f"Top opportunities: {', '.join(workflow_results['top_opportunities'])}")

signal_count = sum(
    len(result.get("signals", [])) 
    for result in workflow_results["trading_signals"].values()
)
print(f"Trading signals generated: {signal_count}")

# Display the first signal for the first top opportunity (if available)
if workflow_results["top_opportunities"] and signal_count > 0:
    top_symbol = workflow_results["top_opportunities"][0]
    signals = workflow_results["trading_signals"].get(top_symbol, {}).get("signals", [])
    
    if signals:
        print(f"\nSample trading signal for {top_symbol}:")
        signal = signals[0]
        print(f"Direction: {signal.get('direction', 'unknown').upper()}")
        print(f"Confidence: {signal.get('confidence', 0):.1f}%")
        print(f"Pattern: {signal.get('pattern', 'unknown')}")
        print(f"Description: {signal.get('description', 'No description')}")

## 9. Clean Up

Finally, let's clean up by stopping the A2A servers and releasing resources.

In [None]:
async def cleanup():
    # Stop the servers
    print("Stopping A2A servers...")
    for task in server_tasks:
        task.cancel()
    
    try:
        # Wait for tasks to be cancelled
        await asyncio.gather(*server_tasks, return_exceptions=True)
    except asyncio.CancelledError:
        pass
    
    # Close agent resources
    await athena.close()
    print("✅ Resources cleaned up")

# Run cleanup
await cleanup()
print("A2A integration demo complete!")

## 10. Conclusion

In this notebook, we've demonstrated how to:

1. Set up Google's Agent-to-Agent (A2A) communication framework
2. Create agent cards for both Athena and Apollo agents
3. Implement function handlers that connect A2A functions to agent methods
4. Set up A2A servers with HTTP transport for both agents
5. Test communication between agents
6. Create a more complex multi-agent workflow

This implementation allows the Athena market intelligence agent to communicate with the Apollo signal generation agent through a standardized protocol, enabling more sophisticated and composable AI agent interactions.

For production use, consider:
- Using more robust authentication and security measures
- Implementing proper error handling and retry mechanisms
- Persisting agent state and dialogue history
- Setting up proper logging and monitoring