The goal of this first section is to get familiar with the [FastRTC library](https://github.com/gradio-app/fastrtc) ‚Äî the Python library for real-time communication ‚Äî which lets us turn any Python function into a real-time audio and video stream over WebRTC or WebSockets.

> We'll use this library to communicate with our agents in real time, just like having a conversation with a colleague!

In this first notebook, we'll teach you the basics of FastRTC, starting with very simple handlers (echo handlers), and gradually moving on to handlers that involve calling LLM providers, and finally, ReACT agents with tool use.

![FastRTC Logo](img/fastrtc_logo.png)

## Understanding FastRTC core concepts

The heart of FastRTC is the powerful [Stream](https://fastrtc.org/reference/stream/) object, which handles real-time streaming of audio, video, or both.

üîÑ **Streaming Modes**

FastRTC supports three streaming modes, depending on your application's needs:

* `send-receive`: Enables full bidirectional communication. This is the mode we‚Äôll use, ideal for conversational agents.
* `send`: Streams data from client to server only.
* `receive`: Streams data from server to client only.

üéôÔ∏è **Modalities**

You can build your application around one of three modalities:

* `video`: For real-time video streaming.
* `audio`: For real-time audio ‚Äî this is our focus, since we‚Äôre building phone-call-style agents.
* `audio-video`: Combines both streams for full audiovisual experiences.

üß† **Handlers**

The handler is the core intelligence behind your `Stream` ‚Äî it's where you define how incoming data should be processed. For audio streams, you'll typically implement a class that inherits from either `StreamHandler` or `AsyncStreamHandler`, depending on whether your processing logic is synchronous or asynchronous.

FastRTC also offers a convenient built-in option: the `ReplyOnPause` handler. This handler uses **voice activity detection (VAD)** to determine when the user has finished speaking, and only then sends the collected audio to your generator function. We‚Äôll be using this approach in some of our examples to simplify interaction and improve responsiveness.

---
**‚ö†Ô∏è  IMPORTANT!  ‚ö†Ô∏è**

Be sure to set up your `.env` file and install the project first, since we'll need those configuration settings before moving forward.

---



In [None]:
from dotenv import load_dotenv

load_dotenv()

In [None]:
from realtime_phone_agents.config import settings

## Example 1: Echo Audio (StreamHandler)

`StreamHandler` is a low-level abstraction that gives you full control over how audio is received, processed, and returned ‚Äî both for the input and output streams.

In the following example, we'll use it to create a simple `EchoHandler`: the handler will send the user's audio back exactly as it was received.

In [None]:
from queue import Queue

import gradio as gr
import numpy as np
from fastrtc import Stream, StreamHandler


class EchoHandler(StreamHandler):
    def __init__(self) -> None:
        super().__init__()
        self.queue = Queue()

    def receive(self, frame: tuple[int, np.ndarray]) -> None:
        self.queue.put(frame)

    def emit(self) -> None:  #
        return self.queue.get()

    def copy(self) -> StreamHandler:
        return EchoHandler()

    def shutdown(self) -> None:  #
        pass

    def start_up(self) -> None:  #
        pass

In [None]:
stream = Stream(handler=EchoHandler(), modality="audio", mode="send-receive")

In [None]:
stream.ui.launch()

We could do exactly the same, but with an `AsyncStreamHandler`.

## Example 2: Async Echo Audio (AsyncStreamHandler)

In [None]:
import asyncio

import numpy as np
from fastrtc import AsyncStreamHandler, Stream, wait_for_item


class AsyncEchoHandler(AsyncStreamHandler):
    """Simple Async Echo Handler"""

    def __init__(self) -> None:
        super().__init__(input_sample_rate=24000)
        self.queue = asyncio.Queue()

    async def receive(self, frame: tuple[int, np.ndarray]) -> None:
        await self.queue.put(frame)

    async def emit(self) -> None:
        return await wait_for_item(self.queue)

    def copy(self):
        return AsyncEchoHandler()

    async def shutdown(self):
        pass

    async def start_up(self) -> None:
        pass

In [None]:
stream = Stream(handler=AsyncEchoHandler(), modality="audio", mode="send-receive")

In [None]:
stream.ui.launch()

## Example 3: ReplyOnPause Handler

Luckily for us, FastRTC provides a convenient built-in handler called [ReplyOnPause](https://fastrtc.org/reference/reply_on_pause/) that does exactly what we need for a phone calling experience.This handler listens to incoming audio, waits for the user to pause, and then calls a reply function (fn) when that pause is detected.

Here's how it works: it collects audio chunks while the user speaks, uses a **Voice Activity Detection (VAD)** model to detect when speech is happening, and identifies pauses based on configurable thresholds. Once it detects a pause after speech has started, it sends the accumulated audio to your reply function.

You can also define an optional `startup_fn` to run when the stream begins, and the handler can interrupt a running reply if new audio arrives ‚Äî making your agent more responsive and natural in conversation.

Let's emulate our `EchoAudioHandler` with the `ReplyOnPause` handler!

> The experiment is not identical, since our first handlers where returning the frames directly, in this case, the ReplyOnPause is going to return the accumulated audio!

In [None]:
import numpy as np
from fastrtc import ReplyOnPause, Stream


def echo(audio: tuple[int, np.ndarray]):
    yield audio


stream = Stream(
    handler=ReplyOnPause(echo),
    modality="audio",
    mode="send-receive",
)

In [None]:
stream.ui.launch()

## Example 4: Adding TTS and STT Models

Now that we have a handler that can detect when the user is speaking ‚Äî and more importantly, when they‚Äôve paused ‚Äî we‚Äôre finally ready to do something useful with their voice, not just echo it back.

It‚Äôs time to introduce two key components that will bring our conversational agent to life: **speech-to-text (STT)** and **text-to-speech (TTS)** models.

* The **STT model** will transcribe the accumulated audio into text, so we can understand what the user said.
* The **TTS model** will take a text response and convert it into audio that we can stream back using FastRTC.

To get started, we'll try out FastRTC's built-in models:

* For STT, we'll use `Moonshine` ‚Äî it‚Äôs lightweight and simple, perfect for our first version.
* For TTS, we‚Äôll go with `Kokoro`, which gives us clear and natural-sounding voices.

> In future lessons, we'll upgrade these two models with faster-whisper and Orpheus 3B!

Let's upgrade our Echo Audio application, by sending back the same audio, but with a different voice (a `kokoro` voice!)

In [None]:
import numpy as np
from fastrtc import ReplyOnPause, Stream, get_stt_model, get_tts_model

stt_model = get_stt_model()
tts_model = get_tts_model()


async def echo(audio: tuple[int, np.ndarray]):
    transcription = stt_model.stt(audio)
    async for audio_chunk in tts_model.stream_tts(transcription):
        yield audio_chunk


stream = Stream(
    handler=ReplyOnPause(echo),
    modality="audio",
    mode="send-receive",
)

In [None]:
stream.ui.launch()

## Example 5: Generating a response with an Agent

We can enhance our system by adding an Agent between the **STT model** and the **TTS model**. For this initial Agent, we‚Äôll ignore any use of Tools.

To create our Agent, we will use the new `create_agent` method from LangChain, that allows us to create a **ReAct Agent** in just a few lines of code.

In [None]:
import numpy as np
from fastrtc import ReplyOnPause, Stream, get_stt_model, get_tts_model
from langchain.agents import create_agent
from langchain_groq import ChatGroq
from langgraph.checkpoint.memory import InMemorySaver

system_prompt = """
Your name is Sarah, a funny voice assistant who loves telling jokes. 
You are part of a phone conversation, so don't use emojis or asterisks
during your responses."""

stt_model = get_stt_model()
llm = ChatGroq(model=settings.groq.model, api_key=settings.groq.api_key)
tts_model = get_tts_model()

simple_agent = create_agent(
    llm, checkpointer=InMemorySaver(), system_prompt=system_prompt
)

The idea is to take Moonshine's transcribed text and pass it directly to our agent. The agent's response will then be streamed using the Kokoro voice. That's precisely what the next cell does.

In [None]:
async def simple_agent_handler(audio: tuple[int, np.ndarray]):
    # 1. Generate the transcription using Moonshine model
    transcription = stt_model.stt(audio)

    # 2. Use the transcription as user input to our agent, and wait for the response
    response = simple_agent.invoke(
        {"messages": [{"role": "user", "content": transcription}]},
        {"configurable": {"thread_id": "test"}},
    )

    # 3. Stream the audio response using the Kokoro model
    async for audio_chunk in tts_model.stream_tts(response["messages"][-1].content):
        yield audio_chunk


stream = Stream(
    handler=ReplyOnPause(simple_agent_handler),
    modality="audio",
    mode="send-receive",
)

In [None]:
stream.ui.launch()

## Example 6: Adding Tools to our Agent

**But what is an Agent without tools?** One of the most important features of our system is the ability to fetch property information from Superlinked using Tools (more on that in Lesson 2!). Simply generating a response won't always be enough‚Äîespecially when the Agent needs to perform a complex search.

To handle this, we'll use a common technique found in ChatGPT Voice and many other products. Whenever the Agent needs to search, we'll send a short message back to the user to acknowledge that the system is working. You could even add extra effects‚Äîlike a typing sound‚Äîto make it feel more interactive.

The next cell demonstrates how we implement this approach.

In [None]:

import numpy as np
from fastrtc import ReplyOnPause, Stream, get_stt_model, get_tts_model
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_groq import ChatGroq
from langgraph.checkpoint.memory import InMemorySaver


@tool
def search_property_mock_tool(location: str) -> str:
    """Retrieve real estate details for properties in a given location."""
    return (
        "I found one apartment in that area. It features 3 rooms, "
        "2 bathrooms, and a beautifully designed living room."
    )


system_prompt = """
Your name is Lisa, and you work for The Neural Maze real estate company. 
Your task is to provide information about specific apartments using the `search_property_mock_tool`.
Don't use asterisks or emojis, as you are engaged in a phone call. Just return short and informative responses.
"""

stt_model = get_stt_model()
llm = ChatGroq(model=settings.groq.model, api_key=settings.groq.api_key)
tts_model = get_tts_model()

tool_agent = create_agent(
    llm,
    checkpointer=InMemorySaver(),
    system_prompt=system_prompt,
    tools=[search_property_mock_tool],
)

Instead of waiting for the full response as we did before, we now listen to the **stream_updates**. This allows us to detect when the agent is about to use a tool and play our ‚Äútrick‚Äù audio, giving the user the feeling of a more natural, real conversation.

In [None]:
from pprint import pformat

for chunk in tool_agent.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": "Give me cool appartments on San Francisco please",
            }
        ]
    },
    {"configurable": {"thread_id": "test"}},
    stream_mode="updates",
):
    for step, data in chunk.items():
        print(f"\n=== Step: {step} ===")
        print(pformat(data))

