From 0bfdaee1d82dd561150444057c762806d8f48398 Mon Sep 17 00:00:00 2001 From: chenghao-mou Date: Fri, 15 May 2026 13:48:28 +0000 Subject: [PATCH 1/2] fix(barge-in): suppress session-level errors --- .changeset/suppress-barge-in-errors.md | 5 +++ agents/src/voice/agent_activity.ts | 17 ++++----- agents/src/voice/agent_session.ts | 24 ++---------- agents/src/voice/audio_recognition.ts | 52 ++++++++++++++++---------- 4 files changed, 48 insertions(+), 50 deletions(-) create mode 100644 .changeset/suppress-barge-in-errors.md diff --git a/.changeset/suppress-barge-in-errors.md b/.changeset/suppress-barge-in-errors.md new file mode 100644 index 000000000..e90ce821c --- /dev/null +++ b/.changeset/suppress-barge-in-errors.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +fix(barge-in): suppress session-level barge-in errors. diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index e53f39b14..f2afae247 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -258,16 +258,9 @@ export class AgentActivity implements RecognitionHooks { }; private readonly onInterruptionError = (ev: InterruptionDetectionError): void => { - const errorEvent = createErrorEvent(ev, this.interruptionDetector); - this.agentSession.emit(AgentSessionEventTypes.Error, errorEvent); - if (!ev.recoverable) { - this.agentSession._onError(ev); - this.fallbackToVadInterruption(); - return; + this.fallbackToVadInterruption(ev); } - - this.agentSession._onError(ev); }; /** @internal */ @@ -3656,7 +3649,7 @@ export class AgentActivity implements RecognitionHooks { this.isInterruptionByAudioActivityEnabled = this.isDefaultInterruptionByAudioActivityEnabled; } - private fallbackToVadInterruption(): void { + private fallbackToVadInterruption(error?: InterruptionDetectionError): void { if (!this.isInterruptionDetectionEnabled) return; this.isInterruptionDetectionEnabled = false; @@ -3675,7 +3668,11 @@ export class AgentActivity implements RecognitionHooks { }); } - this.logger.warn( + this.logger.info( + { + error: error?.message, + label: error?.label, + }, 'adaptive interruption disabled due to unrecoverable error, falling back to VAD-based interruption', ); } diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 8e67dc85c..33920015f 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -18,7 +18,6 @@ import { type STTModelString, type TTSModelString, } from '../inference/index.js'; -import type { InterruptionDetectionError } from '../inference/interruption/errors.js'; import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; import { getJobContext } from '../job.js'; import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js'; @@ -1050,9 +1049,7 @@ export class AgentSession< } /** @internal */ - _onError( - error: RealtimeModelError | STTError | TTSError | LLMError | InterruptionDetectionError, - ): void { + _onError(error: RealtimeModelError | STTError | TTSError | LLMError): void { if (this.closingTask || error.recoverable) { return; } @@ -1068,9 +1065,6 @@ export class AgentSession< if (this.ttsErrorCounts <= this._connOptions.maxUnrecoverableErrors) { return; } - } else if (error.type === 'interruption_detection_error') { - this.logger.error(error.toString()); - return; } this.logger.error(error, 'AgentSession is closing due to an unrecoverable error'); @@ -1264,13 +1258,7 @@ export class AgentSession< private async closeImpl( reason: ShutdownReason, - error: - | RealtimeModelError - | LLMError - | TTSError - | STTError - | InterruptionDetectionError - | null = null, + error: RealtimeModelError | LLMError | TTSError | STTError | null = null, drain: boolean = false, ): Promise { if (this.rootSpanContext) { @@ -1284,13 +1272,7 @@ export class AgentSession< private async closeImplInner( reason: ShutdownReason, - error: - | RealtimeModelError - | LLMError - | TTSError - | STTError - | InterruptionDetectionError - | null = null, + error: RealtimeModelError | LLMError | TTSError | STTError | null = null, drain: boolean = false, ): Promise { if (!this.started) { diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 894ca9d9f..3d2e03b21 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -411,6 +411,7 @@ export class AudioRecognition { await this.interruptionStreamChannel?.close(); this.interruptionStreamChannel = undefined; this.cancelBackchannelBoundary(); + await this.flushHeldTranscripts(0, true); } /** @@ -527,22 +528,32 @@ export class AudioRecognition { } /** - * Flush held transcripts whose *end time* is after the - * `ignoreUserTranscriptUntil - cooldown` timestamp. If the event has no timestamps, we - * assume it is the same as the next valid event. + * Flush held transcripts. When `force` is true, all buffered events are emitted during + * interruption-detector teardown because ignore-window gating can no longer be trusted. + * Otherwise, only transcripts whose end time is after `ignoreUserTranscriptUntil - cooldown` + * are emitted. Events without timestamps are treated as the next valid event. */ - private async flushHeldTranscripts(cooldown: number = 0) { + private async flushHeldTranscripts(cooldown: number = 0, force = false) { + if (this.transcriptBuffer.length === 0) { + this.resetInterruptionDetection(); + return; + } + + if (force) { + const eventsToEmit = [...this.transcriptBuffer]; + this.resetInterruptionDetection(); + for (const event of eventsToEmit) { + await this.onSTTEvent(event); + } + return; + } + if ( !this.isInterruptionEnabled || this.ignoreUserTranscriptUntil === undefined || - this.transcriptBuffer.length === 0 + this._inputStartedAt === undefined ) { - return; - } - - if (!this._inputStartedAt) { - this.transcriptBuffer = []; - this.ignoreUserTranscriptUntil = undefined; + this.resetInterruptionDetection(); return; } @@ -560,8 +571,7 @@ export class AudioRecognition { firstAlternative.startTime === firstAlternative.endTime && firstAlternative.startTime === 0 ) { - this.transcriptBuffer = []; - this.ignoreUserTranscriptUntil = undefined; + this.resetInterruptionDetection(); return; } @@ -581,8 +591,7 @@ export class AudioRecognition { // the value the holding decision was made against. const prevIgnoreUserTranscriptUntil = this.ignoreUserTranscriptUntil; const prevInputStartedAt = this._inputStartedAt; - this.transcriptBuffer = []; - this.ignoreUserTranscriptUntil = undefined; + this.resetInterruptionDetection(); for (const event of eventsToEmit) { let addedDelay = 0; @@ -605,24 +614,29 @@ export class AudioRecognition { { event: event.type, cooldown, addedDelay }, 're-emitting held user transcript', ); - this.onSTTEvent(event); + await this.onSTTEvent(event); } } + private resetInterruptionDetection(): void { + this.transcriptBuffer = []; + this.ignoreUserTranscriptUntil = undefined; + } + #alternativeEndsBeforeIgnoreWindow( alternative: NonNullable[number], ): boolean { if ( this.ignoreUserTranscriptUntil === undefined || !this._inputStartedAt || - alternative.startTime <= 0 + alternative.endTime <= 0 ) { return false; } - // `SpeechData.startTime` is in seconds relative to audio start, while `inputStartedAt` and + // `SpeechData.endTime` is in seconds relative to audio start, while `inputStartedAt` and // `ignoreUserTranscriptUntil` are epoch milliseconds. - return alternative.startTime * 1000 + this._inputStartedAt < this.ignoreUserTranscriptUntil; + return alternative.endTime * 1000 + this._inputStartedAt < this.ignoreUserTranscriptUntil; } private shouldHoldSttEvent(ev: SpeechEvent): boolean { From 91794f803e878f05b3123dd0b84703e80799ec1b Mon Sep 17 00:00:00 2001 From: chenghao-mou Date: Sun, 17 May 2026 13:30:04 +0000 Subject: [PATCH 2/2] fix(barge-in): separate ws connect timeout --- agents/src/inference/interruption/interruption_stream.ts | 2 ++ agents/src/inference/interruption/ws_transport.ts | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/agents/src/inference/interruption/interruption_stream.ts b/agents/src/inference/interruption/interruption_stream.ts index b09547076..37bd2f201 100644 --- a/agents/src/inference/interruption/interruption_stream.ts +++ b/agents/src/inference/interruption/interruption_stream.ts @@ -108,6 +108,7 @@ export class InterruptionStreamBase { threshold: number; minFrames: number; timeout: number; + connectTimeout: number; maxRetries: number; }; @@ -130,6 +131,7 @@ export class InterruptionStreamBase { threshold: this.options.threshold, minFrames: this.options.minFrames, timeout: this.options.inferenceTimeout, + connectTimeout: this.apiOptions.timeout, maxRetries: this.apiOptions.maxRetries, }; diff --git a/agents/src/inference/interruption/ws_transport.ts b/agents/src/inference/interruption/ws_transport.ts index 2e99187d3..4b44749a1 100644 --- a/agents/src/inference/interruption/ws_transport.ts +++ b/agents/src/inference/interruption/ws_transport.ts @@ -29,6 +29,7 @@ export interface WsTransportOptions { threshold: number; minFrames: number; timeout: number; + connectTimeout: number; maxRetries?: number; } @@ -92,7 +93,7 @@ async function connectWebSocket( options: { retryable: false }, }), ); - }, options.timeout); + }, options.connectTimeout); ws.once('open', () => { clearTimeout(timeout); resolve();