Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent
from agents.realtime.config import RealtimeUserInputMessage
from agents.realtime.items import RealtimeItem
from agents.realtime.model import RealtimeModelConfig
from agents.realtime.model_inputs import RealtimeModelSendRawMessage

# Import TwilioHandler class - handle both module and package use cases
Expand Down Expand Up @@ -45,7 +47,18 @@ async def connect(self, websocket: WebSocket, session_id: str):

agent = get_starting_agent()
runner = RealtimeRunner(agent)
session_context = await runner.run()
model_config: RealtimeModelConfig = {
"initial_model_settings": {
"turn_detection": {
"type": "server_vad",
"prefix_padding_ms": 300,
"silence_duration_ms": 500,
"interrupt_response": True,
"create_response": True,
},
},
}
session_context = await runner.run(model_config=model_config)
session = await session_context.__aenter__()
self.active_sessions[session_id] = session
self.session_contexts[session_id] = session_context
Expand Down Expand Up @@ -103,8 +116,26 @@ async def _process_events(self, session_id: str):
event_data = await self._serialize_event(event)
await websocket.send_text(json.dumps(event_data))
except Exception as e:
print(e)
logger.error(f"Error processing events for session {session_id}: {e}")

def _sanitize_history_item(self, item: RealtimeItem) -> dict[str, Any]:
"""Remove large binary payloads from history items while keeping transcripts."""
item_dict = item.model_dump()
content = item_dict.get("content")
if isinstance(content, list):
sanitized_content: list[Any] = []
for part in content:
if isinstance(part, dict):
sanitized_part = part.copy()
if sanitized_part.get("type") in {"audio", "input_audio"}:
sanitized_part.pop("audio", None)
sanitized_content.append(sanitized_part)
else:
sanitized_content.append(part)
item_dict["content"] = sanitized_content
return item_dict

