Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions src/agents/run.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import asyncio
import contextlib
import inspect
import os
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Generic, cast, get_args

Expand Down Expand Up @@ -720,19 +722,40 @@ def run_sync(
conversation_id = kwargs.get("conversation_id")
session = kwargs.get("session")

# Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop.
# Python 3.14 stopped implicitly wiring up a default event loop
# when synchronous code touches asyncio APIs for the first time.
# Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers)
# construct asyncio primitives like asyncio.Lock during __init__,
# which binds them to whatever loop happens to be the thread's default at that moment.
# To keep those locks usable we must ensure that run_sync reuses that same default loop
# instead of hopping over to a brand-new asyncio.run() loop.
try:
loop = asyncio.get_running_loop()
already_running_loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
already_running_loop = None

if loop is not None:
if already_running_loop is not None:
# This method is only expected to run when no loop is already active.
# (Each thread has its own default loop; concurrent sync runs should happen on
# different threads. In a single thread use the async API to interleave work.)
raise RuntimeError(
"AgentRunner.run_sync() cannot be called when an event loop is already running."
)

return asyncio.run(
policy = asyncio.get_event_loop_policy()
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
try:
default_loop = policy.get_event_loop()
except RuntimeError:
default_loop = policy.new_event_loop()
policy.set_event_loop(default_loop)

# We intentionally leave the default loop open even if we had to create one above. Session
# instances and other helpers stash loop-bound primitives between calls and expect to find
# the same default loop every time run_sync is invoked on this thread.
# Schedule the async run on the default loop so that we can manage cancellation explicitly.
task = default_loop.create_task(
self.run(
starting_agent,
input,
Expand All @@ -746,6 +769,24 @@ def run_sync(
)
)

try:
# Drive the coroutine to completion, harvesting the final RunResult.
return default_loop.run_until_complete(task)
except BaseException:
# If the sync caller aborts (KeyboardInterrupt, etc.), make sure the scheduled task
# does not linger on the shared loop by cancelling it and waiting for completion.
if not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
default_loop.run_until_complete(task)
raise
finally:
if not default_loop.is_closed():
# The loop stays open for subsequent runs, but we still need to flush any pending
# async generators so their cleanup code executes promptly.
with contextlib.suppress(RuntimeError):
default_loop.run_until_complete(default_loop.shutdown_asyncgens())

def run_streamed(
self,
starting_agent: Agent[TContext],
Expand Down
150 changes: 150 additions & 0 deletions tests/test_agent_runner_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import asyncio
from collections.abc import Generator
from typing import Any

import pytest

from agents.agent import Agent
from agents.run import AgentRunner


@pytest.fixture
def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
policy_before = asyncio.get_event_loop_policy()
new_policy = asyncio.DefaultEventLoopPolicy()
asyncio.set_event_loop_policy(new_policy)
try:
yield new_policy
finally:
asyncio.set_event_loop_policy(policy_before)


def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

try:
runner.run_sync(Agent(name="test-agent"), "input")
assert observed_loops and observed_loops[0] is test_loop
finally:
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()


def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
observed_loops: list[asyncio.AbstractEventLoop] = []

async def fake_run(self, *_args, **_kwargs):
observed_loops.append(asyncio.get_running_loop())
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

fresh_event_loop_policy.set_event_loop(None)

runner.run_sync(Agent(name="test-agent"), "input")
created_loop = observed_loops[0]
assert created_loop is fresh_event_loop_policy.get_event_loop()

fresh_event_loop_policy.set_event_loop(None)
created_loop.close()


def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()

async def fake_run(self, *_args, **_kwargs):
return object()

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

async def invoke():
with pytest.raises(RuntimeError):
runner.run_sync(Agent(name="test-agent"), "input")

asyncio.run(invoke())


def test_run_sync_cancels_task_when_interrupted(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()

async def fake_run(self, *_args, **_kwargs):
await asyncio.sleep(3600)

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

created_tasks: list[asyncio.Task[Any]] = []
original_create_task = test_loop.create_task

def capturing_create_task(coro):
task = original_create_task(coro)
created_tasks.append(task)
return task

original_run_until_complete = test_loop.run_until_complete
call_count = {"value": 0}

def interrupt_once(future):
call_count["value"] += 1
if call_count["value"] == 1:
raise KeyboardInterrupt()
return original_run_until_complete(future)

monkeypatch.setattr(test_loop, "create_task", capturing_create_task)
monkeypatch.setattr(test_loop, "run_until_complete", interrupt_once)

try:
with pytest.raises(KeyboardInterrupt):
runner.run_sync(Agent(name="test-agent"), "input")

assert created_tasks, "Expected run_sync to schedule a task."
assert created_tasks[0].done()
assert created_tasks[0].cancelled()
assert call_count["value"] >= 2
finally:
monkeypatch.undo()
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()


def test_run_sync_finalizes_async_generators(monkeypatch, fresh_event_loop_policy):
runner = AgentRunner()
cleanup_markers: list[str] = []

async def fake_run(self, *_args, **_kwargs):
async def agen():
try:
yield None
finally:
cleanup_markers.append("done")

gen = agen()
await gen.__anext__()
return "ok"

monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)

test_loop = asyncio.new_event_loop()
fresh_event_loop_policy.set_event_loop(test_loop)

try:
runner.run_sync(Agent(name="test-agent"), "input")
assert cleanup_markers == ["done"], (
"Async generators must be finalized after run_sync returns."
)
finally:
fresh_event_loop_policy.set_event_loop(None)
test_loop.close()