Skip to content
Merged
49 changes: 34 additions & 15 deletions python/packages/core/agent_framework/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1761,10 +1761,26 @@ def _get_result_hooks_from_stream(stream: Any) -> list[Callable[[Any], Any]]:


def _extract_function_calls(response: ChatResponse) -> list[Content]:
function_results = {it.call_id for it in response.messages[0].contents if it.type == "function_result"}
return [
it for it in response.messages[0].contents if it.type == "function_call" and it.call_id not in function_results
]
function_results = {
item.call_id
for message in response.messages
for item in message.contents
if item.type == "function_result" and item.call_id
}
seen_call_ids: set[str] = set()
function_calls: list[Content] = []
for message in response.messages:
for item in message.contents:
if item.type != "function_call":
continue
if item.call_id and item.call_id in function_results:
continue
if item.call_id and item.call_id in seen_call_ids:
continue
if item.call_id:
seen_call_ids.add(item.call_id)
function_calls.append(item)
return function_calls


def _prepend_fcc_messages(response: ChatResponse, fcc_messages: list[Message]) -> None:
Expand Down Expand Up @@ -1822,27 +1838,22 @@ def _handle_function_call_results(

if had_errors:
errors_in_a_row += 1
if errors_in_a_row >= max_errors:
reached_error_limit = errors_in_a_row >= max_errors
if reached_error_limit:
logger.warning(
"Maximum consecutive function call errors reached (%d). "
"Stopping further function calls for this request.",
max_errors,
)
return {
"action": "stop",
"errors_in_a_row": errors_in_a_row,
"result_message": None,
"update_role": None,
"function_call_results": None,
}
else:
errors_in_a_row = 0
reached_error_limit = False

result_message = Message(role="tool", contents=function_call_results)
response.messages.append(result_message)
fcc_messages.extend(response.messages)
return {
"action": "continue",
"action": "stop" if reached_error_limit else "continue",
"errors_in_a_row": errors_in_a_row,
"result_message": result_message,
"update_role": "tool",
Expand Down Expand Up @@ -2025,6 +2036,7 @@ def get_response(
middleware_pipeline=function_middleware_pipeline,
)
filtered_kwargs = {k: v for k, v in kwargs.items() if k != "session"}

# Make options mutable so we can update conversation_id during function invocation loop
mutable_options: dict[str, Any] = dict(options) if options else {}
# Remove additional_function_arguments from options passed to underlying chat client
Expand Down Expand Up @@ -2090,7 +2102,9 @@ async def _get_response() -> ChatResponse:
if result["action"] == "return":
return response
if result["action"] == "stop":
break
# Error threshold reached: force a final non-tool turn so
# function_call_output items are submitted before exit.
mutable_options["tool_choice"] = "none"
errors_in_a_row = result["errors_in_a_row"]

# When tool_choice is 'required', reset tool_choice after one iteration to avoid infinite loops
Expand Down Expand Up @@ -2157,6 +2171,7 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
)
errors_in_a_row = approval_result["errors_in_a_row"]
if approval_result["action"] == "stop":
mutable_options["tool_choice"] = "none"
return

inner_stream = await _ensure_response_stream(
Expand Down Expand Up @@ -2205,7 +2220,11 @@ async def _stream() -> AsyncIterable[ChatResponseUpdate]:
contents=result["function_call_results"] or [],
role=role,
)
if result["action"] != "continue":
if result["action"] == "stop":
# Error threshold reached: submit collected function_call_output
# items once more with tools disabled.
mutable_options["tool_choice"] = "none"
elif result["action"] != "continue":
return

# When tool_choice is 'required', reset the tool_choice after one iteration to avoid infinite loops
Expand Down
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ def from_text(
def from_text_reasoning(
cls: type[ContentT],
*,
id: str | None = None,
text: str | None = None,
protected_data: str | None = None,
annotations: Sequence[Annotation] | None = None,
Expand All @@ -540,6 +541,7 @@ def from_text_reasoning(
"""Create text reasoning content."""
return cls(
"text_reasoning",
id=id,
text=text,
protected_data=protected_data,
annotations=annotations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ async def from_response(
immediately run the agent to produce a new response.
"""
# Replace cache with full conversation if available, else fall back to agent_response messages.
if prior.full_conversation is not None:
self._cache = list(prior.full_conversation)
else:
self._cache = list(prior.agent_response.messages)
source_messages = (
prior.full_conversation if prior.full_conversation is not None else prior.agent_response.messages
)
self._cache = list(source_messages)
await self._run_agent_and_emit(ctx)

@handler
Expand Down Expand Up @@ -311,7 +311,7 @@ async def _run_agent_and_emit(
# Snapshot current conversation as cache + latest agent outputs.
# Do not append to prior snapshots: callers may provide full-history messages
# in request.messages, and extending would duplicate prior turns.
self._full_conversation = list(self._cache) + (list(response.messages) if response else [])
self._full_conversation = [*self._cache, *(list(response.messages) if response else [])]

if response is None:
# Agent did not complete (e.g., waiting for user input); do not emit response
Expand Down
86 changes: 57 additions & 29 deletions python/packages/core/agent_framework/openai/_responses_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,11 +908,16 @@ def _prepare_message_for_openai(
"type": "message",
"role": message.role,
}
# Reasoning items are only valid in input when they directly preceded a function_call
# in the same response. Including a reasoning item that preceded a text response
# (i.e. no function_call in the same message) causes an API error:
# "reasoning was provided without its required following item."
has_function_call = any(c.type == "function_call" for c in message.contents)
for content in message.contents:
match content.type:
case "text_reasoning":
# Reasoning items must be sent back as top-level input items
# for reasoning models that require them alongside function_calls
if not has_function_call:
continue # reasoning not followed by a function_call is invalid in input
reasoning = self._prepare_content_for_openai(message.role, content, call_id_to_id) # type: ignore[arg-type]
if reasoning:
all_messages.append(reasoning)
Expand Down Expand Up @@ -961,26 +966,19 @@ def _prepare_content_for_openai(
"text": content.text,
}
case "text_reasoning":
ret: dict[str, Any] = {
"type": "reasoning",
"summary": {
"type": "summary_text",
"text": content.text,
},
}
ret: dict[str, Any] = {"type": "reasoning", "summary": []}
if content.id:
ret["id"] = content.id
props: dict[str, Any] | None = getattr(content, "additional_properties", None)
if props:
if reasoning_id := props.get("reasoning_id"):
ret["id"] = reasoning_id
if status := props.get("status"):
ret["status"] = status
if reasoning_text := props.get("reasoning_text"):
ret["content"] = {
"type": "reasoning_text",
"text": reasoning_text,
}
ret["content"] = [{"type": "reasoning_text", "text": reasoning_text}]
if encrypted_content := props.get("encrypted_content"):
ret["encrypted_content"] = encrypted_content
if content.text:
ret["summary"].append({"type": "summary_text", "text": content.text})
return ret
case "data" | "uri":
if content.has_top_level_media_type("image"):
Expand Down Expand Up @@ -1189,30 +1187,45 @@ def _parse_response_from_openai(
)
)
case "reasoning": # ResponseOutputReasoning
reasoning_id = getattr(item, "id", None)
if hasattr(item, "content") and item.content:
for index, reasoning_content in enumerate(item.content):
added_reasoning = False
if item_content := getattr(item, "content", None):
for index, reasoning_content in enumerate(item_content):
additional_properties: dict[str, Any] = {}
if reasoning_id:
additional_properties["reasoning_id"] = reasoning_id
if hasattr(item, "summary") and item.summary and index < len(item.summary):
additional_properties["summary"] = item.summary[index]
contents.append(
Content.from_text_reasoning(
id=item.id,
text=reasoning_content.text,
raw_representation=reasoning_content,
additional_properties=additional_properties or None,
)
)
if hasattr(item, "summary") and item.summary:
for summary in item.summary:
added_reasoning = True
if item_summary := getattr(item, "summary", None):
for summary in item_summary:
contents.append(
Content.from_text_reasoning(
id=item.id,
text=summary.text,
raw_representation=summary, # type: ignore[arg-type]
additional_properties={"reasoning_id": reasoning_id} if reasoning_id else None,
)
)
added_reasoning = True
if not added_reasoning:
# Reasoning item with no visible text (e.g. encrypted reasoning).
# Always emit an empty marker so co-occurrence detection can be done
additional_properties_empty: dict[str, Any] = {}
if encrypted := getattr(item, "encrypted_content", None):
additional_properties_empty["encrypted_content"] = encrypted
contents.append(
Content.from_text_reasoning(
id=item.id,
text="",
raw_representation=item,
additional_properties=additional_properties_empty or None,
)
)
case "code_interpreter_call": # ResponseOutputCodeInterpreterCall
call_id = getattr(item, "call_id", None) or getattr(item, "id", None)
outputs: list[Content] = []
Expand Down Expand Up @@ -1427,36 +1440,36 @@ def _parse_chunk_from_openai(
case "response.reasoning_text.delta":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.delta,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.delta":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.delta,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
case "response.reasoning_summary_text.done":
contents.append(
Content.from_text_reasoning(
id=event.item_id,
text=event.text,
raw_representation=event,
additional_properties={"reasoning_id": event.item_id},
)
)
metadata.update(self._get_metadata_from_response(event))
Expand Down Expand Up @@ -1630,11 +1643,10 @@ def _parse_chunk_from_openai(
)
case "reasoning": # ResponseOutputReasoning
reasoning_id = getattr(event_item, "id", None)
added_reasoning = False
if hasattr(event_item, "content") and event_item.content:
for index, reasoning_content in enumerate(event_item.content):
additional_properties: dict[str, Any] = {}
if reasoning_id:
additional_properties["reasoning_id"] = reasoning_id
if (
hasattr(event_item, "summary")
and event_item.summary
Expand All @@ -1643,11 +1655,27 @@ def _parse_chunk_from_openai(
additional_properties["summary"] = event_item.summary[index]
contents.append(
Content.from_text_reasoning(
id=reasoning_id or None,
text=reasoning_content.text,
raw_representation=reasoning_content,
additional_properties=additional_properties or None,
)
)
added_reasoning = True
if not added_reasoning:
# Reasoning item with no visible text (e.g. encrypted reasoning).
# Always emit an empty marker so co-occurrence detection can occur.
additional_properties_empty: dict[str, Any] = {}
if encrypted := getattr(event_item, "encrypted_content", None):
additional_properties_empty["encrypted_content"] = encrypted
contents.append(
Content.from_text_reasoning(
id=reasoning_id or None,
text="",
raw_representation=event_item,
additional_properties=additional_properties_empty or None,
)
)
case _:
logger.debug("Unparsed event of type: %s: %s", event.type, event)
case "response.function_call_arguments.delta":
Expand Down
Loading