diff --git a/examples/realtime/app/server.py b/examples/realtime/app/server.py index abdb1dabb..3a3d58674 100644 --- a/examples/realtime/app/server.py +++ b/examples/realtime/app/server.py @@ -13,6 +13,8 @@ from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent from agents.realtime.config import RealtimeUserInputMessage +from agents.realtime.items import RealtimeItem +from agents.realtime.model import RealtimeModelConfig from agents.realtime.model_inputs import RealtimeModelSendRawMessage # Import TwilioHandler class - handle both module and package use cases @@ -45,7 +47,18 @@ async def connect(self, websocket: WebSocket, session_id: str): agent = get_starting_agent() runner = RealtimeRunner(agent) - session_context = await runner.run() + model_config: RealtimeModelConfig = { + "initial_model_settings": { + "turn_detection": { + "type": "server_vad", + "prefix_padding_ms": 300, + "silence_duration_ms": 500, + "interrupt_response": True, + "create_response": True, + }, + }, + } + session_context = await runner.run(model_config=model_config) session = await session_context.__aenter__() self.active_sessions[session_id] = session self.session_contexts[session_id] = session_context @@ -103,8 +116,26 @@ async def _process_events(self, session_id: str): event_data = await self._serialize_event(event) await websocket.send_text(json.dumps(event_data)) except Exception as e: + print(e) logger.error(f"Error processing events for session {session_id}: {e}") + def _sanitize_history_item(self, item: RealtimeItem) -> dict[str, Any]: + """Remove large binary payloads from history items while keeping transcripts.""" + item_dict = item.model_dump() + content = item_dict.get("content") + if isinstance(content, list): + sanitized_content: list[Any] = [] + for part in content: + if isinstance(part, dict): + sanitized_part = part.copy() + if sanitized_part.get("type") in {"audio", "input_audio"}: + sanitized_part.pop("audio", None) + sanitized_content.append(sanitized_part) + else: + sanitized_content.append(part) + item_dict["content"] = sanitized_content + return item_dict + async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]: base_event: dict[str, Any] = { "type": event.type, @@ -129,11 +160,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]: elif event.type == "audio_end": pass elif event.type == "history_updated": - base_event["history"] = [item.model_dump(mode="json") for item in event.history] + base_event["history"] = [self._sanitize_history_item(item) for item in event.history] elif event.type == "history_added": # Provide the added item so the UI can render incrementally. try: - base_event["item"] = event.item.model_dump(mode="json") + base_event["item"] = self._sanitize_history_item(event.item) except Exception: base_event["item"] = None elif event.type == "guardrail_tripped": diff --git a/examples/realtime/app/static/app.js b/examples/realtime/app/static/app.js index 6858428c6..0724cf4b1 100644 --- a/examples/realtime/app/static/app.js +++ b/examples/realtime/app/static/app.js @@ -5,16 +5,16 @@ class RealtimeDemo { this.isMuted = false; this.isCapturing = false; this.audioContext = null; - this.processor = null; + this.captureSource = null; + this.captureNode = null; this.stream = null; this.sessionId = this.generateSessionId(); - // Audio playback queue - this.audioQueue = []; this.isPlayingAudio = false; this.playbackAudioContext = null; - this.currentAudioSource = null; - this.currentAudioGain = null; // per-chunk gain for smooth fades + this.playbackNode = null; + this.playbackInitPromise = null; + this.pendingPlaybackChunks = []; this.playbackFadeSec = 0.02; // ~20ms fade to reduce clicks this.messageNodes = new Map(); // item_id -> DOM node this.seenItemIds = new Set(); // item_id set for append-only syncing @@ -227,30 +227,35 @@ class RealtimeDemo { }); this.audioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' }); - const source = this.audioContext.createMediaStreamSource(this.stream); + if (this.audioContext.state === 'suspended') { + try { await this.audioContext.resume(); } catch {} + } - // Create a script processor to capture audio data - this.processor = this.audioContext.createScriptProcessor(4096, 1, 1); - source.connect(this.processor); - this.processor.connect(this.audioContext.destination); + if (!this.audioContext.audioWorklet) { + throw new Error('AudioWorklet API not supported in this browser.'); + } - this.processor.onaudioprocess = (event) => { - if (!this.isMuted && this.ws && this.ws.readyState === WebSocket.OPEN) { - const inputBuffer = event.inputBuffer.getChannelData(0); - const int16Buffer = new Int16Array(inputBuffer.length); + await this.audioContext.audioWorklet.addModule('audio-recorder.worklet.js'); - // Convert float32 to int16 - for (let i = 0; i < inputBuffer.length; i++) { - int16Buffer[i] = Math.max(-32768, Math.min(32767, inputBuffer[i] * 32768)); - } + this.captureSource = this.audioContext.createMediaStreamSource(this.stream); + this.captureNode = new AudioWorkletNode(this.audioContext, 'pcm-recorder'); - this.ws.send(JSON.stringify({ - type: 'audio', - data: Array.from(int16Buffer) - })); - } + this.captureNode.port.onmessage = (event) => { + if (this.isMuted) return; + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + + const chunk = event.data instanceof ArrayBuffer ? new Int16Array(event.data) : event.data; + if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) return; + + this.ws.send(JSON.stringify({ + type: 'audio', + data: Array.from(chunk) + })); }; + this.captureSource.connect(this.captureNode); + this.captureNode.connect(this.audioContext.destination); + this.isCapturing = true; this.updateMuteUI(); @@ -264,9 +269,15 @@ class RealtimeDemo { this.isCapturing = false; - if (this.processor) { - this.processor.disconnect(); - this.processor = null; + if (this.captureSource) { + try { this.captureSource.disconnect(); } catch {} + this.captureSource = null; + } + + if (this.captureNode) { + this.captureNode.port.onmessage = null; + try { this.captureNode.disconnect(); } catch {} + this.captureNode = null; } if (this.audioContext) { @@ -544,141 +555,117 @@ class RealtimeDemo { return; } - // Add to queue - this.audioQueue.push(audioBase64); - - // Start processing queue if not already playing - if (!this.isPlayingAudio) { - this.processAudioQueue(); + const int16Array = this.decodeBase64ToInt16(audioBase64); + if (!int16Array || int16Array.length === 0) { + console.warn('Audio chunk has no samples, skipping'); + return; } + this.pendingPlaybackChunks.push(int16Array); + await this.ensurePlaybackNode(); + this.flushPendingPlaybackChunks(); + } catch (error) { console.error('Failed to play audio:', error); + this.pendingPlaybackChunks = []; } } - async processAudioQueue() { - if (this.isPlayingAudio || this.audioQueue.length === 0) { + async ensurePlaybackNode() { + if (this.playbackNode) { return; } - this.isPlayingAudio = true; - - // Initialize audio context if needed - if (!this.playbackAudioContext) { - this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' }); - } + if (!this.playbackInitPromise) { + this.playbackInitPromise = (async () => { + if (!this.playbackAudioContext) { + this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' }); + } - // Ensure context is running (autoplay policies can suspend it) - if (this.playbackAudioContext.state === 'suspended') { - try { await this.playbackAudioContext.resume(); } catch {} - } + if (this.playbackAudioContext.state === 'suspended') { + try { await this.playbackAudioContext.resume(); } catch {} + } - while (this.audioQueue.length > 0) { - const audioBase64 = this.audioQueue.shift(); - await this.playAudioChunk(audioBase64); - } + if (!this.playbackAudioContext.audioWorklet) { + throw new Error('AudioWorklet API not supported in this browser.'); + } - this.isPlayingAudio = false; - } + await this.playbackAudioContext.audioWorklet.addModule('audio-playback.worklet.js'); - async playAudioChunk(audioBase64) { - return new Promise((resolve, reject) => { - try { - // Decode base64 to ArrayBuffer - const binaryString = atob(audioBase64); - const bytes = new Uint8Array(binaryString.length); - for (let i = 0; i < binaryString.length; i++) { - bytes[i] = binaryString.charCodeAt(i); - } + this.playbackNode = new AudioWorkletNode(this.playbackAudioContext, 'pcm-playback', { outputChannelCount: [1] }); + this.playbackNode.port.onmessage = (event) => { + const message = event.data; + if (!message || typeof message !== 'object') return; + if (message.type === 'drained') { + this.isPlayingAudio = false; + } + }; - const int16Array = new Int16Array(bytes.buffer); + // Provide initial configuration for fades. + const fadeSamples = Math.floor(this.playbackAudioContext.sampleRate * this.playbackFadeSec); + this.playbackNode.port.postMessage({ type: 'config', fadeSamples }); - if (int16Array.length === 0) { - console.warn('Audio chunk has no samples, skipping'); - resolve(); - return; - } + this.playbackNode.connect(this.playbackAudioContext.destination); + })().catch((error) => { + this.playbackInitPromise = null; + throw error; + }); + } - const float32Array = new Float32Array(int16Array.length); + await this.playbackInitPromise; + } - // Convert int16 to float32 - for (let i = 0; i < int16Array.length; i++) { - float32Array[i] = int16Array[i] / 32768.0; - } + flushPendingPlaybackChunks() { + if (!this.playbackNode) { + return; + } - const audioBuffer = this.playbackAudioContext.createBuffer(1, float32Array.length, 24000); - audioBuffer.getChannelData(0).set(float32Array); - - const source = this.playbackAudioContext.createBufferSource(); - source.buffer = audioBuffer; - - // Per-chunk gain with short fade-in/out to avoid clicks - const gainNode = this.playbackAudioContext.createGain(); - const now = this.playbackAudioContext.currentTime; - const fade = Math.min(this.playbackFadeSec, Math.max(0.005, audioBuffer.duration / 8)); - try { - gainNode.gain.cancelScheduledValues(now); - gainNode.gain.setValueAtTime(0.0, now); - gainNode.gain.linearRampToValueAtTime(1.0, now + fade); - const endTime = now + audioBuffer.duration; - gainNode.gain.setValueAtTime(1.0, Math.max(now + fade, endTime - fade)); - gainNode.gain.linearRampToValueAtTime(0.0001, endTime); - } catch {} - - source.connect(gainNode); - gainNode.connect(this.playbackAudioContext.destination); - - // Store references to allow smooth stop on interruption - this.currentAudioSource = source; - this.currentAudioGain = gainNode; - - source.onended = () => { - this.currentAudioSource = null; - this.currentAudioGain = null; - resolve(); - }; - source.start(); + while (this.pendingPlaybackChunks.length > 0) { + const chunk = this.pendingPlaybackChunks.shift(); + if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) { + continue; + } + try { + this.playbackNode.port.postMessage( + { type: 'chunk', payload: chunk.buffer }, + [chunk.buffer] + ); + this.isPlayingAudio = true; } catch (error) { - console.error('Failed to play audio chunk:', error); - reject(error); + console.error('Failed to enqueue audio chunk to worklet:', error); } - }); + } + } + + decodeBase64ToInt16(audioBase64) { + try { + const binaryString = atob(audioBase64); + const length = binaryString.length; + const bytes = new Uint8Array(length); + for (let i = 0; i < length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return new Int16Array(bytes.buffer); + } catch (error) { + console.error('Failed to decode audio chunk:', error); + return null; + } } stopAudioPlayback() { console.log('Stopping audio playback due to interruption'); - // Smoothly ramp down before stopping to avoid clicks - if (this.currentAudioSource && this.playbackAudioContext) { + this.pendingPlaybackChunks = []; + + if (this.playbackNode) { try { - const now = this.playbackAudioContext.currentTime; - const fade = Math.max(0.01, this.playbackFadeSec); - if (this.currentAudioGain) { - try { - this.currentAudioGain.gain.cancelScheduledValues(now); - // Capture current value to ramp from it - const current = this.currentAudioGain.gain.value ?? 1.0; - this.currentAudioGain.gain.setValueAtTime(current, now); - this.currentAudioGain.gain.linearRampToValueAtTime(0.0001, now + fade); - } catch {} - } - // Stop after the fade completes - setTimeout(() => { - try { this.currentAudioSource && this.currentAudioSource.stop(); } catch {} - this.currentAudioSource = null; - this.currentAudioGain = null; - }, Math.ceil(fade * 1000)); + this.playbackNode.port.postMessage({ type: 'stop' }); } catch (error) { - console.error('Error stopping audio source:', error); + console.error('Failed to notify playback worklet to stop:', error); } } - // Clear the audio queue - this.audioQueue = []; - - // Reset playback state this.isPlayingAudio = false; console.log('Audio playback stopped and queue cleared'); diff --git a/examples/realtime/app/static/audio-playback.worklet.js b/examples/realtime/app/static/audio-playback.worklet.js new file mode 100644 index 000000000..63735f828 --- /dev/null +++ b/examples/realtime/app/static/audio-playback.worklet.js @@ -0,0 +1,120 @@ +class PCMPlaybackProcessor extends AudioWorkletProcessor { + constructor() { + super(); + + this.buffers = []; + this.currentBuffer = null; + this.currentIndex = 0; + this.isCurrentlyPlaying = false; + this.fadeSamples = Math.round(sampleRate * 0.02); + + this.port.onmessage = (event) => { + const message = event.data; + if (!message || typeof message !== 'object') return; + + if (message.type === 'chunk') { + const payload = message.payload; + if (!(payload instanceof ArrayBuffer)) { + return; + } + + const int16Data = new Int16Array(payload); + if (int16Data.length === 0) { + return; + } + + const scale = 1 / 32768; + const floatData = new Float32Array(int16Data.length); + for (let i = 0; i < int16Data.length; i++) { + floatData[i] = Math.max(-1, Math.min(1, int16Data[i] * scale)); + } + + if (!this.hasPendingAudio()) { + const fadeSamples = Math.min(this.fadeSamples, floatData.length); + for (let i = 0; i < fadeSamples; i++) { + const gain = fadeSamples <= 1 ? 1 : (i / fadeSamples); + floatData[i] *= gain; + } + } + + this.buffers.push(floatData); + + } else if (message.type === 'stop') { + this.reset(); + this.port.postMessage({ type: 'drained' }); + + } else if (message.type === 'config') { + const fadeSamples = message.fadeSamples; + if (Number.isFinite(fadeSamples) && fadeSamples >= 0) { + this.fadeSamples = fadeSamples >>> 0; + } + } + }; + } + + reset() { + this.buffers = []; + this.currentBuffer = null; + this.currentIndex = 0; + this.isCurrentlyPlaying = false; + } + + hasPendingAudio() { + if (this.currentBuffer && this.currentIndex < this.currentBuffer.length) { + return true; + } + return this.buffers.length > 0; + } + + pullSample() { + if (this.currentBuffer && this.currentIndex < this.currentBuffer.length) { + return this.currentBuffer[this.currentIndex++]; + } + + if (this.currentBuffer && this.currentIndex >= this.currentBuffer.length) { + this.currentBuffer = null; + this.currentIndex = 0; + } + + while (this.buffers.length > 0) { + this.currentBuffer = this.buffers.shift(); + this.currentIndex = 0; + if (this.currentBuffer && this.currentBuffer.length > 0) { + return this.currentBuffer[this.currentIndex++]; + } + } + + this.currentBuffer = null; + this.currentIndex = 0; + return 0; + } + + process(inputs, outputs) { + const output = outputs[0]; + if (!output || output.length === 0) { + return true; + } + + const channel = output[0]; + let wroteSamples = false; + + for (let i = 0; i < channel.length; i++) { + const sample = this.pullSample(); + channel[i] = sample; + if (sample !== 0) { + wroteSamples = true; + } + } + + if (this.hasPendingAudio()) { + this.isCurrentlyPlaying = true; + } else if (!wroteSamples && this.isCurrentlyPlaying) { + this.isCurrentlyPlaying = false; + this.port.postMessage({ type: 'drained' }); + } + + return true; + } +} + +registerProcessor('pcm-playback', PCMPlaybackProcessor); diff --git a/examples/realtime/app/static/audio-recorder.worklet.js b/examples/realtime/app/static/audio-recorder.worklet.js new file mode 100644 index 000000000..ccd6e6b13 --- /dev/null +++ b/examples/realtime/app/static/audio-recorder.worklet.js @@ -0,0 +1,56 @@ +class PCMRecorderProcessor extends AudioWorkletProcessor { + constructor() { + super(); + this.chunkSize = 4096; + this.buffer = new Int16Array(this.chunkSize); + this.offset = 0; + this.pendingFrames = 0; + this.maxPendingFrames = 10; + } + + flushBuffer() { + if (this.offset === 0) { + return; + } + + const chunk = new Int16Array(this.offset); + chunk.set(this.buffer.subarray(0, this.offset)); + this.port.postMessage(chunk, [chunk.buffer]); + + this.offset = 0; + this.pendingFrames = 0; + } + + process(inputs) { + const input = inputs[0]; + if (!input || input.length === 0) { + return true; + } + + const channel = input[0]; + if (!channel || channel.length === 0) { + return true; + } + + for (let i = 0; i < channel.length; i++) { + let sample = channel[i]; + sample = Math.max(-1, Math.min(1, sample)); + this.buffer[this.offset++] = sample < 0 ? sample * 0x8000 : sample * 0x7fff; + + if (this.offset === this.chunkSize) { + this.flushBuffer(); + } + } + + if (this.offset > 0) { + this.pendingFrames += 1; + if (this.pendingFrames >= this.maxPendingFrames) { + this.flushBuffer(); + } + } + + return true; + } +} + +registerProcessor('pcm-recorder', PCMRecorderProcessor);