Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,26 +599,26 @@ async def run(
)

if current_turn == 1:
input_guardrail_results, turn_result = await asyncio.gather(
self._run_input_guardrails(
starting_agent,
starting_agent.input_guardrails
+ (run_config.input_guardrails or []),
_copy_str_or_list(prepared_input),
context_wrapper,
),
self._run_single_turn(
agent=current_agent,
all_tools=all_tools,
original_input=original_input,
generated_items=generated_items,
hooks=hooks,
context_wrapper=context_wrapper,
run_config=run_config,
should_run_agent_start_hooks=should_run_agent_start_hooks,
tool_use_tracker=tool_use_tracker,
server_conversation_tracker=server_conversation_tracker,
),
# Run input guardrails first, before starting the agent turn.
# This prevents tools from executing when guardrails should block the input.
input_guardrail_results = await self._run_input_guardrails(
starting_agent,
starting_agent.input_guardrails + (run_config.input_guardrails or []),
_copy_str_or_list(prepared_input),
context_wrapper,
)
# Only proceed with the agent turn if guardrails passed
turn_result = await self._run_single_turn(
agent=current_agent,
all_tools=all_tools,
original_input=original_input,
generated_items=generated_items,
hooks=hooks,
context_wrapper=context_wrapper,
run_config=run_config,
should_run_agent_start_hooks=should_run_agent_start_hooks,
tool_use_tracker=tool_use_tracker,
server_conversation_tracker=server_conversation_tracker,
)
else:
turn_result = await self._run_single_turn(
Expand Down Expand Up @@ -888,6 +888,9 @@ async def _run_input_guardrails_with_queue(
for done in asyncio.as_completed(guardrail_tasks):
result = await done
if result.output.tripwire_triggered:
# Cancel remaining tasks
for t in guardrail_tasks:
t.cancel()
_error_tracing.attach_error_to_span(
parent_span,
SpanError(
Expand All @@ -898,6 +901,10 @@ async def _run_input_guardrails_with_queue(
},
),
)
queue.put_nowait(result)
streamed_result.input_guardrail_results = [result] + guardrail_results
# Raise the exception immediately to prevent the agent turn from starting
raise InputGuardrailTripwireTriggered(result)
queue.put_nowait(result)
guardrail_results.append(result)
except Exception:
Expand Down Expand Up @@ -997,16 +1004,16 @@ async def _start_streaming(
break

if current_turn == 1:
# Run the input guardrails in the background and put the results on the queue
streamed_result._input_guardrails_task = asyncio.create_task(
cls._run_input_guardrails_with_queue(
starting_agent,
starting_agent.input_guardrails + (run_config.input_guardrails or []),
ItemHelpers.input_to_new_input_list(prepared_input),
context_wrapper,
streamed_result,
current_span,
)
# Run input guardrails first, before starting the streamed agent turn.
# This prevents tools from executing when guardrails should block the input.
# We await this to ensure guardrails complete before the agent starts.
await cls._run_input_guardrails_with_queue(
starting_agent,
starting_agent.input_guardrails + (run_config.input_guardrails or []),
ItemHelpers.input_to_new_input_list(prepared_input),
context_wrapper,
streamed_result,
current_span,
)
Comment on lines 1006 to 1017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guardrail tripwire causes stream_events hang

Awaiting _run_input_guardrails_with_queue means a tripwire now raises before anything is queued on the streaming event queue. If a caller starts iterating RunResultStreaming.stream_events() while guardrails are still running, the iterator runs _check_errors() once (no error yet) and then awaits self._event_queue.get(). When the guardrail subsequently raises, _start_streaming exits without ever enqueuing a QueueCompleteSentinel or any event, so the consumer remains blocked forever and never receives the InputGuardrailTripwireTriggered exception even though _run_impl_task has failed. Consider notifying the event queue or setting is_complete before raising so that the stream can terminate instead of deadlocking.

Useful? React with 👍 / 👎.

try:
turn_result = await cls._run_single_turn_streamed(
Expand Down