# LiveKit Agents: STT → Gemini Flash → Soprano TTS Pipeline

This notebook implements a **fastest-path** voice agent pipeline using:

| Component | Choice | Why |
|-----------|--------|-----|
| **STT** | sherpa-onnx streaming Zipformer small bilingual | int8 encoder/joiner + chunk size 96 for lower RTF |
| **LLM** | Gemini Live API (gemini-2.5-flash) | TEXT-only modality for speed |
| **TTS** | Soprano `infer_stream()` | True streaming audio (<15ms GPU latency) |
| **VAD** | Silero VAD | Fast endpointing with `turn_detection="vad"` |

## Key Latency Optimizations
- **No resampling on STT**: sherpa-onnx doesn't require 16kHz input
- **Chunk size 96**: Lower RTF than smaller chunks
- **Preemptive generation**: Enabled for perceived latency improvement
- **Sentence-level TTS**: Incremental audio generation without waiting for full response

## 1. Installation

In [1]:
# Install required packages
!pip install "livekit-agents[google,silero]~=1.3"
!pip install sherpa-onnx onnxruntime-gpu
!pip install soprano-tts

Collecting livekit-agents~=1.3 (from livekit-agents[google,silero]~=1.3)
  Downloading livekit_agents-1.3.12-py3-none-any.whl.metadata (7.3 kB)
Collecting av>=14.0.0 (from livekit-agents~=1.3->livekit-agents[google,silero]~=1.3)
  Downloading av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting colorama>=0.4.6 (from livekit-agents~=1.3->livekit-agents[google,silero]~=1.3)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Collecting eval-type-backport (from livekit-agents~=1.3->livekit-agents[google,silero]~=1.3)
  Downloading eval_type_backport-0.3.1-py3-none-any.whl.metadata (2.4 kB)
Collecting livekit-api<2,>=1.0.7 (from livekit-agents~=1.3->livekit-agents[google,silero]~=1.3)
  Downloading livekit_api-1.1.0-py3-none-any.whl.metadata (1.5 kB)
Collecting livekit-blingfire~=1.0 (from livekit-agents~=1.3->livekit-agents[google,silero]~=1.3)
  Downloading livekit_blingfire-1.1.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata 

Collecting sherpa-onnx
  Downloading sherpa_onnx-1.12.23-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.6/45.6 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting onnxruntime-gpu
  Downloading onnxruntime_gpu-1.23.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.4 kB)
Collecting sherpa-onnx-core==1.12.23 (from sherpa-onnx)
  Downloading sherpa_onnx_core-1.12.23-py3-none-manylinux2014_x86_64.whl.metadata (629 bytes)
