Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/suppress-barge-in-errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

fix(barge-in): suppress session-level barge-in errors.
2 changes: 2 additions & 0 deletions agents/src/inference/interruption/interruption_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export class InterruptionStreamBase {
threshold: number;
minFrames: number;
timeout: number;
connectTimeout: number;
maxRetries: number;
};

Expand All @@ -130,6 +131,7 @@ export class InterruptionStreamBase {
threshold: this.options.threshold,
minFrames: this.options.minFrames,
timeout: this.options.inferenceTimeout,
connectTimeout: this.apiOptions.timeout,
maxRetries: this.apiOptions.maxRetries,
};

Expand Down
3 changes: 2 additions & 1 deletion agents/src/inference/interruption/ws_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export interface WsTransportOptions {
threshold: number;
minFrames: number;
timeout: number;
connectTimeout: number;
maxRetries?: number;
}

Expand Down Expand Up @@ -92,7 +93,7 @@ async function connectWebSocket(
options: { retryable: false },
}),
);
}, options.timeout);
}, options.connectTimeout);
ws.once('open', () => {
clearTimeout(timeout);
resolve();
Expand Down
17 changes: 7 additions & 10 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,9 @@ export class AgentActivity implements RecognitionHooks {
};

private readonly onInterruptionError = (ev: InterruptionDetectionError): void => {
const errorEvent = createErrorEvent(ev, this.interruptionDetector);
this.agentSession.emit(AgentSessionEventTypes.Error, errorEvent);

if (!ev.recoverable) {
this.agentSession._onError(ev);
this.fallbackToVadInterruption();
return;
this.fallbackToVadInterruption(ev);
}

this.agentSession._onError(ev);
};

/** @internal */
Expand Down Expand Up @@ -3656,7 +3649,7 @@ export class AgentActivity implements RecognitionHooks {
this.isInterruptionByAudioActivityEnabled = this.isDefaultInterruptionByAudioActivityEnabled;
}

private fallbackToVadInterruption(): void {
private fallbackToVadInterruption(error?: InterruptionDetectionError): void {
if (!this.isInterruptionDetectionEnabled) return;

this.isInterruptionDetectionEnabled = false;
Expand All @@ -3675,7 +3668,11 @@ export class AgentActivity implements RecognitionHooks {
});
}

this.logger.warn(
this.logger.info(
{
error: error?.message,
label: error?.label,
},
'adaptive interruption disabled due to unrecoverable error, falling back to VAD-based interruption',
);
}
Expand Down
24 changes: 3 additions & 21 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
type STTModelString,
type TTSModelString,
} from '../inference/index.js';
import type { InterruptionDetectionError } from '../inference/interruption/errors.js';
import type { OverlappingSpeechEvent } from '../inference/interruption/types.js';
import { getJobContext } from '../job.js';
import type { FunctionCall, FunctionCallOutput } from '../llm/chat_context.js';
Expand Down Expand Up @@ -1050,9 +1049,7 @@ export class AgentSession<
}

