diff --git a/.changeset/fix-streaming-agent-end-lifecycle.md b/.changeset/fix-streaming-agent-end-lifecycle.md new file mode 100644 index 00000000..5b268c86 --- /dev/null +++ b/.changeset/fix-streaming-agent-end-lifecycle.md @@ -0,0 +1,5 @@ +--- +'@openai/agents-core': patch +--- + +Fix #371 streaming agents not calling agent_end lifecycle hook \ No newline at end of file diff --git a/packages/agents-core/src/run.ts b/packages/agents-core/src/run.ts index 987fded3..3440be76 100644 --- a/packages/agents-core/src/run.ts +++ b/packages/agents-core/src/run.ts @@ -832,6 +832,17 @@ export class Runner extends RunHooks> { result.state, result.state._currentStep.output, ); + this.emit( + 'agent_end', + result.state._context, + currentAgent, + result.state._currentStep.output, + ); + currentAgent.emit( + 'agent_end', + result.state._context, + result.state._currentStep.output, + ); return; } else if ( result.state._currentStep.type === 'next_step_interruption' diff --git a/packages/agents-core/test/run.stream.test.ts b/packages/agents-core/test/run.stream.test.ts index f7b29f86..94d0186b 100644 --- a/packages/agents-core/test/run.stream.test.ts +++ b/packages/agents-core/test/run.stream.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect, beforeAll } from 'vitest'; import { Agent, run, + Runner, setDefaultModelProvider, setTracingDisabled, Usage, @@ -115,4 +116,67 @@ describe('Runner.run (streaming)', () => { ); expect(update?.agent).toBe(agentB); }); + + it('emits agent_end lifecycle event for streaming agents', async () => { + class SimpleStreamingModel implements Model { + constructor(private resp: ModelResponse) {} + async getResponse(_req: ModelRequest): Promise { + return this.resp; + } + async *getStreamedResponse(): AsyncIterable { + yield { + type: 'response_done', + response: { + id: 'r', + usage: { + requests: 1, + inputTokens: 0, + outputTokens: 0, + totalTokens: 0, + }, + output: this.resp.output, + }, + } as any; + } + } + + const agent = new Agent({ + name: 'TestAgent', + model: new SimpleStreamingModel({ + output: [fakeModelMessage('Final output')], + usage: new Usage(), + }), + }); + + // Track agent_end events on both the agent and runner + const agentEndEvents: Array<{ context: any; output: string }> = []; + const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = []; + + agent.on('agent_end', (context, output) => { + agentEndEvents.push({ context, output }); + }); + + // Create a runner instance to listen for events + const runner = new Runner(); + runner.on('agent_end', (context, agent, output) => { + runnerEndEvents.push({ context, agent, output }); + }); + + const result = await runner.run(agent, 'test input', { stream: true }); + + // Consume the stream + const events: RunStreamEvent[] = []; + for await (const e of result.toStream()) { + events.push(e); + } + await result.completed; + + // Verify agent_end was called on both agent and runner + expect(agentEndEvents).toHaveLength(1); + expect(agentEndEvents[0].output).toBe('Final output'); + + expect(runnerEndEvents).toHaveLength(1); + expect(runnerEndEvents[0].agent).toBe(agent); + expect(runnerEndEvents[0].output).toBe('Final output'); + }); }); diff --git a/packages/agents-core/test/run.test.ts b/packages/agents-core/test/run.test.ts index 7cdd1a54..ccba70fb 100644 --- a/packages/agents-core/test/run.test.ts +++ b/packages/agents-core/test/run.test.ts @@ -140,6 +140,38 @@ describe('Runner.run', () => { await expect(run(agent, 'fail')).rejects.toThrow('No response found'); }); + + it('emits agent_end lifecycle event for non-streaming agents', async () => { + const agent = new Agent({ + name: 'TestAgent', + }); + + // Track agent_end events on both the agent and runner + const agentEndEvents: Array<{ context: any; output: string }> = []; + const runnerEndEvents: Array<{ context: any; agent: any; output: string }> = []; + + agent.on('agent_end', (context, output) => { + agentEndEvents.push({ context, output }); + }); + + const runner = new Runner(); + runner.on('agent_end', (context, agent, output) => { + runnerEndEvents.push({ context, agent, output }); + }); + + const result = await runner.run(agent, 'test input'); + + // Verify the result has the expected output + expect(result.finalOutput).toBe('Hello World'); + + // Verify agent_end was called on both agent and runner + expect(agentEndEvents).toHaveLength(1); + expect(agentEndEvents[0].output).toBe('Hello World'); + + expect(runnerEndEvents).toHaveLength(1); + expect(runnerEndEvents[0].agent).toBe(agent); + expect(runnerEndEvents[0].output).toBe('Hello World'); + }); }); describe('additional scenarios', () => { @@ -375,7 +407,7 @@ describe('Runner.run', () => { usage: new Usage(), }; class SimpleStreamingModel implements Model { - constructor(private resps: ModelResponse[]) {} + constructor(private resps: ModelResponse[]) { } async getResponse(_req: ModelRequest): Promise { const r = this.resps.shift(); if (!r) {