Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import asyncio
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any, Literal, cast

from typing_extensions import TypeVar

Expand Down Expand Up @@ -164,24 +164,61 @@ class RunResultStreaming(RunResultBase):
_output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
_stored_exception: Exception | None = field(default=None, repr=False)

# Soft cancel state
_cancel_mode: Literal["none", "immediate", "after_turn"] = field(default="none", repr=False)

@property
def last_agent(self) -> Agent[Any]:
"""The last agent that was run. Updates as the agent run progresses, so the true last agent
is only available after the agent run is complete.
"""
return self.current_agent

def cancel(self) -> None:
"""Cancels the streaming run, stopping all background tasks and marking the run as
complete."""
self._cleanup_tasks() # Cancel all running tasks
self.is_complete = True # Mark the run as complete to stop event streaming
def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
"""Cancel the streaming run.

# Optionally, clear the event queue to prevent processing stale events
while not self._event_queue.empty():
self._event_queue.get_nowait()
while not self._input_guardrail_queue.empty():
self._input_guardrail_queue.get_nowait()
Args:
mode: Cancellation strategy:
- "immediate": Stop immediately, cancel all tasks, clear queues (default)
- "after_turn": Complete current turn gracefully before stopping
* Allows LLM response to finish
* Executes pending tool calls
* Saves session state properly
* Tracks usage accurately
* Stops before next turn begins

Example:
```python
result = Runner.run_streamed(agent, "Task", session=session)

async for event in result.stream_events():
if user_interrupted():
result.cancel(mode="after_turn") # Graceful
# result.cancel() # Immediate (default)
```

Note: After calling cancel(), you should continue consuming stream_events()
to allow the cancellation to complete properly.
"""
# Store the cancel mode for the background task to check
self._cancel_mode = mode

if mode == "immediate":
# Existing behavior - immediate shutdown
self._cleanup_tasks() # Cancel all running tasks
self.is_complete = True # Mark the run as complete to stop event streaming

# Optionally, clear the event queue to prevent processing stale events
while not self._event_queue.empty():
self._event_queue.get_nowait()
while not self._input_guardrail_queue.empty():
self._input_guardrail_queue.get_nowait()

elif mode == "after_turn":
# Soft cancel - just set the flag
# The streaming loop will check this and stop gracefully
# Don't call _cleanup_tasks() or clear queues yet
pass

async def stream_events(self) -> AsyncIterator[StreamEvent]:
"""Stream deltas for new items as they are generated. We're using the types from the
Expand Down
32 changes: 32 additions & 0 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,12 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(session, starting_input, [])

while True:
# Check for soft cancel before starting new turn
if streamed_result._cancel_mode == "after_turn":
streamed_result.is_complete = True
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
break

if streamed_result.is_complete:
break

Expand Down Expand Up @@ -1026,13 +1032,33 @@ async def _start_streaming(
server_conversation_tracker.track_server_items(turn_result.model_response)

if isinstance(turn_result.next_step, NextStepHandoff):
# Save the conversation to session if enabled (before handoff)
# Note: Non-streaming path doesn't save handoff turns immediately,
# but streaming needs to for graceful cancellation support
if session is not None:
should_skip_session_save = (
await AgentRunner._input_guardrail_tripwire_triggered_for_stream(
streamed_result
)
)
if should_skip_session_save is False:
await AgentRunner._save_result_to_session(
session, [], turn_result.new_step_items
)

current_agent = turn_result.next_step.new_agent
current_span.finish(reset_current=True)
current_span = None
should_run_agent_start_hooks = True
streamed_result._event_queue.put_nowait(
AgentUpdatedStreamEvent(new_agent=current_agent)
)

# Check for soft cancel after handoff
if streamed_result._cancel_mode == "after_turn": # type: ignore[comparison-overlap]
streamed_result.is_complete = True
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
break
elif isinstance(turn_result.next_step, NextStepFinalOutput):
streamed_result._output_guardrails_task = asyncio.create_task(
cls._run_output_guardrails(
Expand Down Expand Up @@ -1078,6 +1104,12 @@ async def _start_streaming(
await AgentRunner._save_result_to_session(
session, [], turn_result.new_step_items
)

# Check for soft cancel after turn completion
if streamed_result._cancel_mode == "after_turn": # type: ignore[comparison-overlap]
streamed_result.is_complete = True
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
break
except AgentsException as exc:
streamed_result.is_complete = True
streamed_result._event_queue.put_nowait(QueueCompleteSentinel())
Expand Down
17 changes: 17 additions & 0 deletions tests/test_cancel_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,20 @@ async def test_cancel_cleans_up_resources():
assert result._input_guardrail_queue.empty(), (
"Input guardrail queue should be empty after cancel."
)


@pytest.mark.asyncio
async def test_cancel_immediate_mode_explicit():
"""Test explicit immediate mode behaves same as default."""
model = FakeModel()
agent = Agent(name="Joker", model=model)

result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")

async for _ in result.stream_events():
result.cancel(mode="immediate")
break

assert result.is_complete
assert result._event_queue.empty()
assert result._cancel_mode == "immediate"
Loading