From 0286398290548807fcbb94b88d2c6c9c58041174 Mon Sep 17 00:00:00 2001 From: Dominik Kundel Date: Wed, 4 Jun 2025 12:39:00 -0700 Subject: [PATCH 1/3] fix(realtime): send cancel before audio starts --- packages/agents-realtime/src/openaiRealtimeWebsocket.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts index fa0cd2be..a9b4deb0 100644 --- a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts +++ b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts @@ -328,6 +328,7 @@ export class OpenAIRealtimeWebSocket this.sendEvent({ type: 'response.cancel', }); + this.#ongoingResponse = false; } } @@ -360,12 +361,11 @@ export class OpenAIRealtimeWebSocket * based on an event in the client. */ interrupt() { + this._cancelResponse(); if (!this.#currentItemId || typeof this._firstAudioTimestamp !== 'number') { return; } - this._cancelResponse(); - const elapsedTime = Date.now() - this._firstAudioTimestamp; console.log(`Interrupting response after ${elapsedTime}ms`); console.log(`Audio length: ${this._audioLengthMs}ms`); From 8f1a6efb51e49f693752ce8923aa7ed38e9f41b5 Mon Sep 17 00:00:00 2001 From: Dominik Kundel Date: Thu, 5 Jun 2025 10:40:01 -0700 Subject: [PATCH 2/3] fix guardrails --- .changeset/wise-results-mate.md | 5 ++ .../realtime-next/src/app/websocket/page.tsx | 18 +++++++ .../agents-realtime/src/openaiRealtimeBase.ts | 1 + .../src/openaiRealtimeEvents.ts | 25 +++++---- .../src/openaiRealtimeWebRtc.ts | 2 + .../src/openaiRealtimeWebsocket.ts | 5 +- .../agents-realtime/src/realtimeSession.ts | 51 +++++++++++-------- .../src/transportLayerEvents.ts | 1 + .../test/realtimeSession.test.ts | 12 ++++- 9 files changed, 84 insertions(+), 36 deletions(-) create mode 100644 .changeset/wise-results-mate.md diff --git a/.changeset/wise-results-mate.md b/.changeset/wise-results-mate.md new file mode 100644 index 00000000..e66cc4d8 --- /dev/null +++ b/.changeset/wise-results-mate.md @@ -0,0 +1,5 @@ +--- +'@openai/agents-realtime': patch +--- + +fix: avoid realtime guardrail race condition and detect ongoing response diff --git a/examples/realtime-next/src/app/websocket/page.tsx b/examples/realtime-next/src/app/websocket/page.tsx index de2ad74d..86461ec8 100644 --- a/examples/realtime-next/src/app/websocket/page.tsx +++ b/examples/realtime-next/src/app/websocket/page.tsx @@ -7,6 +7,7 @@ import { TransportEvent, RealtimeItem, OutputGuardrailTripwireTriggered, + RealtimeOutputGuardrail, } from '@openai/agents/realtime'; import { useEffect, useRef, useState } from 'react'; import { z } from 'zod'; @@ -26,6 +27,21 @@ const refundBackchannel = tool({ }, }); +const guardrails: RealtimeOutputGuardrail[] = [ + { + name: 'No mention of Dom', + execute: async ({ agentOutput }) => { + const domInOutput = agentOutput.includes('Dom'); + return { + tripwireTriggered: domInOutput, + outputInfo: { + domInOutput, + }, + }; + }, + }, +]; + const agent = new RealtimeAgent({ name: 'Greeter', instructions: @@ -48,6 +64,7 @@ export default function Home() { useEffect(() => { session.current = new RealtimeSession(agent, { transport: 'websocket', + outputGuardrails: guardrails, }); recorder.current = new WavRecorder({ sampleRate: 24000 }); player.current = new WavStreamPlayer({ sampleRate: 24000 }); @@ -87,6 +104,7 @@ export default function Home() { async function connect() { if (isConnected) { await session.current?.close(); + await player.current?.interrupt(); await recorder.current?.end(); setIsConnected(false); } else { diff --git a/packages/agents-realtime/src/openaiRealtimeBase.ts b/packages/agents-realtime/src/openaiRealtimeBase.ts index 8f6bcc2e..a73320ee 100644 --- a/packages/agents-realtime/src/openaiRealtimeBase.ts +++ b/packages/agents-realtime/src/openaiRealtimeBase.ts @@ -256,6 +256,7 @@ export abstract class OpenAIRealtimeBase type: 'transcript_delta', delta: parsed.delta, itemId: parsed.item_id, + responseId: parsed.response_id, }); } // no support for partial transcripts yet. diff --git a/packages/agents-realtime/src/openaiRealtimeEvents.ts b/packages/agents-realtime/src/openaiRealtimeEvents.ts index 412c9507..fcb378c1 100644 --- a/packages/agents-realtime/src/openaiRealtimeEvents.ts +++ b/packages/agents-realtime/src/openaiRealtimeEvents.ts @@ -6,15 +6,18 @@ import type { MessageEvent as WebSocketMessageEvent } from 'ws'; // provide better runtime validation when parsing events from the server. export const realtimeResponse = z.object({ - id: z.string().optional(), - conversation_id: z.string().optional(), - max_output_tokens: z.number().or(z.literal('inf')).optional(), + id: z.string().optional().nullable(), + conversation_id: z.string().optional().nullable(), + max_output_tokens: z.number().or(z.literal('inf')).optional().nullable(), metadata: z.record(z.string(), z.any()).optional().nullable(), - modalities: z.array(z.string()).optional(), - object: z.literal('realtime.response').optional(), - output: z.array(z.any()).optional(), - output_audio_format: z.string().optional(), - status: z.enum(['completed', 'incomplete', 'failed', 'cancelled']).optional(), + modalities: z.array(z.string()).optional().nullable(), + object: z.literal('realtime.response').optional().nullable(), + output: z.array(z.any()).optional().nullable(), + output_audio_format: z.string().optional().nullable(), + status: z + .enum(['completed', 'incomplete', 'failed', 'cancelled', 'in_progress']) + .optional() + .nullable(), status_details: z.record(z.string(), z.any()).optional().nullable(), usage: z .object({ @@ -26,8 +29,9 @@ export const realtimeResponse = z.object({ .optional() .nullable(), }) - .optional(), - voice: z.string().optional(), + .optional() + .nullable(), + voice: z.string().optional().nullable(), }); // Basic content schema used by ConversationItem. @@ -315,7 +319,6 @@ export const responseDoneEventSchema = z.object({ type: z.literal('response.done'), event_id: z.string(), response: realtimeResponse, - test: z.boolean(), }); export const responseFunctionCallArgumentsDeltaEventSchema = z.object({ diff --git a/packages/agents-realtime/src/openaiRealtimeWebRtc.ts b/packages/agents-realtime/src/openaiRealtimeWebRtc.ts index fbe34aea..678640f6 100644 --- a/packages/agents-realtime/src/openaiRealtimeWebRtc.ts +++ b/packages/agents-realtime/src/openaiRealtimeWebRtc.ts @@ -196,6 +196,7 @@ export class OpenAIRealtimeWebRTC if (!parsed || isGeneric) { return; } + if (parsed.type === 'response.created') { this.#ongoingResponse = true; } else if (parsed.type === 'response.done') { @@ -334,6 +335,7 @@ export class OpenAIRealtimeWebRTC this.sendEvent({ type: 'response.cancel', }); + this.#ongoingResponse = false; } this.sendEvent({ diff --git a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts index a9b4deb0..2e2b9403 100644 --- a/packages/agents-realtime/src/openaiRealtimeWebsocket.ts +++ b/packages/agents-realtime/src/openaiRealtimeWebsocket.ts @@ -361,14 +361,13 @@ export class OpenAIRealtimeWebSocket * based on an event in the client. */ interrupt() { - this._cancelResponse(); if (!this.#currentItemId || typeof this._firstAudioTimestamp !== 'number') { return; } + this._cancelResponse(); + const elapsedTime = Date.now() - this._firstAudioTimestamp; - console.log(`Interrupting response after ${elapsedTime}ms`); - console.log(`Audio length: ${this._audioLengthMs}ms`); if (elapsedTime >= 0 && elapsedTime < this._audioLengthMs) { this._interrupt(elapsedTime); } diff --git a/packages/agents-realtime/src/realtimeSession.ts b/packages/agents-realtime/src/realtimeSession.ts index 81dc1723..f168d1a6 100644 --- a/packages/agents-realtime/src/realtimeSession.ts +++ b/packages/agents-realtime/src/realtimeSession.ts @@ -180,6 +180,7 @@ export class RealtimeSession< #transcribedTextDeltas: Record = {}; #history: RealtimeItem[] = []; #shouldIncludeAudioData: boolean; + #interruptedByGuardrail: Record = {}; constructor( public readonly initialAgent: @@ -446,7 +447,7 @@ export class RealtimeSession< } } - async #runOutputGuardrails(output: string) { + async #runOutputGuardrails(output: string, responseId: string) { if (this.#outputGuardrails.length === 0) { return; } @@ -460,24 +461,32 @@ export class RealtimeSession< this.#outputGuardrails.map((guardrail) => guardrail.run(guardrailArgs)), ); - for (const result of results) { - if (result.output.tripwireTriggered) { - const error = new OutputGuardrailTripwireTriggered( - `Output guardrail triggered: ${JSON.stringify(result.output.outputInfo)}`, - result, - ); - this.emit( - 'guardrail_tripped', - this.#context, - this.#currentAgent, - error, - ); - this.interrupt(); - - const feedbackText = getRealtimeGuardrailFeedbackMessage(result); - this.sendMessage(feedbackText); - break; + const firstTripwireTriggered = results.find( + (result) => result.output.tripwireTriggered, + ); + if (firstTripwireTriggered) { + // this ensures that if one guardrail already trips and we are in the middle of another + // guardrail run, we don't trip again + console.log( + 'interruptedByGuardrail', + this.#interruptedByGuardrail[responseId], + ); + if (this.#interruptedByGuardrail[responseId]) { + return; } + this.#interruptedByGuardrail[responseId] = true; + const error = new OutputGuardrailTripwireTriggered( + `Output guardrail triggered: ${JSON.stringify(firstTripwireTriggered.output.outputInfo)}`, + firstTripwireTriggered, + ); + this.emit('guardrail_tripped', this.#context, this.#currentAgent, error); + this.interrupt(); + + const feedbackText = getRealtimeGuardrailFeedbackMessage( + firstTripwireTriggered, + ); + this.sendMessage(feedbackText); + return; } } @@ -498,7 +507,7 @@ export class RealtimeSession< this.emit('agent_end', this.#context, this.#currentAgent, textOutput); this.#currentAgent.emit('agent_end', this.#context, textOutput); - this.#runOutputGuardrails(textOutput); + this.#runOutputGuardrails(textOutput, event.response.id); }); this.#transport.on('audio_done', () => { @@ -511,6 +520,7 @@ export class RealtimeSession< try { const delta = event.delta; const itemId = event.itemId; + const responseId = event.responseId; if (lastItemId !== itemId) { lastItemId = itemId; lastRunIndex = 0; @@ -531,7 +541,7 @@ export class RealtimeSession< // We don't cancel existing runs because we want the first one to fail to fail // The transport layer should upon failure handle the interruption and stop the model // from generating further - this.#runOutputGuardrails(newText); + this.#runOutputGuardrails(newText, responseId); } } catch (err) { this.emit('error', { @@ -672,6 +682,7 @@ export class RealtimeSession< * Disconnect from the session. */ close() { + this.#interruptedByGuardrail = {}; this.#transport.close(); } diff --git a/packages/agents-realtime/src/transportLayerEvents.ts b/packages/agents-realtime/src/transportLayerEvents.ts index d7334267..581770c1 100644 --- a/packages/agents-realtime/src/transportLayerEvents.ts +++ b/packages/agents-realtime/src/transportLayerEvents.ts @@ -34,6 +34,7 @@ export type TransportLayerTranscriptDelta = { type: 'transcript_delta'; itemId: string; delta: string; + responseId: string; }; export type TransportLayerResponseCompleted = diff --git a/packages/agents-realtime/test/realtimeSession.test.ts b/packages/agents-realtime/test/realtimeSession.test.ts index 0444c3f7..d548d32f 100644 --- a/packages/agents-realtime/test/realtimeSession.test.ts +++ b/packages/agents-realtime/test/realtimeSession.test.ts @@ -171,8 +171,16 @@ describe('RealtimeSession', () => { outputGuardrailSettings: { debounceTextLength: 1 }, }); await s.connect({ apiKey: 'test' }); - t.emit('audio_transcript_delta', { delta: 'a', itemId: '1' } as any); - t.emit('audio_transcript_delta', { delta: 'a', itemId: '2' } as any); + t.emit('audio_transcript_delta', { + delta: 'a', + itemId: '1', + responseId: 'z', + } as any); + t.emit('audio_transcript_delta', { + delta: 'a', + itemId: '2', + responseId: 'z', + } as any); await vi.waitFor(() => expect(runMock).toHaveBeenCalledTimes(2)); vi.restoreAllMocks(); }); From 5f7cf58a7f4bf514fd10a37042473d11599d3b38 Mon Sep 17 00:00:00 2001 From: Dominik Kundel Date: Thu, 5 Jun 2025 10:43:02 -0700 Subject: [PATCH 3/3] remove logging --- packages/agents-realtime/src/realtimeSession.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/agents-realtime/src/realtimeSession.ts b/packages/agents-realtime/src/realtimeSession.ts index f168d1a6..b8b637be 100644 --- a/packages/agents-realtime/src/realtimeSession.ts +++ b/packages/agents-realtime/src/realtimeSession.ts @@ -467,10 +467,6 @@ export class RealtimeSession< if (firstTripwireTriggered) { // this ensures that if one guardrail already trips and we are in the middle of another // guardrail run, we don't trip again - console.log( - 'interruptedByGuardrail', - this.#interruptedByGuardrail[responseId], - ); if (this.#interruptedByGuardrail[responseId]) { return; }