# Custom Wake Word Training

This notebook guides you through training your own custom wake word model using openWakeWord.

## What you'll learn:
- ✅ How to prepare audio data
- ✅ Extract audio features
- ✅ Train a neural network model
- ✅ Export and test your model

## Requirements:
- Positive examples: Audio clips of your wake phrase (100+ recommended)
- Negative examples: Other speech, music, noise (1000+ recommended)
- All audio should be 16kHz, mono, 16-bit WAV files

## Setup

First, let's install and import required libraries.

In [None]:
# Install requirements (uncomment if needed)
# !pip install openwakeword torch scipy matplotlib datasets tqdm

In [None]:
# Imports
import os
import collections
import numpy as np
from pathlib import Path
from tqdm import tqdm
import scipy.io.wavfile
import matplotlib.pyplot as plt

import torch
from torch import nn

import openwakeword
import openwakeword.data
import openwakeword.utils
import openwakeword.metrics

print("✓ All imports successful!")

## Configuration

Set your wake phrase and data directories here.

## Step 0: Data Preparation (IMPORTANT!)

Before training, you need audio data. There are three options:

### **Option 1: Use Sample Datasets (Recommended for Testing)**
Download pre-prepared sample datasets to quickly test the training pipeline.

### **Option 2: Download Full Datasets**
Get larger datasets for better model performance.

### **Option 3: Use Your Own Audio**
Record or collect your own audio files (skip to Configuration section).

**Choose one option below and run the corresponding cells.**

### Option 1: Download Sample Datasets (Quick Start)

Run these cells to download small sample datasets for quick testing:

In [None]:
# Create directory structure
!mkdir -p training_data/positive
!mkdir -p training_data/negative/speech
!mkdir -p training_data/negative/music
!mkdir -p training_data/negative/noise

print("✓ Directory structure created")

In [None]:
# Download sample negative data: Speech from Common Voice
# This will download ~5000 clips of English speech (about 500MB)

import datasets

print("Downloading Common Voice 11 test split...")
print("This may take a few minutes...")

cv_11 = datasets.load_dataset("mozilla-foundation/common_voice_11_0", "en", split="test", streaming=True)
cv_11 = cv_11.cast_column("audio", datasets.Audio(sampling_rate=16000, mono=True))
cv_11 = iter(cv_11)

# Convert and save clips (first 5000 for sample)
limit = 5000
print(f"Processing {limit} audio clips...")

for i in tqdm(range(limit)):
    try:
        example = next(cv_11)
        output = os.path.join("training_data/negative/speech", example["path"].replace("/", "_")[0:-4] + ".wav")
        os.makedirs(os.path.dirname(output), exist_ok=True)
        
        # Convert to 16-bit PCM format
        wav_data = (example["audio"]["array"] * 32767).astype(np.int16)
        scipy.io.wavfile.write(output, 16000, wav_data)
    except StopIteration:
        print(f"Reached end of dataset at {i} clips")
        break
    except Exception as e:
        print(f"Error processing clip {i}: {e}")
        continue

print(f"✓ Downloaded {limit} speech clips to training_data/negative/speech/")

In [None]:
# Download sample music and noise datasets
# These are pre-prepared sample datasets from openWakeWord

import urllib.request
import zipfile

# Download FMA music samples
print("Downloading FMA music samples...")
music_url = "https://f002.backblazeb2.com/file/openwakeword-resources/data/fma_sample.zip"
music_zip = "fma_sample.zip"

try:
    urllib.request.urlretrieve(music_url, music_zip)
    with zipfile.ZipFile(music_zip, 'r') as zip_ref:
        zip_ref.extractall("training_data/negative/music/")
    os.remove(music_zip)
    print(f"✓ Downloaded music samples to training_data/negative/music/")
except Exception as e:
    print(f"⚠️  Failed to download music samples: {e}")

# Download FSD50k noise samples
print("\nDownloading FSD50k noise samples...")
noise_url = "https://f002.backblazeb2.com/file/openwakeword-resources/data/fsd50k_sample.zip"
noise_zip = "fsd50k_sample.zip"

try:
    urllib.request.urlretrieve(noise_url, noise_zip)
    with zipfile.ZipFile(noise_zip, 'r') as zip_ref:
        zip_ref.extractall("training_data/negative/noise/")
    os.remove(noise_zip)
    print(f"✓ Downloaded noise samples to training_data/negative/noise/")
except Exception as e:
    print(f"⚠️  Failed to download noise samples: {e}")

