diff --git a/.changeset/mean-spoons-stand.md b/.changeset/mean-spoons-stand.md new file mode 100644 index 00000000..fd0eca70 --- /dev/null +++ b/.changeset/mean-spoons-stand.md @@ -0,0 +1,5 @@ +--- +'@openai/agents-core': patch +--- + +Fixes a bug where `onTraceEnd` was called immediately after `onTraceStart` when streaming is enabled diff --git a/packages/agents-core/src/result.ts b/packages/agents-core/src/result.ts index 1013d2e3..b06b1df6 100644 --- a/packages/agents-core/src/result.ts +++ b/packages/agents-core/src/result.ts @@ -243,6 +243,7 @@ export class StreamedRunResult< #completedPromiseResolve: (() => void) | undefined; #completedPromiseReject: ((err: unknown) => void) | undefined; #cancelled: boolean = false; + #streamLoopPromise: Promise | undefined; constructor( result: { @@ -413,4 +414,22 @@ export class StreamedRunResult< [Symbol.asyncIterator](): AsyncIterator { return this.#readableStream[Symbol.asyncIterator](); } + + /** + * @internal + * Sets the stream loop promise that completes when the internal stream loop finishes. + * This is used to defer trace end until all agent work is complete. + */ + _setStreamLoopPromise(promise: Promise) { + this.#streamLoopPromise = promise; + } + + /** + * @internal + * Returns a promise that resolves when the stream loop completes. + * This is used by the tracing system to wait for all agent work before ending the trace. + */ + _getStreamLoopPromise(): Promise | undefined { + return this.#streamLoopPromise; + } } diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index c29ede39..596d58ce 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -1138,7 +1138,11 @@ export class Runner extends RunHooks> { result.maxTurns = options.maxTurns ?? state._maxTurns; // Continue the stream loop without blocking - this.#runStreamLoop(result, options, isResumedState).then( + const streamLoopPromise = this.#runStreamLoop( + result, + options, + isResumedState, + ).then( () => { result._done(); }, @@ -1147,6 +1151,9 @@ export class Runner extends RunHooks> { }, ); + // Attach the stream loop promise so trace end waits for the loop to complete + result._setStreamLoopPromise(streamLoopPromise); + return result; }); } diff --git a/packages/agents-core/src/tracing/context.ts b/packages/agents-core/src/tracing/context.ts index cdb3f986..af084709 100644 --- a/packages/agents-core/src/tracing/context.ts +++ b/packages/agents-core/src/tracing/context.ts @@ -2,6 +2,7 @@ import { AsyncLocalStorage } from '@openai/agents-core/_shims'; import { Trace, TraceOptions } from './traces'; import { getGlobalTraceProvider } from './provider'; import { Span, SpanError } from './spans'; +import { StreamedRunResult } from '../result'; type ContextState = { trace?: Trace; @@ -58,6 +59,17 @@ function _wrapFunctionWithTraceLifecycle(fn: (trace: Trace) => Promise) { await trace.start(); const result = await fn(trace); + + // If result is a StreamedRunResult, defer trace end until stream loop completes + if (result instanceof StreamedRunResult) { + const streamLoopPromise = result._getStreamLoopPromise(); + if (streamLoopPromise) { + streamLoopPromise.finally(() => trace.end()); + return result; + } + } + + // For non-streaming results, end trace synchronously await trace.end(); return result; diff --git a/packages/agents-core/test/tracing.test.ts b/packages/agents-core/test/tracing.test.ts index a1fc26a2..38442280 100644 --- a/packages/agents-core/test/tracing.test.ts +++ b/packages/agents-core/test/tracing.test.ts @@ -37,6 +37,13 @@ import { withAgentSpan } from '../src/tracing/createSpans'; import { TraceProvider } from '../src/tracing/provider'; +import { Runner } from '../src/run'; +import { Agent } from '../src/agent'; +import { FakeModel, fakeModelMessage, FakeModelProvider } from './stubs'; +import { Usage } from '../src/usage'; +import * as protocol from '../src/types/protocol'; +import { setDefaultModelProvider } from '../src/providers'; + class TestExporter implements TracingExporter { public exported: Array<(Trace | Span)[]> = []; @@ -259,6 +266,97 @@ describe('withTrace & span helpers (integration)', () => { expect(startedIds).toContain(capturedSpanId); expect(endedIds).toContain(capturedSpanId); }); + + it('streaming run waits for stream loop to complete before calling onTraceEnd', async () => { + // Set up model provider + setDefaultModelProvider(new FakeModelProvider()); + + const traceStartTimes: number[] = []; + const traceEndTimes: number[] = []; + const spanEndTimes: number[] = []; + + class OrderTrackingProcessor implements TracingProcessor { + async onTraceStart(_trace: Trace): Promise { + traceStartTimes.push(Date.now()); + } + async onTraceEnd(_trace: Trace): Promise { + traceEndTimes.push(Date.now()); + } + async onSpanStart(_span: Span): Promise { + // noop + } + async onSpanEnd(_span: Span): Promise { + spanEndTimes.push(Date.now()); + } + async shutdown(): Promise { + /* noop */ + } + async forceFlush(): Promise { + /* noop */ + } + } + + const orderProcessor = new OrderTrackingProcessor(); + setTraceProcessors([orderProcessor]); + + // Create a fake model that supports streaming + class StreamingFakeModel extends FakeModel { + async *getStreamedResponse( + _request: any, + ): AsyncIterable { + const response = await this.getResponse(_request); + yield { + type: 'response_done', + response: { + id: 'resp-1', + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: response.output, + }, + } as any; + } + } + + const agent = new Agent({ + name: 'TestAgent', + model: new StreamingFakeModel([ + { + output: [fakeModelMessage('Final output')], + usage: new Usage(), + }, + ]), + }); + + const runner = new Runner({ + tracingDisabled: false, + }); + + // Run with streaming + const result = await runner.run(agent, 'test input', { stream: true }); + + // Consume the stream + for await (const _event of result) { + // consume all events + } + + // Wait for completion + await result.completed; + + // onTraceEnd should be called after all spans have ended + expect(traceStartTimes.length).toBe(1); + expect(traceEndTimes.length).toBe(1); + expect(spanEndTimes.length).toBeGreaterThan(0); + + // The trace should end after all spans have ended + const lastSpanEndTime = Math.max(...spanEndTimes); + const traceEndTime = traceEndTimes[0]; + + expect(traceEndTime).toBeGreaterThanOrEqual(lastSpanEndTime); + }); }); // -----------------------------------------------------------------------------------------