Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/calm-squids-sparkle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

fix(ai): doStream should reflect transformed values
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ exports[`streamText > options.transform > with base transformation > telemetry s
"ai.response.model": "mock-model-id",
"ai.response.msToFinish": 500,
"ai.response.msToFirstChunk": 100,
"ai.response.providerMetadata": "{"testProvider":{"testKey":"testValue"}}",
"ai.response.text": "Hello, world!",
"ai.response.providerMetadata": "{"testProvider":{"testKey":"TEST VALUE"}}",
"ai.response.text": "HELLO, WORLD!",
"ai.response.timestamp": "1970-01-01T00:00:00.000Z",
"ai.response.toolCalls": "[{"type":"tool-call","toolCallId":"call-1","toolName":"tool1","input":{"value":"VALUE"}}]",
"ai.settings.maxRetries": 2,
Expand Down
50 changes: 28 additions & 22 deletions packages/ai/src/generate-text/stream-text.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1946,38 +1946,20 @@ class DefaultStreamTextResult<TOOLS extends ToolSet, OUTPUT extends Output>
? JSON.stringify(stepToolCalls)
: undefined;

// record telemetry information first to ensure best effort timing
// record telemetry attributes that don't depend on transforms:
try {
doStreamSpan.setAttributes(
await selectTelemetryAttributes({
telemetry,
attributes: {
'ai.response.finishReason': stepFinishReason,
'ai.response.text': {
output: () => activeText,
},
'ai.response.reasoning': {
output: () => {
const reasoningParts = recordedContent.filter(
(
c,
): c is { type: 'reasoning'; text: string } =>
c.type === 'reasoning',
);
return reasoningParts.length > 0
? reasoningParts.map(r => r.text).join('\n')
: undefined;
},
},
'ai.response.toolCalls': {
output: () => stepToolCallsJson,
},
'ai.response.id': stepResponse.id,
'ai.response.model': stepResponse.modelId,
'ai.response.timestamp':
stepResponse.timestamp.toISOString(),
'ai.response.providerMetadata':
JSON.stringify(stepProviderMetadata),

'ai.usage.inputTokens': stepUsage.inputTokens,
'ai.usage.outputTokens': stepUsage.outputTokens,
Expand All @@ -2001,9 +1983,6 @@ class DefaultStreamTextResult<TOOLS extends ToolSet, OUTPUT extends Output>
);
} catch (error) {
// ignore error setting telemetry attributes
} finally {
// finish doStreamSpan before other operations for correct timing:
doStreamSpan.end();
}

controller.enqueue({
Expand All @@ -2027,6 +2006,33 @@ class DefaultStreamTextResult<TOOLS extends ToolSet, OUTPUT extends Output>
// to ensure that the recorded steps are complete:
await stepFinish.promise;

// set transform-dependent attributes after the step has been
// fully processed (post-transform) by the event processor:
const processedStep =
recordedSteps[recordedSteps.length - 1];
try {
doStreamSpan.setAttributes(
await selectTelemetryAttributes({
telemetry,
attributes: {
'ai.response.text': {
output: () => processedStep.text,
},
'ai.response.reasoning': {
output: () => processedStep.reasoningText,
},
'ai.response.providerMetadata': JSON.stringify(
processedStep.providerMetadata,
),
},
}),
);
} catch (error) {
// ignore error setting telemetry attributes
} finally {
doStreamSpan.end();
}

const clientToolCalls = stepToolCalls.filter(
toolCall => toolCall.providerExecuted !== true,
);
Expand Down
Loading