print("\n✅ Sample dataset download complete!")

### Option 2: Generate Positive Examples with TTS

Since you need positive examples of your wake phrase, let's generate them using Text-to-Speech!

In [None]:
# Option A: Simple TTS with pyttsx3 (easiest, but lower quality)
# Uncomment and run if you want quick synthetic data

# !pip install pyttsx3

# import pyttsx3
# 
# engine = pyttsx3.init()
# voices = engine.getProperty('voices')
# 
# wake_phrase = "hey jarvis"  # Change this!
# 
# print(f"Generating {len(voices) * 10} clips with different voices...")
# 
# for i, voice in enumerate(voices):
#     engine.setProperty('voice', voice.id)
#     for j in range(10):  # 10 variations per voice
#         output_file = f"training_data/positive/tts_voice_{i}_clip_{j}.wav"
#         engine.save_to_file(wake_phrase, output_file)
#     engine.runAndWait()
# 
# print(f"✓ Generated positive examples in training_data/positive/")

In [None]:
# Option B: Record your own voice (recommended!)
# Instructions:
# 1. Use any recording app (Audacity, QuickTime, etc.)
# 2. Record yourself saying the wake phrase 50-100 times
# 3. Vary your tone, speed, and environment
# 4. Save as WAV files: training_data/positive/recording_001.wav, etc.
# 5. Ensure 16kHz, mono, 16-bit format

# To convert existing recordings to correct format:
# !ffmpeg -i input.wav -ar 16000 -ac 1 -sample_fmt s16 output.wav

print("⚠️  IMPORTANT: You need positive examples of YOUR wake phrase!")
print("")
print("Quick option: Download a sample dataset for 'turn on the office lights'")
print("Run this cell to download:")

import urllib.request
import tarfile

positive_url = "https://f002.backblazeb2.com/file/openwakeword-resources/data/turn_on_the_office_lights.tar.gz"
positive_tar = "turn_on_the_office_lights.tar.gz"

try:
    print("Downloading sample wake phrase clips...")
    urllib.request.urlretrieve(positive_url, positive_tar)
    
    with tarfile.open(positive_tar, 'r:gz') as tar:
        tar.extractall("training_data/positive/")
    
    os.remove(positive_tar)
    print("✓ Downloaded ~3400 positive examples to training_data/positive/")
    print("  Wake phrase: 'turn on the office lights'")
    print("  Remember to update WAKE_PHRASE in Configuration cell!")
except Exception as e:
    print(f"⚠️  Download failed: {e}")
    print("  Please add your own recordings to training_data/positive/")

### Verify Your Dataset

Run this to check what you have:

In [None]:
# Check what data you have
import os
from pathlib import Path

def count_wav_files(directory):
    """Count WAV files in a directory"""
    if not os.path.exists(directory):
        return 0
    return len(list(Path(directory).rglob("*.wav")))

positive_count = count_wav_files("training_data/positive")
speech_count = count_wav_files("training_data/negative/speech")
music_count = count_wav_files("training_data/negative/music")
noise_count = count_wav_files("training_data/negative/noise")
total_negative = speech_count + music_count + noise_count

print("=" * 60)
print("DATASET SUMMARY")
print("=" * 60)
print(f"Positive examples (wake phrase):  {positive_count:,}")
print(f"\nNegative examples:")
print(f"  Speech:  {speech_count:,}")
print(f"  Music:   {music_count:,}")
print(f"  Noise:   {noise_count:,}")
print(f"  Total:   {total_negative:,}")
print("=" * 60)

# Check if you have enough data
print("\nData Quality Check:")
if positive_count < 50:
    print("❌ WARNING: Very few positive examples!")
    print("   Recommendation: Need at least 50-100, preferably 500+")
elif positive_count < 500:
    print("⚠️  NOTICE: Limited positive examples")
    print("   Recommendation: 500+ examples for better results")
else:
    print("✅ Good amount of positive examples")

if total_negative < 500:
    print("❌ WARNING: Very few negative examples!")
    print("   Recommendation: Need at least 500-1000, preferably 5000+")
elif total_negative < 5000:
    print("⚠️  NOTICE: Limited negative examples")
    print("   Recommendation: 5000+ examples for better results")
else:
    print("✅ Good amount of negative examples")

print("\nYou can proceed to Configuration and training below!")

---

## ✅ Data Preparation Complete!

If you ran the cells above, you now have:
- ✅ Directory structure created
- ✅ Negative examples downloaded (speech, music, noise)
- ✅ Option to generate or download positive examples
- ✅ Dataset verification

