From 26c29ab5939caf8e9c773837bfc3791c423040a0 Mon Sep 17 00:00:00 2001 From: Wen-Tien Chang Date: Fri, 28 Nov 2025 03:09:15 +0800 Subject: [PATCH] Fix streamed runner hanging when session.add_items raises exception --- src/agents/run.py | 8 ++++++++ tests/test_session.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/agents/run.py b/src/agents/run.py index fce7b4840..d9db33d98 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -1285,6 +1285,14 @@ async def _start_streaming( if streamed_result.trace: streamed_result.trace.finish(reset_current=True) + # Ensure QueueCompleteSentinel is always put in the queue when the stream ends, + # even if an exception occurs before the inner try/except block (e.g., in + # _save_result_to_session at the beginning). Without this, stream_events() + # would hang forever waiting for more items. + if not streamed_result.is_complete: + streamed_result.is_complete = True + streamed_result._event_queue.put_nowait(QueueCompleteSentinel()) + @classmethod async def _run_single_turn_streamed( cls, diff --git a/tests/test_session.py b/tests/test_session.py index 40c0dc779..e0328056b 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -534,3 +534,33 @@ def add_item(item): expected = {f"Message {i}" for i in range(10)} assert contents == expected session.close() + + +@pytest.mark.asyncio +async def test_session_add_items_exception_propagates_in_streamed(): + """Test that exceptions from session.add_items are properly propagated + in run_streamed instead of causing the stream to hang forever. + Regression test for https://github.com/openai/openai-agents-python/issues/2130 + """ + session = SQLiteSession("test_exception_session") + + async def _failing_add_items(_items): + raise RuntimeError("Simulated session.add_items failure") + + session.add_items = _failing_add_items # type: ignore[method-assign] + + model = FakeModel() + agent = Agent(name="test", model=model) + model.set_next_output([get_text_message("This should not be reached")]) + + result = Runner.run_streamed(agent, "Hello", session=session) + + async def consume_stream(): + async for _event in result.stream_events(): + pass + + with pytest.raises(RuntimeError, match="Simulated session.add_items failure"): + # Timeout ensures test fails fast instead of hanging forever if bug regresses + await asyncio.wait_for(consume_stream(), timeout=5.0) + + session.close()