diff --git a/src/agents/run.py b/src/agents/run.py index d22947c77..5b25df4f2 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -1,8 +1,10 @@ from __future__ import annotations import asyncio +import contextlib import inspect import os +import warnings from dataclasses import dataclass, field from typing import Any, Callable, Generic, cast, get_args @@ -720,19 +722,40 @@ def run_sync( conversation_id = kwargs.get("conversation_id") session = kwargs.get("session") - # Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop. + # Python 3.14 stopped implicitly wiring up a default event loop + # when synchronous code touches asyncio APIs for the first time. + # Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers) + # construct asyncio primitives like asyncio.Lock during __init__, + # which binds them to whatever loop happens to be the thread's default at that moment. + # To keep those locks usable we must ensure that run_sync reuses that same default loop + # instead of hopping over to a brand-new asyncio.run() loop. try: - loop = asyncio.get_running_loop() + already_running_loop = asyncio.get_running_loop() except RuntimeError: - loop = None + already_running_loop = None - if loop is not None: + if already_running_loop is not None: # This method is only expected to run when no loop is already active. + # (Each thread has its own default loop; concurrent sync runs should happen on + # different threads. In a single thread use the async API to interleave work.) raise RuntimeError( "AgentRunner.run_sync() cannot be called when an event loop is already running." ) - return asyncio.run( + policy = asyncio.get_event_loop_policy() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + try: + default_loop = policy.get_event_loop() + except RuntimeError: + default_loop = policy.new_event_loop() + policy.set_event_loop(default_loop) + + # We intentionally leave the default loop open even if we had to create one above. Session + # instances and other helpers stash loop-bound primitives between calls and expect to find + # the same default loop every time run_sync is invoked on this thread. + # Schedule the async run on the default loop so that we can manage cancellation explicitly. + task = default_loop.create_task( self.run( starting_agent, input, @@ -746,6 +769,24 @@ def run_sync( ) ) + try: + # Drive the coroutine to completion, harvesting the final RunResult. + return default_loop.run_until_complete(task) + except BaseException: + # If the sync caller aborts (KeyboardInterrupt, etc.), make sure the scheduled task + # does not linger on the shared loop by cancelling it and waiting for completion. + if not task.done(): + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + default_loop.run_until_complete(task) + raise + finally: + if not default_loop.is_closed(): + # The loop stays open for subsequent runs, but we still need to flush any pending + # async generators so their cleanup code executes promptly. + with contextlib.suppress(RuntimeError): + default_loop.run_until_complete(default_loop.shutdown_asyncgens()) + def run_streamed( self, starting_agent: Agent[TContext], diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py new file mode 100644 index 000000000..a570eea28 --- /dev/null +++ b/tests/test_agent_runner_sync.py @@ -0,0 +1,150 @@ +import asyncio +from collections.abc import Generator +from typing import Any + +import pytest + +from agents.agent import Agent +from agents.run import AgentRunner + + +@pytest.fixture +def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]: + policy_before = asyncio.get_event_loop_policy() + new_policy = asyncio.DefaultEventLoopPolicy() + asyncio.set_event_loop_policy(new_policy) + try: + yield new_policy + finally: + asyncio.set_event_loop_policy(policy_before) + + +def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + observed_loops: list[asyncio.AbstractEventLoop] = [] + + async def fake_run(self, *_args, **_kwargs): + observed_loops.append(asyncio.get_running_loop()) + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + try: + runner.run_sync(Agent(name="test-agent"), "input") + assert observed_loops and observed_loops[0] is test_loop + finally: + fresh_event_loop_policy.set_event_loop(None) + test_loop.close() + + +def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + observed_loops: list[asyncio.AbstractEventLoop] = [] + + async def fake_run(self, *_args, **_kwargs): + observed_loops.append(asyncio.get_running_loop()) + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + fresh_event_loop_policy.set_event_loop(None) + + runner.run_sync(Agent(name="test-agent"), "input") + created_loop = observed_loops[0] + assert created_loop is fresh_event_loop_policy.get_event_loop() + + fresh_event_loop_policy.set_event_loop(None) + created_loop.close() + + +def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + + async def fake_run(self, *_args, **_kwargs): + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + async def invoke(): + with pytest.raises(RuntimeError): + runner.run_sync(Agent(name="test-agent"), "input") + + asyncio.run(invoke()) + + +def test_run_sync_cancels_task_when_interrupted(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + + async def fake_run(self, *_args, **_kwargs): + await asyncio.sleep(3600) + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + created_tasks: list[asyncio.Task[Any]] = [] + original_create_task = test_loop.create_task + + def capturing_create_task(coro): + task = original_create_task(coro) + created_tasks.append(task) + return task + + original_run_until_complete = test_loop.run_until_complete + call_count = {"value": 0} + + def interrupt_once(future): + call_count["value"] += 1 + if call_count["value"] == 1: + raise KeyboardInterrupt() + return original_run_until_complete(future) + + monkeypatch.setattr(test_loop, "create_task", capturing_create_task) + monkeypatch.setattr(test_loop, "run_until_complete", interrupt_once) + + try: + with pytest.raises(KeyboardInterrupt): + runner.run_sync(Agent(name="test-agent"), "input") + + assert created_tasks, "Expected run_sync to schedule a task." + assert created_tasks[0].done() + assert created_tasks[0].cancelled() + assert call_count["value"] >= 2 + finally: + monkeypatch.undo() + fresh_event_loop_policy.set_event_loop(None) + test_loop.close() + + +def test_run_sync_finalizes_async_generators(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + cleanup_markers: list[str] = [] + + async def fake_run(self, *_args, **_kwargs): + async def agen(): + try: + yield None + finally: + cleanup_markers.append("done") + + gen = agen() + await gen.__anext__() + return "ok" + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + try: + runner.run_sync(Agent(name="test-agent"), "input") + assert cleanup_markers == ["done"], ( + "Async generators must be finalized after run_sync returns." + ) + finally: + fresh_event_loop_policy.set_event_loop(None) + test_loop.close()