diff --git a/examples/realtime/twilio/README.md b/examples/realtime/twilio/README.md index e92f0681a..845330f3a 100644 --- a/examples/realtime/twilio/README.md +++ b/examples/realtime/twilio/README.md @@ -70,6 +70,7 @@ This example demonstrates how to connect the OpenAI Realtime API to a phone call - **WebSocket connection issues**: Ensure your ngrok URL is correct and publicly accessible - **Audio quality**: Twilio streams audio in mulaw format at 8kHz, which may affect quality +- **Audio jittering/skipping**: The implementation includes audio buffering (50ms chunks) to reduce jittering at word boundaries. This buffers both incoming (Twilio → OpenAI) and outgoing (OpenAI → Twilio) audio for smoother playback. - **Latency**: Network latency between Twilio, your server, and OpenAI affects response time - **Logs**: Check the console output for detailed connection and error logs diff --git a/examples/realtime/twilio/twilio_handler.py b/examples/realtime/twilio/twilio_handler.py index 567015dfc..fee7b91a3 100644 --- a/examples/realtime/twilio/twilio_handler.py +++ b/examples/realtime/twilio/twilio_handler.py @@ -52,14 +52,22 @@ def __init__(self, twilio_websocket: WebSocket): self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio self._stream_sid: str | None = None + + # Incoming audio buffer (from Twilio to OpenAI) self._audio_buffer: bytearray = bytearray() self._last_buffer_send_time = time.time() + # Outgoing audio buffer (from OpenAI to Twilio) + self._outgoing_audio_buffer: bytearray = bytearray() + self._last_outgoing_send_time = time.time() + # Mark event tracking for playback self._mark_counter = 0 self._mark_data: dict[ str, tuple[str, int, int] ] = {} # mark_id -> (item_id, content_index, byte_count) + # Track marks for buffered audio chunks + self._buffered_marks: list[str] = [] # mark_ids for chunks in current buffer async def start(self) -> None: """Start the session.""" @@ -122,43 +130,32 @@ async def _twilio_message_loop(self) -> None: async def _handle_realtime_event(self, event: RealtimeSessionEvent) -> None: """Handle events from the realtime session.""" if event.type == "audio": - base64_audio = base64.b64encode(event.audio.data).decode("utf-8") - await self.twilio_websocket.send_text( - json.dumps( - { - "event": "media", - "streamSid": self._stream_sid, - "media": {"payload": base64_audio}, - } - ) - ) + # Buffer outgoing audio to reduce jittering + self._outgoing_audio_buffer.extend(event.audio.data) - # Send mark event for playback tracking - self._mark_counter += 1 - mark_id = str(self._mark_counter) - self._mark_data[mark_id] = ( - event.audio.item_id, - event.audio.content_index, - len(event.audio.data), + # Store metadata for this audio chunk + mark_id = self._create_mark( + event.audio.item_id, event.audio.content_index, len(event.audio.data) ) + self._buffered_marks.append(mark_id) - await self.twilio_websocket.send_text( - json.dumps( - { - "event": "mark", - "streamSid": self._stream_sid, - "mark": {"name": mark_id}, - } - ) - ) + # Send buffered audio if we have enough data (reduces jittering) + if len(self._outgoing_audio_buffer) >= self.BUFFER_SIZE_BYTES: + await self._flush_outgoing_audio_buffer() elif event.type == "audio_interrupted": print("Sending audio interrupted to Twilio") + # Flush any remaining buffered audio before clearing + await self._flush_outgoing_audio_buffer() await self.twilio_websocket.send_text( json.dumps({"event": "clear", "streamSid": self._stream_sid}) ) + self._outgoing_audio_buffer.clear() + self._buffered_marks.clear() elif event.type == "audio_end": - print("Audio end") + print("Audio end - flushing remaining buffered audio") + # Flush remaining audio at the end + await self._flush_outgoing_audio_buffer() elif event.type == "raw_model_event": pass else: @@ -246,19 +243,72 @@ async def _flush_audio_buffer(self) -> None: except Exception as e: print(f"Error sending buffered audio to OpenAI: {e}") + def _create_mark(self, item_id: str, content_index: int, byte_count: int) -> str: + """Create a new mark for tracking audio playback.""" + self._mark_counter += 1 + mark_id = str(self._mark_counter) + self._mark_data[mark_id] = (item_id, content_index, byte_count) + return mark_id + + async def _flush_outgoing_audio_buffer(self) -> None: + """Send buffered audio to Twilio to reduce jittering.""" + if not self._outgoing_audio_buffer: + return + + try: + # Encode and send the buffered audio to Twilio + base64_audio = base64.b64encode(bytes(self._outgoing_audio_buffer)).decode("utf-8") + await self.twilio_websocket.send_text( + json.dumps( + { + "event": "media", + "streamSid": self._stream_sid, + "media": {"payload": base64_audio}, + } + ) + ) + + # Send mark events for all buffered chunks (for playback tracking) + for mark_id in self._buffered_marks: + await self.twilio_websocket.send_text( + json.dumps( + { + "event": "mark", + "streamSid": self._stream_sid, + "mark": {"name": mark_id}, + } + ) + ) + + # Clear the buffer and marks + self._outgoing_audio_buffer.clear() + self._buffered_marks.clear() + self._last_outgoing_send_time = time.time() + + except Exception as e: + print(f"Error sending buffered audio to Twilio: {e}") + async def _buffer_flush_loop(self) -> None: - """Periodically flush audio buffer to prevent stale data.""" + """Periodically flush audio buffers to prevent stale data.""" try: while True: await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms - # If buffer has data and it's been too long since last send, flush it current_time = time.time() + + # Flush incoming audio buffer (from Twilio to OpenAI) if stale if ( self._audio_buffer and current_time - self._last_buffer_send_time > self.CHUNK_LENGTH_S * 2 ): await self._flush_audio_buffer() + # Flush outgoing audio buffer (from OpenAI to Twilio) if stale + if ( + self._outgoing_audio_buffer + and current_time - self._last_outgoing_send_time > self.CHUNK_LENGTH_S * 2 + ): + await self._flush_outgoing_audio_buffer() + except Exception as e: print(f"Error in buffer flush loop: {e}")