---

First, we'll create a few helper functions to load a **keyboard sound**. You can swap this out for any sound you prefer, but we'll keep it simple for now. These functions also ensure compatibility with FastRTC, so you don't have to worry about it.

In [None]:

import numpy as np
from pydub import AudioSegment


def load_keyboard_sound(path: str, target_rate: int = 16000, chunk_ms: int = 100):
    """
    Loads an MP3 keyboard sound and returns it as a list of (sample_rate, np.ndarray)
    audio chunks, suitable for your existing streaming format.
    """
    audio = (
        AudioSegment.from_file(path)
        .set_channels(1)  # mono
        .set_frame_rate(target_rate)  # resample to 16k
    )

    # Convert PCM int16 ‚Üí float32 array in [-1, 1]
    samples = np.array(audio.get_array_of_samples()).astype(np.float32)
    samples /= 32768.0  # normalize from int16

    # Split into chunks
    samples_per_chunk = int((target_rate * chunk_ms) / 1000)
    chunks = []

    for i in range(0, len(samples), samples_per_chunk):
        chunk = samples[i : i + samples_per_chunk]
        if len(chunk) == 0:
            continue
        chunks.append((target_rate, chunk))

    return chunks

Now, we need to create another method, that generates that streams the **keyboard sound** as we do with the TTS voice.

