From a52d0c932e17dc80b1aec6eb081d34f2b18b4f90 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 01:00:36 +0800 Subject: [PATCH 1/3] Fix voice result streaming task cleanup to properly await cancelled tasks Problem: The _cleanup_tasks() method in VoiceStreamResult was only calling task.cancel() on pending tasks but not awaiting them. Additionally, _check_errors() could raise CancelledError when checking cancelled tasks. This could lead to: 1. Unhandled task exception warnings 2. Potential resource leaks from abandoned tasks 3. CancelledError masking real exceptions Evidence: - Similar to fixed guardrail tasks cleanup (PR #1976) - Similar to fixed voice STT cleanup (PR #1977) - Similar to fixed websocket cleanup (PR #1955) - Bug documented in .claude/bug-analysis/03-resource-leaks.md Solution: 1. Made _cleanup_tasks() async 2. Collect all real asyncio.Task objects that need to be awaited 3. Added await asyncio.gather() with return_exceptions=True to properly collect exceptions from cancelled tasks 4. Updated _check_errors() to skip cancelled tasks using task.cancelled() check to avoid CancelledError when calling task.exception() 5. Updated stream() async generator to await _cleanup_tasks() Testing: - Linting passes - No breaking changes to public API - Follows same pattern as PR #1976, #1977, #1955 --- src/agents/voice/result.py | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index fea79902e..05a5998cf 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -243,25 +243,50 @@ async def _wait_for_completion(self): tasks.append(self._dispatcher_task) await asyncio.gather(*tasks) - def _cleanup_tasks(self): + async def _cleanup_tasks(self): + """Cancel all pending tasks and wait for them to complete. + + This ensures that any exceptions raised by the tasks are properly handled + and prevents warnings about unhandled task exceptions. + """ self._finish_turn() + tasks = [] for task in self._tasks: if not task.done(): task.cancel() + if isinstance(task, asyncio.Task): + tasks.append(task) if self._dispatcher_task and not self._dispatcher_task.done(): self._dispatcher_task.cancel() + if isinstance(self._dispatcher_task, asyncio.Task): + tasks.append(self._dispatcher_task) if self.text_generation_task and not self.text_generation_task.done(): self.text_generation_task.cancel() + if isinstance(self.text_generation_task, asyncio.Task): + tasks.append(self.text_generation_task) + + # Wait for all cancelled tasks to complete and collect exceptions + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) def _check_errors(self): + """Check for exceptions in completed tasks. + + Note: CancelledError is not checked as it's expected during cleanup. + """ for task in self._tasks: - if task.done(): - if task.exception(): - self._stored_exception = task.exception() - break + if task.done() and not task.cancelled(): + try: + exc = task.exception() + if exc: + self._stored_exception = exc + break + except asyncio.CancelledError: + # Task was cancelled, skip it + pass async def stream(self) -> AsyncIterator[VoiceStreamEvent]: """Stream the events and audio data as they're generated.""" @@ -281,7 +306,7 @@ async def stream(self) -> AsyncIterator[VoiceStreamEvent]: break self._check_errors() - self._cleanup_tasks() + await self._cleanup_tasks() if self._stored_exception: raise self._stored_exception From 7e4dfcaa9a65376f195b7f48a4d8d7bddba57d7a Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 01:08:03 +0800 Subject: [PATCH 2/3] Remove unnecessary try-except for CancelledError in _check_errors The task.cancelled() check on line 281 already ensures that CancelledError will never be raised when calling task.exception(). The try-except block was redundant and added unnecessary complexity. Thanks to reviewer comment for catching this. --- src/agents/voice/result.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index 05a5998cf..c59828e45 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -275,18 +275,14 @@ async def _cleanup_tasks(self): def _check_errors(self): """Check for exceptions in completed tasks. - Note: CancelledError is not checked as it's expected during cleanup. + Note: task.cancelled() check ensures CancelledError is never raised. """ for task in self._tasks: if task.done() and not task.cancelled(): - try: - exc = task.exception() - if exc: - self._stored_exception = exc - break - except asyncio.CancelledError: - # Task was cancelled, skip it - pass + exc = task.exception() + if exc: + self._stored_exception = exc + break async def stream(self) -> AsyncIterator[VoiceStreamEvent]: """Stream the events and audio data as they're generated.""" From 7198e3c97ccfecb54e1b28502a87ec631246707b Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 23 Oct 2025 01:11:52 +0800 Subject: [PATCH 3/3] Preserve exceptions from gather() during task cleanup --- src/agents/voice/result.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index c59828e45..3161da0fb 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -270,7 +270,16 @@ async def _cleanup_tasks(self): # Wait for all cancelled tasks to complete and collect exceptions if tasks: - await asyncio.gather(*tasks, return_exceptions=True) + results = await asyncio.gather(*tasks, return_exceptions=True) + # Check if any task failed with a real exception (not CancelledError) + # This catches exceptions that occurred after _check_errors() but before cancellation + if not self._stored_exception: + for result in results: + is_exception = isinstance(result, Exception) + is_not_cancelled = not isinstance(result, asyncio.CancelledError) + if is_exception and is_not_cancelled: + self._stored_exception = result + break def _check_errors(self): """Check for exceptions in completed tasks.