Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/realtime/twilio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ This example demonstrates how to connect the OpenAI Realtime API to a phone call

- **WebSocket connection issues**: Ensure your ngrok URL is correct and publicly accessible
- **Audio quality**: Twilio streams audio in mulaw format at 8kHz, which may affect quality
- **Audio jittering/skipping**: The implementation includes audio buffering (50ms chunks) to reduce jittering at word boundaries. This buffers both incoming (Twilio → OpenAI) and outgoing (OpenAI → Twilio) audio for smoother playback.
- **Latency**: Network latency between Twilio, your server, and OpenAI affects response time
- **Logs**: Check the console output for detailed connection and error logs

Expand Down
108 changes: 79 additions & 29 deletions examples/realtime/twilio/twilio_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,22 @@ def __init__(self, twilio_websocket: WebSocket):
self.BUFFER_SIZE_BYTES = int(self.SAMPLE_RATE * self.CHUNK_LENGTH_S) # 50ms worth of audio

self._stream_sid: str | None = None

# Incoming audio buffer (from Twilio to OpenAI)
self._audio_buffer: bytearray = bytearray()
self._last_buffer_send_time = time.time()

# Outgoing audio buffer (from OpenAI to Twilio)
self._outgoing_audio_buffer: bytearray = bytearray()
self._last_outgoing_send_time = time.time()

# Mark event tracking for playback
self._mark_counter = 0
self._mark_data: dict[
str, tuple[str, int, int]
] = {} # mark_id -> (item_id, content_index, byte_count)
# Track marks for buffered audio chunks
self._buffered_marks: list[str] = [] # mark_ids for chunks in current buffer

async def start(self) -> None:
"""Start the session."""
Expand Down Expand Up @@ -122,43 +130,32 @@ async def _twilio_message_loop(self) -> None:
async def _handle_realtime_event(self, event: RealtimeSessionEvent) -> None:
"""Handle events from the realtime session."""
if event.type == "audio":
base64_audio = base64.b64encode(event.audio.data).decode("utf-8")
await self.twilio_websocket.send_text(
json.dumps(
{
"event": "media",
"streamSid": self._stream_sid,
"media": {"payload": base64_audio},
}
)
)
# Buffer outgoing audio to reduce jittering
self._outgoing_audio_buffer.extend(event.audio.data)

# Send mark event for playback tracking
self._mark_counter += 1
mark_id = str(self._mark_counter)
self._mark_data[mark_id] = (
event.audio.item_id,
event.audio.content_index,
len(event.audio.data),
# Store metadata for this audio chunk
mark_id = self._create_mark(
event.audio.item_id, event.audio.content_index, len(event.audio.data)
)
self._buffered_marks.append(mark_id)

await self.twilio_websocket.send_text(
json.dumps(
{
"event": "mark",
"streamSid": self._stream_sid,
"mark": {"name": mark_id},
}
)
)
# Send buffered audio if we have enough data (reduces jittering)
if len(self._outgoing_audio_buffer) >= self.BUFFER_SIZE_BYTES:
await self._flush_outgoing_audio_buffer()

elif event.type == "audio_interrupted":
print("Sending audio interrupted to Twilio")
# Flush any remaining buffered audio before clearing
await self._flush_outgoing_audio_buffer()
await self.twilio_websocket.send_text(
json.dumps({"event": "clear", "streamSid": self._stream_sid})
)
self._outgoing_audio_buffer.clear()
self._buffered_marks.clear()
elif event.type == "audio_end":
print("Audio end")
print("Audio end - flushing remaining buffered audio")
# Flush remaining audio at the end
await self._flush_outgoing_audio_buffer()
elif event.type == "raw_model_event":
pass
else:
Expand Down Expand Up @@ -246,19 +243,72 @@ async def _flush_audio_buffer(self) -> None:
except Exception as e:
print(f"Error sending buffered audio to OpenAI: {e}")

def _create_mark(self, item_id: str, content_index: int, byte_count: int) -> str:
"""Create a new mark for tracking audio playback."""
self._mark_counter += 1
mark_id = str(self._mark_counter)
self._mark_data[mark_id] = (item_id, content_index, byte_count)
return mark_id

async def _flush_outgoing_audio_buffer(self) -> None:
"""Send buffered audio to Twilio to reduce jittering."""
if not self._outgoing_audio_buffer:
return

try:
# Encode and send the buffered audio to Twilio
base64_audio = base64.b64encode(bytes(self._outgoing_audio_buffer)).decode("utf-8")
await self.twilio_websocket.send_text(
json.dumps(
{
"event": "media",
"streamSid": self._stream_sid,
"media": {"payload": base64_audio},
}
)
)

# Send mark events for all buffered chunks (for playback tracking)
for mark_id in self._buffered_marks:
await self.twilio_websocket.send_text(
json.dumps(
{
"event": "mark",
"streamSid": self._stream_sid,
"mark": {"name": mark_id},
}
)
)

# Clear the buffer and marks
self._outgoing_audio_buffer.clear()
self._buffered_marks.clear()
self._last_outgoing_send_time = time.time()

except Exception as e:
print(f"Error sending buffered audio to Twilio: {e}")

async def _buffer_flush_loop(self) -> None:
"""Periodically flush audio buffer to prevent stale data."""
"""Periodically flush audio buffers to prevent stale data."""
try:
while True:
await asyncio.sleep(self.CHUNK_LENGTH_S) # Check every 50ms

# If buffer has data and it's been too long since last send, flush it
current_time = time.time()

# Flush incoming audio buffer (from Twilio to OpenAI) if stale
if (
self._audio_buffer
and current_time - self._last_buffer_send_time > self.CHUNK_LENGTH_S * 2
):
await self._flush_audio_buffer()

# Flush outgoing audio buffer (from OpenAI to Twilio) if stale
if (
self._outgoing_audio_buffer
and current_time - self._last_outgoing_send_time > self.CHUNK_LENGTH_S * 2
):
await self._flush_outgoing_audio_buffer()

except Exception as e:
print(f"Error in buffer flush loop: {e}")