12
12
from typing_extensions import assert_never
13
13
14
14
from agents .realtime import RealtimeRunner , RealtimeSession , RealtimeSessionEvent
15
+ from agents .realtime .config import RealtimeUserInputMessage
16
+ from agents .realtime .model_inputs import RealtimeModelSendRawMessage
15
17
16
18
# Import TwilioHandler class - handle both module and package use cases
17
19
if TYPE_CHECKING :
@@ -64,6 +66,34 @@ async def send_audio(self, session_id: str, audio_bytes: bytes):
64
66
if session_id in self .active_sessions :
65
67
await self .active_sessions [session_id ].send_audio (audio_bytes )
66
68
69
+ async def send_client_event (self , session_id : str , event : dict [str , Any ]):
70
+ """Send a raw client event to the underlying realtime model."""
71
+ session = self .active_sessions .get (session_id )
72
+ if not session :
73
+ return
74
+ await session .model .send_event (
75
+ RealtimeModelSendRawMessage (
76
+ message = {
77
+ "type" : event ["type" ],
78
+ "other_data" : {k : v for k , v in event .items () if k != "type" },
79
+ }
80
+ )
81
+ )
82
+
83
+ async def send_user_message (self , session_id : str , message : RealtimeUserInputMessage ):
84
+ """Send a structured user message via the higher-level API (supports input_image)."""
85
+ session = self .active_sessions .get (session_id )
86
+ if not session :
87
+ return
88
+ await session .send_message (message ) # delegates to RealtimeModelSendUserInput path
89
+
90
+ async def interrupt (self , session_id : str ) -> None :
91
+ """Interrupt current model playback/response for a session."""
92
+ session = self .active_sessions .get (session_id )
93
+ if not session :
94
+ return
95
+ await session .interrupt ()
96
+
67
97
async def _process_events (self , session_id : str ):
68
98
try :
69
99
session = self .active_sessions [session_id ]
@@ -101,7 +131,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
101
131
elif event .type == "history_updated" :
102
132
base_event ["history" ] = [item .model_dump (mode = "json" ) for item in event .history ]
103
133
elif event .type == "history_added" :
104
- pass
134
+ # Provide the added item so the UI can render incrementally.
135
+ try :
136
+ base_event ["item" ] = event .item .model_dump (mode = "json" )
137
+ except Exception :
138
+ base_event ["item" ] = None
105
139
elif event .type == "guardrail_tripped" :
106
140
base_event ["guardrail_results" ] = [
107
141
{"name" : result .guardrail .name } for result in event .guardrail_results
@@ -134,6 +168,7 @@ async def lifespan(app: FastAPI):
134
168
@app .websocket ("/ws/{session_id}" )
135
169
async def websocket_endpoint (websocket : WebSocket , session_id : str ):
136
170
await manager .connect (websocket , session_id )
171
+ image_buffers : dict [str , dict [str , Any ]] = {}
137
172
try :
138
173
while True :
139
174
data = await websocket .receive_text ()
@@ -144,6 +179,124 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str):
144
179
int16_data = message ["data" ]
145
180
audio_bytes = struct .pack (f"{ len (int16_data )} h" , * int16_data )
146
181
await manager .send_audio (session_id , audio_bytes )
182
+ elif message ["type" ] == "image" :
183
+ logger .info ("Received image message from client (session %s)." , session_id )
184
+ # Build a conversation.item.create with input_image (and optional input_text)
185
+ data_url = message .get ("data_url" )
186
+ prompt_text = message .get ("text" ) or "Please describe this image."
187
+ if data_url :
188
+ logger .info (
189
+ "Forwarding image (structured message) to Realtime API (len=%d)." ,
190
+ len (data_url ),
191
+ )
192
+ user_msg : RealtimeUserInputMessage = {
193
+ "type" : "message" ,
194
+ "role" : "user" ,
195
+ "content" : (
196
+ [
197
+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" },
198
+ {"type" : "input_text" , "text" : prompt_text },
199
+ ]
200
+ if prompt_text
201
+ else [
202
+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" }
203
+ ]
204
+ ),
205
+ }
206
+ await manager .send_user_message (session_id , user_msg )
207
+ # Acknowledge to client UI
208
+ await websocket .send_text (
209
+ json .dumps (
210
+ {
211
+ "type" : "client_info" ,
212
+ "info" : "image_enqueued" ,
213
+ "size" : len (data_url ),
214
+ }
215
+ )
216
+ )
217
+ else :
218
+ await websocket .send_text (
219
+ json .dumps (
220
+ {
221
+ "type" : "error" ,
222
+ "error" : "No data_url for image message." ,
223
+ }
224
+ )
225
+ )
226
+ elif message ["type" ] == "commit_audio" :
227
+ # Force close the current input audio turn
228
+ await manager .send_client_event (session_id , {"type" : "input_audio_buffer.commit" })
229
+ elif message ["type" ] == "image_start" :
230
+ img_id = str (message .get ("id" ))
231
+ image_buffers [img_id ] = {
232
+ "text" : message .get ("text" ) or "Please describe this image." ,
233
+ "chunks" : [],
234
+ }
235
+ await websocket .send_text (
236
+ json .dumps ({"type" : "client_info" , "info" : "image_start_ack" , "id" : img_id })
237
+ )
238
+ elif message ["type" ] == "image_chunk" :
239
+ img_id = str (message .get ("id" ))
240
+ chunk = message .get ("chunk" , "" )
241
+ if img_id in image_buffers :
242
+ image_buffers [img_id ]["chunks" ].append (chunk )
243
+ if len (image_buffers [img_id ]["chunks" ]) % 10 == 0 :
244
+ await websocket .send_text (
245
+ json .dumps (
246
+ {
247
+ "type" : "client_info" ,
248
+ "info" : "image_chunk_ack" ,
249
+ "id" : img_id ,
250
+ "count" : len (image_buffers [img_id ]["chunks" ]),
251
+ }
252
+ )
253
+ )
254
+ elif message ["type" ] == "image_end" :
255
+ img_id = str (message .get ("id" ))
256
+ buf = image_buffers .pop (img_id , None )
257
+ if buf is None :
258
+ await websocket .send_text (
259
+ json .dumps ({"type" : "error" , "error" : "Unknown image id for image_end." })
260
+ )
261
+ else :
262
+ data_url = "" .join (buf ["chunks" ]) if buf ["chunks" ] else None
263
+ prompt_text = buf ["text" ]
264
+ if data_url :
265
+ logger .info (
266
+ "Forwarding chunked image (structured message) to Realtime API (len=%d)." ,
267
+ len (data_url ),
268
+ )
269
+ user_msg2 : RealtimeUserInputMessage = {
270
+ "type" : "message" ,
271
+ "role" : "user" ,
272
+ "content" : (
273
+ [
274
+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" },
275
+ {"type" : "input_text" , "text" : prompt_text },
276
+ ]
277
+ if prompt_text
278
+ else [
279
+ {"type" : "input_image" , "image_url" : data_url , "detail" : "high" }
280
+ ]
281
+ ),
282
+ }
283
+ await manager .send_user_message (session_id , user_msg2 )
284
+ await websocket .send_text (
285
+ json .dumps (
286
+ {
287
+ "type" : "client_info" ,
288
+ "info" : "image_enqueued" ,
289
+ "id" : img_id ,
290
+ "size" : len (data_url ),
291
+ }
292
+ )
293
+ )
294
+ else :
295
+ await websocket .send_text (
296
+ json .dumps ({"type" : "error" , "error" : "Empty image." })
297
+ )
298
+ elif message ["type" ] == "interrupt" :
299
+ await manager .interrupt (session_id )
147
300
148
301
except WebSocketDisconnect :
149
302
await manager .disconnect (session_id )
@@ -160,4 +313,10 @@ async def read_index():
160
313
if __name__ == "__main__" :
161
314
import uvicorn
162
315
163
- uvicorn .run (app , host = "0.0.0.0" , port = 8000 )
316
+ uvicorn .run (
317
+ app ,
318
+ host = "0.0.0.0" ,
319
+ port = 8000 ,
320
+ # Increased WebSocket frame size to comfortably handle image data URLs.
321
+ ws_max_size = 16 * 1024 * 1024 ,
322
+ )
0 commit comments