-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Open
Labels
Description
Initial Checks
- I confirm that I'm using the latest version of Pydantic AI
- I confirm that I searched for my issue in https://github.com/pydantic/pydantic-ai/issues before opening this issue
Description
I raised a similar issue in FastA2A here: pydantic/fasta2a#37
But the pattern is the same.
When you use an agent.to_a2a() method, the lifespan overrides the default lifespan entirely. This means that key setup steps are missed for the agent to run, these are:
@asynccontextmanager
async def worker_lifespan(
app: FastA2A, worker: Worker, agent: AbstractAgent[AgentDepsT, OutputDataT]
) -> AsyncIterator[None]:
"""Custom lifespan that runs the worker during application startup.
This ensures the worker is started and ready to process tasks as soon as the application starts.
"""
async with app.task_manager, agent:
async with worker.run():
yieldWhich is needed to start the worker, task manager and agent. The guilty code is here https://github.com/pydantic/pydantic-ai/blob/9c3152144f2d369b13cb56e04d1aaed4f43f3721/pydantic_ai_slim/pydantic_ai/_a2a.py#L98C5-L98C80
lifespan = lifespan or partial(worker_lifespan, worker=worker, agent=agent)This can be worked around by adding to whatever custom lifespan you use:
from pydantic_ai._a2a import AgentWorker
storage = InMemoryStorage()
broker = InMemoryBroker()
worker = AgentWorker(agent=agent, broker=broker, storage=storage)
@asynccontextmanager
async def lifespan(app: FastA2A) -> AsyncIterator[None]:
# custom setup
...
async with app.task_manager, agent:
async with worker.run():
yield
...
agent.to_a2a(
...,
lifespan=lifespan
)but this involves accessing a protected member
This could be fixed by just combining any provided lifespan with the builtin default:
from contextlib import asynccontextmanager
from typing import AsyncIterator, Callable, Optional
LifespanFn = Callable[["FastA2A"], AsyncIterator[None]]
@asynccontextmanager
async def worker_lifespan(
app: FastA2A, worker: Worker, agent: AbstractAgent[AgentDepsT, OutputDataT]
) -> AsyncIterator[None]:
"""Custom lifespan that runs the worker during application startup.
This ensures the worker is started and ready to process tasks as soon as the application starts.
"""
async with app.task_manager, agent:
async with worker.run():
yield
def compose_lifespans(outer: LifespanFn, inner: Optional[LifespanFn]) -> LifespanFn:
if inner is None:
return outer
@asynccontextmanager
async def _composed(app: "FastA2A") -> AsyncIterator[None]:
# This nests them: outer around inner
async with outer(app), inner(app):
yield
return _composed_a2a.py
...
lifespan = _composed(worker_lifespan, provided_lifespan)
...Example Code
from contextlib import asynccontextmanager
from pydantic_ai import Agent
agent = Agent()
async def lifespan(app: FastA2A) -> AsyncIterator[None]:
yield
agent.to_a2a(lifespan=lifespan)
# worker is never enteredPython, Pydantic AI & LLM client version
python3.12
pydantic-ai-slim==1.9.0
T-J-Forster, iangoldby and alexander-petch