In [None]:
# Load the sound before starting the process
KEYBOARD_AUDIO_CHUNKS = load_keyboard_sound("sounds/keyboard.mp3")


async def stream_keyboard_sound(max_duration_s: float = 3.0):
    """
    Streams a random keyboard sound effect for at most `max_duration_s` seconds.

    Args:
        max_duration_s: Maximum duration (in seconds) to stream.
    """
    if max_duration_s <= 0:
        return

    total_samples = 0
    total_samples_allowed = None

    for sample_rate, chunk in KEYBOARD_AUDIO_CHUNKS:
        # Initialize allowed sample budget once we know the sample rate
        if total_samples_allowed is None:
            total_samples_allowed = int(max_duration_s * sample_rate)

        if total_samples >= total_samples_allowed:
            break

        remaining_samples = total_samples_allowed - total_samples

        # Trim the chunk if it would exceed the allowed duration
        if len(chunk) > remaining_samples:
            chunk = chunk[:remaining_samples]

        if len(chunk) == 0:
            break

        yield (sample_rate, chunk)
        total_samples += len(chunk)

        await asyncio.sleep(0)  # allow event loop to breathe

Finally, let's build the **Tool Agent Handler**. Remember, this handler will:

1. Run the Agent in streaming mode and listen for updates.
2. When the Agent emits a message containing a `tool_calls` field, it will play a default audio cue (`Let me look for that in the system`) followed by the keyboard typing sound.
3. Deliver the final result once the Agent completes its work.