**Next:** Continue to the Configuration section below to set your wake phrase and start training!

In [None]:
# Configuration
WAKE_PHRASE = "hey jarvis"  # Change this to your wake phrase
POSITIVE_DIR = "training_data/positive"  # Directory with positive examples
NEGATIVE_DIRS = [  # Directories with negative examples
    "training_data/negative/speech",
    "training_data/negative/music",
    "training_data/negative/noise"
]
OUTPUT_DIR = Path("training_output")
OUTPUT_DIR.mkdir(exist_ok=True)

# Training parameters
N_EPOCHS = 10
BATCH_SIZE = 512
LEARNING_RATE = 0.001

print(f"Wake phrase: {WAKE_PHRASE}")
print(f"Output directory: {OUTPUT_DIR}")

## Step 1: Data Collection

Let's load and validate your audio files.

In [None]:
# Get positive examples
positive_dir = Path(POSITIVE_DIR)
positive_clips_raw = list(positive_dir.glob("*.wav"))
print(f"Found {len(positive_clips_raw)} positive examples")

# Get negative examples
negative_clips_raw = []
for neg_dir in NEGATIVE_DIRS:
    neg_path = Path(neg_dir)
    if neg_path.exists():
        clips = list(neg_path.glob("*.wav"))
        negative_clips_raw.extend(clips)
        print(f"Found {len(clips)} negative examples in {neg_dir}")
    else:
        print(f"⚠️  Directory not found: {neg_dir}")

print(f"\nTotal: {len(positive_clips_raw)} positive, {len(negative_clips_raw)} negative")

In [None]:
# Filter clips by duration (2-8 seconds)
print("Filtering clips by duration (2-8 seconds)...")

positive_clips, positive_durations = openwakeword.data.filter_audio_paths(
    [str(p) for p in positive_clips_raw],
    min_duration=2.0,
    max_duration=8.0
)

negative_clips, negative_durations = openwakeword.data.filter_audio_paths(
    [str(p) for p in negative_clips_raw],
    min_duration=2.0,
    max_duration=8.0
)

print(f"\nAfter filtering: {len(positive_clips)} positive, {len(negative_clips)} negative")

# Check if we have enough data
if len(positive_clips) < 10:
    print("\n⚠️  WARNING: Very few positive examples! You need at least 50-100 for good results.")
if len(negative_clips) < 100:
    print("\n⚠️  WARNING: Few negative examples! You need at least 1000+ for good results.")

In [None]:
# Visualize dataset statistics
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Duration histogram
axes[0].hist(positive_durations, bins=20, alpha=0.5, label='Positive')
axes[0].hist(negative_durations, bins=20, alpha=0.5, label='Negative')
axes[0].set_xlabel('Duration (seconds)')
axes[0].set_ylabel('Count')
axes[0].set_title('Audio Duration Distribution')
axes[0].legend()

# Dataset size comparison
axes[1].bar(['Positive', 'Negative'], [len(positive_clips), len(negative_clips)])
axes[1].set_ylabel('Number of Clips')
axes[1].set_title('Dataset Size')

plt.tight_layout()
plt.show()

## Step 2: Feature Extraction

Convert audio to openWakeWord embeddings (28 timesteps × 96 features).

In [None]:
# Initialize feature extractor
print("Initializing audio feature extractor...")
feature_extractor = openwakeword.utils.AudioFeatures()
print("✓ Feature extractor ready")

In [None]:
def extract_features(audio_clips, cache_file=None):
    """Extract features from audio clips with optional caching"""
    
    # Check for cached features
    if cache_file and cache_file.exists():
        print(f"Loading cached features from {cache_file}")
        return np.load(cache_file)
    
    # Extract features
    all_features = []
    
    for clip_path in tqdm(audio_clips, desc="Extracting features"):
        try:
            # Read audio
            rate, audio = scipy.io.wavfile.read(clip_path)
            
            # Verify sample rate
            if rate != 16000:
                print(f"⚠️  Skipping {clip_path}: incorrect sample rate {rate}Hz (need 16kHz)")
                continue
            
            # Extract features
            features = feature_extractor.get_features(audio)
            all_features.append(features)
            
        except Exception as e:
            print(f"⚠️  Error processing {clip_path}: {e}")
            continue
    
    # Stack into array
    features_array = np.vstack(all_features)
    
    # Cache if requested
    if cache_file:
        np.save(cache_file, features_array)
        print(f"✓ Saved features to {cache_file}")
    
    return features_array

