# Train "GLaDOS" Wake Word Model for openWakeWord

This notebook trains a custom wake word model for the phrase **"GLaDOS"** (inspired by Portal).

**Steps:**
1. Install dependencies
2. Generate synthetic speech using Piper TTS
3. Download negative samples (music, noise, speech)
4. Compute audio embeddings
5. Train the model
6. Export to ONNX

**Runtime:** Use GPU runtime for faster training (Runtime → Change runtime type → T4 GPU)

## 1. Install Dependencies

In [None]:
# Install required packages with compatible versions for CUDA 12.6
# Note: After running this cell, restart the runtime (Runtime → Restart runtime)

# Install PyTorch 2.8.0 with CUDA 12.6 support (torchaudio 2.8 still has list_audio_backends)
!pip install -q torch==2.8.0 torchaudio==2.8.0 --index-url https://download.pytorch.org/whl/cu126

# Install speechbrain
!pip install -q speechbrain

# Install remaining dependencies
!pip install -q openwakeword datasets scipy matplotlib tqdm

# Install piper-tts for synthetic speech generation
!pip install -q piper-tts

print("Installation complete! Now restart the runtime: Runtime → Restart runtime")

In [None]:
# Restart runtime after installation (run this cell, then continue from the next cell)
# This is required to properly load the new package versions

import os
os.kill(os.getpid(), 9)  # This will restart the Colab runtime

In [None]:
# Imports
import os
import collections
import subprocess
import numpy as np
from numpy.lib.format import open_memmap
from pathlib import Path
from tqdm import tqdm
import scipy.io.wavfile
import matplotlib.pyplot as plt
import torch
from torch import nn
import IPython.display as ipd

import openwakeword
import openwakeword.data
import openwakeword.utils
import datasets

# Configuration
WAKE_WORD = "glados"
WAKE_PHRASES = [
    "GLaDOS",
    "Hey GLaDOS",
    "Okay GLaDOS",
    "Hi GLaDOS"
]
CLIP_LENGTH_SECONDS = 3  # Window size for model
N_SYNTHETIC_PER_PHRASE = 500  # Clips per phrase per voice

print(f"Training wake word model for: {WAKE_WORD}")
print(f"Phrases: {WAKE_PHRASES}")

## 2. Generate Synthetic Speech with Piper TTS

We'll generate diverse synthetic examples using multiple Piper TTS voices.

In [None]:
# Download Piper TTS voices (English, multiple speakers)
!mkdir -p piper_voices

PIPER_VOICES = [
    ("en_US-lessac-medium", "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx"),
    ("en_US-libritts-high", "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/libritts/high/en_US-libritts-high.onnx"),
    ("en_US-amy-medium", "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/amy/medium/en_US-amy-medium.onnx"),
    ("en_GB-alba-medium", "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/alba/medium/en_GB-alba-medium.onnx"),
    ("en_GB-aru-medium", "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_GB/aru/medium/en_GB-aru-medium.onnx"),
]

for voice_name, url in PIPER_VOICES:
    onnx_path = f"piper_voices/{voice_name}.onnx"
    json_path = f"piper_voices/{voice_name}.onnx.json"
    if not os.path.exists(onnx_path):
        print(f"Downloading {voice_name}...")
        !wget -q -O {onnx_path} {url}
        !wget -q -O {json_path} {url}.json
    else:
        print(f"{voice_name} already exists")

In [None]:
# Generate synthetic speech clips
import json
import struct
import random

os.makedirs(f"{WAKE_WORD}_clips", exist_ok=True)

def generate_piper_clips(voice_path, phrases, n_per_phrase, output_dir):
    """Generate synthetic speech clips with Piper TTS."""
    voice_name = os.path.basename(voice_path).replace(".onnx", "")
    clips_generated = 0
    
    for phrase in phrases:
        for i in range(n_per_phrase):
            # Vary speech rate and noise scale for diversity
            length_scale = random.uniform(0.8, 1.2)  # Speed variation
            noise_scale = random.uniform(0.3, 0.7)   # Voice variation
            
            output_file = f"{output_dir}/{voice_name}_{phrase.replace(' ', '_')}_{i:04d}.wav"
            
            if os.path.exists(output_file):
                clips_generated += 1
                continue
            
            # Use piper CLI to generate audio
            cmd = f'echo "{phrase}" | piper --model {voice_path} --length_scale {length_scale} --noise_scale {noise_scale} --output_file {output_file} 2>/dev/null'
            os.system(cmd)
            
            # Convert to 16kHz if needed
            if os.path.exists(output_file):
                clips_generated += 1
    
    return clips_generated

