Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions examples/realtime/twilio_sip/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Twilio SIP Realtime Example

This example shows how to handle OpenAI Realtime SIP calls with the Agents SDK. Incoming calls are accepted through the Realtime Calls API, a triage agent answers with a fixed greeting, and handoffs route the caller to specialist agents (FAQ lookup and record updates) similar to the realtime UI demo.

## Prerequisites

- Python 3.9+
- An OpenAI API key with Realtime API access
- A configured webhook secret for your OpenAI project
- A Twilio account with a phone number and Elastic SIP Trunking enabled
- A public HTTPS endpoint for local development (for example, [ngrok](https://ngrok.com/))

## Configure OpenAI

1. In [platform settings](https://platform.openai.com/settings) select your project.
2. Create a webhook pointing to `https://<your-public-host>/openai/webhook` with "realtime.call.incoming" event type and note the signing secret. The example verifies each webhook with `OPENAI_WEBHOOK_SECRET`.

## Configure Twilio Elastic SIP Trunking

1. Create (or edit) an Elastic SIP trunk.
2. On the **Origination** tab, add an origination SIP URI of `sip:proj_<your_project_id>@sip.api.openai.com;transport=tls` so Twilio sends inbound calls to OpenAI. (The Termination tab always ends with `.pstn.twilio.com`, so leave it unchanged.)
3. Add at least one phone number to the trunk so inbound calls are forwarded to OpenAI.

## Setup

1. Install dependencies:
```bash
uv pip install -r examples/realtime/twilio-sip/requirements.txt
```
2. Export required environment variables:
```bash
export OPENAI_API_KEY="sk-..."
export OPENAI_WEBHOOK_SECRET="whsec_..."
```
3. (Optional) Adjust the multi-agent logic in `examples/realtime/twilio_sip/agents.py` if you want
to change the specialist agents or tools.
4. Run the FastAPI server:
```bash
uv run uvicorn examples.realtime.twilio_sip.server:app --host 0.0.0.0 --port 8000
```
5. Expose the server publicly (example with ngrok):
```bash
ngrok http 8000
```

## Test a Call

1. Place a call to the Twilio number attached to the SIP trunk.
2. Twilio sends the call to `sip.api.openai.com`; OpenAI fires `realtime.call.incoming`, which this example accepts.
3. The triage agent greets the caller, then either keeps the conversation or hands off to:
- **FAQ Agent** – answers common questions via `faq_lookup_tool`.
- **Records Agent** – writes short notes using `update_customer_record`.
4. The background task attaches to the call and logs transcripts plus basic events in the console.

You can edit `server.py` to change instructions, add tools, or integrate with internal systems once the SIP session is active.
1 change: 1 addition & 0 deletions examples/realtime/twilio_sip/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""OpenAI Realtime SIP example package."""
87 changes: 87 additions & 0 deletions examples/realtime/twilio_sip/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Realtime agent definitions shared by the Twilio SIP example."""

from __future__ import annotations

import asyncio

from agents import function_tool
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
from agents.realtime import RealtimeAgent, realtime_handoff

# --- Tools -----------------------------------------------------------------


WELCOME_MESSAGE = "Hello, this is ABC customer service. How can I help you today?"


@function_tool(
name_override="faq_lookup_tool", description_override="Lookup frequently asked questions."
)
async def faq_lookup_tool(question: str) -> str:
"""Fetch FAQ answers for the caller."""

await asyncio.sleep(3)

q = question.lower()
if "plan" in q or "wifi" in q or "wi-fi" in q:
return "We provide complimentary Wi-Fi. Join the ABC-Customer network." # demo data
if "billing" in q or "invoice" in q:
return "Your latest invoice is available in the ABC portal under Billing > History."
if "hours" in q or "support" in q:
return "Human support agents are available 24/7; transfer to the specialist if needed."
return "I'm not sure about that. Let me transfer you back to the triage agent."


@function_tool
async def update_customer_record(customer_id: str, note: str) -> str:
"""Record a short note about the caller."""

await asyncio.sleep(1)
return f"Recorded note for {customer_id}: {note}"


# --- Agents ----------------------------------------------------------------


faq_agent = RealtimeAgent(
name="FAQ Agent",
handoff_description="Handles frequently asked questions and general account inquiries.",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are an FAQ specialist. Always rely on the faq_lookup_tool for answers and keep replies
concise. If the caller needs hands-on help, transfer back to the triage agent.
""",
tools=[faq_lookup_tool],
)

records_agent = RealtimeAgent(
name="Records Agent",
handoff_description="Updates customer records with brief notes and confirmation numbers.",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle structured updates. Confirm the customer's ID, capture their request in a short
note, and use the update_customer_record tool. For anything outside data updates, return to the
triage agent.
""",
tools=[update_customer_record],
)

triage_agent = RealtimeAgent(
name="Triage Agent",
handoff_description="Greets callers and routes them to the most appropriate specialist.",
instructions=(
f"{RECOMMENDED_PROMPT_PREFIX} "
"Always begin the call by saying exactly: '"
f"{WELCOME_MESSAGE}' "
"before collecting details. Once the greeting is complete, gather context and hand off to "
"the FAQ or Records agents when appropriate."
),
handoffs=[faq_agent, realtime_handoff(records_agent)],
)

faq_agent.handoffs.append(triage_agent)
records_agent.handoffs.append(triage_agent)


def get_starting_agent() -> RealtimeAgent:
"""Return the agent used to start each realtime call."""

return triage_agent
3 changes: 3 additions & 0 deletions examples/realtime/twilio_sip/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fastapi>=0.120.0
openai>=2.2,<3
uvicorn[standard]>=0.38.0
211 changes: 211 additions & 0 deletions examples/realtime/twilio_sip/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
"""Minimal FastAPI server for handling OpenAI Realtime SIP calls with Twilio."""

from __future__ import annotations

import asyncio
import logging
import os

import websockets
from fastapi import FastAPI, HTTPException, Request, Response
from openai import APIStatusError, AsyncOpenAI, InvalidWebhookSignatureError

from agents.realtime.config import RealtimeSessionModelSettings
from agents.realtime.items import (
AssistantAudio,
AssistantMessageItem,
AssistantText,
InputText,
UserMessageItem,
)
from agents.realtime.model_inputs import RealtimeModelSendRawMessage
from agents.realtime.openai_realtime import OpenAIRealtimeSIPModel
from agents.realtime.runner import RealtimeRunner

from .agents import WELCOME_MESSAGE, get_starting_agent

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("twilio_sip_example")


def _get_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"Missing environment variable: {name}")
return value


OPENAI_API_KEY = _get_env("OPENAI_API_KEY")
OPENAI_WEBHOOK_SECRET = _get_env("OPENAI_WEBHOOK_SECRET")

client = AsyncOpenAI(api_key=OPENAI_API_KEY, webhook_secret=OPENAI_WEBHOOK_SECRET)

# Build the multi-agent graph (triage + specialist agents) from agents.py.
assistant_agent = get_starting_agent()

app = FastAPI()

# Track background tasks so repeated webhooks do not spawn duplicates.
active_call_tasks: dict[str, asyncio.Task[None]] = {}


async def accept_call(call_id: str) -> None:
"""Accept the incoming SIP call and configure the realtime session."""

# The starting agent uses static instructions, so we can forward them directly to the accept
# call payload. If someone swaps in a dynamic prompt, fall back to a sensible default.
instructions_payload = (
assistant_agent.instructions
if isinstance(assistant_agent.instructions, str)
else "You are a helpful triage agent for ABC customer service."
)

try:
# AsyncOpenAI does not yet expose high-level helpers like client.realtime.calls.accept, so
# we call the REST endpoint directly via client.post(). Keep this until the SDK grows an
# async helper.
await client.post(
f"/realtime/calls/{call_id}/accept",
body={
"type": "realtime",
"model": "gpt-realtime",
"instructions": instructions_payload,
},
cast_to=dict,
)
except APIStatusError as exc:
if exc.status_code == 404:
# Twilio occasionally retries webhooks after the caller hangs up; treat as a no-op so
# the webhook still returns 200.
logger.warning(
"Call %s no longer exists when attempting accept (404). Skipping.", call_id
)
return

detail = exc.message
if exc.response is not None:
try:
detail = exc.response.text
except Exception: # noqa: BLE001
detail = str(exc.response)

logger.error("Failed to accept call %s: %s %s", call_id, exc.status_code, detail)
raise HTTPException(status_code=500, detail="Failed to accept call") from exc

logger.info("Accepted call %s", call_id)


async def observe_call(call_id: str) -> None:
"""Attach to the realtime session and log conversation events."""

runner = RealtimeRunner(assistant_agent, model=OpenAIRealtimeSIPModel())

try:
initial_model_settings: RealtimeSessionModelSettings = {
"turn_detection": {
"type": "semantic_vad",
"interrupt_response": True,
}
}
async with await runner.run(
model_config={
"call_id": call_id,
"initial_model_settings": initial_model_settings,
}
) as session:
# Trigger an initial greeting so callers hear the agent right away.
# Issue a response.create immediately after the WebSocket attaches so the model speaks
# before the caller says anything. Using the raw client message ensures zero latency
# and avoids threading the greeting through history.
await session.model.send_event(
RealtimeModelSendRawMessage(
message={
"type": "response.create",
"other_data": {
"response": {
"instructions": (
"Say exactly '"
f"{WELCOME_MESSAGE}"
"' now before continuing the conversation."
)
}
},
}
)
)

async for event in session:
if event.type == "history_added":
item = event.item
if isinstance(item, UserMessageItem):
for user_content in item.content:
if isinstance(user_content, InputText) and user_content.text:
logger.info("Caller: %s", user_content.text)
elif isinstance(item, AssistantMessageItem):
for assistant_content in item.content:
if (
isinstance(assistant_content, AssistantText)
and assistant_content.text
):
logger.info("Assistant (text): %s", assistant_content.text)
elif (
isinstance(assistant_content, AssistantAudio)
and assistant_content.transcript
):
logger.info(
"Assistant (audio transcript): %s",
assistant_content.transcript,
)
elif event.type == "error":
logger.error("Realtime session error: %s", event.error)

except websockets.exceptions.ConnectionClosedError:
# Callers hanging up causes the WebSocket to close without a frame; log at info level so it
# does not surface as an error.
logger.info("Realtime WebSocket closed for call %s", call_id)
except Exception as exc: # noqa: BLE001 - demo logging only
logger.exception("Error while observing call %s", call_id, exc_info=exc)
finally:
logger.info("Call %s ended", call_id)
active_call_tasks.pop(call_id, None)


def _track_call_task(call_id: str) -> None:
existing = active_call_tasks.get(call_id)
if existing:
if not existing.done():
logger.info(
"Call %s already has an active observer; ignoring duplicate webhook delivery.",
call_id,
)
return
# Remove completed tasks so a new observer can start for a fresh call.
active_call_tasks.pop(call_id, None)

task = asyncio.create_task(observe_call(call_id))
active_call_tasks[call_id] = task


@app.post("/openai/webhook")
async def openai_webhook(request: Request) -> Response:
body = await request.body()

try:
event = client.webhooks.unwrap(body, request.headers)
except InvalidWebhookSignatureError as exc:
raise HTTPException(status_code=400, detail="Invalid webhook signature") from exc

if event.type == "realtime.call.incoming":
call_id = event.data.call_id
await accept_call(call_id)
_track_call_task(call_id)
return Response(status_code=200)

# Ignore other webhook event types for brevity.
return Response(status_code=200)


@app.get("/")
async def healthcheck() -> dict[str, str]:
return {"status": "ok"}
7 changes: 7 additions & 0 deletions src/agents/realtime/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ class RealtimeModelConfig(TypedDict):
is played to the user.
"""

call_id: NotRequired[str]
"""Attach to an existing realtime call instead of creating a new session.

When provided, the transport connects using the `call_id` query string parameter rather than a
model name. This is used for SIP-originated calls that are accepted via the Realtime Calls API.
"""


class RealtimeModel(abc.ABC):
"""Interface for connecting to a realtime model and sending/receiving events."""
Expand Down
Loading