In [None]:
import asyncio
import os
import time
from typing import Annotated, Union

import nest_asyncio
import uvicorn
from fastapi import FastAPI, Request, WebSocket
from fastapi.responses import HTMLResponse, JSONResponse
from pydantic import BaseModel
from twilio.twiml.voice_response import Connect, VoiceResponse

from autogen.agentchat.realtime_agent import FunctionObserver, RealtimeAgent, TwilioAudioAdapter
from autogen.agentchat.realtime_agent.swarm_observer import SwarmObserver

In [None]:
# Configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PORT = int(os.getenv("PORT", 5050))

if not OPENAI_API_KEY:
    raise ValueError("Missing the OpenAI API key. Please set it in the .env file.")

llm_config = {
    "timeout": 600,
    "cache_seed": 45,  # change the seed for different trials
    "config_list": [
        {
            "model": "gpt-4o-realtime-preview-2024-10-01",
            "api_key": OPENAI_API_KEY,
        }
    ],
    "temperature": 0.8,
}

In [None]:
nest_asyncio.apply()

app = FastAPI()


@app.get("/", response_class=JSONResponse)
async def index_page():
    return {"message": "Twilio Media Stream Server is running!"}


@app.api_route("/incoming-call", methods=["GET", "POST"])
async def handle_incoming_call(request: Request):
    """Handle incoming call and return TwiML response to connect to Media Stream."""
    response = VoiceResponse()
    # <Say> punctuation to improve text-to-speech flow
    response.say(
        "Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API"
    )
    response.pause(length=1)
    response.say("O.K. you can start talking!")
    host = request.url.hostname
    connect = Connect()
    connect.stream(url=f"wss://{host}/media-stream")
    response.append(connect)
    return HTMLResponse(content=str(response), media_type="application/xml")


@app.websocket("/media-stream")
async def handle_media_stream(websocket: WebSocket):
    """Handle WebSocket connections between Twilio and OpenAI."""
    await websocket.accept()

    audio_adapter = TwilioAudioAdapter(websocket)
    openai_client = RealtimeAgent(
        name="Weather Bot",
        system_message="Hello there! I am an AI voice assistant powered by Twilio and the OpenAI Realtime API. You can ask me for facts, jokes, or anything you can imagine. How can I help you?",
        llm_config=llm_config,
        audio_adapter=audio_adapter,
    )

    @openai_client.register_handover(name="get_weather", description="Get the current weather")
    def get_weather(location: Annotated[str, "city"]) -> str:
        ...
        return "The weather is cloudy." if location == "Seattle" else "The weather is sunny."

    await openai_client.run()


uvicorn.run(app, host="0.0.0.0", port=PORT)

## With swarm questions

In [None]:
app = FastAPI()


@app.get("/", response_class=JSONResponse)
async def index_page():
    return {"message": "Twilio Media Stream Server is running!"}


@app.api_route("/incoming-call", methods=["GET", "POST"])
async def handle_incoming_call(request: Request):
    """Handle incoming call and return TwiML response to connect to Media Stream."""
    response = VoiceResponse()
    response.say(
        "Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API"
    )
    response.pause(length=1)
    response.say("O.K. you can start talking!")
    host = request.url.hostname
    connect = Connect()
    connect.stream(url=f"wss://{host}/media-stream")
    response.append(connect)
    return HTMLResponse(content=str(response), media_type="application/xml")


@app.websocket("/media-stream")
async def handle_media_stream(websocket: WebSocket):
    """Handle WebSocket connections between Twilio and OpenAI."""
    await websocket.accept()

    audio_adapter = TwilioAudioAdapter(websocket)
    realtime_agent = RealtimeAgent(
        name="Customer service Bot",
        system_message=(
            "You are a helpful voice assistant. You are tasked with delegating tasks to a swarm of agents\n"
            "When a user calls in, you will need to ask the swarm for help to complete the task\n"
            "The swarm will ask questions and provide feedback to help you complete the task\n"
            "You can start the swarm by calling the function `start_swarm`\n"
            "After the swarm asks a question, you can answer it by calling the function `answer_swarm_question`"
        ),
        llm_config=llm_config,
        audio_adapter=audio_adapter,
    )

    class SwarmTaskInput(BaseModel):
        task: str

    class TaskEnd(BaseModel):
        result: str

    class TaskQuestion(BaseModel):
        question: str

    class TaskAnswer(BaseModel):
        answer: str

    swarm_started = False

    @realtime_agent.register_handover(name="start_swarm", description="Start a refund task using swarm intelligence")
    async def start_swarm(task_input: SwarmTaskInput) -> Union[TaskEnd, TaskQuestion]:
        nonlocal swarm_started
        swarm_started = True
        await asyncio.sleep(2)
        return TaskQuestion(
            question=(
                "Need more info from user, ask him: 'What is your id number?'\n\n"
                "After you get the answer from the user, call the function `answer_swarm_question` with the appropriate information to answer the question."
            )
        )

    @realtime_agent.register_handover(name="answer_swarm_question", description="Answer a question from the swarm")
    async def answer_swarm_question(answer: TaskAnswer) -> TaskEnd:
        nonlocal swarm_started
        if not swarm_started:
            return "Error: Swarm not started, call start_swarm first"
        await asyncio.sleep(2)
        time.sleep(2)
        return TaskEnd(result="We gave the user a refund for the last purchase.")

    await realtime_agent.run()


uvicorn.run(app, host="0.0.0.0", port=PORT)