total_clips = 0
for voice_name, _ in tqdm(PIPER_VOICES, desc="Generating clips per voice"):
    voice_path = f"piper_voices/{voice_name}.onnx"
    n = generate_piper_clips(voice_path, WAKE_PHRASES, N_SYNTHETIC_PER_PHRASE, f"{WAKE_WORD}_clips")
    total_clips += n
    print(f"  {voice_name}: {n} clips")

print(f"\nTotal synthetic clips generated: {total_clips}")

In [None]:
# Resample all clips to 16kHz (required by openWakeWord)
from scipy.io import wavfile
from scipy import signal

def resample_to_16k(input_path, output_path=None):
    """Resample audio file to 16kHz."""
    if output_path is None:
        output_path = input_path
    
    try:
        sr, data = wavfile.read(input_path)
        if sr != 16000:
            # Resample
            n_samples = int(len(data) * 16000 / sr)
            data = signal.resample(data, n_samples).astype(np.int16)
            wavfile.write(output_path, 16000, data)
        return True
    except Exception as e:
        print(f"Error resampling {input_path}: {e}")
        return False

# Resample all generated clips
clips = list(Path(f"{WAKE_WORD}_clips").glob("*.wav"))
for clip in tqdm(clips, desc="Resampling to 16kHz"):
    resample_to_16k(str(clip))

print(f"Resampled {len(clips)} clips to 16kHz")

In [None]:
# Listen to a sample clip
sample_clips = list(Path(f"{WAKE_WORD}_clips").glob("*.wav"))[:5]
for clip in sample_clips:
    print(f"Playing: {clip.name}")
    ipd.display(ipd.Audio(str(clip), rate=16000, autoplay=False))

## 3. Download Negative Data

Download samples of music, noise, and speech that do NOT contain the wake word.

In [None]:
# Download negative data samples
!wget -q -O fma_sample.zip "https://f002.backblazeb2.com/file/openwakeword-resources/data/fma_sample.zip"
!unzip -q -o fma_sample.zip

!wget -q -O fsd50k_sample.zip "https://f002.backblazeb2.com/file/openwakeword-resources/data/fsd50k_sample.zip"
!unzip -q -o fsd50k_sample.zip

print("Downloaded FMA (music) and FSD50K (noise) samples")

In [None]:
# Download Common Voice speech samples
cv_11 = datasets.load_dataset("mozilla-foundation/common_voice_11_0", "en", split="test", streaming=True, trust_remote_code=True)
cv_11 = cv_11.cast_column("audio", datasets.Audio(sampling_rate=16000, mono=True))
cv_11 = iter(cv_11)

# Convert and save clips (first 5000)
os.makedirs("cv11_test_clips", exist_ok=True)
limit = 5000

for i in tqdm(range(limit), desc="Downloading Common Voice clips"):
    try:
        example = next(cv_11)
        output = os.path.join("cv11_test_clips", example["path"][0:-4] + ".wav")
        os.makedirs(os.path.dirname(output), exist_ok=True)
        wav_data = (example["audio"]["array"] * 32767).astype(np.int16)
        scipy.io.wavfile.write(output, 16000, wav_data)
    except StopIteration:
        break
    except Exception as e:
        continue

print(f"Downloaded Common Voice clips")

## 4. Compute Audio Embeddings

In [None]:
# Create audio feature extractor
F = openwakeword.utils.AudioFeatures()

In [None]:
# Get negative clips
negative_clips, negative_durations = openwakeword.data.filter_audio_paths(
    ["fma_sample", "fsd50k_sample", "cv11_test_clips"],
    min_length_secs=1.0,
    max_length_secs=60*30,
    duration_method="header"
)

print(f"{len(negative_clips)} negative clips, ~{sum(negative_durations)//3600} hours")

In [None]:
# Compute negative features
audio_dataset = datasets.Dataset.from_dict({"audio": negative_clips})
audio_dataset = audio_dataset.cast_column("audio", datasets.Audio(sampling_rate=16000))

