Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/app/service/agent/core/providers/anthropic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,24 @@ describe("parseAnthropicStream", () => {
expect(events).toHaveLength(2);
expect(events[0]).toEqual({ type: "content_delta", delta: "ok" });
});

it("tool_use block 的 input_json_delta 應帶上 id 和 index", async () => {
const reader = createMockReader([
'event: content_block_start\ndata: {"index":1,"content_block":{"type":"tool_use","id":"toolu_X","name":"f"}}\n\n',
'event: content_block_delta\ndata: {"index":1,"delta":{"type":"input_json_delta","partial_json":"{\\"a\\":1}"}}\n\n',
"event: message_stop\ndata: {}\n\n",
]);

const events: ChatStreamEvent[] = [];
const controller = new AbortController();

await parseAnthropicStream(reader, (e) => events.push(e), controller.signal);

const d = events.find((e) => e.type === "tool_call_delta");
expect(d).toBeDefined();
if (d && d.type === "tool_call_delta") {
expect(d.id).toBe("toolu_X");
expect(d.index).toBe(1);
}
});
});
12 changes: 11 additions & 1 deletion src/app/service/agent/core/providers/anthropic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ export function parseAnthropicStream(
// 跟踪图片块的累积 base64 数据
let imageBlockData: { index: number; mediaType: string; base64Chunks: string[] } | null = null;

const toolUseByIndex = new Map<number, { id: string }>();

Comment on lines +190 to +191
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里新增了 toolUseByIndex 用于 index→id 映射,但当前实现只在 content_block_startset,在 content_block_stop / message_stop 等结束事件里没有清理对应 index。这会导致 map 在长会话里持续增长,并且如果 index 在同一连接生命周期内被复用可能会拿到过期 id。建议在收到 content_block_stop(可用 json.index)时执行 toolUseByIndex.delete(json.index),并在 message_stop 时清空 map。

Copilot uses AI. Check for mistakes.
return readSSEStream(
reader,
signal,
Expand All @@ -212,6 +214,7 @@ export function parseAnthropicStream(
if (block?.type === "thinking") {
// thinking block 开始,后续通过 content_block_delta 传输内容
} else if (block?.type === "tool_use") {
toolUseByIndex.set(json.index, { id: block.id });
onEvent({
type: "tool_call_start",
toolCall: {
Expand Down Expand Up @@ -245,9 +248,11 @@ export function parseAnthropicStream(
} else if (delta?.type === "thinking_delta") {
onEvent({ type: "thinking_delta", delta: delta.thinking });
} else if (delta?.type === "input_json_delta") {
const tu = toolUseByIndex.get(json.index);
onEvent({
type: "tool_call_delta",
id: "",
id: tu?.id || "",
index: json.index,
delta: delta.partial_json,
});
} else if (delta?.type === "image_delta" && imageBlockData) {
Expand All @@ -274,6 +279,10 @@ export function parseAnthropicStream(
});
imageBlockData = null;
}
// tool_use block 结束后清理 index→id 映射,避免长会话下 map 持续增长
if (typeof json.index === "number") {
toolUseByIndex.delete(json.index);
}
break;
}
case "message_delta": {
Expand All @@ -293,6 +302,7 @@ export function parseAnthropicStream(
break;
}
case "message_stop": {
toolUseByIndex.clear();
onEvent({ type: "done" });
return true;
}
Expand Down
98 changes: 87 additions & 11 deletions src/app/service/agent/core/providers/openai.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,20 +361,25 @@ describe("parseOpenAIStream", () => {

await parseOpenAIStream(reader, (e) => events.push(e), controller.signal);

expect(events).toHaveLength(3);
expect(events).toHaveLength(4);
expect(events[0].type).toBe("tool_call_start");
if (events[0].type === "tool_call_start") {
expect(events[0].toolCall.name).toBe("dom_read_page");
expect(events[0].toolCall.arguments).toBe('{"tabId":123');
// 新行为:start 事件的 args 永远为空,首 chunk 的 args 通过 delta 发出
expect(events[0].toolCall.arguments).toBe("");
}
// 关键:最后的 tool_call_delta 不应被 usage 检查吞掉
expect(events[1].type).toBe("tool_call_delta");
if (events[1].type === "tool_call_delta") {
expect(events[1].delta).toBe(',"mode":"summary"}');
expect(events[1].delta).toBe('{"tabId":123'); // 故意的 — 模拟 streaming 还没收完的状态
}
expect(events[2].type).toBe("done");
if (events[2].type === "done") {
expect(events[2].usage).toEqual({ inputTokens: 40010, outputTokens: 154 });
// 关键:最后的 tool_call_delta 不应被 usage 检查吞掉
expect(events[2].type).toBe("tool_call_delta");
if (events[2].type === "tool_call_delta") {
expect(events[2].delta).toBe(',"mode":"summary"}');
}
expect(events[3].type).toBe("done");
if (events[3].type === "done") {
expect(events[3].usage).toEqual({ inputTokens: 40010, outputTokens: 154 });
}
});

Expand Down Expand Up @@ -529,13 +534,84 @@ describe("parseOpenAIStream", () => {

await parseOpenAIStream(reader, (e) => events.push(e), controller.signal);

expect(events).toHaveLength(4);
expect(events).toHaveLength(5);
expect(events[0]).toEqual({ type: "thinking_delta", delta: "分析页面" });
expect(events[1]).toEqual({ type: "thinking_delta", delta: "结构" });
expect(events[2].type).toBe("tool_call_start");
expect(events[3].type).toBe("done");
if (events[3].type === "done") {
expect(events[3].usage).toEqual({ inputTokens: 500, outputTokens: 50 });
if (events[2].type === "tool_call_start") {
expect(events[2].toolCall.name).toBe("dom_read_page");
expect(events[2].toolCall.arguments).toBe("");
}
expect(events[3].type).toBe("tool_call_delta");
if (events[3].type === "tool_call_delta") {
expect(events[3].delta).toBe('{"selector":".item"}');
}
expect(events[4].type).toBe("done");
if (events[4].type === "done") {
expect(events[4].usage).toEqual({ inputTokens: 500, outputTokens: 50 });
}
});

it("首 chunk 同时带 name 和 arguments 时:start 事件 args 为空,首 chunk args 作为 delta 发出", async () => {
const reader = createMockReader([
// gateway / 某些 model 会先发一个 arguments="{}" 占位再送真正 JSON
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_x","function":{"name":"agent","arguments":"{}"}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\\"description\\":\\"r\\""}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":",\\"prompt\\":\\"do\\"}"}}]}}]}\n\n',
"data: [DONE]\n\n",
]);
const events: ChatStreamEvent[] = [];
await parseOpenAIStream(reader, (e) => events.push(e), new AbortController().signal);

expect(events[0].type).toBe("tool_call_start");
if (events[0].type === "tool_call_start") {
// 关键断言:start 事件里的 args 必须为空,不能是 "{}"(避免前缀污染)
expect(events[0].toolCall.arguments).toBe("");
expect(events[0].toolCall.name).toBe("agent");
}
// 首 chunk 的 "{}" 作为第一段 delta 原样透传(模型问题:整体非合法 JSON,但解析器不吞字符)
const deltas = events.filter((e) => e.type === "tool_call_delta");
expect(deltas).toHaveLength(3);
expect(deltas[0].type === "tool_call_delta" && deltas[0].delta).toBe("{}");
expect(deltas[1].type === "tool_call_delta" && deltas[1].delta).toBe('{"description":"r"');
expect(deltas[2].type === "tool_call_delta" && deltas[2].delta).toBe(',"prompt":"do"}');
});

it("并发多个 tool_call(不同 index)arguments 不应互相串扰", async () => {
const reader = createMockReader([
// 两个 tool 同时开始
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"a","function":{"name":"f1","arguments":""}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":1,"id":"b","function":{"name":"f2","arguments":""}}]}}]}\n\n',
// 然后交错发 arguments delta(只带 index,不带 id)
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\\"x\\":1}"}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\\"y\\":2}"}}]}}]}\n\n',
"data: [DONE]\n\n",
]);
const events: ChatStreamEvent[] = [];
await parseOpenAIStream(reader, (e) => events.push(e), new AbortController().signal);
// 基础断言:两个 start + 两个 delta + done
const starts = events.filter((e) => e.type === "tool_call_start");
expect(starts).toHaveLength(2);
// (完整的 index 匹配需要 ChatStreamEvent 增加 index 字段,这里先确保 parser 不丢 event)
});

