From 12eb40d766bec11938c7e246686bd760159fe266 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 20 Nov 2025 13:36:02 -0800 Subject: [PATCH 1/3] [ai] Remove "start" and "finish" chunk events --- packages/ai/src/agent/do-stream-step.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/ai/src/agent/do-stream-step.ts b/packages/ai/src/agent/do-stream-step.ts index 711f4191b..ba04f07cb 100644 --- a/packages/ai/src/agent/do-stream-step.ts +++ b/packages/ai/src/agent/do-stream-step.ts @@ -65,9 +65,6 @@ export async function doStreamStep( .pipeThrough( new TransformStream({ start: (controller) => { - controller.enqueue({ - type: 'start', - }); controller.enqueue({ type: 'start-step', }); @@ -76,9 +73,6 @@ export async function doStreamStep( controller.enqueue({ type: 'finish-step', }); - controller.enqueue({ - type: 'finish', - }); }, transform: async (part, controller) => { const partType = part.type; From 2f631e17df4042e0ca9e98daeb3c952cf108941f Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 20 Nov 2025 17:05:10 -0500 Subject: [PATCH 2/3] Wire through sendStart and sendFinish properly --- packages/ai/src/agent/do-stream-step.ts | 10 ++++- packages/ai/src/agent/durable-agent.ts | 42 +++++++++++++++++-- packages/ai/src/agent/stream-text-iterator.ts | 10 ++++- 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/packages/ai/src/agent/do-stream-step.ts b/packages/ai/src/agent/do-stream-step.ts index ba04f07cb..b4a25dc50 100644 --- a/packages/ai/src/agent/do-stream-step.ts +++ b/packages/ai/src/agent/do-stream-step.ts @@ -21,7 +21,10 @@ export async function doStreamStep( conversationPrompt: LanguageModelV2Prompt, modelInit: string | (() => Promise), writable: WritableStream, - tools?: LanguageModelV2CallOptions['tools'] + tools?: LanguageModelV2CallOptions['tools'], + options?: { + sendStart?: boolean; + } ) { 'use step'; @@ -65,6 +68,11 @@ export async function doStreamStep( .pipeThrough( new TransformStream({ start: (controller) => { + if (options?.sendStart) { + controller.enqueue({ + type: 'start', + }); + } controller.enqueue({ type: 'start-step', }); diff --git a/packages/ai/src/agent/durable-agent.ts b/packages/ai/src/agent/durable-agent.ts index 3ea32bdb8..156011fdf 100644 --- a/packages/ai/src/agent/durable-agent.ts +++ b/packages/ai/src/agent/durable-agent.ts @@ -63,6 +63,18 @@ export interface DurableAgentStreamOptions { */ preventClose?: boolean; + /** + * If true, sends a 'start' chunk at the beginning of the stream. + * Defaults to true. + */ + sendStart?: boolean; + + /** + * If true, sends a 'finish' chunk at the end of the stream. + * Defaults to true. + */ + sendFinish?: boolean; + /** * Condition for stopping the generation when there are tool results in the last step. * When the condition is an array, any of the conditions can be met to stop the generation. @@ -133,6 +145,7 @@ export class DurableAgent { writable: options.writable, prompt: modelPrompt, stopConditions: options.stopWhen, + sendStart: options.sendStart ?? true, }); let result = await iterator.next(); @@ -147,16 +160,37 @@ export class DurableAgent { result = await iterator.next(toolResults); } - if (!options.preventClose) { - await closeStream(options.writable); + const sendFinish = options.sendFinish ?? true; + const preventClose = options.preventClose ?? false; + + // Only call closeStream if there's something to do + if (sendFinish || !preventClose) { + await closeStream(options.writable, preventClose, sendFinish); } } } -async function closeStream(writable: WritableStream) { +async function closeStream( + writable: WritableStream, + preventClose?: boolean, + sendFinish?: boolean +) { 'use step'; - await writable.close(); + // Conditionally write the finish chunk + if (sendFinish) { + const writer = writable.getWriter(); + try { + await writer.write({ type: 'finish' }); + } finally { + writer.releaseLock(); + } + } + + // Conditionally close the stream + if (!preventClose) { + await writable.close(); + } } async function executeTool( diff --git a/packages/ai/src/agent/stream-text-iterator.ts b/packages/ai/src/agent/stream-text-iterator.ts index 20b6fb3ff..9f6665166 100644 --- a/packages/ai/src/agent/stream-text-iterator.ts +++ b/packages/ai/src/agent/stream-text-iterator.ts @@ -15,12 +15,14 @@ export async function* streamTextIterator({ writable, model, stopConditions, + sendStart = true, }: { prompt: LanguageModelV2Prompt; tools: ToolSet; writable: WritableStream; model: string | (() => Promise); stopConditions?: ModelStopCondition[] | ModelStopCondition; + sendStart?: boolean; }): AsyncGenerator< LanguageModelV2ToolCall[], void, @@ -30,13 +32,19 @@ export async function* streamTextIterator({ const steps: StepResult[] = []; let done = false; + let isFirstIteration = true; + while (!done) { const { toolCalls, finish, step } = await doStreamStep( conversationPrompt, model, writable, - toolsToModelTools(tools) + toolsToModelTools(tools), + { + sendStart: sendStart && isFirstIteration, + } ); + isFirstIteration = false; steps.push(step); if (finish?.finishReason === 'tool-calls') { From 7fda66f59802b0ffb2884fe101b6b4dc925ad6d4 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Thu, 20 Nov 2025 14:16:07 -0800 Subject: [PATCH 3/3] changeset --- .changeset/cuddly-lobsters-build.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/cuddly-lobsters-build.md diff --git a/.changeset/cuddly-lobsters-build.md b/.changeset/cuddly-lobsters-build.md new file mode 100644 index 000000000..5a92909de --- /dev/null +++ b/.changeset/cuddly-lobsters-build.md @@ -0,0 +1,5 @@ +--- +"@workflow/ai": patch +--- + +DurableAgent#stream now sends `start` and `finish` chunks properly at the start and end