batch_size = 64
clip_size = CLIP_LENGTH_SECONDS
N_total = int(sum(negative_durations) // clip_size)
n_feature_cols = F.get_embedding_shape(clip_size)

output_file = "negative_features.npy"
output_array_shape = (N_total, n_feature_cols[0], n_feature_cols[1])
fp = open_memmap(output_file, mode='w+', dtype=np.float32, shape=output_array_shape)

row_counter = 0
for i in tqdm(np.arange(0, audio_dataset.num_rows, batch_size), desc="Computing negative features"):
    wav_data = [(j["array"] * 32767).astype(np.int16) for j in audio_dataset[i:i+batch_size]["audio"]]
    wav_data = openwakeword.data.stack_clips(wav_data, clip_size=16000*clip_size).astype(np.int16)
    features = F.embed_clips(x=wav_data, batch_size=1024, ncpu=2)
    
    if row_counter + features.shape[0] > N_total:
        fp[row_counter:min(row_counter+features.shape[0], N_total), :, :] = features[0:N_total - row_counter, :, :]
        fp.flush()
        break
    else:
        fp[row_counter:row_counter+features.shape[0], :, :] = features
        row_counter += features.shape[0]
        fp.flush()

openwakeword.data.trim_mmap(output_file)
print(f"Saved negative features to {output_file}")

In [None]:
# Get positive clips
positive_clips, durations = openwakeword.data.filter_audio_paths(
    [f"{WAKE_WORD}_clips"],
    min_length_secs=0.3,
    max_length_secs=3.0,
    duration_method="header"
)

print(f"{len(positive_clips)} positive clips after filtering")

In [None]:
# Mix positive clips with background noise and compute features
sr = 16000
total_length = int(sr * CLIP_LENGTH_SECONDS)

# Position positive clips to end near the end of the window (for low latency detection)
jitters = (np.random.uniform(0, 0.2, len(positive_clips)) * sr).astype(np.int32)
starts = [total_length - (int(np.ceil(d * sr)) + j) for d, j in zip(durations, jitters)]

# Create mixing generator
batch_size = 8
mixing_generator = openwakeword.data.mix_clips_batch(
    foreground_clips=positive_clips,
    background_clips=negative_clips,
    combined_size=total_length,
    batch_size=batch_size,
    snr_low=5,
    snr_high=15,
    start_index=starts,
    volume_augmentation=True,
)

In [None]:
# Listen to a mixed sample
mixed_clips, labels, background_clips = next(mixing_generator)
print("Sample mixed clip (wake word + background noise):")
ipd.display(ipd.Audio(mixed_clips[0], rate=16000, normalize=True, autoplay=False))

In [None]:
# Recreate generator and compute positive features
mixing_generator = openwakeword.data.mix_clips_batch(
    foreground_clips=positive_clips,
    background_clips=negative_clips,
    combined_size=total_length,
    batch_size=batch_size,
    snr_low=5,
    snr_high=15,
    start_index=starts,
    volume_augmentation=True,
)

N_total = len(positive_clips)
n_feature_cols = F.get_embedding_shape(CLIP_LENGTH_SECONDS)

output_file = f"{WAKE_WORD}_features.npy"
output_array_shape = (N_total, n_feature_cols[0], n_feature_cols[1])
fp = open_memmap(output_file, mode='w+', dtype=np.float32, shape=output_array_shape)

row_counter = 0
for batch in tqdm(mixing_generator, total=N_total//batch_size, desc="Computing positive features"):
    batch_data, lbls, background = batch[0], batch[1], batch[2]
    features = F.embed_clips(batch_data, batch_size=256)
    fp[row_counter:row_counter+features.shape[0], :, :] = features
    row_counter += features.shape[0]
    fp.flush()
    
    if row_counter >= N_total:
        break

openwakeword.data.trim_mmap(output_file)
print(f"Saved positive features to {output_file}")

## 5. Train the Model

In [None]:
# Load features
negative_features = np.load("negative_features.npy")
positive_features = np.load(f"{WAKE_WORD}_features.npy")

print(f"Negative features shape: {negative_features.shape}")
print(f"Positive features shape: {positive_features.shape}")

X = np.vstack((negative_features, positive_features))
y = np.array([0]*len(negative_features) + [1]*len(positive_features)).astype(np.float32)[..., None]

print(f"\nTotal samples: {len(X)}")
print(f"Positive ratio: {y.sum()/len(y)*100:.1f}%")

In [None]:
# Create PyTorch dataloader
batch_size = 512
training_data = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(torch.from_numpy(X), torch.from_numpy(y)),
    batch_size=batch_size,
    shuffle=True
)

# Move to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# Define model architecture
layer_dim = 64  # Slightly larger for better accuracy

model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(X.shape[1] * X.shape[2], layer_dim),
    nn.LayerNorm(layer_dim),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.Linear(layer_dim, layer_dim),
    nn.LayerNorm(layer_dim),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.Linear(layer_dim, 1),
    nn.Sigmoid(),
).to(device)

loss_function = torch.nn.functional.binary_cross_entropy
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)

print(model)

In [None]:
# Training loop
n_epochs = 15
history = collections.defaultdict(list)

