From 5e6feaecbeb10ab2b953dc25883026a7843c89ce Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 00:53:28 +0800 Subject: [PATCH 1/3] Fix guardrail task cleanup to properly await cancelled tasks Problem: The _cleanup_guardrail_tasks() method in RealtimeSession was only calling task.cancel() on pending guardrail tasks but not awaiting them. This could lead to: 1. Unhandled task exception warnings 2. Potential memory leaks from abandoned tasks 3. Improper resource cleanup Evidence: - Test code in tests/realtime/test_session.py:1199 shows the correct pattern: await asyncio.gather(*session._guardrail_tasks, return_exceptions=True) - Similar pattern used in openai_realtime.py:519-523 for WebSocket task cleanup Solution: 1. Made _cleanup_guardrail_tasks() async 2. Added await asyncio.gather() for real asyncio.Task objects to properly collect exceptions (with isinstance check to support mock objects in tests) 3. Updated _cleanup() to await the cleanup method Testing: - Created comprehensive test suite in tests/realtime/test_guardrail_cleanup.py with 3 test cases: 1. Verify cancelled tasks are properly awaited 2. Verify exceptions during cleanup are handled 3. Verify multiple concurrent tasks are cleaned up - All new tests pass - All existing tests pass (838 passed, 3 skipped) - Note: test_issue_889_guardrail_tool_execution has 1 pre-existing failure unrelated to this PR (also fails on main) --- src/agents/realtime/session.py | 20 +- tests/realtime/test_guardrail_cleanup.py | 246 +++++++++++++++++++++++ 2 files changed, 264 insertions(+), 2 deletions(-) create mode 100644 tests/realtime/test_guardrail_cleanup.py diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index e10b48e53..55e8b7c2e 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -746,16 +746,32 @@ def _on_guardrail_task_done(self, task: asyncio.Task[Any]) -> None: ) ) - def _cleanup_guardrail_tasks(self) -> None: + async def _cleanup_guardrail_tasks(self) -> None: + """Cancel all pending guardrail tasks and wait for them to complete. + + This ensures that any exceptions raised by the tasks are properly handled + and prevents warnings about unhandled task exceptions. + """ + # Collect real asyncio.Task objects that need to be awaited + real_tasks = [] + for task in self._guardrail_tasks: if not task.done(): task.cancel() + # Only await real asyncio.Task objects (not mocks in tests) + if isinstance(task, asyncio.Task): + real_tasks.append(task) + + # Wait for all real tasks to complete and collect any exceptions + if real_tasks: + await asyncio.gather(*real_tasks, return_exceptions=True) + self._guardrail_tasks.clear() async def _cleanup(self) -> None: """Clean up all resources and mark session as closed.""" # Cancel and cleanup guardrail tasks - self._cleanup_guardrail_tasks() + await self._cleanup_guardrail_tasks() # Remove ourselves as a listener self._model.remove_listener(self) diff --git a/tests/realtime/test_guardrail_cleanup.py b/tests/realtime/test_guardrail_cleanup.py new file mode 100644 index 000000000..f7655a6c5 --- /dev/null +++ b/tests/realtime/test_guardrail_cleanup.py @@ -0,0 +1,246 @@ +"""Test guardrail task cleanup to ensure proper exception handling. + +This test verifies the fix for the bug where _cleanup_guardrail_tasks() was not +properly awaiting cancelled tasks, which could lead to unhandled task exceptions +and potential memory leaks. +""" + +import asyncio +from unittest.mock import AsyncMock, Mock, PropertyMock + +import pytest + +from agents.guardrail import GuardrailFunctionOutput, OutputGuardrail +from agents.realtime import RealtimeSession +from agents.realtime.agent import RealtimeAgent +from agents.realtime.config import RealtimeRunConfig +from agents.realtime.model import RealtimeModel +from agents.realtime.model_events import RealtimeModelTranscriptDeltaEvent + + +class MockRealtimeModel(RealtimeModel): + """Mock realtime model for testing.""" + + def __init__(self): + super().__init__() + self.listeners = [] + self.connect_called = False + self.close_called = False + self.sent_events = [] + self.sent_messages = [] + self.sent_audio = [] + self.sent_tool_outputs = [] + self.interrupts_called = 0 + + async def connect(self, options=None): + self.connect_called = True + + def add_listener(self, listener): + self.listeners.append(listener) + + def remove_listener(self, listener): + if listener in self.listeners: + self.listeners.remove(listener) + + async def send_event(self, event): + from agents.realtime.model_inputs import ( + RealtimeModelSendAudio, + RealtimeModelSendInterrupt, + RealtimeModelSendToolOutput, + RealtimeModelSendUserInput, + ) + + self.sent_events.append(event) + + # Update legacy tracking for compatibility + if isinstance(event, RealtimeModelSendUserInput): + self.sent_messages.append(event.user_input) + elif isinstance(event, RealtimeModelSendAudio): + self.sent_audio.append((event.audio, event.commit)) + elif isinstance(event, RealtimeModelSendToolOutput): + self.sent_tool_outputs.append((event.tool_call, event.output, event.start_response)) + elif isinstance(event, RealtimeModelSendInterrupt): + self.interrupts_called += 1 + + async def close(self): + self.close_called = True + + +@pytest.fixture +def mock_model(): + return MockRealtimeModel() + + +@pytest.fixture +def mock_agent(): + agent = Mock(spec=RealtimeAgent) + agent.name = "test_agent" + agent.get_all_tools = AsyncMock(return_value=[]) + type(agent).handoffs = PropertyMock(return_value=[]) + type(agent).output_guardrails = PropertyMock(return_value=[]) + return agent + + +@pytest.mark.asyncio +async def test_guardrail_task_cleanup_awaits_cancelled_tasks(mock_model, mock_agent): + """Test that cleanup properly awaits cancelled guardrail tasks. + + This test verifies that when guardrail tasks are cancelled during cleanup, + the cleanup method properly awaits them to completion using asyncio.gather() + with return_exceptions=True. This ensures: + 1. No warnings about unhandled task exceptions + 2. Proper resource cleanup + 3. No memory leaks from abandoned tasks + """ + + # Create a guardrail that runs a long async operation + task_started = asyncio.Event() + task_cancelled = asyncio.Event() + + async def slow_guardrail_func(context, agent, output): + """A guardrail that takes time to execute.""" + task_started.set() + try: + # Simulate a long-running operation + await asyncio.sleep(10) + return GuardrailFunctionOutput(output_info={}, tripwire_triggered=False) + except asyncio.CancelledError: + task_cancelled.set() + raise + + guardrail = OutputGuardrail(guardrail_function=slow_guardrail_func, name="slow_guardrail") + + run_config: RealtimeRunConfig = { + "output_guardrails": [guardrail], + "guardrails_settings": {"debounce_text_length": 5}, + } + + session = RealtimeSession(mock_model, mock_agent, None, run_config=run_config) + + # Trigger a guardrail by sending a transcript delta + transcript_event = RealtimeModelTranscriptDeltaEvent( + item_id="item_1", delta="hello world", response_id="resp_1" + ) + + await session.on_event(transcript_event) + + # Wait for the guardrail task to start + await asyncio.wait_for(task_started.wait(), timeout=1.0) + + # Verify a guardrail task was created + assert len(session._guardrail_tasks) == 1 + task = list(session._guardrail_tasks)[0] + assert not task.done() + + # Now cleanup the session - this should cancel and await the task + await session._cleanup_guardrail_tasks() + + # Verify the task was cancelled and properly awaited + assert task_cancelled.is_set(), "Task should have received CancelledError" + assert len(session._guardrail_tasks) == 0, "Tasks list should be cleared" + + # No warnings should be raised about unhandled task exceptions + + +@pytest.mark.asyncio +async def test_guardrail_task_cleanup_with_exception(mock_model, mock_agent): + """Test that cleanup handles guardrail tasks that raise exceptions. + + This test verifies that if a guardrail task raises an exception (not just + CancelledError), the cleanup method still completes successfully and doesn't + propagate the exception, thanks to return_exceptions=True. + """ + + task_started = asyncio.Event() + exception_raised = asyncio.Event() + + async def failing_guardrail_func(context, agent, output): + """A guardrail that raises an exception.""" + task_started.set() + try: + await asyncio.sleep(10) + return GuardrailFunctionOutput(output_info={}, tripwire_triggered=False) + except asyncio.CancelledError as e: + exception_raised.set() + # Simulate an error during cleanup + raise RuntimeError("Cleanup error") from e + + guardrail = OutputGuardrail( + guardrail_function=failing_guardrail_func, name="failing_guardrail" + ) + + run_config: RealtimeRunConfig = { + "output_guardrails": [guardrail], + "guardrails_settings": {"debounce_text_length": 5}, + } + + session = RealtimeSession(mock_model, mock_agent, None, run_config=run_config) + + # Trigger a guardrail + transcript_event = RealtimeModelTranscriptDeltaEvent( + item_id="item_1", delta="hello world", response_id="resp_1" + ) + + await session.on_event(transcript_event) + + # Wait for the guardrail task to start + await asyncio.wait_for(task_started.wait(), timeout=1.0) + + # Cleanup should not raise the RuntimeError due to return_exceptions=True + await session._cleanup_guardrail_tasks() + + # Verify cleanup completed successfully + assert exception_raised.is_set() + assert len(session._guardrail_tasks) == 0 + + +@pytest.mark.asyncio +async def test_guardrail_task_cleanup_with_multiple_tasks(mock_model, mock_agent): + """Test cleanup with multiple pending guardrail tasks. + + This test verifies that cleanup properly handles multiple concurrent guardrail + tasks by triggering guardrails multiple times, then cancelling and awaiting all of them. + """ + + tasks_started = asyncio.Event() + tasks_cancelled = 0 + + async def slow_guardrail_func(context, agent, output): + nonlocal tasks_cancelled + tasks_started.set() + try: + await asyncio.sleep(10) + return GuardrailFunctionOutput(output_info={}, tripwire_triggered=False) + except asyncio.CancelledError: + tasks_cancelled += 1 + raise + + guardrail = OutputGuardrail(guardrail_function=slow_guardrail_func, name="slow_guardrail") + + run_config: RealtimeRunConfig = { + "output_guardrails": [guardrail], + "guardrails_settings": {"debounce_text_length": 5}, + } + + session = RealtimeSession(mock_model, mock_agent, None, run_config=run_config) + + # Trigger guardrails multiple times to create multiple tasks + for i in range(3): + transcript_event = RealtimeModelTranscriptDeltaEvent( + item_id=f"item_{i}", delta="hello world", response_id=f"resp_{i}" + ) + await session.on_event(transcript_event) + + # Wait for at least one task to start + await asyncio.wait_for(tasks_started.wait(), timeout=1.0) + + # Should have at least one guardrail task + initial_task_count = len(session._guardrail_tasks) + assert initial_task_count >= 1, "At least one guardrail task should exist" + + # Cleanup should cancel and await all tasks + await session._cleanup_guardrail_tasks() + + # Verify all tasks were cancelled and cleared + assert tasks_cancelled >= 1, "At least one task should have been cancelled" + assert len(session._guardrail_tasks) == 0 From d64ecfe2075e88dfe4d448ea308299ad631906fb Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 13:06:33 +0800 Subject: [PATCH 2/3] Remove unnecessary type check in guardrail cleanup Simplified the cleanup logic by removing the isinstance check for asyncio.Task. Since _guardrail_tasks only contains real tasks created through asyncio.create_task(), this type check is unnecessary. This makes the code cleaner and avoids test-specific workarounds in production code. --- src/agents/realtime/session.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index 55e8b7c2e..0b8fbac3d 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -752,19 +752,13 @@ async def _cleanup_guardrail_tasks(self) -> None: This ensures that any exceptions raised by the tasks are properly handled and prevents warnings about unhandled task exceptions. """ - # Collect real asyncio.Task objects that need to be awaited - real_tasks = [] - for task in self._guardrail_tasks: if not task.done(): task.cancel() - # Only await real asyncio.Task objects (not mocks in tests) - if isinstance(task, asyncio.Task): - real_tasks.append(task) - # Wait for all real tasks to complete and collect any exceptions - if real_tasks: - await asyncio.gather(*real_tasks, return_exceptions=True) + # Wait for all tasks to complete and collect any exceptions + if self._guardrail_tasks: + await asyncio.gather(*self._guardrail_tasks, return_exceptions=True) self._guardrail_tasks.clear() From 5534151e3c1bba9b59780289e51d4635cda7bb71 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 13:20:31 +0800 Subject: [PATCH 3/3] Fix test to use real asyncio tasks instead of mocks Changed test_exception_during_guardrail_processing to use real asyncio.Task objects instead of Mock objects. This allows the production code to remain simple and clean, without needing isinstance checks to handle test-specific mocks. The test now creates actual tasks using asyncio.create_task() which better reflects real-world usage and naturally works with the cleanup logic that uses asyncio.gather(). --- tests/realtime/test_guardrail_cleanup.py | 4 +--- tests/test_session_exceptions.py | 25 ++++++++++++++---------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/realtime/test_guardrail_cleanup.py b/tests/realtime/test_guardrail_cleanup.py index f7655a6c5..943c4ccb5 100644 --- a/tests/realtime/test_guardrail_cleanup.py +++ b/tests/realtime/test_guardrail_cleanup.py @@ -165,9 +165,7 @@ async def failing_guardrail_func(context, agent, output): # Simulate an error during cleanup raise RuntimeError("Cleanup error") from e - guardrail = OutputGuardrail( - guardrail_function=failing_guardrail_func, name="failing_guardrail" - ) + guardrail = OutputGuardrail(guardrail_function=failing_guardrail_func, name="failing_guardrail") run_config: RealtimeRunConfig = { "output_guardrails": [guardrail], diff --git a/tests/test_session_exceptions.py b/tests/test_session_exceptions.py index da9390236..96639b709 100644 --- a/tests/test_session_exceptions.py +++ b/tests/test_session_exceptions.py @@ -249,16 +249,21 @@ async def test_exception_during_guardrail_processing( session = RealtimeSession(fake_model, fake_agent, None) - # Add some fake guardrail tasks - fake_task1 = Mock() - fake_task1.done.return_value = False - fake_task1.cancel = Mock() + # Create real async tasks for testing cleanup + async def long_running_task(): + await asyncio.sleep(10) - fake_task2 = Mock() - fake_task2.done.return_value = True - fake_task2.cancel = Mock() + async def completed_task(): + pass - session._guardrail_tasks = {fake_task1, fake_task2} + # Create tasks + task1 = asyncio.create_task(long_running_task()) + task2 = asyncio.create_task(completed_task()) + + # Wait for task2 to complete + await task2 + + session._guardrail_tasks = {task1, task2} fake_model.set_next_events([exception_event]) @@ -268,8 +273,8 @@ async def test_exception_during_guardrail_processing( pass # Verify guardrail tasks were properly cleaned up - fake_task1.cancel.assert_called_once() - fake_task2.cancel.assert_not_called() # Already done + assert task1.cancelled() # Should be cancelled + assert task2.done() # Was already done assert len(session._guardrail_tasks) == 0 @pytest.mark.asyncio