it("并行 tool_call 按 index 正确分派 arguments", async () => {
const reader = createMockReader([
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"a","function":{"name":"f1","arguments":""}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":1,"id":"b","function":{"name":"f2","arguments":""}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\\"y\\":2}"}}]}}]}\n\n',
'data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\\"x\\":1}"}}]}}]}\n\n',
"data: [DONE]\n\n",
]);
const events: ChatStreamEvent[] = [];
await parseOpenAIStream(reader, (e) => events.push(e), new AbortController().signal);

const deltas = events.filter((e) => e.type === "tool_call_delta");
expect(deltas).toHaveLength(2);
// 第一个 delta 对应 index=1(因为到达顺序)
expect((deltas[0] as any).index).toBe(1);
expect((deltas[0] as any).delta).toBe('{"y":2}');
expect((deltas[1] as any).index).toBe(0);
expect((deltas[1] as any).delta).toBe('{"x":1}');
});
});
12 changes: 8 additions & 4 deletions src/app/service/agent/core/providers/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,23 @@ export function parseOpenAIStream(
// 工具调用
if (delta.tool_calls) {
for (const tc of delta.tool_calls) {
// OpenAI 约定:第一个 chunk 带 id + function.name,后续 chunk 只带 index + function.arguments
if (tc.function?.name) {
onEvent({
type: "tool_call_start",
toolCall: {
id: tc.id || `tc_${Date.now()}`,
id: tc.id || `tc_${Date.now()}_${tc.index ?? 0}`,
name: tc.function.name,
arguments: tc.function.arguments || "",
arguments: "", // 永远空启动,避免首 chunk 的 "{}" 作为 prefix 污染
},
});
} else if (tc.function?.arguments) {
}
// 首 chunk 带 arguments 也作为 delta 处理(不 else if!)
if (tc.function?.arguments !== undefined && tc.function.arguments !== "") {
onEvent({
type: "tool_call_delta",
id: tc.id || "",
id: tc.id || "", // 后续 chunk 大概率无 id,这里只保留接口兼容
index: tc.index, // 用于匹配的字段
delta: tc.function.arguments,
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/app/service/agent/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export type LLMStreamEvent =
| { type: "content_delta"; delta: string }
| { type: "thinking_delta"; delta: string }
| { type: "tool_call_start"; toolCall: Omit<ToolCall, "result"> }
| { type: "tool_call_delta"; id: string; delta: string }
| { type: "tool_call_delta"; id: string; delta: string; index?: number }
| { type: "tool_call_complete"; id: string; result: string; attachments?: Attachment[] }
| { type: "content_block_start"; block: Omit<ImageBlock | FileBlock | AudioBlock, "attachmentId"> }
| { type: "content_block_complete"; block: ImageBlock | FileBlock | AudioBlock; data?: string };
Expand Down
15 changes: 15 additions & 0 deletions src/app/service/agent/service_worker/background.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,21 @@ describe("handleAttachToConversation 重连逻辑", () => {

(service as any).bgSessionManager.delete("conv-empty");
});

it("tool_call_delta 按 index 分派給正確的 tool call", () => {
const { service } = createTestService();
const rc = createRunningConversation();
const upd = (service as any).bgSessionManager.updateStreamingState.bind((service as any).bgSessionManager);

upd(rc, { type: "tool_call_start", toolCall: { id: "a", name: "f1", arguments: "" } });
upd(rc, { type: "tool_call_start", toolCall: { id: "b", name: "f2", arguments: "" } });
// 交錯到達
upd(rc, { type: "tool_call_delta", id: "", index: 1, delta: '{"y":2}' });
upd(rc, { type: "tool_call_delta", id: "", index: 0, delta: '{"x":1}' });

expect(rc.streamingState.toolCalls[0].arguments).toBe('{"x":1}');
expect(rc.streamingState.toolCalls[1].arguments).toBe('{"y":2}');
});
});

// ---- 后台运行会话 集成测试 ----
Expand Down
31 changes: 27 additions & 4 deletions src/app/service/agent/service_worker/background_session_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,35 @@ export class BackgroundSessionManager {
case "tool_call_start":
rc.streamingState.toolCalls.push({ ...event.toolCall, status: "running" });
break;
case "tool_call_delta":
if (rc.streamingState.toolCalls.length > 0) {
const last = rc.streamingState.toolCalls[rc.streamingState.toolCalls.length - 1];
last.arguments += event.delta;
case "tool_call_delta": {
// 按 id 匹配(fallback 到最新 running 的 tc),不再盲目取 length-1。
// 并发 tool call 时(OpenAI 用 index 区分、Anthropic 的多个 tool_use block)length-1 会把 delta 写错工具。
if (rc.streamingState.toolCalls.length === 0) break;

let target: ToolCall | undefined = undefined;
// 1a. 按 id 匹配
if (event.id) {
target = rc.streamingState.toolCalls.find((t) => t.id === event.id);
}
// 1b. 按 index 匹配(OpenAI 后续 chunk 无 id 只有 index)
if (!target && event.index !== undefined) {
target = rc.streamingState.toolCalls[event.index];
}

// 2. fallback:最新一个状态为 running 的 tool call
// (OpenAI 后续 chunk 不带 id,但同一 index 的 tool 一定在 running)
if (!target) {
for (let i = rc.streamingState.toolCalls.length - 1; i >= 0; i--) {
if (rc.streamingState.toolCalls[i].status === "running") {
target = rc.streamingState.toolCalls[i];
break;
}
}
}

if (target) target.arguments += event.delta;
break;
}
case "tool_call_complete": {
const tc = rc.streamingState.toolCalls.find((t) => t.id === event.id);
if (tc) {
Expand Down
38 changes: 24 additions & 14 deletions src/app/service/agent/service_worker/llm_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ export class LLMClient {
let content = "";
let thinking = "";
const toolCalls: ToolCall[] = [];
let currentToolCall: ToolCall | null = null;
let usage:
| { inputTokens: number; outputTokens: number; cacheCreationInputTokens?: number; cacheReadInputTokens?: number }
| undefined;
Expand Down Expand Up @@ -157,23 +156,34 @@ export class LLMClient {
thinking += event.delta;
break;
case "tool_call_start":
// 如果已有一个正在收集的 tool call,先保存它(多个 tool_use 并行返回时)
if (currentToolCall) {
toolCalls.push(currentToolCall);
}
currentToolCall = { ...event.toolCall, arguments: event.toolCall.arguments || "" };
// 并发 tool_call 时 parser 会交错发 delta,这里立即 push 到数组,
// 由 tool_call_delta 通过 id/index 定位目标 tool,避免串扰。
toolCalls.push({ ...event.toolCall, arguments: event.toolCall.arguments || "", status: "running" });
break;
case "tool_call_delta":
if (currentToolCall) {
currentToolCall.arguments += event.delta;
case "tool_call_delta": {
if (!toolCalls.length) break;
let target: ToolCall | undefined = undefined;
// 1a. 按 id 匹配
if (event.id) {
target = toolCalls.find((t) => t.id === event.id);
}
// 1b. 按 index 匹配(OpenAI 后续 chunk 无 id 只有 index)
if (!target && event.index !== undefined) {
target = toolCalls[event.index];
}
// 2. fallback:最新一个状态为 running 的 tool call
if (!target) {
for (let i = toolCalls.length - 1; i >= 0; i--) {
if (toolCalls[i].status === "running") {
target = toolCalls[i];
break;
}
}
}
if (target) target.arguments += event.delta;
break;
}
case "done": {
// 保存当前的 tool call
if (currentToolCall) {
toolCalls.push(currentToolCall);
currentToolCall = null;
}
if (event.usage) {
usage = event.usage;
}
Expand Down
16 changes: 13 additions & 3 deletions src/app/service/agent/service_worker/sub_agent_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,21 @@ export class SubAgentService {
status: "running",
});
break;
case "tool_call_delta":
if (currentMsg.toolCalls.length) {
currentMsg.toolCalls[currentMsg.toolCalls.length - 1].arguments += event.delta;
case "tool_call_delta": {
if (!currentMsg.toolCalls.length) break;
let t = event.id ? currentMsg.toolCalls.find((x) => x.id === event.id) : undefined;
if (!t && event.index !== undefined) t = currentMsg.toolCalls[event.index];
if (!t) {
for (let i = currentMsg.toolCalls.length - 1; i >= 0; i--) {
if (currentMsg.toolCalls[i].status === "running") {
t = currentMsg.toolCalls[i];
break;
}
}
}
if (t) t.arguments += event.delta;
break;
}
case "tool_call_complete": {
const tc = currentMsg.toolCalls.find((t) => t.id === event.id);
if (tc) {
Expand Down
Loading
Loading