for epoch in tqdm(range(n_epochs), desc="Training"):
    epoch_loss = []
    epoch_recall = []
    
    for batch in training_data:
        x, y_batch = batch[0].to(device), batch[1].to(device)
        
        # Weight negative class higher to reduce false positives
        weights = torch.ones(y_batch.shape[0], device=device)
        weights[y_batch.flatten() == 1] = 0.1
        
        optimizer.zero_grad()
        predictions = model(x)
        loss = loss_function(predictions, y_batch, weights[..., None])
        loss.backward()
        optimizer.step()
        
        epoch_loss.append(float(loss.detach().cpu().numpy()))
        
        # Calculate recall
        tp = sum(predictions.flatten()[y_batch.flatten() == 1] >= 0.5)
        fn = sum(predictions.flatten()[y_batch.flatten() == 1] < 0.5)
        if (tp + fn) > 0:
            epoch_recall.append(float((tp / (tp + fn)).detach().cpu().numpy()))
    
    history['loss'].append(np.mean(epoch_loss))
    history['recall'].append(np.mean(epoch_recall))
    
    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}: Loss={history['loss'][-1]:.4f}, Recall={history['recall'][-1]:.4f}")

In [None]:
# Plot training metrics
plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.plot(history['loss'], label='Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history['recall'], label='Recall', color='orange')
plt.xlabel('Epoch')
plt.ylabel('Recall')
plt.title('Training Recall')
plt.ylim(0, 1)
plt.legend()

plt.tight_layout()
plt.show()

## 6. Export Model to ONNX

In [None]:
# Export to ONNX format
model.eval()
model_cpu = model.cpu()

output_path = f"{WAKE_WORD}.onnx"
dummy_input = torch.zeros((1, X.shape[1], X.shape[2]))

torch.onnx.export(
    model_cpu,
    dummy_input,
    output_path,
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)

print(f"Model exported to: {output_path}")
print(f"Model size: {os.path.getsize(output_path) / 1024:.1f} KB")

## 7. Test the Model

In [None]:
# Load model with openWakeWord
oww = openwakeword.Model(
    wakeword_model_paths=[f"{WAKE_WORD}.onnx"],
    enable_speex_noise_suppression=True,
    vad_threshold=0.5
)

print(f"Loaded model: {WAKE_WORD}")

In [None]:
# Test on a positive clip
test_clip = list(Path(f"{WAKE_WORD}_clips").glob("*.wav"))[0]
print(f"Testing on: {test_clip}")

scores = oww.predict_clip(str(test_clip))

plt.figure(figsize=(10, 3))
plt.plot([s[WAKE_WORD] for s in scores])
plt.axhline(y=0.5, color='r', linestyle='--', label='Threshold')
plt.xlabel('Frame (80ms each)')
plt.ylabel('Score')
plt.title(f'Wake Word Detection Score - {test_clip.name}')
plt.ylim(0, 1)
plt.legend()
plt.show()

max_score = max([s[WAKE_WORD] for s in scores])
print(f"Max score: {max_score:.3f} {'✓ DETECTED' if max_score >= 0.5 else '✗ NOT DETECTED'}")

In [None]:
# Test on multiple positive clips
test_clips = list(Path(f"{WAKE_WORD}_clips").glob("*.wav"))[:50]
detected = 0

for clip in tqdm(test_clips, desc="Testing positive clips"):
    scores = oww.predict_clip(str(clip))
    max_score = max([s[WAKE_WORD] for s in scores])
    if max_score >= 0.5:
        detected += 1

print(f"\nDetection rate: {detected}/{len(test_clips)} ({detected/len(test_clips)*100:.1f}%)")

In [None]:
# Test false positive rate on negative clips
test_negative = negative_clips[:100]
false_positives = 0

for clip in tqdm(test_negative, desc="Testing negative clips"):
    try:
        scores = oww.predict_clip(clip)
        max_score = max([s[WAKE_WORD] for s in scores])
        if max_score >= 0.5:
            false_positives += 1
    except:
        continue

print(f"\nFalse positive rate: {false_positives}/{len(test_negative)} ({false_positives/len(test_negative)*100:.1f}%)")

## 8. Download the Model

Download the trained model to use with your voice assistant.

In [None]:
# Download the model file
from google.colab import files

print(f"Downloading {WAKE_WORD}.onnx...")
files.download(f"{WAKE_WORD}.onnx")

print("\n" + "="*50)
print("DONE! Next steps:")
print("="*50)
print(f"1. Save {WAKE_WORD}.onnx to your Raspberry Pi")
print(f"2. Update your WakeWordConfig:")
print(f"   WakeWordConfig(model_path='path/to/{WAKE_WORD}.onnx')")
print(f"3. Run your voice assistant and say 'GLaDOS'!")