# A2A Client example

### Initialize

In [1]:
from uuid import uuid4

from a2a_min.client.card_resolver import A2ACardResolver
from a2a_min.client.client import A2AClient

In [2]:
A2A_SERVER_URL = "http://localhost:10000"

### Get A2A server's card

In [3]:
card_resolver = A2ACardResolver(A2A_SERVER_URL)
card = card_resolver.get_agent_card()

print(card.model_dump_json(exclude_none=True, indent=2))

{
  "name": "Dummy Agent",
  "description": "Pretneds to be a usefull agent...",
  "url": "http://localhost:10000/",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "defaultInputModes": [
    "text"
  ],
  "defaultOutputModes": [
    "text"
  ],
  "skills": [
    {
      "id": "dummy_agent",
      "name": "Dummy Agent",
      "description": "Pretneds to be a usefull agent...",
      "tags": [
        "dummy",
        "fake"
      ],
      "examples": [
        "Are you a real agent?",
        "What is the meaning of life?"
      ]
    }
  ]
}


### Create an A2A client

In [4]:
client = A2AClient(agent_card=card)

In [5]:
def to_json(data):
    if hasattr(data, "model_dump_json"):
        return data.model_dump_json(indent=2, exclude_none=True)
    return data.json(indent=2, exclude_none=True)

In [8]:
from a2a_min.types import Message, Part, TaskSendParams, TaskState, TextPart


async def completeTask(a2a_client: A2AClient, prompt: str, taskId, sessionId):
    """ Run a task with the given prompt """
    if prompt == ":q" or prompt == "quit":
        return False

    message = Message(role="user", parts=[TextPart(text=prompt)])
    tasK_send_params = TaskSendParams(id=taskId, sessionId=sessionId, acceptedOutputModes=["text"], message=message)

    taskResult = await a2a_client.send_task(tasK_send_params)
    print(f"\n{to_json(taskResult)}")

    ## if the result is that more input is required, loop again.
    state = TaskState(taskResult.result.status.state)
    if state.name == TaskState.INPUT_REQUIRED.name:
        return await completeTask(a2a_client, prompt, taskId, sessionId)
    else:
        return False    # We are done with this task.


In [9]:
prompt = "What do you want to send to the agent? (:q or quit to exit)"

continue_loop = True
sessionId = uuid4().hex
while continue_loop:
    taskId = uuid4().hex
    print("=========  starting a new task ======== ")
    continue_loop = await completeTask(client, prompt, taskId, sessionId)


{
  "jsonrpc": "2.0",
  "id": "a1109f5ba6f147ee83b9cc1946549894",
  "result": {
    "id": "20196ee5ecc0483c8a08a2df36a8efe6",
    "sessionId": "3ad9a6b746f94fd1acb55f2be9e6ed91",
    "status": {
      "state": "completed",
      "timestamp": "2025-04-14T14:27:10.979711"
    },
    "artifacts": [
      {
        "parts": [
          {
            "type": "text",
            "text": "We are unable to process your request at the moment. Please try again."
          }
        ],
        "index": 0
      }
    ],
    "history": []
  }
}