In [None]:


def model_has_tool_calls(model_step_data) -> bool:
    """
    Heuristic: returns True if this 'model' step contains tool_calls.
    The exact schema depends on your agent; adjust as needed.
    """
    msgs = None
    if isinstance(model_step_data, dict) and "messages" in model_step_data:
        msgs = model_step_data["messages"]
    elif isinstance(model_step_data, list):
        msgs = model_step_data
    else:
        msgs = [model_step_data]

    for msg in msgs:
        # Attribute-style (e.g. pydantic/BaseModel objects)
        tool_calls = getattr(msg, "tool_calls", None)
        if tool_calls:
            return True

        # Dict-style
        if isinstance(msg, dict):
            if msg.get("tool_calls"):
                return True

        # Sometimes tool calls live in content parts
        content = getattr(msg, "content", None) or (
            msg.get("content") if isinstance(msg, dict) else None
        )
        if isinstance(content, list):
            for part in content:
                if isinstance(part, dict) and part.get("tool_calls"):
                    return True

    return False


async def tool_agent_handler(audio: tuple[int, np.ndarray], *additional_inputs):
    _, tool_use_message, keyboard_sound_seconds = additional_inputs

    transcription = stt_model.stt(audio)

    for chunk in tool_agent.stream(
        {"messages": [{"role": "user", "content": transcription}]},
        {"configurable": {"thread_id": "test"}},
        stream_mode="updates",
    ):
        for step, data in chunk.items():
            if step == "model" and model_has_tool_calls(data):
                async for audio_chunk in tts_model.stream_tts(tool_use_message):
                    yield audio_chunk

                async for kb_chunk in stream_keyboard_sound(keyboard_sound_seconds):
                    yield kb_chunk

                await asyncio.sleep(5)

    final_text = data["messages"][0].content

    if not final_text:
        final_text = "I'm sorry, I couldn't find anything useful in the system."

    async for audio_chunk in tts_model.stream_tts(final_text):
        yield audio_chunk

In [None]:
stream = Stream(
    handler=ReplyOnPause(tool_agent_handler),
    modality="audio",
    mode="send-receive",
    additional_inputs=[
        gr.Text("Let me look for that in the system", label="Tool Use Sentence"),
        gr.Number(3.0, label="Max Keyboard Sound Duration"),
    ],
)

In [None]:
stream.ui.launch()