From e64c62b5e967ebb1f30432d20948cc2020f88dc0 Mon Sep 17 00:00:00 2001 From: Rishav C Date: Wed, 10 Sep 2025 15:41:24 -0400 Subject: [PATCH] feat: add previous_response_id parameter to OpenAI module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for previous_response_id parameter to all OpenAI module run methods, following the same implementation pattern as the existing max_turns parameter. Changes: - Add previous_response_id parameter to all 4 OpenAI module run methods - Update parameter classes in openai_activities.py to inherit the parameter - Implement conditional logic in service layer to handle parameter combinations - Add comprehensive unit test to verify parameter passing behavior The parameter flows through the following layers: Module → Activity → Service → Runner.run This enables conversation continuity by allowing users to specify a previous response ID when running OpenAI agents. --- .../lib/adk/providers/_modules/openai.py | 32 +- .../lib/core/services/adk/providers/openai.py | 364 ++++++++---------- .../adk/providers/openai_activities.py | 4 + .../adk/providers/test_openai_activities.py | 54 +++ 4 files changed, 229 insertions(+), 225 deletions(-) diff --git a/src/agentex/lib/adk/providers/_modules/openai.py b/src/agentex/lib/adk/providers/_modules/openai.py index a0a59c04..6ad4566d 100644 --- a/src/agentex/lib/adk/providers/_modules/openai.py +++ b/src/agentex/lib/adk/providers/_modules/openai.py @@ -81,14 +81,13 @@ async def run_agent( tools: list[Tool] | None = None, output_type: type[Any] | AgentOutputSchemaBase | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> SerializableRunResult | RunResult: """ Run an agent without streaming or TaskMessage creation. @@ -116,6 +115,7 @@ async def run_agent( input_guardrails: Optional list of input guardrails to run on initial user input. output_guardrails: Optional list of output guardrails to run on final agent output. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. + previous_response_id: Optional previous response ID for conversation continuity. Returns: Union[SerializableRunResult, RunResult]: SerializableRunResult when in Temporal, RunResult otherwise. @@ -139,6 +139,7 @@ async def run_agent( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) return await ActivityHelpers.execute_activity( activity_name=OpenAIActivityName.RUN_AGENT, @@ -167,6 +168,7 @@ async def run_agent( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) async def run_agent_auto_send( @@ -188,14 +190,13 @@ async def run_agent_auto_send( tools: list[Tool] | None = None, output_type: type[Any] | AgentOutputSchemaBase | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> SerializableRunResult | RunResult: """ Run an agent with automatic TaskMessage creation. @@ -222,6 +223,7 @@ async def run_agent_auto_send( input_guardrails: Optional list of input guardrails to run on initial user input. output_guardrails: Optional list of output guardrails to run on final agent output. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. + previous_response_id: Optional previous response ID for conversation continuity. Returns: Union[SerializableRunResult, RunResult]: SerializableRunResult when in Temporal, RunResult otherwise. @@ -246,6 +248,7 @@ async def run_agent_auto_send( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) return await ActivityHelpers.execute_activity( activity_name=OpenAIActivityName.RUN_AGENT_AUTO_SEND, @@ -275,6 +278,7 @@ async def run_agent_auto_send( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) async def run_agent_streamed( @@ -292,14 +296,13 @@ async def run_agent_streamed( tools: list[Tool] | None = None, output_type: type[Any] | AgentOutputSchemaBase | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> RunResultStreaming: """ Run an agent with streaming enabled but no TaskMessage creation. @@ -330,6 +333,7 @@ async def run_agent_streamed( input_guardrails: Optional list of input guardrails to run on initial user input. output_guardrails: Optional list of output guardrails to run on final agent output. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. + previous_response_id: Optional previous response ID for conversation continuity. Returns: RunResultStreaming: The result of the agent run with streaming. @@ -363,6 +367,7 @@ async def run_agent_streamed( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) async def run_agent_streamed_auto_send( @@ -384,14 +389,13 @@ async def run_agent_streamed_auto_send( tools: list[Tool] | None = None, output_type: type[Any] | AgentOutputSchemaBase | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> SerializableRunResultStreaming | RunResultStreaming: """ Run an agent with streaming enabled and automatic TaskMessage creation. @@ -418,6 +422,7 @@ async def run_agent_streamed_auto_send( tool_use_behavior: Optional tool use behavior. mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. + previous_response_id: Optional previous response ID for conversation continuity. Returns: Union[SerializableRunResultStreaming, RunResultStreaming]: SerializableRunResultStreaming when in Temporal, RunResultStreaming otherwise. @@ -441,7 +446,7 @@ async def run_agent_streamed_auto_send( mcp_timeout_seconds=mcp_timeout_seconds, input_guardrails=input_guardrails, output_guardrails=output_guardrails, - max_turns=max_turns + max_turns=max_turns, ) return await ActivityHelpers.execute_activity( activity_name=OpenAIActivityName.RUN_AGENT_STREAMED_AUTO_SEND, @@ -471,4 +476,5 @@ async def run_agent_streamed_auto_send( input_guardrails=input_guardrails, output_guardrails=output_guardrails, max_turns=max_turns, + previous_response_id=previous_response_id, ) diff --git a/src/agentex/lib/core/services/adk/providers/openai.py b/src/agentex/lib/core/services/adk/providers/openai.py index 8ed6d9c2..31631a94 100644 --- a/src/agentex/lib/core/services/adk/providers/openai.py +++ b/src/agentex/lib/core/services/adk/providers/openai.py @@ -85,9 +85,7 @@ def __init__( self.streaming_service = streaming_service self.tracer = tracer - def _extract_tool_call_info( - self, tool_call_item: Any - ) -> tuple[str, str, dict[str, Any]]: + def _extract_tool_call_info(self, tool_call_item: Any) -> tuple[str, str, dict[str, Any]]: """ Extract call_id, tool_name, and tool_arguments from a tool call item. @@ -99,9 +97,9 @@ def _extract_tool_call_info( """ # Generic handling for different tool call types # Try 'call_id' first, then 'id', then generate placeholder - if hasattr(tool_call_item, 'call_id'): + if hasattr(tool_call_item, "call_id"): call_id = tool_call_item.call_id - elif hasattr(tool_call_item, 'id'): + elif hasattr(tool_call_item, "id"): call_id = tool_call_item.id else: call_id = f"unknown_call_{id(tool_call_item)}" @@ -113,26 +111,18 @@ def _extract_tool_call_info( if isinstance(tool_call_item, ResponseFunctionWebSearch): tool_name = "web_search" - tool_arguments = { - "action": tool_call_item.action.model_dump(), - "status": tool_call_item.status - } + tool_arguments = {"action": tool_call_item.action.model_dump(), "status": tool_call_item.status} elif isinstance(tool_call_item, ResponseCodeInterpreterToolCall): tool_name = "code_interpreter" - tool_arguments = { - "code": tool_call_item.code, - "status": tool_call_item.status - } + tool_arguments = {"code": tool_call_item.code, "status": tool_call_item.status} else: # Generic handling for any tool call type - tool_name = getattr(tool_call_item, 'name', type(tool_call_item).__name__) + tool_name = getattr(tool_call_item, "name", type(tool_call_item).__name__) tool_arguments = tool_call_item.model_dump() - + return call_id, tool_name, tool_arguments - def _extract_tool_response_info( - self, tool_call_map: dict[str, Any], tool_output_item: Any - ) -> tuple[str, str, str]: + def _extract_tool_response_info(self, tool_call_map: dict[str, Any], tool_output_item: Any) -> tuple[str, str, str]: """ Extract call_id, tool_name, and content from a tool output item. @@ -145,14 +135,14 @@ def _extract_tool_response_info( """ # Extract call_id and content from the tool_output_item # Handle both dictionary access and attribute access - if hasattr(tool_output_item, 'get') and callable(tool_output_item.get): + if hasattr(tool_output_item, "get") and callable(tool_output_item.get): # Dictionary-like access call_id = tool_output_item["call_id"] content = tool_output_item["output"] else: # Attribute access for structured objects - call_id = getattr(tool_output_item, 'call_id', None) - content = getattr(tool_output_item, 'output', None) + call_id = getattr(tool_output_item, "call_id", None) + content = getattr(tool_output_item, "output", None) # Get the name from the tool call map using generic approach tool_call = tool_call_map[call_id] @@ -180,14 +170,13 @@ async def run_agent( tools: list[BaseModel] | None = None, output_type: type[Any] | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> RunResult: """ Run an agent without streaming or TaskMessage creation. @@ -206,11 +195,11 @@ async def run_agent( tools: Optional list of tools. output_type: Optional output type. tool_use_behavior: Optional tool use behavior. - mcp_timeout_seconds: Optional param to set the timeout threshold + mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. - input_guardrails: Optional list of input guardrails to run on + input_guardrails: Optional list of input guardrails to run on initial user input. - output_guardrails: Optional list of output guardrails to run on + output_guardrails: Optional list of output guardrails to run on final agent output. mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. @@ -240,17 +229,9 @@ async def run_agent( ) as span: heartbeat_if_in_workflow("run agent") - async with mcp_server_context( - mcp_server_params, mcp_timeout_seconds - ) as servers: - tools = [ - tool.to_oai_function_tool()for tool in tools - ] if tools else [] - handoffs = ( - [Agent(**handoff.model_dump()) for handoff in handoffs] - if handoffs - else [] - ) + async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers: + tools = [tool.to_oai_function_tool() for tool in tools] if tools else [] + handoffs = [Agent(**handoff.model_dump()) for handoff in handoffs] if handoffs else [] agent_kwargs = { "name": agent_name, @@ -264,9 +245,7 @@ async def run_agent( "tool_use_behavior": tool_use_behavior, } if model_settings is not None: - agent_kwargs["model_settings"] = ( - model_settings.to_oai_model_settings() - ) + agent_kwargs["model_settings"] = model_settings.to_oai_model_settings() if input_guardrails is not None: agent_kwargs["input_guardrails"] = input_guardrails if output_guardrails is not None: @@ -275,17 +254,26 @@ async def run_agent( agent = Agent(**agent_kwargs) # Run without streaming - if max_turns is not None: + if max_turns is not None and previous_response_id is not None: + result = await Runner.run( + starting_agent=agent, + input=input_list, + max_turns=max_turns, + previous_response_id=previous_response_id, + ) + elif max_turns is not None: result = await Runner.run(starting_agent=agent, input=input_list, max_turns=max_turns) + elif previous_response_id is not None: + result = await Runner.run( + starting_agent=agent, input=input_list, previous_response_id=previous_response_id + ) else: result = await Runner.run(starting_agent=agent, input=input_list) if span: span.output = { "new_items": [ - item.raw_item.model_dump() - if isinstance(item.raw_item, BaseModel) - else item.raw_item + item.raw_item.model_dump() if isinstance(item.raw_item, BaseModel) else item.raw_item for item in result.new_items ], "final_output": result.final_output, @@ -309,14 +297,13 @@ async def run_agent_auto_send( tools: list[BaseModel] | None = None, output_type: type[Any] | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> RunResult: """ Run an agent with automatic TaskMessage creation. @@ -372,15 +359,9 @@ async def run_agent_auto_send( ) as span: heartbeat_if_in_workflow("run agent auto send") - async with mcp_server_context( - mcp_server_params, mcp_timeout_seconds - ) as servers: + async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers: tools = [tool.to_oai_function_tool() for tool in tools] if tools else [] - handoffs = ( - [Agent(**handoff.model_dump()) for handoff in handoffs] - if handoffs - else [] - ) + handoffs = [Agent(**handoff.model_dump()) for handoff in handoffs] if handoffs else [] agent_kwargs = { "name": agent_name, "instructions": agent_instructions, @@ -393,9 +374,7 @@ async def run_agent_auto_send( "tool_use_behavior": tool_use_behavior, } if model_settings is not None: - agent_kwargs["model_settings"] = ( - model_settings.to_oai_model_settings() - ) + agent_kwargs["model_settings"] = model_settings.to_oai_model_settings() if input_guardrails is not None: agent_kwargs["input_guardrails"] = input_guardrails if output_guardrails is not None: @@ -404,17 +383,26 @@ async def run_agent_auto_send( agent = Agent(**agent_kwargs) # Run without streaming - if max_turns is not None: + if max_turns is not None and previous_response_id is not None: + result = await Runner.run( + starting_agent=agent, + input=input_list, + max_turns=max_turns, + previous_response_id=previous_response_id, + ) + elif max_turns is not None: result = await Runner.run(starting_agent=agent, input=input_list, max_turns=max_turns) + elif previous_response_id is not None: + result = await Runner.run( + starting_agent=agent, input=input_list, previous_response_id=previous_response_id + ) else: result = await Runner.run(starting_agent=agent, input=input_list) if span: span.output = { "new_items": [ - item.raw_item.model_dump() - if isinstance(item.raw_item, BaseModel) - else item.raw_item + item.raw_item.model_dump() if isinstance(item.raw_item, BaseModel) else item.raw_item for item in result.new_items ], "final_output": result.final_output, @@ -429,12 +417,10 @@ async def run_agent_auto_send( content=item.raw_item.content[0].text, ) # Create message for the final result using streaming context - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=text_content, - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=text_content, + ) as streaming_context: await streaming_context.stream_update( update=StreamTaskMessageFull( parent_task_message=streaming_context.task_message, @@ -445,7 +431,7 @@ async def run_agent_auto_send( elif item.type == "tool_call_item": tool_call_item = item.raw_item - + # Extract tool call information using the helper method call_id, tool_name, tool_arguments = self._extract_tool_call_info(tool_call_item) tool_call_map[call_id] = tool_call_item @@ -458,12 +444,10 @@ async def run_agent_auto_send( ) # Create tool request using streaming context - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_request_content, - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=tool_request_content, + ) as streaming_context: await streaming_context.stream_update( update=StreamTaskMessageFull( parent_task_message=streaming_context.task_message, @@ -476,9 +460,7 @@ async def run_agent_auto_send( tool_output_item = item.raw_item # Extract tool response information using the helper method - call_id, tool_name, content = self._extract_tool_response_info( - tool_call_map, tool_output_item - ) + call_id, tool_name, content = self._extract_tool_response_info(tool_call_map, tool_output_item) tool_response_content = ToolResponseContent( author="agent", @@ -487,12 +469,9 @@ async def run_agent_auto_send( content=content, ) # Create tool response using streaming context - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_response_content - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, initial_content=tool_response_content + ) as streaming_context: await streaming_context.stream_update( update=StreamTaskMessageFull( parent_task_message=streaming_context.task_message, @@ -519,14 +498,13 @@ async def run_agent_streamed( tools: list[BaseModel] | None = None, output_type: type[Any] | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> RunResultStreaming: """ Run an agent with streaming enabled but no TaskMessage creation. @@ -545,11 +523,11 @@ async def run_agent_streamed( tools: Optional list of tools. output_type: Optional output type. tool_use_behavior: Optional tool use behavior. - mcp_timeout_seconds: Optional param to set the timeout threshold + mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. - input_guardrails: Optional list of input guardrails to run on + input_guardrails: Optional list of input guardrails to run on initial user input. - output_guardrails: Optional list of output guardrails to run on + output_guardrails: Optional list of output guardrails to run on final agent output. mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. @@ -579,15 +557,9 @@ async def run_agent_streamed( ) as span: heartbeat_if_in_workflow("run agent streamed") - async with mcp_server_context( - mcp_server_params, mcp_timeout_seconds - ) as servers: + async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers: tools = [tool.to_oai_function_tool() for tool in tools] if tools else [] - handoffs = ( - [Agent(**handoff.model_dump()) for handoff in handoffs] - if handoffs - else [] - ) + handoffs = [Agent(**handoff.model_dump()) for handoff in handoffs] if handoffs else [] agent_kwargs = { "name": agent_name, "instructions": agent_instructions, @@ -600,9 +572,7 @@ async def run_agent_streamed( "tool_use_behavior": tool_use_behavior, } if model_settings is not None: - agent_kwargs["model_settings"] = ( - model_settings.to_oai_model_settings() - ) + agent_kwargs["model_settings"] = model_settings.to_oai_model_settings() if input_guardrails is not None: agent_kwargs["input_guardrails"] = input_guardrails if output_guardrails is not None: @@ -611,17 +581,26 @@ async def run_agent_streamed( agent = Agent(**agent_kwargs) # Run with streaming (but no TaskMessage creation) - if max_turns is not None: + if max_turns is not None and previous_response_id is not None: + result = Runner.run_streamed( + starting_agent=agent, + input=input_list, + max_turns=max_turns, + previous_response_id=previous_response_id, + ) + elif max_turns is not None: result = Runner.run_streamed(starting_agent=agent, input=input_list, max_turns=max_turns) + elif previous_response_id is not None: + result = Runner.run_streamed( + starting_agent=agent, input=input_list, previous_response_id=previous_response_id + ) else: result = Runner.run_streamed(starting_agent=agent, input=input_list) if span: span.output = { "new_items": [ - item.raw_item.model_dump() - if isinstance(item.raw_item, BaseModel) - else item.raw_item + item.raw_item.model_dump() if isinstance(item.raw_item, BaseModel) else item.raw_item for item in result.new_items ], "final_output": result.final_output, @@ -645,14 +624,13 @@ async def run_agent_streamed_auto_send( tools: list[BaseModel] | None = None, output_type: type[Any] | None = None, tool_use_behavior: ( - Literal["run_llm_again", "stop_on_first_tool"] - | StopAtTools - | ToolsToFinalOutputFunction + Literal["run_llm_again", "stop_on_first_tool"] | StopAtTools | ToolsToFinalOutputFunction ) = "run_llm_again", mcp_timeout_seconds: int | None = None, input_guardrails: list[InputGuardrail] | None = None, output_guardrails: list[OutputGuardrail] | None = None, max_turns: int | None = None, + previous_response_id: str | None = None, ) -> RunResultStreaming: """ Run an agent with streaming enabled and automatic TaskMessage creation. @@ -672,11 +650,11 @@ async def run_agent_streamed_auto_send( tools: Optional list of tools. output_type: Optional output type. tool_use_behavior: Optional tool use behavior. - mcp_timeout_seconds: Optional param to set the timeout threshold + mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. - input_guardrails: Optional list of input guardrails to run on + input_guardrails: Optional list of input guardrails to run on initial user input. - output_guardrails: Optional list of output guardrails to run on + output_guardrails: Optional list of output guardrails to run on final agent output. mcp_timeout_seconds: Optional param to set the timeout threshold for the MCP servers. Defaults to 5 seconds. max_turns: Maximum number of turns the agent can take. Uses Runner's default if None. @@ -715,15 +693,9 @@ async def run_agent_streamed_auto_send( ) as span: heartbeat_if_in_workflow("run agent streamed auto send") - async with mcp_server_context( - mcp_server_params, mcp_timeout_seconds - ) as servers: + async with mcp_server_context(mcp_server_params, mcp_timeout_seconds) as servers: tools = [tool.to_oai_function_tool() for tool in tools] if tools else [] - handoffs = ( - [Agent(**handoff.model_dump()) for handoff in handoffs] - if handoffs - else [] - ) + handoffs = [Agent(**handoff.model_dump()) for handoff in handoffs] if handoffs else [] agent_kwargs = { "name": agent_name, "instructions": agent_instructions, @@ -736,9 +708,7 @@ async def run_agent_streamed_auto_send( "tool_use_behavior": tool_use_behavior, } if model_settings is not None: - agent_kwargs["model_settings"] = ( - model_settings.to_oai_model_settings() - ) + agent_kwargs["model_settings"] = model_settings.to_oai_model_settings() if input_guardrails is not None: agent_kwargs["input_guardrails"] = input_guardrails if output_guardrails is not None: @@ -752,22 +722,18 @@ async def run_agent_streamed_auto_send( else: result = Runner.run_streamed(starting_agent=agent, input=input_list) - item_id_to_streaming_context: dict[ - str, StreamingTaskMessageContext - ] = {} + item_id_to_streaming_context: dict[str, StreamingTaskMessageContext] = {} unclosed_item_ids: set[str] = set() try: # Process streaming events with TaskMessage creation async for event in result.stream_events(): - heartbeat_if_in_workflow( - "processing stream event with auto send" - ) + heartbeat_if_in_workflow("processing stream event with auto send") if event.type == "run_item_stream_event": if event.item.type == "tool_call_item": tool_call_item = event.item.raw_item - + # Extract tool call information using the helper method call_id, tool_name, tool_arguments = self._extract_tool_call_info(tool_call_item) tool_call_map[call_id] = tool_call_item @@ -780,12 +746,10 @@ async def run_agent_streamed_auto_send( ) # Create tool request using streaming context (immediate completion) - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_request_content, - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=tool_request_content, + ) as streaming_context: # The message has already been persisted, but we still need to send an upda await streaming_context.stream_update( update=StreamTaskMessageFull( @@ -811,12 +775,9 @@ async def run_agent_streamed_auto_send( ) # Create tool response using streaming context (immediate completion) - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=tool_response_content - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, initial_content=tool_response_content + ) as streaming_context: # The message has already been persisted, but we still need to send an update await streaming_context.stream_update( update=StreamTaskMessageFull( @@ -842,14 +803,10 @@ async def run_agent_streamed_auto_send( ), ) # Open the streaming context - item_id_to_streaming_context[ - item_id - ] = await streaming_context.open() + item_id_to_streaming_context[item_id] = await streaming_context.open() unclosed_item_ids.add(item_id) else: - streaming_context = item_id_to_streaming_context[ - item_id - ] + streaming_context = item_id_to_streaming_context[item_id] # Stream the delta through the streaming service await streaming_context.stream_update( @@ -879,14 +836,10 @@ async def run_agent_streamed_auto_send( ), ) # Open the streaming context - item_id_to_streaming_context[ - item_id - ] = await streaming_context.open() + item_id_to_streaming_context[item_id] = await streaming_context.open() unclosed_item_ids.add(item_id) else: - streaming_context = item_id_to_streaming_context[ - item_id - ] + streaming_context = item_id_to_streaming_context[item_id] # Stream the summary delta through the streaming service await streaming_context.stream_update( @@ -920,14 +873,10 @@ async def run_agent_streamed_auto_send( ), ) # Open the streaming context - item_id_to_streaming_context[ - item_id - ] = await streaming_context.open() + item_id_to_streaming_context[item_id] = await streaming_context.open() unclosed_item_ids.add(item_id) else: - streaming_context = item_id_to_streaming_context[ - item_id - ] + streaming_context = item_id_to_streaming_context[item_id] # Stream the content delta through the streaming service await streaming_context.stream_update( @@ -955,7 +904,7 @@ async def run_agent_streamed_auto_send( # to close the streaming context, but they do!!! # They output both a ResponseReasoningSummaryTextDoneEvent and a ResponseReasoningSummaryPartDoneEvent # I have no idea why they do this. - + elif isinstance(event.data, ResponseReasoningTextDoneEvent): # Handle reasoning content text completion item_id = event.data.item_id @@ -965,16 +914,13 @@ async def run_agent_streamed_auto_send( # reasoning content texts. The context will be closed when the entire # output item is done (ResponseOutputItemDoneEvent) - elif isinstance(event.data, ResponseOutputItemDoneEvent): # Handle item completion item_id = event.data.item.id # Finish the streaming context (sends DONE event and updates message) if item_id in item_id_to_streaming_context: - streaming_context = item_id_to_streaming_context[ - item_id - ] + streaming_context = item_id_to_streaming_context[item_id] await streaming_context.close() if item_id in unclosed_item_ids: unclosed_item_ids.remove(item_id) @@ -984,39 +930,36 @@ async def run_agent_streamed_auto_send( # Create a copy to avoid modifying set during iteration remaining_items = list(unclosed_item_ids) for item_id in remaining_items: - if (item_id in unclosed_item_ids and - item_id in item_id_to_streaming_context): # Check if still unclosed - streaming_context = item_id_to_streaming_context[ - item_id - ] + if ( + item_id in unclosed_item_ids and item_id in item_id_to_streaming_context + ): # Check if still unclosed + streaming_context = item_id_to_streaming_context[item_id] await streaming_context.close() unclosed_item_ids.discard(item_id) except InputGuardrailTripwireTriggered as e: # Handle guardrail trigger by sending a rejection message rejection_message = "I'm sorry, but I cannot process this request due to a guardrail. Please try a different question." - + # Try to extract rejection message from the guardrail result - if hasattr(e, 'guardrail_result') and hasattr(e.guardrail_result, 'output'): - output_info = getattr(e.guardrail_result.output, 'output_info', {}) - if isinstance(output_info, dict) and 'rejection_message' in output_info: - rejection_message = output_info['rejection_message'] - elif hasattr(e.guardrail_result, 'guardrail'): + if hasattr(e, "guardrail_result") and hasattr(e.guardrail_result, "output"): + output_info = getattr(e.guardrail_result.output, "output_info", {}) + if isinstance(output_info, dict) and "rejection_message" in output_info: + rejection_message = output_info["rejection_message"] + elif hasattr(e.guardrail_result, "guardrail"): # Fall back to using guardrail name if no custom message - triggered_guardrail_name = getattr(e.guardrail_result.guardrail, 'name', None) + triggered_guardrail_name = getattr(e.guardrail_result.guardrail, "name", None) if triggered_guardrail_name: rejection_message = f"I'm sorry, but I cannot process this request. The '{triggered_guardrail_name}' guardrail was triggered." - + # Create and send the rejection message as a TaskMessage - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=TextContent( - author="agent", - content=rejection_message, - ), - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=TextContent( + author="agent", + content=rejection_message, + ), + ) as streaming_context: # Send the full message await streaming_context.stream_update( update=StreamTaskMessageFull( @@ -1028,35 +971,33 @@ async def run_agent_streamed_auto_send( type="full", ), ) - + # Re-raise to let the activity handle it raise - + except OutputGuardrailTripwireTriggered as e: # Handle output guardrail trigger by sending a rejection message rejection_message = "I'm sorry, but I cannot provide this response due to a guardrail. Please try a different question." - + # Try to extract rejection message from the guardrail result - if hasattr(e, 'guardrail_result') and hasattr(e.guardrail_result, 'output'): - output_info = getattr(e.guardrail_result.output, 'output_info', {}) - if isinstance(output_info, dict) and 'rejection_message' in output_info: - rejection_message = output_info['rejection_message'] - elif hasattr(e.guardrail_result, 'guardrail'): + if hasattr(e, "guardrail_result") and hasattr(e.guardrail_result, "output"): + output_info = getattr(e.guardrail_result.output, "output_info", {}) + if isinstance(output_info, dict) and "rejection_message" in output_info: + rejection_message = output_info["rejection_message"] + elif hasattr(e.guardrail_result, "guardrail"): # Fall back to using guardrail name if no custom message - triggered_guardrail_name = getattr(e.guardrail_result.guardrail, 'name', None) + triggered_guardrail_name = getattr(e.guardrail_result.guardrail, "name", None) if triggered_guardrail_name: rejection_message = f"I'm sorry, but I cannot provide this response. The '{triggered_guardrail_name}' guardrail was triggered." - + # Create and send the rejection message as a TaskMessage - async with ( - self.streaming_service.streaming_task_message_context( - task_id=task_id, - initial_content=TextContent( - author="agent", - content=rejection_message, - ), - ) as streaming_context - ): + async with self.streaming_service.streaming_task_message_context( + task_id=task_id, + initial_content=TextContent( + author="agent", + content=rejection_message, + ), + ) as streaming_context: # Send the full message await streaming_context.stream_update( update=StreamTaskMessageFull( @@ -1068,7 +1009,7 @@ async def run_agent_streamed_auto_send( type="full", ), ) - + # Re-raise to let the activity handle it raise @@ -1077,8 +1018,9 @@ async def run_agent_streamed_auto_send( # Create a copy to avoid modifying set during iteration remaining_items = list(unclosed_item_ids) for item_id in remaining_items: - if (item_id in unclosed_item_ids and - item_id in item_id_to_streaming_context): # Check if still unclosed + if ( + item_id in unclosed_item_ids and item_id in item_id_to_streaming_context + ): # Check if still unclosed streaming_context = item_id_to_streaming_context[item_id] await streaming_context.close() unclosed_item_ids.discard(item_id) @@ -1086,9 +1028,7 @@ async def run_agent_streamed_auto_send( if span: span.output = { "new_items": [ - item.raw_item.model_dump() - if isinstance(item.raw_item, BaseModel) - else item.raw_item + item.raw_item.model_dump() if isinstance(item.raw_item, BaseModel) else item.raw_item for item in result.new_items ], "final_output": result.final_output, diff --git a/src/agentex/lib/core/temporal/activities/adk/providers/openai_activities.py b/src/agentex/lib/core/temporal/activities/adk/providers/openai_activities.py index bdbc2c57..39b6f0bd 100644 --- a/src/agentex/lib/core/temporal/activities/adk/providers/openai_activities.py +++ b/src/agentex/lib/core/temporal/activities/adk/providers/openai_activities.py @@ -446,6 +446,7 @@ class RunAgentParams(BaseModelWithTraceParams): input_guardrails: list[TemporalInputGuardrail] | None = None output_guardrails: list[TemporalOutputGuardrail] | None = None max_turns: int | None = None + previous_response_id: str | None = None class RunAgentAutoSendParams(RunAgentParams): @@ -515,6 +516,7 @@ async def run_agent(self, params: RunAgentParams) -> SerializableRunResult: output_guardrails=output_guardrails, mcp_timeout_seconds=params.mcp_timeout_seconds, max_turns=params.max_turns, + previous_response_id=params.previous_response_id, ) return self._to_serializable_run_result(result) @@ -550,6 +552,7 @@ async def run_agent_auto_send(self, params: RunAgentAutoSendParams) -> Serializa output_guardrails=output_guardrails, mcp_timeout_seconds=params.mcp_timeout_seconds, max_turns=params.max_turns, + previous_response_id=params.previous_response_id, ) return self._to_serializable_run_result(result) except InputGuardrailTripwireTriggered as e: @@ -622,6 +625,7 @@ async def run_agent_streamed_auto_send( output_guardrails=output_guardrails, mcp_timeout_seconds=params.mcp_timeout_seconds, max_turns=params.max_turns, + previous_response_id=params.previous_response_id, ) return self._to_serializable_run_result_streaming(result) except InputGuardrailTripwireTriggered as e: diff --git a/tests/lib/adk/providers/test_openai_activities.py b/tests/lib/adk/providers/test_openai_activities.py index 830263d7..c933b6ce 100644 --- a/tests/lib/adk/providers/test_openai_activities.py +++ b/tests/lib/adk/providers/test_openai_activities.py @@ -75,6 +75,60 @@ async def test_run_agent(self, mock_runner_run, max_turns, should_be_passed, sam else: assert "max_turns" not in call_args.kwargs, "max_turns should not be passed when None" + @pytest.mark.parametrize( + "previous_response_id,should_be_passed", + [ + (None, False), + ("response_123", True), + ], + ) + @patch("agents.Runner.run") + async def test_run_agent_previous_response_id( + self, mock_runner_run, previous_response_id, should_be_passed, sample_run_result + ): + """Test run_agent with previous_response_id parameter.""" + from agentex.lib.core.temporal.activities.adk.providers.openai_activities import RunAgentParams + + # Arrange + mock_runner_run.return_value = sample_run_result + mock_tracer = self._create_mock_tracer() + _, openai_activities, env = self._create_test_setup(mock_tracer) + + # Create params with or without previous_response_id + params = RunAgentParams( + input_list=[{"role": "user", "content": "Hello, world!"}], + mcp_server_params=[], + agent_name="test_agent", + agent_instructions="You are a helpful assistant", + previous_response_id=previous_response_id, + trace_id="test-trace-id", + parent_span_id="test-span-id", + ) + + # Act + result = await env.run(openai_activities.run_agent, params) + + # Assert - Result structure + self._assert_result_structure(result) + + # Assert - Runner call + mock_runner_run.assert_called_once() + call_args = mock_runner_run.call_args + + # Assert - Runner signature validation + self._assert_runner_call_signature(call_args) + + # Assert - Previous response ID parameter handling + if should_be_passed: + assert "previous_response_id" in call_args.kwargs, ( + f"previous_response_id should be passed when set to {previous_response_id}" + ) + assert call_args.kwargs["previous_response_id"] == previous_response_id, ( + f"previous_response_id value should be {previous_response_id}" + ) + else: + assert "previous_response_id" not in call_args.kwargs, "previous_response_id should not be passed when None" + @pytest.mark.parametrize( "tools_case", [