<a href="https://colab.research.google.com/github/m-adeleke1/Association_of_Data_Scientists/blob/main/Google_A2A_Project_Create_OpenAI_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Google A2A → OpenAI Agent
This notebook installs the A2A SDK, creates an OpenAI-powered agent, launches an A2A server in the background, and tests it via HTTP.


In [9]:
import sys
!{sys.executable} -m pip -q install a2a-sdk "openai>=1.59.0" langgraph langchain-openai uvicorn httpx nest_asyncio
print('Installed a2a-sdk, openai, langgraph, langchain-openai, uvicorn, httpx, nest_asyncio')


Installed a2a-sdk, openai, langgraph, langchain-openai, uvicorn, httpx, nest_asyncio


In [13]:
import os, nest_asyncio
nest_asyncio.apply()

# Set your API key here or rely on an already-set environment variable
# os.environ['OPENAI_API_KEY'] = 'sk-...'
# print('OPENAI_API_KEY present:', bool(os.getenv('OPENAI_API_KEY')))

from google.colab import userdata
import os

key = userdata.get("OPENAI_API_KEY")     # exact name match, case-sensitive
print("Secret found:", bool(key))

if key:
    os.environ["OPENAI_API_KEY"] = key.strip()  # strip just in case
    print("Env set:", bool(os.getenv("OPENAI_API_KEY")))
else:
    raise RuntimeError("OPENAI_API_KEY is not granted to this notebook in Secrets.")

Secret found: True
Env set: True


In [14]:
import os, textwrap, pathlib
base_dir = '/content/my_project'
os.makedirs(base_dir, exist_ok=True)
print('Project dir:', base_dir)

Project dir: /content/my_project


In [15]:
agent_py = '''\
from dataclasses import dataclass
import os
from openai import AsyncOpenAI

@dataclass
class OpenAIAgentConfig:
    model: str = "gpt-4o-mini"
    temperature: float = 0.2

def create_openai_agent(openai_model: str, temperature: float = 0.2) -> OpenAIAgentConfig:
    """Create a tiny config object describing how to call OpenAI."""
    return OpenAIAgentConfig(model=openai_model, temperature=temperature)

async def run_openai(agent: OpenAIAgentConfig, prompt: str) -> str:
    """Call OpenAI Chat API and return the assistant's text."""
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    resp = await client.chat.completions.create(
        model=agent.model,
        temperature=agent.temperature,
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt},
        ],
    )
    return resp.choices[0].message.content or ""
'''
open(os.path.join(base_dir, 'agent.py'), 'w').write(agent_py)
print('Wrote', os.path.join(base_dir, 'agent.py'))

Wrote /content/my_project/agent.py


In [16]:
task_manager_py = '''\
from __future__ import annotations
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
from my_project.agent import create_openai_agent, run_openai

def _message_text(context: RequestContext) -> str:
    """
    Extract plain text from the incoming A2A message.
    Falls back gracefully if parts are non-text.
    """
    if not context.message or not getattr(context.message, "parts", None):
        return ""
    texts: list[str] = []
    for part in context.message.parts:
        text = getattr(part, "text", None)
        if text is None and hasattr(part, "root"):
            text = getattr(part.root, "text", None)
        if isinstance(text, str):
            texts.append(text)
    return "\\n".join(texts).strip()

class MyAgentTaskManager(AgentExecutor):
    """
    An AgentExecutor that routes requests to OpenAI.
    """
    def __init__(self, openai_model: str = "gpt-4o-mini", temperature: float = 0.2):
        self._cfg = create_openai_agent(openai_model, temperature)

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        user_text = _message_text(context) or "Please respond helpfully."
        reply = await run_openai(self._cfg, user_text)
        await event_queue.enqueue_event(
            new_agent_text_message(reply, context_id=context.context_id)
        )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        await event_queue.enqueue_event(
            new_agent_text_message("Cancellation is not supported for this agent.", context_id=context.context_id)
        )
'''
open(os.path.join(base_dir, 'task_manager.py'), 'w').write(task_manager_py)
print('Wrote', os.path.join(base_dir, 'task_manager.py'))

Wrote /content/my_project/task_manager.py


In [17]:
init_py = '''\
import argparse
import os
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from my_project.task_manager import MyAgentTaskManager

def main(host: str = "0.0.0.0", port: int = 9999, openai_model: str = "gpt-4o-mini"):
    skill = AgentSkill(
        id="general_chat",
        name="General Chat",
        description="Answers questions with helpful, concise responses.",
        tags=["chat", "assistant"],
        examples=["Explain transformers in 3 bullet points."],
    )
    agent_card = AgentCard(
        name="Google_A2A_Project_Create_OpenAI_Agent",
        description="An A2A-compliant agent backed by OpenAI.",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        default_input_modes=["text"],
        default_output_modes=["text"],
        capabilities=AgentCapabilities(streaming=True),
        skills=[skill],
    )
    handler = DefaultRequestHandler(
        agent_executor=MyAgentTaskManager(openai_model=openai_model),
        task_store=InMemoryTaskStore(),
    )
    app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler)
    uvicorn.run(app.build(), host=host, port=port)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--host", default=os.getenv("HOST", "127.0.0.1"))
    parser.add_argument("--port", type=int, default=int(os.getenv("PORT", "9999")))
    parser.add_argument("--openai-model", default=os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
    args = parser.parse_args()
    main(args.host, args.port, args.openai_model)
'''
open(os.path.join(base_dir, '__init__.py'), 'w').write(init_py)
print('Wrote', os.path.join(base_dir, '__init__.py'))

Wrote /content/my_project/__init__.py


In [18]:
# Runs uvicorn in a background thread and waits until the port is open
import asyncio, socket, threading, time
import uvicorn

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from my_project.task_manager import MyAgentTaskManager

_server = None
_server_thread = None

def _is_port_free(host: str, port: int) -> bool:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout(0.2)
        return s.connect_ex((host, port)) != 0

def _find_free_port(host: str = "127.0.0.1", preferred: int = 9999, max_tries: int = 50) -> int:
    if _is_port_free(host, preferred):
        return preferred
    for p in range(preferred + 1, preferred + 1 + max_tries):
        if _is_port_free(host, p):
            return p
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((host, 0))
        return s.getsockname()[1]

def start_a2a_server(host: str = "127.0.0.1", preferred_port: int = 9999, openai_model: str = "gpt-4o-mini"):
    global _server, _server_thread
    port = _find_free_port(host, preferred_port)

    skill = AgentSkill(
        id="general_chat",
        name="General Chat",
        description="Answers questions with helpful, concise responses.",
        tags=["chat", "assistant"],
        examples=["Explain transformers in 3 bullet points."],
    )
    agent_card = AgentCard(
        name="Google_A2A_Project_Create_OpenAI_Agent",
        description="An A2A-compliant agent backed by OpenAI.",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        default_input_modes=["text"],
        default_output_modes=["text"],
        capabilities=AgentCapabilities(streaming=True),
        skills=[skill],
    )

    handler = DefaultRequestHandler(
        agent_executor=MyAgentTaskManager(openai_model=openai_model),
        task_store=InMemoryTaskStore(),
    )
    app = A2AStarletteApplication(agent_card=agent_card, http_handler=handler).build()
    config = uvicorn.Config(app, host=host, port=port, log_level="info")
    _server = uvicorn.Server(config)

    def _serve():
        asyncio.run(_server.serve())

    _server_thread = threading.Thread(target=_serve, daemon=True)
    _server_thread.start()

    # Wait for socket to open
    t0 = time.time()
    while time.time() - t0 < 6.0:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(0.2)
            if s.connect_ex((host, port)) == 0:
                print(f"A2A server is up at http://{host}:{port}")
                return f"http://{host}:{port}"
        time.sleep(0.1)
    raise RuntimeError("Server did not bind in time")

def stop_a2a_server():
    global _server
    if _server is not None:
        _server.should_exit = True
        print("Requested server shutdown.")

In [19]:
BASE = start_a2a_server(host="127.0.0.1", preferred_port=9999, openai_model="gpt-4o-mini")
BASE

INFO:     Started server process [447]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:9999 (Press CTRL+C to quit)


A2A server is up at http://127.0.0.1:9999


'http://127.0.0.1:9999'

In [20]:
import httpx, json
r = httpx.get(f"{BASE}/.well-known/agent.json", timeout=10)
print(r.status_code)
print(json.dumps(r.json(), indent=2)[:800])



INFO:     127.0.0.1:53496 - "GET /.well-known/agent.json HTTP/1.1" 200 OK
200
{
  "capabilities": {
    "streaming": true
  },
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "description": "An A2A-compliant agent backed by OpenAI.",
  "name": "Google_A2A_Project_Create_OpenAI_Agent",
  "preferredTransport": "JSONRPC",
  "protocolVersion": "0.3.0",
  "skills": [
    {
      "description": "Answers questions with helpful, concise responses.",
      "examples": [
        "Explain transformers in 3 bullet points."
      ],
      "id": "general_chat",
      "name": "General Chat",
      "tags": [
        "chat",
        "assistant"
      ]
    }
  ],
  "url": "http://127.0.0.1:9999/",
  "version": "1.0.0"
}


In [23]:
import httpx, uuid, json

BASE = BASE  # whatever you printed from start_a2a_server()

task_id = str(uuid.uuid4())
message_id = str(uuid.uuid4())  # REQUIRED in this SDK version

payload = {
  "jsonrpc": "2.0",
  "id": "1",
  "method": "message/send",      # modern method name
  "params": {
    "id": task_id,
    "contextId": "ctx-001",
    "message": {
      "messageId": message_id,   # <-- add this
      "role": "user",
      "parts": [
        {"type": "text", "text": "Say hello in 8 words."}
      ]
    },
    "metadata": {}
  }
}

r = httpx.post(f"{BASE}/", json=payload, timeout=60)
print(r.status_code, r.headers.get("content-type"))
print(json.dumps(r.json(), indent=2))

INFO:     127.0.0.1:33042 - "POST / HTTP/1.1" 200 OK
200 application/json
{
  "id": "1",
  "jsonrpc": "2.0",
  "result": {
    "contextId": "ee239b70-9ccd-48ee-ad68-0a24e1e1f22c",
    "kind": "message",
    "messageId": "f6bb1dc1-53e0-411b-b438-8816688e449e",
    "parts": [
      {
        "kind": "text",
        "text": "Hello! How are you doing today, my friend?"
      }
    ],
    "role": "agent"
  }
}


In [24]:
stop_a2a_server()
print('Stopped.')

Requested server shutdown.
Stopped.
