From e0e57a2604a6a54c1c473d34dbc2d4a3a0025905 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 11:30:20 +0900 Subject: [PATCH 1/6] fix: improve run_sync's python 3.14 compatibility --- src/agents/run.py | 29 ++++++++++--- tests/test_agent_runner_sync.py | 73 +++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 tests/test_agent_runner_sync.py diff --git a/src/agents/run.py b/src/agents/run.py index d22947c77..54a59c50d 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -3,6 +3,7 @@ import asyncio import inspect import os +import warnings from dataclasses import dataclass, field from typing import Any, Callable, Generic, cast, get_args @@ -720,19 +721,37 @@ def run_sync( conversation_id = kwargs.get("conversation_id") session = kwargs.get("session") - # Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop. + # Python 3.14 stopped implicitly wiring up a default event loop + # when synchronous code touches asyncio APIs for the first time. + # Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers) + # construct asyncio primitives like asyncio.Lock during __init__, + # which binds them to whatever loop happens to be the thread's default at that moment. + # To keep those locks usable we must ensure that run_sync reuses that same default loop + # instead of hopping over to a brand-new asyncio.run() loop. try: - loop = asyncio.get_running_loop() + already_running_loop = asyncio.get_running_loop() except RuntimeError: - loop = None + already_running_loop = None - if loop is not None: + if already_running_loop is not None: # This method is only expected to run when no loop is already active. raise RuntimeError( "AgentRunner.run_sync() cannot be called when an event loop is already running." ) - return asyncio.run( + policy = asyncio.get_event_loop_policy() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + try: + default_loop = policy.get_event_loop() + except RuntimeError: + default_loop = policy.new_event_loop() + policy.set_event_loop(default_loop) + + # We intentionally leave the default loop open even if we had to create one above. Session + # instances and other helpers stash loop-bound primitives between calls and expect to find + # the same default loop every time run_sync is invoked on this thread. + return default_loop.run_until_complete( self.run( starting_agent, input, diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py new file mode 100644 index 000000000..474a72068 --- /dev/null +++ b/tests/test_agent_runner_sync.py @@ -0,0 +1,73 @@ +import asyncio +from collections.abc import Generator + +import pytest + +from agents.run import AgentRunner + + +@pytest.fixture +def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]: + policy_before = asyncio.get_event_loop_policy() + new_policy = asyncio.DefaultEventLoopPolicy() + asyncio.set_event_loop_policy(new_policy) + try: + yield new_policy + finally: + asyncio.set_event_loop_policy(policy_before) + + +def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + observed_loops: list[asyncio.AbstractEventLoop] = [] + + async def fake_run(self, *_args, **_kwargs): + observed_loops.append(asyncio.get_running_loop()) + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + try: + runner.run_sync(object(), "input") + assert observed_loops and observed_loops[0] is test_loop + finally: + fresh_event_loop_policy.set_event_loop(None) + test_loop.close() + + +def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + observed_loops: list[asyncio.AbstractEventLoop] = [] + + async def fake_run(self, *_args, **_kwargs): + observed_loops.append(asyncio.get_running_loop()) + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + fresh_event_loop_policy.set_event_loop(None) + + runner.run_sync(object(), "input") + created_loop = observed_loops[0] + assert created_loop is fresh_event_loop_policy.get_event_loop() + + fresh_event_loop_policy.set_event_loop(None) + created_loop.close() + + +def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + + async def fake_run(self, *_args, **_kwargs): + return object() + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + async def invoke(): + with pytest.raises(RuntimeError): + runner.run_sync(object(), "input") + + asyncio.run(invoke()) From baed3fe8c820c842241eae9b5b4645b90e35fb90 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 11:35:59 +0900 Subject: [PATCH 2/6] fix --- tests/test_agent_runner_sync.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py index 474a72068..e03e0786d 100644 --- a/tests/test_agent_runner_sync.py +++ b/tests/test_agent_runner_sync.py @@ -3,6 +3,7 @@ import pytest +from agents.agent import Agent from agents.run import AgentRunner @@ -31,7 +32,7 @@ async def fake_run(self, *_args, **_kwargs): fresh_event_loop_policy.set_event_loop(test_loop) try: - runner.run_sync(object(), "input") + runner.run_sync(Agent(name="test-agent"), "input") assert observed_loops and observed_loops[0] is test_loop finally: fresh_event_loop_policy.set_event_loop(None) @@ -50,7 +51,7 @@ async def fake_run(self, *_args, **_kwargs): fresh_event_loop_policy.set_event_loop(None) - runner.run_sync(object(), "input") + runner.run_sync(Agent(name="test-agent"), "input") created_loop = observed_loops[0] assert created_loop is fresh_event_loop_policy.get_event_loop() @@ -68,6 +69,6 @@ async def fake_run(self, *_args, **_kwargs): async def invoke(): with pytest.raises(RuntimeError): - runner.run_sync(object(), "input") + runner.run_sync(Agent(name="test-agent"), "input") asyncio.run(invoke()) From 3537b62ecec4f786bb0d037e84e2ff578437e251 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 11:42:59 +0900 Subject: [PATCH 3/6] fix review comment --- src/agents/run.py | 12 ++++++++- tests/test_agent_runner_sync.py | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/src/agents/run.py b/src/agents/run.py index 54a59c50d..03b69ded3 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import contextlib import inspect import os import warnings @@ -751,7 +752,7 @@ def run_sync( # We intentionally leave the default loop open even if we had to create one above. Session # instances and other helpers stash loop-bound primitives between calls and expect to find # the same default loop every time run_sync is invoked on this thread. - return default_loop.run_until_complete( + task = default_loop.create_task( self.run( starting_agent, input, @@ -765,6 +766,15 @@ def run_sync( ) ) + try: + return default_loop.run_until_complete(task) + except BaseException: + if not task.done(): + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + default_loop.run_until_complete(task) + raise + def run_streamed( self, starting_agent: Agent[TContext], diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py index e03e0786d..1035fd718 100644 --- a/tests/test_agent_runner_sync.py +++ b/tests/test_agent_runner_sync.py @@ -1,5 +1,6 @@ import asyncio from collections.abc import Generator +from typing import Any import pytest @@ -72,3 +73,48 @@ async def invoke(): runner.run_sync(Agent(name="test-agent"), "input") asyncio.run(invoke()) + + +def test_run_sync_cancels_task_when_interrupted(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + + async def fake_run(self, *_args, **_kwargs): + await asyncio.sleep(3600) + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + created_tasks: list[asyncio.Task[Any]] = [] + original_create_task = test_loop.create_task + + def capturing_create_task(coro): + task = original_create_task(coro) + created_tasks.append(task) + return task + + original_run_until_complete = test_loop.run_until_complete + call_count = {"value": 0} + + def interrupt_once(future): + call_count["value"] += 1 + if call_count["value"] == 1: + raise KeyboardInterrupt() + return original_run_until_complete(future) + + monkeypatch.setattr(test_loop, "create_task", capturing_create_task) + monkeypatch.setattr(test_loop, "run_until_complete", interrupt_once) + + try: + with pytest.raises(KeyboardInterrupt): + runner.run_sync(Agent(name="test-agent"), "input") + + assert created_tasks, "Expected run_sync to schedule a task." + assert created_tasks[0].done() + assert created_tasks[0].cancelled() + assert call_count["value"] >= 2 + finally: + monkeypatch.undo() + fresh_event_loop_policy.set_event_loop(None) + test_loop.close() From cff74954a82c5acad227bd54873056921895cdb4 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 12:02:10 +0900 Subject: [PATCH 4/6] fix review comment and add more comments --- src/agents/run.py | 12 ++++++++++++ tests/test_agent_runner_sync.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/agents/run.py b/src/agents/run.py index 03b69ded3..5b25df4f2 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -736,6 +736,8 @@ def run_sync( if already_running_loop is not None: # This method is only expected to run when no loop is already active. + # (Each thread has its own default loop; concurrent sync runs should happen on + # different threads. In a single thread use the async API to interleave work.) raise RuntimeError( "AgentRunner.run_sync() cannot be called when an event loop is already running." ) @@ -752,6 +754,7 @@ def run_sync( # We intentionally leave the default loop open even if we had to create one above. Session # instances and other helpers stash loop-bound primitives between calls and expect to find # the same default loop every time run_sync is invoked on this thread. + # Schedule the async run on the default loop so that we can manage cancellation explicitly. task = default_loop.create_task( self.run( starting_agent, @@ -767,13 +770,22 @@ def run_sync( ) try: + # Drive the coroutine to completion, harvesting the final RunResult. return default_loop.run_until_complete(task) except BaseException: + # If the sync caller aborts (KeyboardInterrupt, etc.), make sure the scheduled task + # does not linger on the shared loop by cancelling it and waiting for completion. if not task.done(): task.cancel() with contextlib.suppress(asyncio.CancelledError): default_loop.run_until_complete(task) raise + finally: + if not default_loop.is_closed(): + # The loop stays open for subsequent runs, but we still need to flush any pending + # async generators so their cleanup code executes promptly. + with contextlib.suppress(RuntimeError): + default_loop.run_until_complete(default_loop.shutdown_asyncgens()) def run_streamed( self, diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py index 1035fd718..f1b134428 100644 --- a/tests/test_agent_runner_sync.py +++ b/tests/test_agent_runner_sync.py @@ -118,3 +118,31 @@ def interrupt_once(future): monkeypatch.undo() fresh_event_loop_policy.set_event_loop(None) test_loop.close() + + +def test_run_sync_finalizes_async_generators(monkeypatch, fresh_event_loop_policy): + runner = AgentRunner() + cleanup_markers: list[str] = [] + + async def fake_run(self, *_args, **_kwargs): + async def agen(): + try: + yield None + finally: + cleanup_markers.append("done") + + gen = agen() + await gen.__anext__() + return "ok" + + monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False) + + test_loop = asyncio.new_event_loop() + fresh_event_loop_policy.set_event_loop(test_loop) + + try: + runner.run_sync(Agent(name="test-agent"), "input") + assert cleanup_markers == ["done"], "Async generators must be finalized after run_sync returns." + finally: + fresh_event_loop_policy.set_event_loop(None) + test_loop.close() From 6406a2f0af48213ce74c942c83164454f86363ba Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 12:04:37 +0900 Subject: [PATCH 5/6] fix lint error --- tests/test_agent_runner_sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py index f1b134428..29c20c436 100644 --- a/tests/test_agent_runner_sync.py +++ b/tests/test_agent_runner_sync.py @@ -142,7 +142,9 @@ async def agen(): try: runner.run_sync(Agent(name="test-agent"), "input") - assert cleanup_markers == ["done"], "Async generators must be finalized after run_sync returns." + assert cleanup_markers == [ + "done" + ], "Async generators must be finalized after run_sync returns." finally: fresh_event_loop_policy.set_event_loop(None) test_loop.close() From 30c2edc06c1e39525b5d1929f9a518f90a25f6f5 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 12:05:48 +0900 Subject: [PATCH 6/6] make format --- tests/test_agent_runner_sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_agent_runner_sync.py b/tests/test_agent_runner_sync.py index 29c20c436..a570eea28 100644 --- a/tests/test_agent_runner_sync.py +++ b/tests/test_agent_runner_sync.py @@ -142,9 +142,9 @@ async def agen(): try: runner.run_sync(Agent(name="test-agent"), "input") - assert cleanup_markers == [ - "done" - ], "Async generators must be finalized after run_sync returns." + assert cleanup_markers == ["done"], ( + "Async generators must be finalized after run_sync returns." + ) finally: fresh_event_loop_policy.set_event_loop(None) test_loop.close()