Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,14 @@ async def _start_streaming(
if streamed_result.trace:
streamed_result.trace.finish(reset_current=True)

# Ensure QueueCompleteSentinel is always put in the queue when the stream ends,
# even if an exception occurs before the inner try/except block (e.g., in
# _save_result_to_session at the beginning). Without this, stream_events()
# would hang forever waiting for more items.
if not streamed_result.is_complete:
streamed_result.is_complete = True
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())

@classmethod
async def _run_single_turn_streamed(
cls,
Expand Down
30 changes: 30 additions & 0 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,33 @@ def add_item(item):
expected = {f"Message {i}" for i in range(10)}
assert contents == expected
session.close()


@pytest.mark.asyncio
async def test_session_add_items_exception_propagates_in_streamed():
"""Test that exceptions from session.add_items are properly propagated
in run_streamed instead of causing the stream to hang forever.
Regression test for https://github.com/openai/openai-agents-python/issues/2130
"""
session = SQLiteSession("test_exception_session")

async def _failing_add_items(_items):
raise RuntimeError("Simulated session.add_items failure")

session.add_items = _failing_add_items # type: ignore[method-assign]

model = FakeModel()
agent = Agent(name="test", model=model)
model.set_next_output([get_text_message("This should not be reached")])

result = Runner.run_streamed(agent, "Hello", session=session)

async def consume_stream():
async for _event in result.stream_events():
pass

with pytest.raises(RuntimeError, match="Simulated session.add_items failure"):
# Timeout ensures test fails fast instead of hanging forever if bug regresses
await asyncio.wait_for(consume_stream(), timeout=5.0)

session.close()