From f21f723526687d227362cfd644b9869b11728d6b Mon Sep 17 00:00:00 2001 From: Michael Sun Date: Wed, 6 Aug 2025 12:48:46 -0700 Subject: [PATCH] Change where tool request content event is sent --- .../lib/core/services/adk/providers/openai.py | 54 ++++++++++--------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/src/agentex/lib/core/services/adk/providers/openai.py b/src/agentex/lib/core/services/adk/providers/openai.py index 02604865..510823d6 100644 --- a/src/agentex/lib/core/services/adk/providers/openai.py +++ b/src/agentex/lib/core/services/adk/providers/openai.py @@ -10,6 +10,7 @@ from openai.types.responses import ( ResponseCompletedEvent, ResponseFunctionToolCall, + ResponseOutputItemAddedEvent, ResponseOutputItemDoneEvent, ResponseTextDeltaEvent, ) @@ -593,34 +594,11 @@ async def run_agent_streamed_auto_send( heartbeat_if_in_workflow( "processing stream event with auto send" ) - if event.type == "run_item_stream_event": if event.item.type == "tool_call_item": tool_call_item = event.item.raw_item tool_call_map[tool_call_item.call_id] = tool_call_item - tool_request_content = ToolRequestContent( - author="agent", - tool_call_id=tool_call_item.call_id, - name=tool_call_item.name, - arguments=json.loads(tool_call_item.arguments), - ) - - # Create tool request using streaming context (immediate completion) - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_request_content, - ) as streaming_context - ): - # The message has already been persisted, but we still need to send an upda - await streaming_context.stream_update( - update=StreamTaskMessageFull( - parent_task_message=streaming_context.task_message, - content=tool_request_content, - ), - ) - elif event.item.type == "tool_call_output_item": tool_output_item = event.item.raw_item @@ -649,8 +627,33 @@ async def run_agent_streamed_auto_send( ) elif event.type == "raw_response_event": - if isinstance(event.data, ResponseTextDeltaEvent): - # Handle text delta + if isinstance(event.data, ResponseOutputItemAddedEvent): + # Handle tool call initiation - stream tool request immediately when tool call starts + if (event.data.item.type == "function_call" and + hasattr(event.data.item, 'call_id') and + hasattr(event.data.item, 'name')): + + tool_request_content = ToolRequestContent( + author="agent", + tool_call_id=event.data.item.call_id, + name=event.data.item.name, + arguments={}, # Empty arguments at initiation time + ) + + # Create tool request using streaming context (immediate completion) + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=tool_request_content, + ) as streaming_context: + await streaming_context.stream_update( + update=StreamTaskMessageFull( + parent_task_message=streaming_context.task_message, + content=tool_request_content, + ), + ) + + elif isinstance(event.data, ResponseTextDeltaEvent): + # Handle text delta - skip tool argument deltas, only handle actual text responses item_id = event.data.item_id # Check if we already have a streaming context for this item @@ -701,7 +704,6 @@ async def run_agent_streamed_auto_send( ] await streaming_context.close() unclosed_item_ids.remove(item_id) - finally: # Cleanup: ensure all streaming contexts for this session are properly finished for item_id in unclosed_item_ids: