Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mean-spoons-stand.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@openai/agents-core': patch
---

Fixes a bug where `onTraceEnd` was called immediately after `onTraceStart` when streaming is enabled
19 changes: 19 additions & 0 deletions packages/agents-core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ export class StreamedRunResult<
#completedPromiseResolve: (() => void) | undefined;
#completedPromiseReject: ((err: unknown) => void) | undefined;
#cancelled: boolean = false;
#streamLoopPromise: Promise<void> | undefined;

constructor(
result: {
Expand Down Expand Up @@ -413,4 +414,22 @@ export class StreamedRunResult<
[Symbol.asyncIterator](): AsyncIterator<RunStreamEvent> {
return this.#readableStream[Symbol.asyncIterator]();
}

/**
* @internal
* Sets the stream loop promise that completes when the internal stream loop finishes.
* This is used to defer trace end until all agent work is complete.
*/
_setStreamLoopPromise(promise: Promise<void>) {
this.#streamLoopPromise = promise;
}

/**
* @internal
* Returns a promise that resolves when the stream loop completes.
* This is used by the tracing system to wait for all agent work before ending the trace.
*/
_getStreamLoopPromise(): Promise<void> | undefined {
return this.#streamLoopPromise;
}
}
9 changes: 8 additions & 1 deletion packages/agents-core/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,11 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
result.maxTurns = options.maxTurns ?? state._maxTurns;

// Continue the stream loop without blocking
this.#runStreamLoop(result, options, isResumedState).then(
const streamLoopPromise = this.#runStreamLoop(
result,
options,
isResumedState,
).then(
() => {
result._done();
},
Expand All @@ -1147,6 +1151,9 @@ export class Runner extends RunHooks<any, AgentOutputType<unknown>> {
},
);

// Attach the stream loop promise so trace end waits for the loop to complete
result._setStreamLoopPromise(streamLoopPromise);

return result;
});
}
Expand Down
12 changes: 12 additions & 0 deletions packages/agents-core/src/tracing/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AsyncLocalStorage } from '@openai/agents-core/_shims';
import { Trace, TraceOptions } from './traces';
import { getGlobalTraceProvider } from './provider';
import { Span, SpanError } from './spans';
import { StreamedRunResult } from '../result';

type ContextState = {
trace?: Trace;
Expand Down Expand Up @@ -58,6 +59,17 @@ function _wrapFunctionWithTraceLifecycle<T>(fn: (trace: Trace) => Promise<T>) {

await trace.start();
const result = await fn(trace);

// If result is a StreamedRunResult, defer trace end until stream loop completes
if (result instanceof StreamedRunResult) {
const streamLoopPromise = result._getStreamLoopPromise();
if (streamLoopPromise) {
streamLoopPromise.finally(() => trace.end());
return result;
}
}

// For non-streaming results, end trace synchronously
await trace.end();

return result;
Expand Down
98 changes: 98 additions & 0 deletions packages/agents-core/test/tracing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ import { withAgentSpan } from '../src/tracing/createSpans';

import { TraceProvider } from '../src/tracing/provider';

import { Runner } from '../src/run';
import { Agent } from '../src/agent';
import { FakeModel, fakeModelMessage, FakeModelProvider } from './stubs';
import { Usage } from '../src/usage';
import * as protocol from '../src/types/protocol';
import { setDefaultModelProvider } from '../src/providers';

class TestExporter implements TracingExporter {
public exported: Array<(Trace | Span<any>)[]> = [];

Expand Down Expand Up @@ -259,6 +266,97 @@ describe('withTrace & span helpers (integration)', () => {
expect(startedIds).toContain(capturedSpanId);
expect(endedIds).toContain(capturedSpanId);
});

it('streaming run waits for stream loop to complete before calling onTraceEnd', async () => {
// Set up model provider
setDefaultModelProvider(new FakeModelProvider());

const traceStartTimes: number[] = [];
const traceEndTimes: number[] = [];
const spanEndTimes: number[] = [];

class OrderTrackingProcessor implements TracingProcessor {
async onTraceStart(_trace: Trace): Promise<void> {
traceStartTimes.push(Date.now());
}
async onTraceEnd(_trace: Trace): Promise<void> {
traceEndTimes.push(Date.now());
}
async onSpanStart(_span: Span<any>): Promise<void> {
// noop
}
async onSpanEnd(_span: Span<any>): Promise<void> {
spanEndTimes.push(Date.now());
}
async shutdown(): Promise<void> {
/* noop */
}
async forceFlush(): Promise<void> {
/* noop */
}
}

const orderProcessor = new OrderTrackingProcessor();
setTraceProcessors([orderProcessor]);

// Create a fake model that supports streaming
class StreamingFakeModel extends FakeModel {
async *getStreamedResponse(
_request: any,
): AsyncIterable<protocol.StreamEvent> {
const response = await this.getResponse(_request);
yield {
type: 'response_done',
response: {
id: 'resp-1',
usage: {
requests: 1,
inputTokens: 0,
outputTokens: 0,
totalTokens: 0,
},
output: response.output,
},
} as any;
}
}

const agent = new Agent({
name: 'TestAgent',
model: new StreamingFakeModel([
{
output: [fakeModelMessage('Final output')],
usage: new Usage(),
},
]),
});

const runner = new Runner({
tracingDisabled: false,
});

// Run with streaming
const result = await runner.run(agent, 'test input', { stream: true });

// Consume the stream
for await (const _event of result) {
// consume all events
}

// Wait for completion
await result.completed;

// onTraceEnd should be called after all spans have ended
expect(traceStartTimes.length).toBe(1);
expect(traceEndTimes.length).toBe(1);
expect(spanEndTimes.length).toBeGreaterThan(0);

// The trace should end after all spans have ended
const lastSpanEndTime = Math.max(...spanEndTimes);
const traceEndTime = traceEndTimes[0];

expect(traceEndTime).toBeGreaterThanOrEqual(lastSpanEndTime);
});
});

// -----------------------------------------------------------------------------------------
Expand Down
Loading