async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
base_event: dict[str, Any] = {
"type": event.type,
Expand All @@ -129,11 +160,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
elif event.type == "audio_end":
pass
elif event.type == "history_updated":
base_event["history"] = [item.model_dump(mode="json") for item in event.history]
base_event["history"] = [self._sanitize_history_item(item) for item in event.history]
elif event.type == "history_added":
# Provide the added item so the UI can render incrementally.
try:
base_event["item"] = event.item.model_dump(mode="json")
base_event["item"] = self._sanitize_history_item(event.item)
except Exception:
base_event["item"] = None
elif event.type == "guardrail_tripped":
Expand Down
245 changes: 116 additions & 129 deletions examples/realtime/app/static/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ class RealtimeDemo {
this.isMuted = false;
this.isCapturing = false;
this.audioContext = null;
this.processor = null;
this.captureSource = null;
this.captureNode = null;
this.stream = null;
this.sessionId = this.generateSessionId();

// Audio playback queue
this.audioQueue = [];
this.isPlayingAudio = false;
this.playbackAudioContext = null;
this.currentAudioSource = null;
this.currentAudioGain = null; // per-chunk gain for smooth fades
this.playbackNode = null;
this.playbackInitPromise = null;
this.pendingPlaybackChunks = [];
this.playbackFadeSec = 0.02; // ~20ms fade to reduce clicks
this.messageNodes = new Map(); // item_id -> DOM node
this.seenItemIds = new Set(); // item_id set for append-only syncing
Expand Down Expand Up @@ -227,30 +227,35 @@ class RealtimeDemo {
});

this.audioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
const source = this.audioContext.createMediaStreamSource(this.stream);
if (this.audioContext.state === 'suspended') {
try { await this.audioContext.resume(); } catch {}
}

// Create a script processor to capture audio data
this.processor = this.audioContext.createScriptProcessor(4096, 1, 1);
source.connect(this.processor);
this.processor.connect(this.audioContext.destination);
if (!this.audioContext.audioWorklet) {
throw new Error('AudioWorklet API not supported in this browser.');
}

this.processor.onaudioprocess = (event) => {
if (!this.isMuted && this.ws && this.ws.readyState === WebSocket.OPEN) {
const inputBuffer = event.inputBuffer.getChannelData(0);
const int16Buffer = new Int16Array(inputBuffer.length);
await this.audioContext.audioWorklet.addModule('audio-recorder.worklet.js');

// Convert float32 to int16
for (let i = 0; i < inputBuffer.length; i++) {
int16Buffer[i] = Math.max(-32768, Math.min(32767, inputBuffer[i] * 32768));
}
this.captureSource = this.audioContext.createMediaStreamSource(this.stream);
this.captureNode = new AudioWorkletNode(this.audioContext, 'pcm-recorder');

this.ws.send(JSON.stringify({
type: 'audio',
data: Array.from(int16Buffer)
}));
}
this.captureNode.port.onmessage = (event) => {
if (this.isMuted) return;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;

const chunk = event.data instanceof ArrayBuffer ? new Int16Array(event.data) : event.data;
if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) return;

this.ws.send(JSON.stringify({
type: 'audio',
data: Array.from(chunk)
}));
};

this.captureSource.connect(this.captureNode);
this.captureNode.connect(this.audioContext.destination);

this.isCapturing = true;
this.updateMuteUI();

Expand All @@ -264,9 +269,15 @@ class RealtimeDemo {

this.isCapturing = false;

if (this.processor) {
this.processor.disconnect();
this.processor = null;
if (this.captureSource) {
try { this.captureSource.disconnect(); } catch {}
this.captureSource = null;
}

if (this.captureNode) {
this.captureNode.port.onmessage = null;
try { this.captureNode.disconnect(); } catch {}
this.captureNode = null;
}

if (this.audioContext) {
Expand Down Expand Up @@ -544,141 +555,117 @@ class RealtimeDemo {
return;
}

// Add to queue
this.audioQueue.push(audioBase64);

// Start processing queue if not already playing
if (!this.isPlayingAudio) {
this.processAudioQueue();
const int16Array = this.decodeBase64ToInt16(audioBase64);
if (!int16Array || int16Array.length === 0) {
console.warn('Audio chunk has no samples, skipping');
return;
}

this.pendingPlaybackChunks.push(int16Array);
await this.ensurePlaybackNode();
this.flushPendingPlaybackChunks();

} catch (error) {
console.error('Failed to play audio:', error);
this.pendingPlaybackChunks = [];
}
}

async processAudioQueue() {
if (this.isPlayingAudio || this.audioQueue.length === 0) {
async ensurePlaybackNode() {
if (this.playbackNode) {
return;
}

this.isPlayingAudio = true;

// Initialize audio context if needed
if (!this.playbackAudioContext) {
this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
}
if (!this.playbackInitPromise) {
this.playbackInitPromise = (async () => {
if (!this.playbackAudioContext) {
this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
}

// Ensure context is running (autoplay policies can suspend it)
if (this.playbackAudioContext.state === 'suspended') {
try { await this.playbackAudioContext.resume(); } catch {}
}
if (this.playbackAudioContext.state === 'suspended') {
try { await this.playbackAudioContext.resume(); } catch {}
Comment on lines +574 to +586

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Resume playback AudioContext after browser suspends it

When playAudio pushes a chunk it calls ensurePlaybackNode, but this helper immediately returns once this.playbackNode has been created. That means the call to AudioContext.resume() only happens the first time the node is built. Browsers (Chrome/Edge/Safari) automatically suspend idle audio contexts after a period of silence, so any later audio events will enqueue chunks into the worklet while the context remains suspended and nothing is heard. The previous implementation resumed the context before every playback, so this is a regression that makes audio permanently silent after the tab has idled. Consider resuming the context even when playbackNode already exists or resuming inside playAudio before posting chunks.

Useful? React with 👍 / 👎.

}

while (this.audioQueue.length > 0) {
const audioBase64 = this.audioQueue.shift();
await this.playAudioChunk(audioBase64);
}
if (!this.playbackAudioContext.audioWorklet) {
throw new Error('AudioWorklet API not supported in this browser.');
}

this.isPlayingAudio = false;
}
await this.playbackAudioContext.audioWorklet.addModule('audio-playback.worklet.js');

async playAudioChunk(audioBase64) {
return new Promise((resolve, reject) => {
try {
// Decode base64 to ArrayBuffer
const binaryString = atob(audioBase64);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
this.playbackNode = new AudioWorkletNode(this.playbackAudioContext, 'pcm-playback', { outputChannelCount: [1] });
this.playbackNode.port.onmessage = (event) => {
const message = event.data;
if (!message || typeof message !== 'object') return;
if (message.type === 'drained') {
this.isPlayingAudio = false;
}
};

const int16Array = new Int16Array(bytes.buffer);
// Provide initial configuration for fades.
const fadeSamples = Math.floor(this.playbackAudioContext.sampleRate * this.playbackFadeSec);
this.playbackNode.port.postMessage({ type: 'config', fadeSamples });

if (int16Array.length === 0) {
console.warn('Audio chunk has no samples, skipping');
resolve();
return;
}
this.playbackNode.connect(this.playbackAudioContext.destination);
})().catch((error) => {
this.playbackInitPromise = null;
throw error;
});
}

const float32Array = new Float32Array(int16Array.length);
await this.playbackInitPromise;
}

// Convert int16 to float32
for (let i = 0; i < int16Array.length; i++) {
float32Array[i] = int16Array[i] / 32768.0;
}
flushPendingPlaybackChunks() {
if (!this.playbackNode) {
return;
}

const audioBuffer = this.playbackAudioContext.createBuffer(1, float32Array.length, 24000);
audioBuffer.getChannelData(0).set(float32Array);

const source = this.playbackAudioContext.createBufferSource();
source.buffer = audioBuffer;

// Per-chunk gain with short fade-in/out to avoid clicks
const gainNode = this.playbackAudioContext.createGain();
const now = this.playbackAudioContext.currentTime;
const fade = Math.min(this.playbackFadeSec, Math.max(0.005, audioBuffer.duration / 8));
try {
gainNode.gain.cancelScheduledValues(now);
gainNode.gain.setValueAtTime(0.0, now);
gainNode.gain.linearRampToValueAtTime(1.0, now + fade);
const endTime = now + audioBuffer.duration;
gainNode.gain.setValueAtTime(1.0, Math.max(now + fade, endTime - fade));
gainNode.gain.linearRampToValueAtTime(0.0001, endTime);
} catch {}

source.connect(gainNode);
gainNode.connect(this.playbackAudioContext.destination);

// Store references to allow smooth stop on interruption
this.currentAudioSource = source;
this.currentAudioGain = gainNode;

source.onended = () => {
this.currentAudioSource = null;
this.currentAudioGain = null;
resolve();
};
source.start();
while (this.pendingPlaybackChunks.length > 0) {
const chunk = this.pendingPlaybackChunks.shift();
if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) {
continue;
}

try {
this.playbackNode.port.postMessage(
{ type: 'chunk', payload: chunk.buffer },
[chunk.buffer]
);
this.isPlayingAudio = true;
} catch (error) {
console.error('Failed to play audio chunk:', error);
reject(error);
console.error('Failed to enqueue audio chunk to worklet:', error);
}
});
}
}

decodeBase64ToInt16(audioBase64) {
try {
const binaryString = atob(audioBase64);
const length = binaryString.length;
const bytes = new Uint8Array(length);
for (let i = 0; i < length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return new Int16Array(bytes.buffer);
} catch (error) {
console.error('Failed to decode audio chunk:', error);
return null;
}
}

stopAudioPlayback() {
console.log('Stopping audio playback due to interruption');

// Smoothly ramp down before stopping to avoid clicks
if (this.currentAudioSource && this.playbackAudioContext) {
this.pendingPlaybackChunks = [];

if (this.playbackNode) {
try {
const now = this.playbackAudioContext.currentTime;
const fade = Math.max(0.01, this.playbackFadeSec);
if (this.currentAudioGain) {
try {
this.currentAudioGain.gain.cancelScheduledValues(now);
// Capture current value to ramp from it
const current = this.currentAudioGain.gain.value ?? 1.0;
this.currentAudioGain.gain.setValueAtTime(current, now);
this.currentAudioGain.gain.linearRampToValueAtTime(0.0001, now + fade);
} catch {}
}
// Stop after the fade completes
setTimeout(() => {
try { this.currentAudioSource && this.currentAudioSource.stop(); } catch {}
this.currentAudioSource = null;
this.currentAudioGain = null;
}, Math.ceil(fade * 1000));
this.playbackNode.port.postMessage({ type: 'stop' });
} catch (error) {
console.error('Error stopping audio source:', error);
console.error('Failed to notify playback worklet to stop:', error);
}
}

// Clear the audio queue
this.audioQueue = [];

// Reset playback state
this.isPlayingAudio = false;

console.log('Audio playback stopped and queue cleared');
Expand Down
Loading