# Wake Word Detection - Threshold Sweep Analysis

This notebook performs a systematic sweep across different detection thresholds to analyze model performance on negative samples.

## 1. Setup - Mount Google Drive

In [11]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 2. Install Dependencies

In [12]:
!pip install openwakeword numpy



## 3. Import Libraries

In [13]:
import csv
import wave
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict

import numpy as np
from openwakeword.model import Model

## 4. Configuration Parameters

**Update these paths after uploading your files to Google Drive**

In [14]:
# ============================================================
# PATHS - UPDATE THESE AFTER UPLOADING TO GOOGLE DRIVE
# ============================================================

# Path to your negative wavs (either a zip file or a directory)
NEGATIVE_WAVS_PATH = "/content/drive/MyDrive/curve_sweep/neg_wavs.zip"  # UPDATE THIS

# Path to your model
MODEL_PATH = "/content/drive/MyDrive/curve_sweep/how_you_do_this.onnx"  # UPDATE THIS

# Output CSV path in Google Drive
OUTPUT_CSV_PATH = "/content/drive/My Drive/threshold_sweep_results.csv"  # UPDATE IF NEEDED

# ============================================================
# SWEEP PARAMETERS
# ============================================================

# Thresholds to test
THRESHOLDS = [0.05, 0.1, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50,
              0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95]

# Detection parameters
RELEASE_RATIO = 0.9
HOP_MS = 40.0
CHUNK_SIZE = 1280
TARGET_RATE = 16000
CAPTURE_SECONDS = 0  # Set to 0 to disable saving triggered audio

# Working directory for extracted files (local to Colab instance)
WORK_DIR = "/content/negative_wavs"

print("Configuration loaded successfully!")
print(f"Testing {len(THRESHOLDS)} thresholds: {THRESHOLDS}")

Configuration loaded successfully!
Testing 19 thresholds: [0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95]


## 5. Extract Negative WAV Files (if using ZIP)

In [15]:
import os

# Check if input is a zip file
if NEGATIVE_WAVS_PATH.endswith('.zip'):
    print(f"Extracting {NEGATIVE_WAVS_PATH}...")
    !unzip -q "{NEGATIVE_WAVS_PATH}" -d "{WORK_DIR}"
    print(f"Extraction complete! Files extracted to {WORK_DIR}")

    # Count extracted files
    wav_files = list(Path(WORK_DIR).rglob('*.wav'))
    print(f"Found {len(wav_files)} WAV files")

    AUDIO_DIR = WORK_DIR
else:
    # Assume it's already a directory
    AUDIO_DIR = NEGATIVE_WAVS_PATH
    wav_files = list(Path(AUDIO_DIR).rglob('*.wav'))
    print(f"Using directory: {AUDIO_DIR}")
    print(f"Found {len(wav_files)} WAV files")

Extracting /content/drive/MyDrive/curve_sweep/neg_wavs.zip...
replace /content/negative_wavs/8555-284449-0011.wav? [y]es, [n]o, [A]ll, [N]one, [r]ename: N
Extraction complete! Files extracted to /content/negative_wavs
Found 2414 WAV files


## 6. Helper Classes and Functions

In [16]:
class PeakPicker:
    """
    Simple peak-picking logic:
    - Trigger when score crosses threshold from below.
    - Arm next trigger when score falls below release_threshold.
    """
    def __init__(self, threshold: float, release_ratio: float = 0.9):
        self.threshold = threshold
        self.release_threshold = threshold * release_ratio
        self.armed = True

    def step(self, score: float) -> bool:
        if self.armed and score >= self.threshold:
            self.armed = False
            return True
        if not self.armed and score < self.release_threshold:
            self.armed = True
        return False


def read_wav_int16_mono(path: Path) -> tuple:
    """Read WAV file and return (samples_array, sample_rate)"""
    with wave.open(str(path), "rb") as wf:
        nchannels = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        framerate = wf.getframerate()
        nframes = wf.getnframes()
        raw = wf.readframes(nframes)

    if sampwidth != 2:
        raise ValueError(f"Only 16-bit WAV supported, got {sampwidth*8}-bit")

    arr = np.frombuffer(raw, dtype=np.int16)
    if nchannels == 2:
        arr = arr.reshape(-1, 2).mean(axis=1).astype(np.int16)
    elif nchannels != 1:
        raise ValueError(f"Only mono/stereo supported, got {nchannels} channels")

    return arr, framerate


def to_16k_linear_int16(samples: np.ndarray, source_rate: int) -> np.ndarray:
    """Simple linear resampling to 16kHz"""
    if source_rate == TARGET_RATE:
        return samples
    ratio = TARGET_RATE / source_rate
    new_len = int(len(samples) * ratio)
    indices = np.linspace(0, len(samples) - 1, new_len)
    return np.interp(indices, np.arange(len(samples)), samples).astype(np.int16)