Downloading sherpa_onnx-1.12.23-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m68.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading sherpa_onnx_core-1.12.23-py3-none-manylinux2014_x86_64.whl (9.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.5/9.5 MB[0m [31m69.3 MB/s[0m eta [36m0:00:00[0mta [36m0:00

## 2. Download Silero VAD Weights

In [2]:
# Download Silero VAD weights (required once)
# This uses the standard LiveKit plugin flow
!python -c "from livekit.plugins import silero; silero.VAD.load()"

## 3. Download Sherpa Model

Use **`sherpa-onnx-streaming-zipformer-small-bilingual-zh-en-2023-02-16`** and prefer the **int8** files.
The directory includes `64/` and `96/` chunk variants where larger chunk = lower RTF.

In [3]:
import os

# Download the sherpa model
SHERPA_MODEL_NAME = "sherpa-onnx-streaming-zipformer-small-bilingual-zh-en-2023-02-16"
SHERPA_MODEL_DIR = f"/content/{SHERPA_MODEL_NAME}"

if not os.path.exists(SHERPA_MODEL_DIR):
    !wget -q https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/{SHERPA_MODEL_NAME}.tar.bz2
    !tar -xf {SHERPA_MODEL_NAME}.tar.bz2
    print(f"Model downloaded to: {SHERPA_MODEL_DIR}")
else:
    print(f"Model already exists at: {SHERPA_MODEL_DIR}")

os.environ["SHERPA_MODEL_DIR"] = SHERPA_MODEL_DIR

Model already exists at: /content/sherpa-onnx-streaming-zipformer-small-bilingual-zh-en-2023-02-16


## 4. Environment Configuration

In [4]:
import os

# ============ REQUIRED: LiveKit Credentials ============
os.environ["LIVEKIT_URL"] = "wss://test-jllkasbg.livekit.cloud"  # Your LiveKit server URL
os.environ["LIVEKIT_API_KEY"] = "APIFnsAaWh3eFdR"  # Your API key
os.environ["LIVEKIT_API_SECRET"] = "WabCvkbupgaGfV7JQKBdZNDlYXuRFrr9jZcu7HTFdfG"  # Your API secret

# ============ REQUIRED: Google API Key ============
os.environ["GOOGLE_API_KEY"] = "AIzaSyD9sGx9FmvzIl7NtgU7vdwJVgs7NohSSqI"  # For Gemini

# ============ OPTIONAL: Tuning Parameters ============
os.environ["GEMINI_MODEL"] = "gemini-2.5-flash"  # Model to use
os.environ["GEMINI_TEMPERATURE"] = "0.3"  # Response temperature
os.environ["SYSTEM_PROMPT"] = "You are a concise, helpful voice assistant. Keep replies short unless asked."

# Soprano configuration
os.environ["SOPRANO_BACKEND"] = "lmdeploy"  # Use lmdeploy for speed
os.environ["SOPRANO_DEVICE"] = "cuda"  # GPU acceleration
os.environ["SOPRANO_CACHE_MB"] = "512"  # Cache size for T4
os.environ["SOPRANO_DECODER_BS"] = "2"  # Decoder batch size

# Sherpa configuration
os.environ["SHERPA_PROVIDER"] = "cuda"  # Use GPU
os.environ["SHERPA_NUM_THREADS"] = "2"  # Thread count

print("✅ Environment configured!")

✅ Environment configured!


## 5. Agent Implementation

In [5]:
import os
import re
import uuid
import numpy as np
from typing import AsyncIterable, Optional, List

from livekit import agents, rtc
from livekit.agents import Agent, AgentSession, ModelSettings, stt
from livekit.agents import cli
from livekit.agents.worker import WorkerOptions
from livekit.plugins import google, silero
from google.genai.types import Modality

import sherpa_onnx
from soprano import SopranoTTS


def _pick_existing(*paths: str) -> str:
    """Return the first path that exists."""
    for p in paths:
        if p and os.path.exists(p):
            return p
    raise FileNotFoundError(f"None of these paths exist: {paths}")


def _load_sherpa_recognizer(model_dir: str) -> sherpa_onnx.OnlineRecognizer:
    """
    Load sherpa-onnx recognizer with:
      - chunk size 96 (lower RTF)
      - int8 encoder + int8 joiner
    """
    # Prefer chunk=96 files when present
    base96 = os.path.join(model_dir, "96")
    base = base96 if os.path.isdir(base96) else model_dir

    tokens = _pick_existing(
        os.path.join(model_dir, "tokens.txt"),
        os.path.join(base, "tokens.txt"),
    )

    # Prefer int8 encoder/joiner when available
    encoder = _pick_existing(
        os.path.join(base, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "encoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.onnx"),
    )
    decoder = _pick_existing(
        os.path.join(base, "decoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "decoder-epoch-99-avg-1.onnx"),
    )
    joiner = _pick_existing(
        os.path.join(base, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "joiner-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.onnx"),
    )

    recognizer = sherpa_onnx.OnlineRecognizer.from_transducer(
        tokens=tokens,
        encoder=encoder,
        decoder=decoder,
        joiner=joiner,
        num_threads=int(os.getenv("SHERPA_NUM_THREADS", "2")),
        provider=os.getenv("SHERPA_PROVIDER", "cuda"),
        decoding_method="greedy_search",
        max_active_paths=4,
        enable_endpoint_detection=False,  # LiveKit VAD handles turns
    )
    return recognizer


print("✅ Helper functions defined!")

✅ Helper functions defined!


In [6]:
class FastPipelineAgent(Agent):
    """Custom agent with sherpa-onnx STT and Soprano TTS nodes."""
    
    def __init__(self, *, sherpa_recognizer: sherpa_onnx.OnlineRecognizer, soprano: SopranoTTS):
        super().__init__()
        self._sherpa = sherpa_recognizer
        self._soprano = soprano
        # Sentence boundary splitter for incremental TTS
        self._sent_re = re.compile(r"(.+?[.!?]\s+|.+?\n+)", re.DOTALL)

    async def stt_node(
        self,
        audio: AsyncIterable[rtc.AudioFrame],
        model_settings: ModelSettings,
    ) -> Optional[AsyncIterable[stt.SpeechEvent]]:
        """Custom STT using sherpa-onnx streaming recognizer."""
        
        async def _run() -> AsyncIterable[stt.SpeechEvent]:
            stream = self._sherpa.create_stream()
            last_text = ""

            async for frame in audio:
                samples_i16 = np.frombuffer(frame.data, dtype=np.int16)
                samples_f32 = samples_i16.astype(np.float32) / 32768.0
                stream.accept_waveform(frame.sample_rate, samples_f32)

                while self._sherpa.is_ready(stream):
                    self._sherpa.decode_stream(stream)
                    r = self._sherpa.get_result(stream)
                    text = r if isinstance(r, str) else getattr(r, "text", "")
                    if text and text != last_text:
                        last_text = text

            # Finalize
            stream.input_finished()
            while self._sherpa.is_ready(stream):
                self._sherpa.decode_stream(stream)

            r = self._sherpa.get_result(stream)
            final_text = r if isinstance(r, str) else getattr(r, "text", "")

            if final_text.strip():
                yield stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[stt.SpeechData(text=final_text)],
                )

            yield stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH, alternatives=[])

        return _run()

    async def tts_node(
        self,
        text: AsyncIterable[str],
        model_settings: ModelSettings,
    ) -> AsyncIterable[rtc.AudioFrame]:
        """Custom TTS using Soprano's infer_stream() for true streaming audio."""
        
        buffer = ""
        sample_rate = 32000  # Soprano outputs 32kHz
        channels = 1
        frame_ms = 20
        spf = int(sample_rate * frame_ms / 1000)  # samples per frame

        async def _emit_audio_from_pcm(pcm_f32: np.ndarray):
            """Convert float PCM to int16 frames."""
            pcm_i16 = np.clip(pcm_f32, -1.0, 1.0)
            pcm_i16 = (pcm_i16 * 32767.0).astype(np.int16)

            idx = 0
            n = pcm_i16.shape[0]
            while idx < n:
                chunk = pcm_i16[idx : idx + spf]
                if chunk.shape[0] < spf:
                    pad = np.zeros((spf - chunk.shape[0],), dtype=np.int16)
                    chunk = np.concatenate([chunk, pad], axis=0)
                idx += spf

                yield rtc.AudioFrame(
                    data=chunk.tobytes(),
                    sample_rate=sample_rate,
                    num_channels=channels,
                    samples_per_channel=spf,
                )

        async def _speak_sentence(sentence: str):
            """Stream audio for a single sentence."""
            stream = self._soprano.infer_stream(sentence, chunk_size=1)
            for chunk in stream:
                if isinstance(chunk, np.ndarray):
                    pcm = chunk.astype(np.float32)
                else:
                    pcm = np.asarray(chunk, dtype=np.float32)

                async for frame in _emit_audio_from_pcm(pcm):
                    yield frame

        async for delta in text:
            buffer += delta

            # Pull completed sentences fast
            out_sentences: List[str] = []
            while True:
                m = self._sent_re.match(buffer)
                if not m:
                    break
                s = m.group(1)
                out_sentences.append(s)
                buffer = buffer[len(s) :]

            for s in out_sentences:
                async for frame in _speak_sentence(s):
                    yield frame

        # Flush remainder
        if buffer.strip():
            async for frame in _speak_sentence(buffer):
                yield frame


print("✅ FastPipelineAgent class defined!")

✅ FastPipelineAgent class defined!


In [7]:
def prewarm(proc: agents.JobProcess):
    """Prewarm models for faster first response."""
    
    # VAD: local CPU model for responsive turn detection
    proc.userdata["vad"] = silero.VAD.load(
        min_speech_duration=0.05,
        min_silence_duration=0.35,  # reduce end-of-turn latency
        force_cpu=True,
    )

    # Sherpa model
    sherpa_dir = os.environ["SHERPA_MODEL_DIR"]
    proc.userdata["sherpa"] = _load_sherpa_recognizer(sherpa_dir)

    # Soprano TTS
    proc.userdata["soprano"] = SopranoTTS(
        backend=os.getenv("SOPRANO_BACKEND", "lmdeploy"),
        device=os.getenv("SOPRANO_DEVICE", "cuda"),
        cache_size_mb=int(os.getenv("SOPRANO_CACHE_MB", "512")),
        decoder_batch_size=int(os.getenv("SOPRANO_DECODER_BS", "2")),
    )
    
    print("✅ Models prewarmed!")


async def entrypoint(ctx: agents.JobContext):
    """Main agent entrypoint."""
    
    vad = ctx.proc.userdata["vad"]
    sherpa_recognizer = ctx.proc.userdata["sherpa"]
    soprano = ctx.proc.userdata["soprano"]

    agent = FastPipelineAgent(sherpa_recognizer=sherpa_recognizer, soprano=soprano)

    session = AgentSession(
        # Fast turn detection
        turn_detection="vad",
        vad=vad,

        # Gemini Live API (text-only for speed)
        llm=google.realtime.RealtimeModel(
            model=os.getenv("GEMINI_MODEL", "gemini-2.5-flash"),
            modalities=[Modality.TEXT],
            instructions=os.getenv(
                "SYSTEM_PROMPT",
                "You are a concise, helpful voice assistant. Keep replies short unless asked.",
            ),
            temperature=float(os.getenv("GEMINI_TEMPERATURE", "0.3")),
        ),

        # Helps perceived latency
        preemptive_generation=True,
    )

    await session.start(agent=agent, room=ctx.room)


print("✅ Entrypoint and prewarm functions defined!")

✅ Entrypoint and prewarm functions defined!


## 6. Run the Agent

⚠️ **Note**: Running the agent requires a LiveKit server and proper credentials. 
The agent will connect to your LiveKit room and start processing audio.

In [8]:
%%writefile livekit_soprano_agent.py
"""
LiveKit Agents (Python) STT → Gemini (Flash) → Soprano Pipeline
Fastest-path version with:
- STT: sherpa-onnx streaming Zipformer small bilingual (int8 encoder/joiner + chunk size 96)
- LLM: Gemini Live API via LiveKit Google plugin (gemini-2.5-flash, TEXT-only modality)
- TTS: Soprano infer_stream() (true streaming audio)
- Turn detection: Silero VAD + turn_detection="vad" for fast endpointing
- Preemptive generation: enabled
"""

"""LiveKit Agents: STT → Gemini Flash → Soprano TTS Pipeline"""

import os
import re
import uuid
import numpy as np
from typing import AsyncIterable, Optional, List

from livekit import agents, rtc
from livekit.agents import Agent, AgentSession, ModelSettings, stt
from livekit.agents import cli
from livekit.agents.worker import WorkerOptions
from livekit.plugins import google, silero
from google.genai.types import Modality

# --- Optional but recommended: ONNX Runtime GPU for sherpa-onnx on your T4 ---
# pip install onnxruntime-gpu

import sherpa_onnx  # pip install sherpa-onnx
from soprano import SopranoTTS  # pip install soprano-tts


def _pick_existing(*paths: str) -> str:
    for p in paths:
        if p and os.path.exists(p):
            return p
    raise FileNotFoundError(f"None of these paths exist: {paths}")


def _load_sherpa_recognizer(model_dir: str) -> sherpa_onnx.OnlineRecognizer:
    """
    Uses the small bilingual Zipformer model directory and prefers:
      - chunk size 96 (lower RTF)
      - int8 encoder + int8 joiner
    Model dir contains 64/96 subfolders where larger chunk => lower RTF.
    """
    # Prefer chunk=96 files when present (some packages place chunked variants in subfolders)
    base96 = os.path.join(model_dir, "96")
    base = base96 if os.path.isdir(base96) else model_dir

    tokens = _pick_existing(
        os.path.join(model_dir, "tokens.txt"),
        os.path.join(base, "tokens.txt"),
    )

    # Prefer int8 encoder/joiner when available
    encoder = _pick_existing(
        os.path.join(base, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "encoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.onnx"),
    )
    decoder = _pick_existing(
        os.path.join(base, "decoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "decoder-epoch-99-avg-1.onnx"),
    )
    joiner = _pick_existing(
        os.path.join(base, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "joiner-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.onnx"),
    )

    # NOTE: sherpa-onnx supports input sampling rates not necessarily 16k (so we avoid resampling for speed).
    # Provider "cuda" uses onnxruntime-gpu.
    recognizer = sherpa_onnx.OnlineRecognizer.from_transducer(
        tokens=tokens,
        encoder=encoder,
        decoder=decoder,
        joiner=joiner,
        num_threads=int(os.getenv("SHERPA_NUM_THREADS", "2")),
        provider=os.getenv("SHERPA_PROVIDER", "cuda"),  # "cuda" on your T4
        decoding_method="greedy_search",
        max_active_paths=4,
        enable_endpoint_detection=False,  # LiveKit VAD handles turns; keep this off for simplicity/latency
    )
    return recognizer


class FastPipelineAgent(Agent):
    def __init__(self, *, sherpa_recognizer: sherpa_onnx.OnlineRecognizer, soprano: SopranoTTS):
        super().__init__()
        self._sherpa = sherpa_recognizer
        self._soprano = soprano

        # Simple sentence boundary splitter for incremental TTS kickoff.
        # (Keeps latency low without waiting for the full assistant message.)
        self._sent_re = re.compile(r"(.+?[.!?]\s+|.+?\n+)", re.DOTALL)

    async def stt_node(
        self,
        audio: AsyncIterable[rtc.AudioFrame],
        model_settings: ModelSettings,
    ) -> Optional[AsyncIterable[stt.SpeechEvent]]:
        async def _run() -> AsyncIterable[stt.SpeechEvent]:
            stream = self._sherpa.create_stream()
            last_text = ""

            async for frame in audio:
                # LiveKit frames are PCM bytes; interpret as int16 mono/whatever channeling is provided.
                # If you know your input is stereo, downmix in the client or upstream track settings for best accuracy.
                samples_i16 = np.frombuffer(frame.data, dtype=np.int16)
                samples_f32 = samples_i16.astype(np.float32) / 32768.0

                stream.accept_waveform(frame.sample_rate, samples_f32)

                while self._sherpa.is_ready(stream):
                    self._sherpa.decode_stream(stream)
                    r = self._sherpa.get_result(stream)
                    text = r if isinstance(r, str) else getattr(r, "text", "")
                    if text and text != last_text:
                        last_text = text

            # finalize
            stream.input_finished()
            while self._sherpa.is_ready(stream):
                self._sherpa.decode_stream(stream)

            r = self._sherpa.get_result(stream)
            final_text = r if isinstance(r, str) else getattr(r, "text", "")

            if final_text.strip():
                yield stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[stt.SpeechData(text=final_text)],
                )

            yield stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH, alternatives=[])

        return _run()

    async def tts_node(
        self,
        text: AsyncIterable[str],
        model_settings: ModelSettings,
    ) -> AsyncIterable[rtc.AudioFrame]:
        """
        Uses Soprano's official infer_stream() for streaming audio (<15ms GPU latency claim).
        The OpenAI-compatible HTTP endpoint is currently non-streaming, so we avoid it.
        """
        buffer = ""
        sample_rate = 32000  # Soprano outputs 32kHz
        channels = 1
        frame_ms = 20
        spf = int(sample_rate * frame_ms / 1000)  # samples per frame

        async def _emit_audio_from_pcm(pcm_f32: np.ndarray):
            # Convert float [-1,1] -> int16 PCM
            pcm_i16 = np.clip(pcm_f32, -1.0, 1.0)
            pcm_i16 = (pcm_i16 * 32767.0).astype(np.int16)

            # chunk into ~20ms frames
            idx = 0
            n = pcm_i16.shape[0]
            while idx < n:
                chunk = pcm_i16[idx : idx + spf]
                if chunk.shape[0] < spf:
                    # pad last frame
                    pad = np.zeros((spf - chunk.shape[0],), dtype=np.int16)
                    chunk = np.concatenate([chunk, pad], axis=0)
                idx += spf

                yield rtc.AudioFrame(
                    data=chunk.tobytes(),
                    sample_rate=sample_rate,
                    num_channels=channels,
                    samples_per_channel=spf,
                )

        async def _speak_sentence(sentence: str):
            # Soprano streaming inference generator
            # It yields audio chunks; handle common return types robustly.
            stream = self._soprano.infer_stream(sentence, chunk_size=1)
            for chunk in stream:
                if isinstance(chunk, np.ndarray):
                    pcm = chunk.astype(np.float32)
                else:
                    pcm = np.asarray(chunk, dtype=np.float32)

                async for frame in _emit_audio_from_pcm(pcm):
                    yield frame

        async for delta in text:
            buffer += delta

            # Pull completed sentences fast (don't wait for full message)
            out_sentences: List[str] = []
            while True:
                m = self._sent_re.match(buffer)
                if not m:
                    break
                s = m.group(1)
                out_sentences.append(s)
                buffer = buffer[len(s) :]

            for s in out_sentences:
                async for frame in _speak_sentence(s):
                    yield frame

        # Flush remainder
        if buffer.strip():
            async for frame in _speak_sentence(buffer):
                yield frame


def prewarm(proc: agents.JobProcess):
    # VAD: local CPU model, recommended for responsive turn detection.
    proc.userdata["vad"] = silero.VAD.load(
        min_speech_duration=0.05,
        min_silence_duration=0.35,  # reduce end-of-turn latency a bit
        force_cpu=True,
    )

    # Sherpa model
    sherpa_dir = os.environ["SHERPA_MODEL_DIR"]
    proc.userdata["sherpa"] = _load_sherpa_recognizer(sherpa_dir)

    # Soprano: use lmdeploy backend if available for speed; adjust cache/batch for your 15GB T4.
    # (From Soprano docs: cache_size_mb and decoder_batch_size can improve speed.)
    proc.userdata["soprano"] = SopranoTTS(
        backend=os.getenv("SOPRANO_BACKEND", "lmdeploy"),
        device=os.getenv("SOPRANO_DEVICE", "cuda"),
        cache_size_mb=int(os.getenv("SOPRANO_CACHE_MB", "512")),
        decoder_batch_size=int(os.getenv("SOPRANO_DECODER_BS", "2")),
    )


async def entrypoint(ctx: agents.JobContext):
    vad = ctx.proc.userdata["vad"]
    sherpa_recognizer = ctx.proc.userdata["sherpa"]
    soprano = ctx.proc.userdata["soprano"]

    agent = FastPipelineAgent(sherpa_recognizer=sherpa_recognizer, soprano=soprano)

    session = AgentSession(
        # Fast turn detection
        turn_detection="vad",
        vad=vad,

        # Gemini Live API as text-only (half-cascade); model defaults to gemini-2.5-flash and can be specified.
        llm=google.realtime.RealtimeModel(
            model=os.getenv("GEMINI_MODEL", "gemini-2.5-flash"),
            modalities=[Modality.TEXT],
            instructions=os.getenv(
                "SYSTEM_PROMPT",
                "You are a concise, helpful voice assistant. Keep replies short unless asked.",
            ),
            temperature=float(os.getenv("GEMINI_TEMPERATURE", "0.3")),
        ),

        # Helps perceived latency in some scenarios.
        preemptive_generation=True,
    )

    await session.start(agent=agent, room=ctx.room)


if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            prewarm_fnc=prewarm,
        )
    )


Overwriting livekit_soprano_agent.py


In [9]:
!python livekit_soprano_agent.py start

{"message": "starting worker", "level": "INFO", "name": "livekit.agents", "version": "1.3.12", "rtc-version": "1.0.23", "timestamp": "2026-01-24T09:27:31.317023+00:00"}
{"message": "preloading plugins", "level": "INFO", "name": "livekit.agents", "packages": ["livekit.plugins.google", "livekit.plugins.silero", "av"], "timestamp": "2026-01-24T09:27:31.317196+00:00"}
{"message": "HTTP server listening on :8081", "level": "INFO", "name": "livekit.agents", "timestamp": "2026-01-24T09:27:31.318505+00:00"}
{"message": "initializing process", "level": "INFO", "name": "livekit.agents", "pid": 9086, "timestamp": "2026-01-24T09:27:42.368682+00:00"}
{"message": "initializing process", "level": "INFO", "name": "livekit.agents", "pid": 9088, "timestamp": "2026-01-24T09:27:42.371219+00:00"}
/project/sherpa-onnx/csrc/session.cc:GetSessionOptionsImpl:182 Please compile with -DSHERPA_ONNX_ENABLE_GPU=ON. Available providers: CPUExecutionProvider, . Fallback to cpu!
/project/sherpa-onnx/csrc/session.cc:Ge

## 7. Alternative: Save as Python Script and Run

For production use, save the agent as a Python script and run it from the command line.

In [None]:
# Save the complete agent script
AGENT_SCRIPT = '''
"""LiveKit Agents: STT → Gemini Flash → Soprano TTS Pipeline"""

import os
import re
import numpy as np
from typing import AsyncIterable, Optional, List

from livekit import agents, rtc
from livekit.agents import Agent, AgentSession, ModelSettings, stt
from livekit.agents.cli import WorkerOptions, cli
from livekit.plugins import google, silero
from google.genai.types import Modality

import sherpa_onnx
from soprano import SopranoTTS


def _pick_existing(*paths: str) -> str:
    for p in paths:
        if p and os.path.exists(p):
            return p
    raise FileNotFoundError(f"None of these paths exist: {paths}")


def _load_sherpa_recognizer(model_dir: str) -> sherpa_onnx.OnlineRecognizer:
    base96 = os.path.join(model_dir, "96")
    base = base96 if os.path.isdir(base96) else model_dir

    tokens = _pick_existing(
        os.path.join(model_dir, "tokens.txt"),
        os.path.join(base, "tokens.txt"),
    )
    encoder = _pick_existing(
        os.path.join(base, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "encoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "encoder-epoch-99-avg-1.onnx"),
    )
    decoder = _pick_existing(
        os.path.join(base, "decoder-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "decoder-epoch-99-avg-1.onnx"),
    )
    joiner = _pick_existing(
        os.path.join(base, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.int8.onnx"),
        os.path.join(base, "joiner-epoch-99-avg-1.onnx"),
        os.path.join(model_dir, "joiner-epoch-99-avg-1.onnx"),
    )

    return sherpa_onnx.OnlineRecognizer.from_transducer(
        tokens=tokens,
        encoder=encoder,
        decoder=decoder,
        joiner=joiner,
        num_threads=int(os.getenv("SHERPA_NUM_THREADS", "2")),
        provider=os.getenv("SHERPA_PROVIDER", "cuda"),
        decoding_method="greedy_search",
        max_active_paths=4,
        enable_endpoint=False,
    )


class FastPipelineAgent(Agent):
    def __init__(self, *, sherpa_recognizer, soprano):
        super().__init__()
        self._sherpa = sherpa_recognizer
        self._soprano = soprano
        self._sent_re = re.compile(r"(.+?[.!?]\\s+|.+?\\n+)", re.DOTALL)

    async def stt_node(self, audio, model_settings):
        async def _run():
            stream = self._sherpa.create_stream()
            last_text = ""
            async for frame in audio:
                samples_i16 = np.frombuffer(frame.data, dtype=np.int16)
                samples_f32 = samples_i16.astype(np.float32) / 32768.0
                stream.accept_waveform(frame.sample_rate, samples_f32)
                while self._sherpa.is_ready(stream):
                    self._sherpa.decode_stream(stream)
                    r = self._sherpa.get_result(stream)
                    text = r if isinstance(r, str) else getattr(r, "text", "")
                    if text and text != last_text:
                        last_text = text
            stream.input_finished()
            while self._sherpa.is_ready(stream):
                self._sherpa.decode_stream(stream)
            r = self._sherpa.get_result(stream)
            final_text = r if isinstance(r, str) else getattr(r, "text", "")
            if final_text.strip():
                yield stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[stt.SpeechData(text=final_text)],
                )
            yield stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH, alternatives=[])
        return _run()

    async def tts_node(self, text, model_settings):
        buffer = ""
        sample_rate = 32000
        spf = int(sample_rate * 20 / 1000)

        async def _emit(pcm_f32):
            pcm_i16 = (np.clip(pcm_f32, -1.0, 1.0) * 32767.0).astype(np.int16)
            idx = 0
            while idx < pcm_i16.shape[0]:
                chunk = pcm_i16[idx:idx + spf]
                if chunk.shape[0] < spf:
                    chunk = np.concatenate([chunk, np.zeros(spf - chunk.shape[0], dtype=np.int16)])
                idx += spf
                yield rtc.AudioFrame(data=chunk.tobytes(), sample_rate=sample_rate, num_channels=1, samples_per_channel=spf)

        async def _speak(sentence):
            for chunk in self._soprano.infer_stream(sentence, chunk_size=1):
                pcm = chunk.astype(np.float32) if isinstance(chunk, np.ndarray) else np.asarray(chunk, dtype=np.float32)
                async for frame in _emit(pcm):
                    yield frame

        async for delta in text:
            buffer += delta
            while True:
                m = self._sent_re.match(buffer)
                if not m:
                    break
                s = m.group(1)
                buffer = buffer[len(s):]
                async for frame in _speak(s):
                    yield frame
        if buffer.strip():
            async for frame in _speak(buffer):
                yield frame


def prewarm(proc):
    proc.userdata["vad"] = silero.VAD.load(min_speech_duration=0.05, min_silence_duration=0.35, force_cpu=True)
    proc.userdata["sherpa"] = _load_sherpa_recognizer(os.environ["SHERPA_MODEL_DIR"])
    proc.userdata["soprano"] = SopranoTTS(
        backend=os.getenv("SOPRANO_BACKEND", "lmdeploy"),
        device=os.getenv("SOPRANO_DEVICE", "cuda"),
        cache_size_mb=int(os.getenv("SOPRANO_CACHE_MB", "512")),
        decoder_batch_size=int(os.getenv("SOPRANO_DECODER_BS", "2")),
    )


async def entrypoint(ctx):
    agent = FastPipelineAgent(
        sherpa_recognizer=ctx.proc.userdata["sherpa"],
        soprano=ctx.proc.userdata["soprano"],
    )
    session = AgentSession(
        turn_detection="vad",
        vad=ctx.proc.userdata["vad"],
        llm=google.realtime.RealtimeModel(
            model=os.getenv("GEMINI_MODEL", "gemini-2.5-flash"),
            modalities=[Modality.TEXT],
            instructions=os.getenv("SYSTEM_PROMPT", "You are a concise, helpful voice assistant."),
            temperature=float(os.getenv("GEMINI_TEMPERATURE", "0.3")),
        ),
        preemptive_generation=True,
    )
    await session.start(agent=agent, room=ctx.room)


if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
'''

with open('fast_agent.py', 'w') as f:
    f.write(AGENT_SCRIPT)

print("✅ Agent script saved to fast_agent.py")
print("\nTo run: python fast_agent.py dev")

## Reference Links

- [LiveKit Agents: Pipeline Nodes](https://docs.livekit.io/agents/logic/nodes/)
- [LiveKit Gemini Live API Plugin](https://docs.livekit.io/agents/models/realtime/plugins/gemini/)
- [LiveKit Silero VAD Plugin](https://docs.livekit.io/agents/build/turns/vad/)
- [Turn Detection Configuration](https://docs.livekit.io/agents/v1/build/turn-detection/configuration)
- [sherpa-onnx Streaming Zipformer Models](https://k2-fsa.github.io/sherpa/onnx/pretrained_models/online-transducer/zipformer-transducer-models.html)
- [Soprano Model Card](https://huggingface.co/ekwek/Soprano-1.1-80M)