From dd9e4a72cc0fc89507640605b644ead538718b9e Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Mon, 27 Oct 2025 20:36:59 +0900 Subject: [PATCH 1/6] feat: #1760 Add SIP support for realtime agent runner --- examples/realtime/twilio_sip/README.md | 55 +++++ examples/realtime/twilio_sip/__init__.py | 1 + examples/realtime/twilio_sip/agents.py | 87 ++++++++ examples/realtime/twilio_sip/requirements.txt | 3 + examples/realtime/twilio_sip/server.py | 195 ++++++++++++++++++ src/agents/realtime/model.py | 7 + src/agents/realtime/openai_realtime.py | 18 +- .../test_openai_realtime_sip_model.py | 56 +++++ 8 files changed, 421 insertions(+), 1 deletion(-) create mode 100644 examples/realtime/twilio_sip/README.md create mode 100644 examples/realtime/twilio_sip/__init__.py create mode 100644 examples/realtime/twilio_sip/agents.py create mode 100644 examples/realtime/twilio_sip/requirements.txt create mode 100644 examples/realtime/twilio_sip/server.py create mode 100644 tests/realtime/test_openai_realtime_sip_model.py diff --git a/examples/realtime/twilio_sip/README.md b/examples/realtime/twilio_sip/README.md new file mode 100644 index 000000000..a96e5d379 --- /dev/null +++ b/examples/realtime/twilio_sip/README.md @@ -0,0 +1,55 @@ +# Twilio SIP Realtime Example + +This example shows how to handle OpenAI Realtime SIP calls with the Agents SDK. Incoming calls are accepted through the Realtime Calls API, a triage agent answers with a fixed greeting, and handoffs route the caller to specialist agents (FAQ lookup and record updates) similar to the realtime UI demo. + +## Prerequisites + +- Python 3.9+ +- An OpenAI API key with Realtime API access +- A configured webhook secret for your OpenAI project +- A Twilio account with a phone number and Elastic SIP Trunking enabled +- A public HTTPS endpoint for local development (for example, [ngrok](https://ngrok.com/)) + +## Configure OpenAI + +1. In [platform settings](https://platform.openai.com/settings) select your project. +2. Create a webhook pointing to `https:///openai/webhook` with "realtime.call.incoming" event type and note the signing secret. The example verifies each webhook with `OPENAI_WEBHOOK_SECRET`. + +## Configure Twilio Elastic SIP Trunking + +1. Create (or edit) an Elastic SIP trunk. +2. On the **Origination** tab, add an origination SIP URI of `sip:proj_@sip.api.openai.com;transport=tls` so Twilio sends inbound calls to OpenAI. (The Termination tab always ends with `.pstn.twilio.com`, so leave it unchanged.) +3. Add at least one phone number to the trunk so inbound calls are forwarded to OpenAI. + +## Setup + +1. Install dependencies: + ```bash + uv pip install -r examples/realtime/twilio-sip/requirements.txt + ``` +2. Export required environment variables: + ```bash + export OPENAI_API_KEY="sk-..." + export OPENAI_WEBHOOK_SECRET="whsec_..." + ``` +3. (Optional) Adjust the multi-agent logic in `examples/realtime/twilio_sip/agents.py` if you want + to change the specialist agents or tools. +4. Run the FastAPI server: + ```bash + uv run uvicorn examples.realtime.twilio_sip.server:app --host 0.0.0.0 --port 8000 + ``` +5. Expose the server publicly (example with ngrok): + ```bash + ngrok http 8000 + ``` + +## Test a Call + +1. Place a call to the Twilio number attached to the SIP trunk. +2. Twilio sends the call to `sip.api.openai.com`; OpenAI fires `realtime.call.incoming`, which this example accepts. +3. The triage agent greets the caller, then either keeps the conversation or hands off to: + - **FAQ Agent** – answers common questions via `faq_lookup_tool`. + - **Records Agent** – writes short notes using `update_customer_record`. +4. The background task attaches to the call and logs transcripts plus basic events in the console. + +You can edit `server.py` to change instructions, add tools, or integrate with internal systems once the SIP session is active. diff --git a/examples/realtime/twilio_sip/__init__.py b/examples/realtime/twilio_sip/__init__.py new file mode 100644 index 000000000..367fe3530 --- /dev/null +++ b/examples/realtime/twilio_sip/__init__.py @@ -0,0 +1 @@ +"""OpenAI Realtime SIP example package.""" diff --git a/examples/realtime/twilio_sip/agents.py b/examples/realtime/twilio_sip/agents.py new file mode 100644 index 000000000..2a8da238f --- /dev/null +++ b/examples/realtime/twilio_sip/agents.py @@ -0,0 +1,87 @@ +"""Realtime agent definitions shared by the Twilio SIP example.""" + +from __future__ import annotations + +import asyncio + +from agents import function_tool +from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX +from agents.realtime import RealtimeAgent, realtime_handoff + +# --- Tools ----------------------------------------------------------------- + + +WELCOME_MESSAGE = "Hello, this is ABC customer service. How can I help you today?" + + +@function_tool( + name_override="faq_lookup_tool", description_override="Lookup frequently asked questions." +) +async def faq_lookup_tool(question: str) -> str: + """Fetch FAQ answers for the caller.""" + + await asyncio.sleep(3) + + q = question.lower() + if "plan" in q or "wifi" in q or "wi-fi" in q: + return "We provide complimentary Wi-Fi. Join the ABC-Customer network." # demo data + if "billing" in q or "invoice" in q: + return "Your latest invoice is available in the ABC portal under Billing > History." + if "hours" in q or "support" in q: + return "Human support agents are available 24/7; transfer to the specialist if needed." + return "I'm not sure about that. Let me transfer you back to the triage agent." + + +@function_tool +async def update_customer_record(customer_id: str, note: str) -> str: + """Record a short note about the caller.""" + + await asyncio.sleep(1) + return f"Recorded note for {customer_id}: {note}" + + +# --- Agents ---------------------------------------------------------------- + + +faq_agent = RealtimeAgent( + name="FAQ Agent", + handoff_description="Handles frequently asked questions and general account inquiries.", + instructions=f"""{RECOMMENDED_PROMPT_PREFIX} + You are an FAQ specialist. Always rely on the faq_lookup_tool for answers and keep replies + concise. If the caller needs hands-on help, transfer back to the triage agent. + """, + tools=[faq_lookup_tool], +) + +records_agent = RealtimeAgent( + name="Records Agent", + handoff_description="Updates customer records with brief notes and confirmation numbers.", + instructions=f"""{RECOMMENDED_PROMPT_PREFIX} + You handle structured updates. Confirm the customer's ID, capture their request in a short + note, and use the update_customer_record tool. For anything outside data updates, return to the + triage agent. + """, + tools=[update_customer_record], +) + +triage_agent = RealtimeAgent( + name="Triage Agent", + handoff_description="Greets callers and routes them to the most appropriate specialist.", + instructions=( + f"{RECOMMENDED_PROMPT_PREFIX} " + "Always begin the call by saying exactly: '" + f"{WELCOME_MESSAGE}' " + "before collecting details. Once the greeting is complete, gather context and hand off to " + "the FAQ or Records agents when appropriate." + ), + handoffs=[faq_agent, realtime_handoff(records_agent)], +) + +faq_agent.handoffs.append(triage_agent) +records_agent.handoffs.append(triage_agent) + + +def get_starting_agent() -> RealtimeAgent: + """Return the agent used to start each realtime call.""" + + return triage_agent diff --git a/examples/realtime/twilio_sip/requirements.txt b/examples/realtime/twilio_sip/requirements.txt new file mode 100644 index 000000000..943a72eb6 --- /dev/null +++ b/examples/realtime/twilio_sip/requirements.txt @@ -0,0 +1,3 @@ +fastapi>=0.120.0 +openai>=2.2,<3 +uvicorn[standard]>=0.38.0 diff --git a/examples/realtime/twilio_sip/server.py b/examples/realtime/twilio_sip/server.py new file mode 100644 index 000000000..183c81663 --- /dev/null +++ b/examples/realtime/twilio_sip/server.py @@ -0,0 +1,195 @@ +"""Minimal FastAPI server for handling OpenAI Realtime SIP calls with Twilio.""" + +from __future__ import annotations + +import asyncio +import logging +import os + +import websockets +from fastapi import FastAPI, HTTPException, Request, Response +from openai import APIStatusError, AsyncOpenAI, InvalidWebhookSignatureError + +from agents.realtime.items import ( + AssistantAudio, + AssistantMessageItem, + AssistantText, + InputText, + UserMessageItem, +) +from agents.realtime.model_inputs import RealtimeModelSendRawMessage +from agents.realtime.openai_realtime import OpenAIRealtimeSIPModel +from agents.realtime.runner import RealtimeRunner + +from .agents import WELCOME_MESSAGE, get_starting_agent + +logging.basicConfig(level=logging.INFO) + +logger = logging.getLogger("twilio_sip_example") + + +def _get_env(name: str) -> str: + value = os.getenv(name) + if not value: + raise RuntimeError(f"Missing environment variable: {name}") + return value + + +OPENAI_API_KEY = _get_env("OPENAI_API_KEY") +OPENAI_WEBHOOK_SECRET = _get_env("OPENAI_WEBHOOK_SECRET") + +client = AsyncOpenAI(api_key=OPENAI_API_KEY, webhook_secret=OPENAI_WEBHOOK_SECRET) + +# Build the multi-agent graph (triage + specialist agents) from agents.py. +assistant_agent = get_starting_agent() + +app = FastAPI() + +# Track background tasks so repeated webhooks do not spawn duplicates. +active_call_tasks: dict[str, asyncio.Task[None]] = {} + + +async def accept_call(call_id: str) -> None: + """Accept the incoming SIP call and configure the realtime session.""" + + # The starting agent uses static instructions, so we can forward them directly to the accept + # call payload. If someone swaps in a dynamic prompt, fall back to a sensible default. + instructions_payload = ( + assistant_agent.instructions + if isinstance(assistant_agent.instructions, str) + else "You are a helpful triage agent for ABC customer service." + ) + + try: + # AsyncOpenAI does not yet expose high-level helpers like client.realtime.calls.accept, so + # we call the REST endpoint directly via client.post(). Keep this until the SDK grows an + # async helper. + await client.post( + f"/realtime/calls/{call_id}/accept", + body={ + "type": "realtime", + "model": "gpt-realtime", + "instructions": instructions_payload, + }, + cast_to=dict, + ) + except APIStatusError as exc: + if exc.status_code == 404: + # Twilio occasionally retries webhooks after the caller hangs up; treat as a no-op so + # the webhook still returns 200. + logger.warning( + "Call %s no longer exists when attempting accept (404). Skipping.", call_id + ) + return + + detail = exc.message + if exc.response is not None: + try: + detail = exc.response.text + except Exception: # noqa: BLE001 + detail = str(exc.response) + + logger.error( + "Failed to accept call %s: %s %s", call_id, exc.status_code, detail + ) + raise HTTPException(status_code=500, detail="Failed to accept call") from exc + + logger.info("Accepted call %s", call_id) + + +async def observe_call(call_id: str) -> None: + """Attach to the realtime session and log conversation events.""" + + runner = RealtimeRunner(assistant_agent, model=OpenAIRealtimeSIPModel()) + + try: + initial_settings = { + "turn_detection": { + "type": "semantic_vad", + "interrupt_response": True, + } + } + + async with await runner.run( + model_config={ + "call_id": call_id, + "initial_model_settings": initial_settings, + } + ) as session: + # Trigger an initial greeting so callers hear the agent right away. + # Issue a response.create immediately after the WebSocket attaches so the model speaks + # before the caller says anything. Using the raw client message ensures zero latency + # and avoids threading the greeting through history. + await session.model.send_event( + RealtimeModelSendRawMessage( + message={ + "type": "response.create", + "response": { + "instructions": ( + "Say exactly '" + f"{WELCOME_MESSAGE}" + "' now before continuing the conversation." + ) + }, + } + ) + ) + + async for event in session: + if event.type == "history_added": + item = event.item + if isinstance(item, UserMessageItem): + for content in item.content: + if isinstance(content, InputText) and content.text: + logger.info("Caller: %s", content.text) + elif isinstance(item, AssistantMessageItem): + for content in item.content: + if isinstance(content, AssistantText) and content.text: + logger.info("Assistant (text): %s", content.text) + elif isinstance(content, AssistantAudio) and content.transcript: + logger.info("Assistant (audio transcript): %s", content.transcript) + elif event.type == "error": + logger.error("Realtime session error: %s", event.error) + + except websockets.exceptions.ConnectionClosedError: + # Callers hanging up causes the WebSocket to close without a frame; log at info level so it + # does not surface as an error. + logger.info("Realtime WebSocket closed for call %s", call_id) + except Exception as exc: # noqa: BLE001 - demo logging only + logger.exception("Error while observing call %s", call_id, exc_info=exc) + finally: + logger.info("Call %s ended", call_id) + active_call_tasks.pop(call_id, None) + + +def _track_call_task(call_id: str) -> None: + existing = active_call_tasks.get(call_id) + if existing and not existing.done(): + existing.cancel() + + task = asyncio.create_task(observe_call(call_id)) + active_call_tasks[call_id] = task + + +@app.post("/openai/webhook") +async def openai_webhook(request: Request) -> Response: + body = await request.body() + + try: + event = client.webhooks.unwrap(body, request.headers) + except InvalidWebhookSignatureError as exc: + raise HTTPException(status_code=400, detail="Invalid webhook signature") from exc + + if event.type == "realtime.call.incoming": + call_id = event.data.call_id + await accept_call(call_id) + _track_call_task(call_id) + return Response(status_code=200) + + # Ignore other webhook event types for brevity. + return Response(status_code=200) + + +@app.get("/") +async def healthcheck() -> dict[str, str]: + return {"status": "ok"} diff --git a/src/agents/realtime/model.py b/src/agents/realtime/model.py index c0632aa9b..c207878cd 100644 --- a/src/agents/realtime/model.py +++ b/src/agents/realtime/model.py @@ -139,6 +139,13 @@ class RealtimeModelConfig(TypedDict): is played to the user. """ + call_id: NotRequired[str] + """Attach to an existing realtime call instead of creating a new session. + + When provided, the transport connects using the `call_id` query string parameter rather than a + model name. This is used for SIP-originated calls that are accepted via the Realtime Calls API. + """ + class RealtimeModel(abc.ABC): """Interface for connecting to a realtime model and sending/receiving events.""" diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index c6d1b6faa..75f1f1fd0 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -216,7 +216,11 @@ async def connect(self, options: RealtimeModelConfig) -> None: else: self._tracing_config = "auto" - url = options.get("url", f"wss://api.openai.com/v1/realtime?model={self.model}") + call_id = options.get("call_id") + if call_id: + url = options.get("url", f"wss://api.openai.com/v1/realtime?call_id={call_id}") + else: + url = options.get("url", f"wss://api.openai.com/v1/realtime?model={self.model}") headers: dict[str, str] = {} if options.get("headers") is not None: @@ -930,6 +934,18 @@ def _tools_to_session_tools( return converted_tools +class OpenAIRealtimeSIPModel(OpenAIRealtimeWebSocketModel): + """Realtime model that attaches to SIP-originated calls using a call ID.""" + + async def connect(self, options: RealtimeModelConfig) -> None: # type: ignore[override] + call_id = options.get("call_id") + if not call_id: + raise UserError("OpenAIRealtimeSIPModel requires `call_id` in the model configuration.") + + sip_options = options.copy() + await super().connect(sip_options) + + class _ConversionHelper: @classmethod def conversation_item_to_realtime_message_item( diff --git a/tests/realtime/test_openai_realtime_sip_model.py b/tests/realtime/test_openai_realtime_sip_model.py new file mode 100644 index 000000000..b00d268b4 --- /dev/null +++ b/tests/realtime/test_openai_realtime_sip_model.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import asyncio + +import pytest + +from agents.exceptions import UserError +from agents.realtime.openai_realtime import OpenAIRealtimeSIPModel + + +class _DummyWebSocket: + def __init__(self) -> None: + self.sent_messages: list[str] = [] + self.closed = False + + def __aiter__(self): + return self + + async def __anext__(self): # pragma: no cover - simple termination + raise StopAsyncIteration + + async def send(self, data: str) -> None: + self.sent_messages.append(data) + + async def close(self) -> None: + self.closed = True + + +@pytest.mark.asyncio +async def test_sip_model_uses_call_id_in_url(monkeypatch: pytest.MonkeyPatch) -> None: + dummy_ws = _DummyWebSocket() + captured: dict[str, object] = {} + + async def fake_connect(url: str, **kwargs): # type: ignore[override] + captured["url"] = url + captured["kwargs"] = kwargs + return dummy_ws + + monkeypatch.setattr("agents.realtime.openai_realtime.websockets.connect", fake_connect) + + model = OpenAIRealtimeSIPModel() + await model.connect({"api_key": "sk-test", "call_id": "call_789", "initial_model_settings": {}}) + + assert captured["url"] == "wss://api.openai.com/v1/realtime?call_id=call_789" + + await asyncio.sleep(0) # allow listener task to start and finish + await model.close() + assert dummy_ws.closed + + +@pytest.mark.asyncio +async def test_sip_model_requires_call_id() -> None: + model = OpenAIRealtimeSIPModel() + + with pytest.raises(UserError): + await model.connect({"api_key": "sk-test", "initial_model_settings": {}}) From a0178a2f860ed394e53244b9f360883f08e3715f Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Mon, 27 Oct 2025 21:05:29 +0900 Subject: [PATCH 2/6] fix lint and mypy errors --- examples/realtime/twilio_sip/server.py | 46 +++++++++++-------- src/agents/realtime/openai_realtime.py | 2 +- .../test_openai_realtime_sip_model.py | 2 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/examples/realtime/twilio_sip/server.py b/examples/realtime/twilio_sip/server.py index 183c81663..08de9c5df 100644 --- a/examples/realtime/twilio_sip/server.py +++ b/examples/realtime/twilio_sip/server.py @@ -10,6 +10,7 @@ from fastapi import FastAPI, HTTPException, Request, Response from openai import APIStatusError, AsyncOpenAI, InvalidWebhookSignatureError +from agents.realtime.config import RealtimeSessionModelSettings from agents.realtime.items import ( AssistantAudio, AssistantMessageItem, @@ -89,9 +90,7 @@ async def accept_call(call_id: str) -> None: except Exception: # noqa: BLE001 detail = str(exc.response) - logger.error( - "Failed to accept call %s: %s %s", call_id, exc.status_code, detail - ) + logger.error("Failed to accept call %s: %s %s", call_id, exc.status_code, detail) raise HTTPException(status_code=500, detail="Failed to accept call") from exc logger.info("Accepted call %s", call_id) @@ -103,17 +102,16 @@ async def observe_call(call_id: str) -> None: runner = RealtimeRunner(assistant_agent, model=OpenAIRealtimeSIPModel()) try: - initial_settings = { + initial_model_settings: RealtimeSessionModelSettings = { "turn_detection": { "type": "semantic_vad", "interrupt_response": True, } } - async with await runner.run( model_config={ "call_id": call_id, - "initial_model_settings": initial_settings, + "initial_model_settings": initial_model_settings, } ) as session: # Trigger an initial greeting so callers hear the agent right away. @@ -124,12 +122,14 @@ async def observe_call(call_id: str) -> None: RealtimeModelSendRawMessage( message={ "type": "response.create", - "response": { - "instructions": ( - "Say exactly '" - f"{WELCOME_MESSAGE}" - "' now before continuing the conversation." - ) + "other_data": { + "response": { + "instructions": ( + "Say exactly '" + f"{WELCOME_MESSAGE}" + "' now before continuing the conversation." + ) + } }, } ) @@ -139,15 +139,21 @@ async def observe_call(call_id: str) -> None: if event.type == "history_added": item = event.item if isinstance(item, UserMessageItem): - for content in item.content: - if isinstance(content, InputText) and content.text: - logger.info("Caller: %s", content.text) + for user_content in item.content: + if isinstance(user_content, InputText) and user_content.text: + logger.info("Caller: %s", user_content.text) elif isinstance(item, AssistantMessageItem): - for content in item.content: - if isinstance(content, AssistantText) and content.text: - logger.info("Assistant (text): %s", content.text) - elif isinstance(content, AssistantAudio) and content.transcript: - logger.info("Assistant (audio transcript): %s", content.transcript) + for assistant_content in item.content: + if isinstance(assistant_content, AssistantText) and assistant_content.text: + logger.info("Assistant (text): %s", assistant_content.text) + elif ( + isinstance(assistant_content, AssistantAudio) + and assistant_content.transcript + ): + logger.info( + "Assistant (audio transcript): %s", + assistant_content.transcript, + ) elif event.type == "error": logger.error("Realtime session error: %s", event.error) diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index 75f1f1fd0..208f0c08e 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -937,7 +937,7 @@ def _tools_to_session_tools( class OpenAIRealtimeSIPModel(OpenAIRealtimeWebSocketModel): """Realtime model that attaches to SIP-originated calls using a call ID.""" - async def connect(self, options: RealtimeModelConfig) -> None: # type: ignore[override] + async def connect(self, options: RealtimeModelConfig) -> None: call_id = options.get("call_id") if not call_id: raise UserError("OpenAIRealtimeSIPModel requires `call_id` in the model configuration.") diff --git a/tests/realtime/test_openai_realtime_sip_model.py b/tests/realtime/test_openai_realtime_sip_model.py index b00d268b4..0ae833eee 100644 --- a/tests/realtime/test_openai_realtime_sip_model.py +++ b/tests/realtime/test_openai_realtime_sip_model.py @@ -31,7 +31,7 @@ async def test_sip_model_uses_call_id_in_url(monkeypatch: pytest.MonkeyPatch) -> dummy_ws = _DummyWebSocket() captured: dict[str, object] = {} - async def fake_connect(url: str, **kwargs): # type: ignore[override] + async def fake_connect(url: str, **kwargs): captured["url"] = url captured["kwargs"] = kwargs return dummy_ws From 97f2a5cf61084e322fedfb26e43714c306ea3ef1 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Mon, 27 Oct 2025 21:13:58 +0900 Subject: [PATCH 3/6] uv run ruff format --- examples/realtime/twilio_sip/server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/realtime/twilio_sip/server.py b/examples/realtime/twilio_sip/server.py index 08de9c5df..7628b6eff 100644 --- a/examples/realtime/twilio_sip/server.py +++ b/examples/realtime/twilio_sip/server.py @@ -144,7 +144,10 @@ async def observe_call(call_id: str) -> None: logger.info("Caller: %s", user_content.text) elif isinstance(item, AssistantMessageItem): for assistant_content in item.content: - if isinstance(assistant_content, AssistantText) and assistant_content.text: + if ( + isinstance(assistant_content, AssistantText) + and assistant_content.text + ): logger.info("Assistant (text): %s", assistant_content.text) elif ( isinstance(assistant_content, AssistantAudio) From 5339c35d2b4fa62ef0b245ecdfcafa407518c641 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 07:13:14 +0900 Subject: [PATCH 4/6] Add input validation to WS connect method --- src/agents/realtime/openai_realtime.py | 14 ++++++++++++-- tests/realtime/test_openai_realtime.py | 12 ++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index 208f0c08e..3aad14c8a 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -208,7 +208,18 @@ async def connect(self, options: RealtimeModelConfig) -> None: self._playback_tracker = options.get("playback_tracker", None) - self.model = model_settings.get("model_name", self.model) + call_id = options.get("call_id") + model_name = model_settings.get("model_name") + if call_id and model_name: + error_message = ( + "Cannot specify both `call_id` and `model_name` " + "when attaching to an existing realtime call." + ) + raise UserError(error_message) + + if model_name: + self.model = model_name + api_key = await get_api_key(options.get("api_key")) if "tracing" in model_settings: @@ -216,7 +227,6 @@ async def connect(self, options: RealtimeModelConfig) -> None: else: self._tracing_config = "auto" - call_id = options.get("call_id") if call_id: url = options.get("url", f"wss://api.openai.com/v1/realtime?call_id={call_id}") else: diff --git a/tests/realtime/test_openai_realtime.py b/tests/realtime/test_openai_realtime.py index 85297ec62..08c45e5d7 100644 --- a/tests/realtime/test_openai_realtime.py +++ b/tests/realtime/test_openai_realtime.py @@ -53,6 +53,18 @@ async def test_connect_missing_api_key_raises_error(self, model): with pytest.raises(UserError, match="API key is required"): await model.connect(config) + @pytest.mark.asyncio + async def test_connect_with_call_id_and_model_raises_error(self, model): + """Test that specifying both call_id and model raises UserError.""" + config = { + "api_key": "test-api-key-123", + "call_id": "call-123", + "initial_model_settings": {"model_name": "gpt-4o-realtime-preview"}, + } + + with pytest.raises(UserError, match="Cannot specify both `call_id` and `model_name`"): + await model.connect(config) + @pytest.mark.asyncio async def test_connect_with_string_api_key(self, model, mock_websocket): """Test successful connection with string API key.""" From 0367d814b6c55d8f109a9fd8d35f36898266222c Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 10:16:45 +0900 Subject: [PATCH 5/6] tweak the example sever --- examples/realtime/twilio_sip/server.py | 11 +++- tests/realtime/test_twilio_sip_server.py | 74 ++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 tests/realtime/test_twilio_sip_server.py diff --git a/examples/realtime/twilio_sip/server.py b/examples/realtime/twilio_sip/server.py index 7628b6eff..6fd07ade2 100644 --- a/examples/realtime/twilio_sip/server.py +++ b/examples/realtime/twilio_sip/server.py @@ -173,8 +173,15 @@ async def observe_call(call_id: str) -> None: def _track_call_task(call_id: str) -> None: existing = active_call_tasks.get(call_id) - if existing and not existing.done(): - existing.cancel() + if existing: + if not existing.done(): + logger.info( + "Call %s already has an active observer; ignoring duplicate webhook delivery.", + call_id, + ) + return + # Remove completed tasks so a new observer can start for a fresh call. + active_call_tasks.pop(call_id, None) task = asyncio.create_task(observe_call(call_id)) active_call_tasks[call_id] = task diff --git a/tests/realtime/test_twilio_sip_server.py b/tests/realtime/test_twilio_sip_server.py new file mode 100644 index 000000000..b9c1da4f4 --- /dev/null +++ b/tests/realtime/test_twilio_sip_server.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import importlib +from types import ModuleType +from unittest.mock import AsyncMock, Mock + +import pytest + +# +# This is a unit test for examples/realtime/twilio_sip/server.py +# If this is no longer relevant in the future, we can remove it. +# + +@pytest.fixture +def twilio_server(monkeypatch: pytest.MonkeyPatch) -> ModuleType: + monkeypatch.setenv("OPENAI_API_KEY", "test") + monkeypatch.setenv("OPENAI_WEBHOOK_SECRET", "secret") + module = importlib.import_module("examples.realtime.twilio_sip.server") + module = importlib.reload(module) + monkeypatch.setattr(module, "active_call_tasks", {}) + return module + + +@pytest.mark.asyncio +async def test_track_call_task_ignores_duplicate_webhooks( + monkeypatch: pytest.MonkeyPatch, twilio_server: ModuleType +) -> None: + call_id = "call-123" + existing_task = Mock() + existing_task.done.return_value = False + existing_task.cancel = Mock() + + monkeypatch.setitem(twilio_server.active_call_tasks, call_id, existing_task) + + create_task_mock = Mock() + + def fake_create_task(coro): + coro.close() + return create_task_mock.return_value + + monkeypatch.setattr(twilio_server.asyncio, "create_task", fake_create_task) + + twilio_server._track_call_task(call_id) + + existing_task.cancel.assert_not_called() + create_task_mock.assert_not_called() + assert twilio_server.active_call_tasks[call_id] is existing_task + + +@pytest.mark.asyncio +async def test_track_call_task_restarts_after_completion( + monkeypatch: pytest.MonkeyPatch, twilio_server: ModuleType +) -> None: + call_id = "call-456" + existing_task = Mock() + existing_task.done.return_value = True + existing_task.cancel = Mock() + + monkeypatch.setitem(twilio_server.active_call_tasks, call_id, existing_task) + + new_task = AsyncMock() + create_task_mock = Mock(return_value=new_task) + + def fake_create_task(coro): + coro.close() + return create_task_mock(coro) + + monkeypatch.setattr(twilio_server.asyncio, "create_task", fake_create_task) + + twilio_server._track_call_task(call_id) + + existing_task.cancel.assert_not_called() + create_task_mock.assert_called_once() + assert twilio_server.active_call_tasks[call_id] is new_task From fedbb903ae9d3168e10d5e31fb9ddd93ac6ae339 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Wed, 29 Oct 2025 10:19:47 +0900 Subject: [PATCH 6/6] make format --- tests/realtime/test_twilio_sip_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/realtime/test_twilio_sip_server.py b/tests/realtime/test_twilio_sip_server.py index b9c1da4f4..173395173 100644 --- a/tests/realtime/test_twilio_sip_server.py +++ b/tests/realtime/test_twilio_sip_server.py @@ -11,6 +11,7 @@ # If this is no longer relevant in the future, we can remove it. # + @pytest.fixture def twilio_server(monkeypatch: pytest.MonkeyPatch) -> ModuleType: monkeypatch.setenv("OPENAI_API_KEY", "test")