def extract_score(prediction, key: Optional[str]) -> Optional[float]:
    """Extract score from prediction"""
    if key is None:
        return None
    if isinstance(prediction, dict):
        val = prediction.get(key)
        if val is None:
            return None
        if isinstance(val, (list, np.ndarray)):
            return float(val[-1]) if len(val) > 0 else 0.0
        return float(val)
    return None


print("Helper functions loaded successfully!")

Helper functions loaded successfully!


## 7. Offline Detection Function

In [19]:
def run_offline_detection(audio_dir: Path, model_path: str, threshold: float,
                         hop_ms: float, chunk_size: int) -> Dict:
    """
    Run wake word detection on all WAV files in audio_dir with given threshold.
    Returns dictionary with metrics.
    """
    # Load model - fix: use correct parameter name
    owwModel = Model(
    wakeword_model_paths=[model_path] if model_path else None
)

    # Get model key
    prediction_key = None

    # Peak picker for this threshold
    picker = PeakPicker(threshold, RELEASE_RATIO)

    # Metrics
    total_triggers = 0
    total_seconds = 0.0
    global_max_score = 0.0
    file_count = 0

    # Find all WAV files
    files = sorted(audio_dir.rglob('*.wav'))

    hop_samples = int((hop_ms / 1000.0) * TARGET_RATE)

    for wav_file in files:
        try:
            arr, sr = read_wav_int16_mono(wav_file)
        except Exception as e:
            print(f"Warning: Could not read {wav_file}: {e}")
            continue

        if sr != TARGET_RATE:
            arr = to_16k_linear_int16(arr, sr)

        file_count += 1
        total_seconds += len(arr) / TARGET_RATE

        # Process in chunks with hopping
        for start_idx in range(0, len(arr), hop_samples):
            end_idx = min(start_idx + chunk_size, len(arr))
            chunk = arr[start_idx:end_idx]

            if len(chunk) < chunk_size:
                chunk = np.pad(chunk, (0, chunk_size - len(chunk)), mode='constant')

            try:
                prediction = owwModel.predict(chunk)
            except Exception as e:
                continue

            # Auto-detect key on first prediction
            if prediction_key is None and isinstance(prediction, dict):
                keys = list(prediction.keys())
                prediction_key = keys[0] if keys else None

            score = extract_score(prediction, prediction_key)
            if score is None:
                continue

            global_max_score = max(global_max_score, score)

            if picker.step(score):
                total_triggers += 1

    return {
        'threshold': threshold,
        'total_triggers': total_triggers,
        'total_files': file_count,
        'total_duration_seconds': total_seconds,
        'total_duration_hours': total_seconds / 3600.0,
        'global_max_score': global_max_score,
        'triggers_per_hour': total_triggers / (total_seconds / 3600.0) if total_seconds > 0 else 0,
        'release_threshold': threshold * RELEASE_RATIO
    }


print("Detection function loaded successfully!")

Detection function loaded successfully!


## 8. Run Threshold Sweep

This will test all thresholds and collect results. This may take several minutes depending on the number of files.

In [None]:
results = []

print(f"Starting threshold sweep across {len(THRESHOLDS)} thresholds...")
print(f"Processing {len(wav_files)} WAV files")
print("="*60)

for i, threshold in enumerate(THRESHOLDS):
    print(f"\n[{i+1}/{len(THRESHOLDS)}] Testing threshold: {threshold}")

    result = run_offline_detection(
        audio_dir=Path(AUDIO_DIR),
        model_path=MODEL_PATH,
        threshold=threshold,
        hop_ms=HOP_MS,
        chunk_size=CHUNK_SIZE
    )

    results.append(result)
    print(f"  â†’ Triggers: {result['total_triggers']}, Max score: {result['global_max_score']:.4f}")

print("\n" + "="*60)
print(f"Sweep complete! Tested {len(results)} thresholds.")

Starting threshold sweep across 19 thresholds...
Processing 2414 WAV files

[1/19] Testing threshold: 0.05




## 9. Save Results to CSV

In [None]:
# Define CSV headers
headers = [
    'threshold',
    'release_threshold',
    'total_triggers',
    'total_files',
    'total_duration_seconds',
    'total_duration_hours',
    'global_max_score',
    'triggers_per_hour'
]

# Write to CSV
with open(OUTPUT_CSV_PATH, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=headers)
    writer.writeheader()
    writer.writerows(results)

print(f"Results saved to: {OUTPUT_CSV_PATH}")
print(f"Total rows: {len(results)}")

## 10. Display Results Summary

In [None]:
import pandas as pd

# Load and display results
df = pd.read_csv(OUTPUT_CSV_PATH)
print("\nRESULTS SUMMARY")
print("="*80)
print(df.to_string(index=False))
print("="*80)

# Find optimal threshold (minimum false positives)
min_triggers_idx = df['total_triggers'].idxmin()
optimal_threshold = df.loc[min_triggers_idx, 'threshold']
min_triggers = df.loc[min_triggers_idx, 'total_triggers']

print(f"\nOptimal threshold (min false positives): {optimal_threshold}")
print(f"False positives at optimal: {min_triggers}")
print(f"\nCSV saved to Google Drive: {OUTPUT_CSV_PATH}")