From 4a34f222747e569027a1d5a0d6d6d62a20258aac Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Wed, 13 May 2026 14:43:58 +0800 Subject: [PATCH 1/3] fix: coalesce code interpreter history chunks --- .../packages/core/agent_framework/_types.py | 82 +++++++++++++++++++ .../packages/core/tests/core/test_sessions.py | 57 +++++++++++++ 2 files changed, 139 insertions(+) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index ce47f5fc8e..f8fdeef8c0 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -1973,11 +1973,93 @@ def _coalesce_text_content(contents: list[Content], type_str: Literal["text", "t contents.extend(coalesced_contents) +def _content_items_text(items: Any) -> str | None: + """Return concatenated text when a content item list only contains text.""" + if not isinstance(items, list): + return None + text_parts: list[str] = [] + for item in items: + if not isinstance(item, Content) or item.type != "text": + return None + text_parts.append(item.text or "") + return "".join(text_parts) + + +def _merge_content_item_lists(existing: Any, incoming: Any) -> Any: + """Merge streamed nested content lists, replacing deltas with a later full value when present.""" + if incoming is None: + return existing + if existing is None: + return deepcopy(incoming) + + existing_text = _content_items_text(existing) + incoming_text = _content_items_text(incoming) + if existing_text is not None and incoming_text is not None: + if incoming_text.startswith(existing_text): + return deepcopy(incoming) + if existing_text.startswith(incoming_text): + return existing + + merged = deepcopy(existing[0]) + merged.text = existing_text + incoming_text + return [merged] + + if isinstance(existing, list) and isinstance(incoming, list): + return [*existing, *deepcopy(incoming)] + return deepcopy(incoming) + + +def _merge_code_interpreter_content(existing: Content, incoming: Content) -> None: + """Merge two code interpreter content items for the same logical call.""" + existing.inputs = _merge_content_item_lists(existing.inputs, incoming.inputs) + existing.outputs = _merge_content_item_lists(existing.outputs, incoming.outputs) + existing.annotations = _combine_annotations(existing.annotations, incoming.annotations) + existing.additional_properties = {**existing.additional_properties, **incoming.additional_properties} + existing.raw_representation = _combine_raw_representations(existing.raw_representation, incoming.raw_representation) + + +def _code_interpreter_key(content: Content) -> tuple[str, str] | None: + """Return the aggregation key for code interpreter call/result content.""" + if content.type not in {"code_interpreter_tool_call", "code_interpreter_tool_result"}: + return None + call_id = content.call_id or content.additional_properties.get("item_id") + if not isinstance(call_id, str) or not call_id: + return None + return content.type, call_id + + +def _coalesce_code_interpreter_content(contents: list[Content]) -> None: + """Coalesce streaming code interpreter chunks by call id.""" + if not contents: + return + + coalesced_contents: list[Content] = [] + seen: dict[tuple[str, str], Content] = {} + for content in contents: + key = _code_interpreter_key(content) + if key is None: + coalesced_contents.append(content) + continue + + existing = seen.get(key) + if existing is None: + copied = deepcopy(content) + seen[key] = copied + coalesced_contents.append(copied) + continue + + _merge_code_interpreter_content(existing, content) + + contents.clear() + contents.extend(coalesced_contents) + + def _finalize_response(response: ChatResponse | AgentResponse) -> None: """Finalizes the response by performing any necessary post-processing.""" for msg in response.messages: _coalesce_text_content(msg.contents, "text") _coalesce_text_content(msg.contents, "text_reasoning") + _coalesce_code_interpreter_content(msg.contents) # region ContinuationToken diff --git a/python/packages/core/tests/core/test_sessions.py b/python/packages/core/tests/core/test_sessions.py index ebb91d0b0d..7c78dba209 100644 --- a/python/packages/core/tests/core/test_sessions.py +++ b/python/packages/core/tests/core/test_sessions.py @@ -307,6 +307,63 @@ async def test_after_run_stores_inputs_and_responses(self) -> None: assert provider.stored[0].text == "hello" assert provider.stored[1].text == "hi" + async def test_after_run_stores_coalesced_code_interpreter_chunks(self) -> None: + from agent_framework import AgentResponse, AgentResponseUpdate, Content + + provider = ConcreteHistoryProvider("mem", store_inputs=False) + updates = [ + AgentResponseUpdate( + role="assistant", + contents=[ + Content.from_code_interpreter_tool_result( + call_id="ci_123", + outputs=[], + ) + ], + ), + AgentResponseUpdate( + contents=[ + Content.from_code_interpreter_tool_call( + call_id="ci_123", + inputs=[Content.from_text(text="import")], + additional_properties={"sequence_number": 1}, + ) + ], + ), + AgentResponseUpdate( + contents=[ + Content.from_code_interpreter_tool_call( + call_id="ci_123", + inputs=[Content.from_text(text=" pandas")], + additional_properties={"sequence_number": 2}, + ) + ], + ), + AgentResponseUpdate( + contents=[ + Content.from_code_interpreter_tool_call( + call_id="ci_123", + inputs=[Content.from_text(text="import pandas as pd")], + additional_properties={"sequence_number": 3}, + ) + ], + ), + ] + ctx = SessionContext(session_id="s1", input_messages=[Message(role="user", contents=["make a sheet"])]) + ctx._response = AgentResponse.from_updates(updates) + + await provider.after_run(agent=None, session=AgentSession(), context=ctx, state={}) # type: ignore[arg-type] + + assert len(provider.stored) == 1 + stored_contents = provider.stored[0].contents + calls = [content for content in stored_contents if content.type == "code_interpreter_tool_call"] + results = [content for content in stored_contents if content.type == "code_interpreter_tool_result"] + assert len(calls) == 1 + assert len(results) == 1 + assert calls[0].inputs is not None + assert len(calls[0].inputs) == 1 + assert calls[0].inputs[0].text == "import pandas as pd" + async def test_after_run_skips_inputs_when_disabled(self) -> None: from agent_framework import AgentResponse From 4e6063cdc4d50d3378d06eaf6b50c6b4ea979c4f Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Thu, 14 May 2026 15:40:00 +0800 Subject: [PATCH 2/3] fix: narrow content item list types --- python/packages/core/agent_framework/_types.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index f8fdeef8c0..f5b3f3e6ac 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -1977,8 +1977,9 @@ def _content_items_text(items: Any) -> str | None: """Return concatenated text when a content item list only contains text.""" if not isinstance(items, list): return None + content_items = cast(list[Any], items) text_parts: list[str] = [] - for item in items: + for item in content_items: if not isinstance(item, Content) or item.type != "text": return None text_parts.append(item.text or "") @@ -2000,12 +2001,15 @@ def _merge_content_item_lists(existing: Any, incoming: Any) -> Any: if existing_text.startswith(incoming_text): return existing - merged = deepcopy(existing[0]) + existing_items = cast(list[Content], existing) + merged = deepcopy(existing_items[0]) merged.text = existing_text + incoming_text return [merged] if isinstance(existing, list) and isinstance(incoming, list): - return [*existing, *deepcopy(incoming)] + existing_items = cast(list[Any], existing) + incoming_items = cast(list[Any], incoming) + return [*existing_items, *deepcopy(incoming_items)] return deepcopy(incoming) From a4a0a73e64167e434e422fda0efbe13b9e876cab Mon Sep 17 00:00:00 2001 From: Yufeng He <40085740+he-yufeng@users.noreply.github.com> Date: Thu, 14 May 2026 21:23:11 +0800 Subject: [PATCH 3/3] fix: remove redundant content list casts --- python/packages/core/agent_framework/_types.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/packages/core/agent_framework/_types.py b/python/packages/core/agent_framework/_types.py index f5b3f3e6ac..59d2c0c327 100644 --- a/python/packages/core/agent_framework/_types.py +++ b/python/packages/core/agent_framework/_types.py @@ -1977,9 +1977,8 @@ def _content_items_text(items: Any) -> str | None: """Return concatenated text when a content item list only contains text.""" if not isinstance(items, list): return None - content_items = cast(list[Any], items) text_parts: list[str] = [] - for item in content_items: + for item in items: if not isinstance(item, Content) or item.type != "text": return None text_parts.append(item.text or "") @@ -2007,9 +2006,7 @@ def _merge_content_item_lists(existing: Any, incoming: Any) -> Any: return [merged] if isinstance(existing, list) and isinstance(incoming, list): - existing_items = cast(list[Any], existing) - incoming_items = cast(list[Any], incoming) - return [*existing_items, *deepcopy(incoming_items)] + return [*existing, *deepcopy(incoming)] return deepcopy(incoming)