/** @internal */
_onError(
error: RealtimeModelError | STTError | TTSError | LLMError | InterruptionDetectionError,
): void {
_onError(error: RealtimeModelError | STTError | TTSError | LLMError): void {
if (this.closingTask || error.recoverable) {
return;
}
Expand All @@ -1068,9 +1065,6 @@ export class AgentSession<
if (this.ttsErrorCounts <= this._connOptions.maxUnrecoverableErrors) {
return;
}
} else if (error.type === 'interruption_detection_error') {
this.logger.error(error.toString());
return;
}

this.logger.error(error, 'AgentSession is closing due to an unrecoverable error');
Expand Down Expand Up @@ -1264,13 +1258,7 @@ export class AgentSession<

private async closeImpl(
reason: ShutdownReason,
error:
| RealtimeModelError
| LLMError
| TTSError
| STTError
| InterruptionDetectionError
| null = null,
error: RealtimeModelError | LLMError | TTSError | STTError | null = null,
drain: boolean = false,
): Promise<void> {
if (this.rootSpanContext) {
Expand All @@ -1284,13 +1272,7 @@ export class AgentSession<

private async closeImplInner(
reason: ShutdownReason,
error:
| RealtimeModelError
| LLMError
| TTSError
| STTError
| InterruptionDetectionError
| null = null,
error: RealtimeModelError | LLMError | TTSError | STTError | null = null,
drain: boolean = false,
): Promise<void> {
if (!this.started) {
Expand Down
52 changes: 33 additions & 19 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ export class AudioRecognition {
await this.interruptionStreamChannel?.close();
this.interruptionStreamChannel = undefined;
this.cancelBackchannelBoundary();
await this.flushHeldTranscripts(0, true);
}

/**
Expand Down Expand Up @@ -527,22 +528,32 @@ export class AudioRecognition {
}

/**
* Flush held transcripts whose *end time* is after the
* `ignoreUserTranscriptUntil - cooldown` timestamp. If the event has no timestamps, we
* assume it is the same as the next valid event.
* Flush held transcripts. When `force` is true, all buffered events are emitted during
* interruption-detector teardown because ignore-window gating can no longer be trusted.
* Otherwise, only transcripts whose end time is after `ignoreUserTranscriptUntil - cooldown`
* are emitted. Events without timestamps are treated as the next valid event.
*/
private async flushHeldTranscripts(cooldown: number = 0) {
private async flushHeldTranscripts(cooldown: number = 0, force = false) {
if (this.transcriptBuffer.length === 0) {
this.resetInterruptionDetection();
return;
}

if (force) {
const eventsToEmit = [...this.transcriptBuffer];
this.resetInterruptionDetection();
for (const event of eventsToEmit) {
await this.onSTTEvent(event);
}
return;
}

if (
!this.isInterruptionEnabled ||
this.ignoreUserTranscriptUntil === undefined ||
this.transcriptBuffer.length === 0
this._inputStartedAt === undefined
) {
return;
}

if (!this._inputStartedAt) {
this.transcriptBuffer = [];
this.ignoreUserTranscriptUntil = undefined;
this.resetInterruptionDetection();
return;
}

Expand All @@ -560,8 +571,7 @@ export class AudioRecognition {
firstAlternative.startTime === firstAlternative.endTime &&
firstAlternative.startTime === 0
) {
this.transcriptBuffer = [];
this.ignoreUserTranscriptUntil = undefined;
this.resetInterruptionDetection();
return;
}

Expand All @@ -581,8 +591,7 @@ export class AudioRecognition {
// the value the holding decision was made against.
const prevIgnoreUserTranscriptUntil = this.ignoreUserTranscriptUntil;
const prevInputStartedAt = this._inputStartedAt;
this.transcriptBuffer = [];
this.ignoreUserTranscriptUntil = undefined;
this.resetInterruptionDetection();

for (const event of eventsToEmit) {
let addedDelay = 0;
Expand All @@ -605,24 +614,29 @@ export class AudioRecognition {
{ event: event.type, cooldown, addedDelay },
're-emitting held user transcript',
);
this.onSTTEvent(event);
await this.onSTTEvent(event);
}
}

private resetInterruptionDetection(): void {
this.transcriptBuffer = [];
this.ignoreUserTranscriptUntil = undefined;
}

#alternativeEndsBeforeIgnoreWindow(
alternative: NonNullable<SpeechEvent['alternatives']>[number],
): boolean {
if (
this.ignoreUserTranscriptUntil === undefined ||
!this._inputStartedAt ||
alternative.startTime <= 0
alternative.endTime <= 0
) {
return false;
}

// `SpeechData.startTime` is in seconds relative to audio start, while `inputStartedAt` and
// `SpeechData.endTime` is in seconds relative to audio start, while `inputStartedAt` and
// `ignoreUserTranscriptUntil` are epoch milliseconds.
return alternative.startTime * 1000 + this._inputStartedAt < this.ignoreUserTranscriptUntil;
return alternative.endTime * 1000 + this._inputStartedAt < this.ignoreUserTranscriptUntil;
}

private shouldHoldSttEvent(ev: SpeechEvent): boolean {
Expand Down
Loading