# Macaw OpenVoice — End-to-End Demo

Complete walkthrough of every Macaw feature: CLI model management,
REST API (STT, TTS, Translation), WebSocket real-time streaming, and full-duplex voice.

This notebook interacts with Macaw **exclusively through its public interfaces**
(CLI binary, REST API, WebSocket protocol). No internal modules are imported.

**Prerequisites**
- Python 3.12+
- Port 8765 available

**Table of Contents**

| # | Section | Interface |
|---|---------|-----------|
| 0 | [Setup & Installation](#setup) | pip / install.sh |
| 1 | [Model Management](#sec1) | CLI |
| 2 | [Server Startup](#sec2) | CLI |
| 3 | [Test Audio Generation](#sec3) | — |
| 4 | [CLI — Transcription & Translation](#sec4) | CLI |
| 5 | [REST API — Service Discovery](#sec5) | REST |
| 6 | [REST API — Speech-to-Text](#sec6) | REST |
| 7 | [REST API — Translation](#sec7) | REST |
| 8 | [REST API — Text-to-Speech](#sec8) | REST |
| 9 | [Real-time Streaming STT](#sec9) | WebSocket |
| 10 | [Full-Duplex Voice](#sec10) | WebSocket |
| 11 | [Error Handling](#sec11) | REST + WebSocket |
| 12 | [Results](#sec12) | — |
| 13 | [Cleanup](#sec13) | — |

In [None]:
# ============================================================
# Configuration
# ============================================================
# All tunables in one place. When adding new models or changing
# the test setup, only this cell needs to be modified.

# Package
MACAW_VERSION = "0.1.5"

# Models -- update when new engines/models are added to the catalog
STT_MODEL = "faster-whisper-tiny"
TTS_MODEL = "kokoro-v1"

# Test phrases — used by TTS to generate audio, then fed to STT (round-trip)
TEST_PHRASE = "Hello, welcome to Macaw OpenVoice."
TEST_PHRASE_LONG = (
    "Macaw OpenVoice is a unified voice runtime that provides "
    "real time speech recognition and text to speech synthesis "
    "with an OpenAI compatible API."
)

# Server
SERVER_HOST = "127.0.0.1"
SERVER_PORT = 8765

# pip extras required for the chosen models.
INSTALL_EXTRAS = "server,stream,huggingface,faster-whisper,kokoro"

# Timeouts (seconds)
SERVER_STARTUP_TIMEOUT = 90
REQUEST_TIMEOUT = 30
TTS_REQUEST_TIMEOUT = 60
WS_EVENT_TIMEOUT = 30
SERVER_SHUTDOWN_TIMEOUT = 15

# Derived
BASE_URL = f"http://{SERVER_HOST}:{SERVER_PORT}"
WS_URL = f"ws://{SERVER_HOST}:{SERVER_PORT}"

<a id="setup"></a>
## 0. Setup & Installation

Macaw OpenVoice can be installed in two ways:

### Option A — One-command installer (recommended for production)

```bash
# Linux / macOS
curl -fsSL https://raw.githubusercontent.com/usemacaw/macaw-openvoice/main/install.sh | sh

# Windows (PowerShell)
irm https://raw.githubusercontent.com/usemacaw/macaw-openvoice/main/scripts/install.ps1 | iex
```

| Variable | Default | Description |
|----------|---------|-------------|
| `MACAW_VERSION` | latest | Pin to a specific version |
| `MACAW_INSTALL_DIR` | `/opt/macaw` | Custom install directory |
| `MACAW_EXTRAS` | `server,grpc` | pip extras to install |
| `MACAW_NO_SERVICE` | unset | Skip systemd service setup |

### Option B — pip install (this notebook)

The cell below installs `macaw-openvoice` from PyPI with all extras needed
for this demo. Update `MACAW_VERSION` and `INSTALL_EXTRAS` in the Configuration
cell above to change the version or engines.

### Environment variables

| Variable | Default | Description |
|----------|---------|-------------|
| `MACAW_MODELS_DIR` | `~/.macaw/models` | Default models directory |
| `MACAW_SERVER_URL` | `http://localhost:8000` | Default server URL for client commands |
| `MACAW_LOG_LEVEL` | `info` | Default log level |
| `MACAW_LOG_FORMAT` | `text` | Default log format |

In [None]:
!pip install macaw-openvoice[server,stream,huggingface,faster-whisper,kokoro]==0.1.5

In [None]:
import asyncio
import io
import json
import os
import signal
import subprocess
import tempfile
import time
import wave

import httpx
import numpy as np
import scipy.signal
import websockets

# State
server_process = None
_temp_files = []
SERVER_READY = False

# Audio placeholders (populated by Section 3 after TTS generates speech)
test_wav = b""
test_wav_path = ""
test_wav_44k = b""
test_wav_44k_path = ""
ws_frames: list[bytes] = []


class Results:
    _G = f"{chr(27)}[32m"
    _R = f"{chr(27)}[31m"
    _Y = f"{chr(27)}[33m"
    _B = f"{chr(27)}[1m"
    _N = f"{chr(27)}[0m"

    def __init__(self):
        self.passed = 0
        self.failed = 0
        self.skipped = 0

    def check(self, ok, msg):
        if ok:
            self.passed += 1
            print(f"  {self._G}[PASS]{self._N} {msg}")
        else:
            self.failed += 1
            print(f"  {self._R}[FAIL]{self._N} {msg}")

    def skip(self, msg):
        self.skipped += 1
        print(f"  {self._Y}[SKIP]{self._N} {msg}")

    def summary(self):
        total = self.passed + self.failed + self.skipped
        bar = "=" * 60
        print(f"\n{bar}")
        print(f"  {self._B}Macaw OpenVoice -- E2E Results{self._N}")
        print(bar)
        print(f"  {'Passed:':<10} {self.passed:>4}")
        print(f"  {'Failed:':<10} {self.failed:>4}")
        print(f"  {'Skipped:':<10} {self.skipped:>4}")
        print(f"  {'Total:':<10} {total:>4}")
        print(bar)
        if self.failed == 0:
            print(f"  {self._G}ALL CHECKS PASSED{self._N}")
        else:
            print(f"  {self._R}{self.failed} CHECK(S) FAILED{self._N}")
        print(bar)

T = Results()


def wait_for_health(base_url, timeout=90):
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        try:
            r = httpx.get(f"{base_url}/health", timeout=5)
            if r.status_code == 200 and r.json().get("status") == "ok":
                return True
        except (httpx.ConnectError, httpx.ReadTimeout, httpx.ConnectTimeout):
            pass
        time.sleep(2)
    return False


def save_wav(wav_bytes, name="test.wav"):
    path = os.path.join(tempfile.gettempdir(), f"macaw_e2e_{name}")
    with open(path, "wb") as f:
        f.write(wav_bytes)
    _temp_files.append(path)
    return path


print(f"Server:    {BASE_URL}")
print(f"STT model: {STT_MODEL}")
print(f"TTS model: {TTS_MODEL}")
print("Ready.")

In [None]:
!macaw --help

<a id="sec1"></a>
## 1. Model Management (CLI)

Ollama-style model management. Download, inspect, and remove models from the catalog.

| Command | Description |
|---------|-------------|
| `macaw pull <model>` | Download a model from HuggingFace Hub |
| `macaw pull <model> --force` | Re-download even if already exists |
| `macaw list` | List all installed models |
| `macaw inspect <model>` | Show model metadata, capabilities, and engine config |
| `macaw remove <model>` | Remove an installed model |
| `macaw remove <model> -y` | Remove without confirmation |

| Flag | Default | Description |
|------|---------|-------------|
| `--models-dir` | `~/.macaw/models` | Models directory |
| `--force` | — | Force re-download |
| `--yes` / `-y` | — | Skip confirmation prompt |

In [None]:
!macaw pull {STT_MODEL}

In [None]:
!macaw pull {TTS_MODEL}

In [None]:
!macaw list

In [None]:
!macaw inspect {STT_MODEL}

In [None]:
!macaw inspect {TTS_MODEL}

### Model lifecycle demo

Remove a model, verify it's gone, and re-download it.

In [None]:
# Remove TTS model
!macaw remove {TTS_MODEL} -y

# Verify it's gone
!macaw list

# Re-download
!macaw pull {TTS_MODEL}

# Verify it's back
!macaw list

<a id="sec2"></a>
## 2. Server Startup

Start the Macaw API server. It loads all installed models and spawns isolated
gRPC worker subprocesses (STT on port 50051, TTS on port 50052).

```
macaw serve [--host HOST] [--port PORT] [--models-dir DIR]
            [--cors-origins ORIGINS] [--log-format text|json]
            [--log-level debug|info|warning|error]
```

| Flag | Default | Description |
|------|---------|-------------|
| `--host` | `127.0.0.1` | Bind address |
| `--port` | `8000` | HTTP port |
| `--models-dir` | `~/.macaw/models` | Models directory |
| `--cors-origins` | `*` | Allowed CORS origins |
| `--log-format` | `text` | Log format (`text` or `json`) |
| `--log-level` | `info` | Log level |

> **Note:** The server is started via `subprocess.Popen` so the notebook can
> manage its lifecycle (health check, graceful shutdown in Cleanup).

In [None]:
# The server runs in background — subprocess.Popen is needed to
# capture the PID for the health check and graceful shutdown in Cleanup.
server_process = subprocess.Popen(
    ["macaw", "serve", "--port", str(SERVER_PORT)],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
)
print(f"$ macaw serve --port {SERVER_PORT}")
print(f"  PID: {server_process.pid}")

print(f"\nWaiting for health (timeout {SERVER_STARTUP_TIMEOUT}s) ...")
SERVER_READY = wait_for_health(BASE_URL, timeout=SERVER_STARTUP_TIMEOUT)
T.check(SERVER_READY, "Server is healthy")

if SERVER_READY:
    data = httpx.get(f"{BASE_URL}/health", timeout=5).json()
    T.check(data.get("status") == "ok", "status == 'ok'")
    T.check(isinstance(data.get("models_loaded"), int) and data["models_loaded"] >= 2, "models_loaded >= 2")
    T.check("version" in data, "has 'version' field")
    print(f"\n  {json.dumps(data)}")
else:
    print("\n  SERVER FAILED TO START.")
    if server_process.poll() is not None:
        out = server_process.stderr.read().decode() if server_process.stderr else ""
        print(f"  Exit code: {server_process.returncode}")
        if out:
            print(f"  stderr (last 500 chars): {out[-500:]}")

<a id="sec3"></a>
## 3. Test Audio Generation (TTS → STT round-trip)

Instead of synthetic tones, we use **Macaw TTS to generate real speech**.
This audio is then fed to STT in the sections below, demonstrating the
full round-trip:

```
TEST_PHRASE  →  TTS (POST /v1/audio/speech)  →  WAV  →  STT  →  text
```

This also produces:
- A **44.1 kHz variant** (resampled via scipy) to test automatic resampling
- **16 kHz PCM frames** for WebSocket streaming tests

In [None]:
if not SERVER_READY:
    T.skip("Audio generation via TTS (server not running)")
else:
    # 1. Generate speech using Macaw TTS
    print(f'--- TTS: "{TEST_PHRASE}" ---')
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": TEST_PHRASE, "response_format": "wav"},
        timeout=TTS_REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "TTS generated test audio")
    test_wav = r.content
    test_wav_path = save_wav(test_wav, "tts_speech.wav")

    # Read PCM and sample rate from TTS WAV
    with wave.open(io.BytesIO(test_wav), "rb") as wf:
        tts_rate = wf.getframerate()
        tts_pcm = np.frombuffer(wf.readframes(wf.getnframes()), dtype=np.int16)
    print(f"  WAV: {len(test_wav):,} bytes, {tts_rate} Hz, {len(tts_pcm)/tts_rate:.1f}s")
    print(f"  Saved: {test_wav_path}")

    # 2. Create 44.1 kHz variant for resampling test
    n_44k = int(len(tts_pcm) * 44100 / tts_rate)
    pcm_44k = scipy.signal.resample(tts_pcm, n_44k).astype(np.int16)
    buf = io.BytesIO()
    with wave.open(buf, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(44100)
        wf.writeframes(pcm_44k.tobytes())
    test_wav_44k = buf.getvalue()
    test_wav_44k_path = save_wav(test_wav_44k, "tts_44khz.wav")
    print(f"  44.1 kHz variant: {len(test_wav_44k):,} bytes -> {test_wav_44k_path}")

    # 3. Extract 16 kHz PCM frames for WebSocket streaming
    if tts_rate != 16000:
        n_16k = int(len(tts_pcm) * 16000 / tts_rate)
        pcm_16k = scipy.signal.resample(tts_pcm, n_16k).astype(np.int16)
    else:
        pcm_16k = tts_pcm
    frame_size = int(16000 * 0.040)  # 640 samples per 40ms frame
    ws_frames = [pcm_16k[i:i + frame_size].tobytes() for i in range(0, len(pcm_16k), frame_size)]
    print(f"  WebSocket frames: {len(ws_frames)} x 40ms (16 kHz PCM)")

<a id="sec4"></a>
## 4. CLI — Transcription & Translation

All CLI client commands connect to the running server via `--server`.

### macaw transcribe

```
macaw transcribe <file> --model <name> [--format json|text|verbose_json|srt|vtt]
                        [--language <code>] [--no-itn] [--hot-words "word1,word2"]
                        [--server <url>]
```

| Flag | Short | Default | Description |
|------|-------|---------|-------------|
| `--model` | `-m` | — | Model to use (required) |
| `--format` | | `json` | Output format |
| `--language` | `-l` | `auto` | Language code (e.g., `en`, `pt`) |
| `--no-itn` | | — | Disable Inverse Text Normalization |
| `--hot-words` | | — | Comma-separated hot words |
| `--stream` | | — | Stream from microphone |
| `--server` | | `http://localhost:8000` | Server URL |

### macaw translate

```
macaw translate <file> --model <name> [--format json|text]
                       [--no-itn] [--hot-words "word1,word2"]
                       [--server <url>]
```

> Translation always produces English text, regardless of the source language.

In [None]:
if not SERVER_READY:
    T.skip("macaw ps (server not running)")
else:
    !macaw ps --server {BASE_URL}

In [None]:
if not SERVER_READY:
    T.skip("macaw transcribe (server not running)")
else:
    print("--- format: json (default) ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --format json --server {BASE_URL}

    print("\n--- format: text ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --format text --server {BASE_URL}

    print("\n--- format: verbose_json ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --format verbose_json --server {BASE_URL}

    print("\n--- format: srt (SubRip subtitles) ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --format srt --server {BASE_URL}

    print("\n--- format: vtt (WebVTT subtitles) ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --format vtt --server {BASE_URL}

In [None]:
if not SERVER_READY:
    T.skip("macaw transcribe options (server not running)")
else:
    print("--- language=en ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} -l en --server {BASE_URL}

    print("\n--- --no-itn (disable Inverse Text Normalization) ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --no-itn --server {BASE_URL}

    print("\n--- --hot-words ---")
    !macaw transcribe {test_wav_path} -m {STT_MODEL} --hot-words "Macaw,OpenVoice" --server {BASE_URL}

    print("\n--- 44.1 kHz input (automatic resampling) ---")
    !macaw transcribe {test_wav_44k_path} -m {STT_MODEL} --server {BASE_URL}

In [None]:
if not SERVER_READY:
    T.skip("macaw translate (server not running)")
else:
    print("--- translate (json) ---")
    !macaw translate {test_wav_path} -m {STT_MODEL} --format json --server {BASE_URL}

    print("\n--- translate (text) ---")
    !macaw translate {test_wav_path} -m {STT_MODEL} --format text --server {BASE_URL}

<a id="sec5"></a>
## 5. REST API — Service Discovery

| Endpoint | Description |
|----------|-------------|
| `GET /health` | Liveness + readiness (`status`, `version`, `models_loaded`) |
| `GET /v1/models` | List all loaded models with name, type, engine, status |

In [None]:
if not SERVER_READY:
    T.skip("GET /health (server not running)")
    T.skip("GET /v1/models (server not running)")
else:
    print("--- GET /health ---")
    r = httpx.get(f"{BASE_URL}/health", timeout=REQUEST_TIMEOUT)
    T.check(r.status_code == 200, "GET /health -> 200")
    data = r.json()
    T.check(data.get("status") == "ok", "status == 'ok'")
    T.check("version" in data, "has 'version'")
    T.check(isinstance(data.get("models_loaded"), int), "has 'models_loaded'")
    print(f"  {json.dumps(data)}")

    print("\n--- GET /v1/models ---")
    r = httpx.get(f"{BASE_URL}/v1/models", timeout=REQUEST_TIMEOUT)
    T.check(r.status_code == 200, "GET /v1/models -> 200")
    models = r.json().get("models", [])
    T.check(len(models) >= 2, f"models count >= 2 (got {len(models)})")

    for m in models:
        name = m.get("name", "?")
        T.check("name" in m and "type" in m and "engine" in m, f"model '{name}' has name/type/engine")
        T.check(m.get("type") in ("stt", "tts"), f"model '{name}' type is stt|tts")
        T.check(m.get("status") == "loaded", f"model '{name}' status == loaded")

    print(f"  {json.dumps(models, indent=2)}")

<a id="sec6"></a>
## 6. REST API — Speech-to-Text

`POST /v1/audio/transcriptions` (OpenAI-compatible)

| Format | Content-Type | Structure |
|--------|-------------|----------|
| `json` | application/json | `{"text": "..."}` |
| `verbose_json` | application/json | task, language, duration, segments, words |
| `text` | text/plain | Raw transcription text |
| `srt` | text/plain | SubRip subtitle format |
| `vtt` | text/plain | WebVTT subtitle format |

**Options:** `model`, `language`, `response_format`, `temperature`, `itn`.

In [None]:
if not SERVER_READY:
    T.skip("Transcription tests (server not running)")
else:
    print("--- POST /v1/audio/transcriptions (5 formats) ---")

    for fmt in ["json", "verbose_json", "text", "srt", "vtt"]:
        print(f"\n  format={fmt}")
        r = httpx.post(
            f"{BASE_URL}/v1/audio/transcriptions",
            files={"file": ("speech.wav", test_wav, "audio/wav")},
            data={"model": STT_MODEL, "response_format": fmt},
            timeout=REQUEST_TIMEOUT,
        )
        T.check(r.status_code == 200, f"format={fmt} -> 200")

        if fmt == "json":
            body = r.json()
            T.check("text" in body, "json: has 'text'")
            T.check(isinstance(body["text"], str), "json: text is string")

        elif fmt == "verbose_json":
            body = r.json()
            T.check(body.get("task") == "transcribe", "verbose_json: task == 'transcribe'")
            T.check("language" in body, "verbose_json: has 'language'")
            T.check(isinstance(body.get("duration"), (int, float)), "verbose_json: has numeric duration")
            T.check(isinstance(body.get("segments"), list), "verbose_json: 'segments' is list")
            T.check("text" in body, "verbose_json: has 'text'")
            if body.get("segments"):
                seg = body["segments"][0]
                T.check(all(k in seg for k in ("start", "end", "text")), "verbose_json: segment has start/end/text")

        elif fmt == "text":
            T.check("text/plain" in r.headers.get("content-type", ""), "text: content-type is text/plain")

        elif fmt == "srt":
            T.check("text/plain" in r.headers.get("content-type", ""), "srt: content-type is text/plain")
            if r.text.strip():
                T.check("-->" in r.text, "srt: contains '-->'")

        elif fmt == "vtt":
            T.check("text/plain" in r.headers.get("content-type", ""), "vtt: content-type is text/plain")
            T.check(r.text.startswith("WEBVTT"), "vtt: starts with 'WEBVTT'")

In [None]:
if not SERVER_READY:
    T.skip("Transcription options (server not running)")
else:
    print("--- language=en ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/transcriptions",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "json", "language": "en"},
        timeout=REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "language=en -> 200")

    print("\n--- itn=false ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/transcriptions",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "json", "itn": "false"},
        timeout=REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "itn=false -> 200")

    print("\n--- 44.1 kHz input (automatic resampling) ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/transcriptions",
        files={"file": ("speech_44k.wav", test_wav_44k, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "json"},
        timeout=REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "44.1 kHz -> 200 (auto resampled)")

<a id="sec7"></a>
## 7. REST API — Translation

`POST /v1/audio/translations` (OpenAI-compatible)

Translates audio to English. Same interface as transcription, with `task=translate`
in verbose output.

In [None]:
if not SERVER_READY:
    T.skip("Translation tests (server not running)")
else:
    print("--- POST /v1/audio/translations (json) ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/translations",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "json"},
        timeout=REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "translation json -> 200")
    T.check("text" in r.json(), "has 'text'")

    print("\n--- POST /v1/audio/translations (verbose_json) ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/translations",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "verbose_json"},
        timeout=REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "translation verbose_json -> 200")
    body = r.json()
    T.check(body.get("task") == "translate", "task == 'translate'")
    T.check(isinstance(body.get("segments"), list), "has segments list")

<a id="sec8"></a>
## 8. REST API — Text-to-Speech

`POST /v1/audio/speech` (OpenAI-compatible)

| Format | Content-Type | Description |
|--------|-------------|-------------|
| `wav` | audio/wav | WAV with RIFF/WAVE header (default) |
| `pcm` | audio/pcm | Raw 16-bit PCM (no header) |

| Field | Default | Description |
|-------|---------|-------------|
| `model` | — | TTS model name (required) |
| `input` | — | Text to synthesize (required) |
| `voice` | `default` | Voice name |
| `response_format` | `wav` | Output format |
| `speed` | `1.0` | Speech rate (0.25–4.0) |

In [None]:
if not SERVER_READY:
    T.skip("TTS tests (server not running)")
else:
    print("--- response_format=wav ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": TEST_PHRASE, "response_format": "wav"},
        timeout=TTS_REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "TTS wav -> 200")
    T.check("audio/wav" in r.headers.get("content-type", ""), "content-type is audio/wav")
    T.check(r.content[:4] == b"RIFF", "WAV starts with RIFF")
    T.check(r.content[8:12] == b"WAVE", "WAV has WAVE marker")
    print(f"  Audio: {len(r.content):,} bytes")

    print("\n--- response_format=pcm ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": TEST_PHRASE, "response_format": "pcm"},
        timeout=TTS_REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "TTS pcm -> 200")
    T.check("audio/pcm" in r.headers.get("content-type", ""), "content-type is audio/pcm")
    T.check(r.content[:4] != b"RIFF", "PCM has no RIFF header")
    T.check(len(r.content) > 0, "PCM has audio data")
    print(f"  Audio: {len(r.content):,} bytes")

    print("\n--- speed=1.5 (longer text) ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": TEST_PHRASE_LONG, "response_format": "wav", "speed": 1.5},
        timeout=TTS_REQUEST_TIMEOUT,
    )
    T.check(r.status_code == 200, "TTS speed=1.5 -> 200")
    print(f"  Audio: {len(r.content):,} bytes")

<a id="sec9"></a>
## 9. Real-time Streaming STT (WebSocket)

`WS /v1/realtime?model=<name>`

**Protocol:**
1. Connect with `?model=<name>`
2. Receive `session.created` (session_id, model, config)
3. Send binary PCM audio frames (16-bit, 16 kHz, mono)
4. Receive events: `vad.speech_start`, `vad.speech_end`, `transcript.partial`, `transcript.final`
5. Send `session.close` command
6. Receive `session.closed` (reason, total_duration_ms, segments_transcribed)

In [None]:
async def test_ws_streaming_stt():
    if not SERVER_READY:
        T.skip("WebSocket STT (server not running)")
        return

    uri = f"{WS_URL}/v1/realtime?model={STT_MODEL}"
    print(f"  Connecting to {uri}")

    async with websockets.connect(uri) as ws:
        # 1. session.created
        msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
        T.check(msg["type"] == "session.created", "received session.created")
        T.check("session_id" in msg, "has session_id")
        T.check(msg.get("model") == STT_MODEL, f"model == {STT_MODEL}")
        config = msg.get("config", {})
        T.check("vad_sensitivity" in config, "config has vad_sensitivity")
        T.check("silence_timeout_ms" in config, "config has silence_timeout_ms")
        print(f"  Session: {msg['session_id']}")

        # 2. Send TTS-generated PCM frames (real speech, real-time pacing)
        print(f"  Sending {len(ws_frames)} frames ({len(ws_frames) * 40} ms) ...")
        for frame in ws_frames:
            await ws.send(frame)
            await asyncio.sleep(0.04)

        # 3. Collect events
        events = []
        try:
            while True:
                raw = await asyncio.wait_for(ws.recv(), timeout=5)
                if isinstance(raw, str):
                    evt = json.loads(raw)
                    events.append(evt)
                    print(f"    {evt['type']}")
        except asyncio.TimeoutError:
            pass

        event_types = sorted({e["type"] for e in events})
        print(f"  Event types: {event_types}")

        # 4. session.close
        await ws.send(json.dumps({"type": "session.close"}))
        try:
            raw = await asyncio.wait_for(ws.recv(), timeout=10)
            if isinstance(raw, str):
                closed = json.loads(raw)
                T.check(closed["type"] == "session.closed", "received session.closed")
                T.check("reason" in closed, "closed has 'reason'")
                T.check("total_duration_ms" in closed, "closed has 'total_duration_ms'")
                T.check("segments_transcribed" in closed, "closed has 'segments_transcribed'")
                print(f"  Closed: reason={closed.get('reason')}, duration={closed.get('total_duration_ms')} ms")
        except asyncio.TimeoutError:
            T.check(False, "received session.closed (timed out)")

await test_ws_streaming_stt()

<a id="sec10"></a>
## 10. Full-Duplex Voice (WebSocket)

TTS synthesis over the same WebSocket connection used for STT.

**Commands:**

| Command | Description |
|---------|-------------|
| `session.configure` | Set `model_tts` for TTS on this session |
| `tts.speak` | Synthesize text (with `request_id`) |
| `tts.cancel` | Cancel active synthesis |

**Events:**

| Event | Description |
|-------|-------------|
| `tts.speaking_start` | TTS started (request_id, timestamp_ms) |
| binary frames | Audio chunks (server → client) |
| `tts.speaking_end` | TTS finished (request_id, duration_ms, cancelled) |

In [None]:
async def test_ws_tts_fullduplex():
    if not SERVER_READY:
        T.skip("WebSocket TTS (server not running)")
        return

    uri = f"{WS_URL}/v1/realtime?model={STT_MODEL}"
    print(f"  Connecting to {uri}")

    async with websockets.connect(uri) as ws:
        # 1. session.created
        msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=10))
        T.check(msg["type"] == "session.created", "received session.created")
        print(f"  Session: {msg['session_id']}")

        # 2. Configure TTS model
        await ws.send(json.dumps({"type": "session.configure", "model_tts": TTS_MODEL}))
        print(f"  Configured model_tts={TTS_MODEL}")

        # 3. tts.speak
        req_id = "tts_e2e_001"
        await ws.send(json.dumps({
            "type": "tts.speak",
            "text": "Hello world, this is a full duplex voice test.",
            "request_id": req_id,
        }))
        print(f"  Sent tts.speak (request_id={req_id})")

        # 4. Collect: speaking_start, binary chunks, speaking_end
        got_start = False
        got_end = False
        audio_chunks = 0
        audio_bytes = 0
        end_evt = None

        try:
            while True:
                raw = await asyncio.wait_for(ws.recv(), timeout=WS_EVENT_TIMEOUT)
                if isinstance(raw, bytes):
                    audio_chunks += 1
                    audio_bytes += len(raw)
                elif isinstance(raw, str):
                    evt = json.loads(raw)
                    if evt["type"] == "tts.speaking_start":
                        got_start = True
                        T.check(evt.get("request_id") == req_id, "speaking_start: correct request_id")
                        T.check("timestamp_ms" in evt, "speaking_start: has timestamp_ms")
                    elif evt["type"] == "tts.speaking_end":
                        got_end = True
                        end_evt = evt
                        break
                    elif evt["type"] == "error":
                        print(f"    ERROR: {evt.get('message')}")
                        break
        except asyncio.TimeoutError:
            print("    (timed out)")

        T.check(got_start, "received tts.speaking_start")
        T.check(audio_chunks > 0, f"received {audio_chunks} audio chunks ({audio_bytes:,} bytes)")
        T.check(got_end, "received tts.speaking_end")
        if end_evt:
            T.check(end_evt.get("request_id") == req_id, "speaking_end: correct request_id")
            T.check("duration_ms" in end_evt, "speaking_end: has duration_ms")
            T.check(end_evt.get("cancelled") is False, "cancelled == false")
            print(f"  TTS done: duration_ms={end_evt.get('duration_ms')}")

        # 5. Test tts.cancel
        print("\n  --- tts.cancel ---")
        cancel_id = "tts_e2e_cancel"
        await ws.send(json.dumps({
            "type": "tts.speak",
            "text": "This is a longer sentence that should be interrupted before it finishes speaking completely.",
            "request_id": cancel_id,
        }))
        await asyncio.sleep(0.5)
        await ws.send(json.dumps({"type": "tts.cancel", "request_id": cancel_id}))
        print(f"  Sent tts.speak + tts.cancel (request_id={cancel_id})")

        cancel_end = None
        try:
            while True:
                raw = await asyncio.wait_for(ws.recv(), timeout=15)
                if isinstance(raw, str):
                    evt = json.loads(raw)
                    if evt["type"] == "tts.speaking_end":
                        cancel_end = evt
                        break
                    elif evt["type"] == "error":
                        break
        except asyncio.TimeoutError:
            pass

        if cancel_end:
            T.check(cancel_end.get("cancelled") is True, "cancelled == true after tts.cancel")
        else:
            print("  (TTS may have completed before cancel arrived)")

        # 6. Close
        await ws.send(json.dumps({"type": "session.close"}))
        try:
            raw = await asyncio.wait_for(ws.recv(), timeout=10)
            if isinstance(raw, str):
                T.check(json.loads(raw)["type"] == "session.closed", "session closed")
        except asyncio.TimeoutError:
            pass

await test_ws_tts_fullduplex()

<a id="sec11"></a>
## 11. Error Handling

Verify the server returns correct error responses for invalid requests.

Error format (OpenAI-compatible):
```json
{"error": {"message": "...", "type": "...", "code": "..."}}
```

| Scenario | Expected |
|----------|----------|
| Non-existent model | 404 with `error.message` |
| Invalid response_format | 400 |
| Empty TTS input | 400 |
| Unsupported TTS format (mp3) | 400 |
| WebSocket without `model` param | Error event + close code 1008 |

In [None]:
if not SERVER_READY:
    T.skip("Error handling tests (server not running)")
else:
    print("--- Non-existent model -> 404 ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/transcriptions",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": "nonexistent-model-xyz", "response_format": "json"},
        timeout=10,
    )
    T.check(r.status_code == 404, f"non-existent model -> {r.status_code}")
    T.check("error" in r.json() and "message" in r.json()["error"], "has error.message")

    print("\n--- Invalid response_format -> 400 ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/transcriptions",
        files={"file": ("speech.wav", test_wav, "audio/wav")},
        data={"model": STT_MODEL, "response_format": "invalid_fmt"},
        timeout=10,
    )
    T.check(r.status_code == 400, f"invalid format -> {r.status_code}")

    print("\n--- Empty TTS input -> 400 ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": "   ", "response_format": "wav"},
        timeout=10,
    )
    T.check(r.status_code == 400, f"empty TTS input -> {r.status_code}")

    print("\n--- Unsupported TTS format (mp3) -> 400 ---")
    r = httpx.post(
        f"{BASE_URL}/v1/audio/speech",
        json={"model": TTS_MODEL, "input": "test", "response_format": "mp3"},
        timeout=10,
    )
    T.check(r.status_code == 400, f"unsupported TTS format -> {r.status_code}")

    print("\n--- WebSocket without model param -> error + close 1008 ---")

    async def test_ws_no_model():
        uri = f"{WS_URL}/v1/realtime"
        try:
            async with websockets.connect(uri) as ws:
                raw = await asyncio.wait_for(ws.recv(), timeout=10)
                if isinstance(raw, str):
                    evt = json.loads(raw)
                    T.check(evt["type"] == "error", "received error event")
                    T.check(evt.get("recoverable") is False, "error is not recoverable")
                    T.check("model" in evt.get("message", "").lower(), "error mentions 'model'")
                try:
                    await asyncio.wait_for(ws.recv(), timeout=5)
                except (websockets.ConnectionClosedError, websockets.ConnectionClosed) as e:
                    T.check(e.rcvd.code == 1008, f"close code == {e.rcvd.code}")
        except (websockets.ConnectionClosedError, websockets.ConnectionClosed) as e:
            T.check(e.rcvd.code == 1008, f"close code == {e.rcvd.code}")

    await test_ws_no_model()

<a id="sec12"></a>
## 12. Results

In [None]:
T.summary()

<a id="sec13"></a>
## 13. Cleanup

Stop the server and remove temporary files.

In [None]:
if server_process is not None and server_process.poll() is None:
    print(f"Stopping server (PID {server_process.pid}) ...")
    server_process.send_signal(signal.SIGTERM)
    try:
        server_process.wait(timeout=SERVER_SHUTDOWN_TIMEOUT)
        print(f"Server exited with code {server_process.returncode}")
    except subprocess.TimeoutExpired:
        print("SIGTERM timed out, sending SIGKILL ...")
        server_process.kill()
        server_process.wait(timeout=5)
        print(f"Server killed (code {server_process.returncode})")
else:
    print("Server already stopped.")

if _temp_files:
    print(f"\nRemoving {len(_temp_files)} temp file(s) ...")
    for path in _temp_files:
        if os.path.exists(path):
            os.remove(path)
            print(f"  {path}")
    _temp_files.clear()

print("\nDone.")