In [None]:
# Extract positive features
model_name = WAKE_PHRASE.replace(' ', '_')
positive_features = extract_features(
    positive_clips,
    cache_file=OUTPUT_DIR / f"{model_name}_positive_features.npy"
)

print(f"Positive features shape: {positive_features.shape}")

In [None]:
# Extract negative features
negative_features = extract_features(
    negative_clips,
    cache_file=OUTPUT_DIR / "negative_features.npy"
)

print(f"Negative features shape: {negative_features.shape}")

In [None]:
# Visualize sample features
fig, axes = plt.subplots(1, 2, figsize=(14, 4))

# Positive example
axes[0].imshow(positive_features[0].T, aspect='auto', origin='lower', cmap='viridis')
axes[0].set_xlabel('Time Steps')
axes[0].set_ylabel('Feature Dimension')
axes[0].set_title('Positive Example Features')
axes[0].colorbar()

# Negative example
axes[1].imshow(negative_features[0].T, aspect='auto', origin='lower', cmap='viridis')
axes[1].set_xlabel('Time Steps')
axes[1].set_ylabel('Feature Dimension')
axes[1].set_title('Negative Example Features')

plt.tight_layout()
plt.show()

## Step 3: Model Training

Train a neural network to classify wake word vs non-wake word.

In [None]:
# Prepare training data
X = np.vstack((negative_features, positive_features))
y = np.array([0] * len(negative_features) + [1] * len(positive_features)).astype(np.float32)[..., None]

print(f"Training data shape: {X.shape}")
print(f"Labels shape: {y.shape}")
print(f"Positive examples: {sum(y == 1)[0]}")
print(f"Negative examples: {sum(y == 0)[0]}")
print(f"Positive ratio: {sum(y == 1)[0] / len(y) * 100:.1f}%")

In [None]:
# Create PyTorch dataloader
training_data = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(torch.from_numpy(X), torch.from_numpy(y)),
    batch_size=BATCH_SIZE,
    shuffle=True
)

print(f"✓ Created dataloader with batch size {BATCH_SIZE}")
print(f"  Number of batches: {len(training_data)}")

In [None]:
# Define model architecture
layer_dim = 32

model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(X.shape[1] * X.shape[2], layer_dim),  # timesteps * features -> hidden
    nn.LayerNorm(layer_dim),
    nn.ReLU(),
    nn.Linear(layer_dim, layer_dim),
    nn.LayerNorm(layer_dim),
    nn.ReLU(),
    nn.Linear(layer_dim, 1),  # hidden -> output (0 or 1)
    nn.Sigmoid(),
)

print("Model architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters())}")

In [None]:
# Training setup
loss_function = torch.nn.functional.binary_cross_entropy
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

print(f"✓ Optimizer: Adam with learning rate {LEARNING_RATE}")
print(f"✓ Loss function: Binary Cross Entropy")

In [None]:
# Training loop
print(f"\nTraining for {N_EPOCHS} epochs...\n")

history = collections.defaultdict(list)

for epoch in range(N_EPOCHS):
    epoch_losses = []
    epoch_recalls = []
    
    progress_bar = tqdm(training_data, desc=f"Epoch {epoch+1}/{N_EPOCHS}")
    
    for batch in progress_bar:
        x, y_batch = batch[0], batch[1]
        
        # Weight classes: 10x penalty for false positives
        # This helps reduce false activations
        weights = torch.ones(y_batch.shape[0])
        weights[y_batch.flatten() == 1] = 0.1
        
        # Forward pass
        optimizer.zero_grad()
        predictions = model(x)
        
        # Calculate loss
        loss = loss_function(predictions, y_batch, weights[..., None])
        loss.backward()
        optimizer.step()
        
        # Track metrics
        epoch_losses.append(float(loss.detach().numpy()))
        
        tp = sum(predictions.flatten()[y_batch.flatten() == 1] >= 0.5)
        fn = sum(predictions.flatten()[y_batch.flatten() == 1] < 0.5)
        recall = float(tp / (tp + fn).detach().numpy())
        epoch_recalls.append(recall)
        
        progress_bar.set_postfix({
            'loss': f'{loss.item():.4f}',
            'recall': f'{recall:.4f}'
        })
    
    # Log epoch summary
    avg_loss = np.mean(epoch_losses)
    avg_recall = np.mean(epoch_recalls)
    history['loss'].extend(epoch_losses)
    history['recall'].extend(epoch_recalls)
    history['epoch_loss'].append(avg_loss)
    history['epoch_recall'].append(avg_recall)
    
    print(f"Epoch {epoch+1}/{N_EPOCHS} Summary - Loss: {avg_loss:.4f}, Recall: {avg_recall:.4f}")

print("\n✓ Training complete!")

In [None]:
# Visualize training metrics
fig, axes = plt.subplots(1, 2, figsize=(14, 4))

# Loss over time
axes[0].plot(history['epoch_loss'])
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training Loss')
axes[0].grid(True)

# Recall over time
axes[1].plot(history['epoch_recall'])
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Recall')
axes[1].set_title('Training Recall')
axes[1].set_ylim(0, 1)
axes[1].grid(True)

plt.tight_layout()
plt.show()

print(f"\nFinal training metrics:")
print(f"  Loss: {history['epoch_loss'][-1]:.4f}")
print(f"  Recall: {history['epoch_recall'][-1]:.4f}")

## Step 4: Model Export

Export the trained model to ONNX format for use with openWakeWord.

In [None]:
# Export to ONNX
onnx_path = OUTPUT_DIR / f"{model_name}.onnx"

print(f"Exporting model to {onnx_path}...")

torch.onnx.export(
    model,
    args=torch.zeros((1, 28, 96)),  # Input shape: (batch, timesteps, features)
    f=str(onnx_path),
    input_names=['input'],
    output_names=['output'],
    dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)

print(f"✓ Model exported to: {onnx_path}")
print(f"  File size: {onnx_path.stat().st_size / 1024:.1f} KB")

## Step 5: Model Testing

Load and test the exported model with openWakeWord.

In [None]:
# Load model with openWakeWord
print(f"Loading model with openWakeWord...")

oww_model = openwakeword.Model(
    wakeword_model_paths=[str(onnx_path)],
    enable_speex_noise_suppression=True,
    vad_threshold=0.5
)

print(f"✓ Model loaded successfully!")
print(f"  Model name: {model_name}")

In [None]:
# Test on positive examples
print("\nTesting on positive examples...\n")

for i, clip_path in enumerate(positive_clips[:5]):
    scores = oww_model.predict_clip(str(clip_path))
    max_score = max(s[model_name] for s in scores)
    print(f"{i+1}. {Path(clip_path).name}")
    print(f"   Max score: {max_score:.3f} {'✓ DETECTED' if max_score > 0.5 else '✗ MISSED'}")

In [None]:
# Test on negative examples
print("\nTesting on negative examples...\n")

for i, clip_path in enumerate(negative_clips[:5]):
    scores = oww_model.predict_clip(str(clip_path))
    max_score = max(s[model_name] for s in scores)
    print(f"{i+1}. {Path(clip_path).name}")
    print(f"   Max score: {max_score:.3f} {'✓ CORRECT' if max_score < 0.5 else '✗ FALSE POSITIVE'}")

In [None]:
# Visualize prediction on a sample clip
sample_clip = positive_clips[0]
scores = oww_model.predict_clip(str(sample_clip))
score_values = [s[model_name] for s in scores]

plt.figure(figsize=(12, 4))
plt.plot(score_values)
plt.axhline(y=0.5, color='r', linestyle='--', label='Threshold (0.5)')
plt.xlabel('Frame')
plt.ylabel('Detection Score')
plt.title(f'Detection Scores on Positive Example: {Path(sample_clip).name}')
plt.ylim(0, 1)
plt.legend()
plt.grid(True)
plt.show()

print(f"Max score: {max(score_values):.3f}")
print(f"Mean score: {np.mean(score_values):.3f}")

## Summary

Your custom wake word model is now trained and ready to use!

In [None]:
print("="*60)
print("TRAINING COMPLETE!")
print("="*60)
print(f"\nWake phrase: '{WAKE_PHRASE}'")
print(f"Model file: {onnx_path}")
print(f"Model size: {onnx_path.stat().st_size / 1024:.1f} KB")
print(f"\nTraining data:")
print(f"  Positive examples: {len(positive_clips)}")
print(f"  Negative examples: {len(negative_clips)}")
print(f"\nFinal metrics:")
print(f"  Loss: {history['epoch_loss'][-1]:.4f}")
print(f"  Recall: {history['epoch_recall'][-1]:.4f}")
print(f"\nNext steps:")
print(f"  1. Copy {onnx_path} to your models/ folder")
print(f"  2. Update simple_wakeword_test.py to use your model")
print(f"  3. Test with: python simple_wakeword_test.py")
print("="*60)