In [1]:
# ü•î MicroWakeWord Trainer ‚Äî Tater Totterson Edition
# ==================================================
# Welcome, friend! üëã This notebook will help you train your very own wake word model.
# Think of it like teaching Tater Totterson to recognize when you say a special word.
#
# By the end, you'll have:
#   ‚úÖ A trained Ten sorFlow Lite model ready for on-device detection.
#   ‚úÖ A matching JSON manifest you can drop straight into ESPHome.
#
# This flow is optimized for Python 3.10 and NVIDIA GPUs (but should work elsewhere too).
# You can customize the wake word, play with training parameters, and experiment with
# different datasets until you get something that feels just right. üí™
#
# ‚ö° Quick Tips:
#   ‚Ä¢ Change TARGET_WORD below to whatever you want your wake word to be.
#   ‚Ä¢ Rerun the notebook from the top if you change it (to regenerate everything).
#   ‚Ä¢ Expect to experiment ‚Äî tweaking hyperparameters is part of the fun!
#
# When you‚Äôre done, you‚Äôll get two files:
#   1Ô∏è‚É£ <wakeword>.tflite ‚Äî your trained model.
#   2Ô∏è‚É£ <wakeword>.json ‚Äî a manifest for ESPHome integration.
#
# More info & examples:
# üîó https://github.com/TaterTotterson/microWakeWord-Trainer-Nvidia-Docker

# --- Set your wake word here ---
TARGET_WORD = "mila"  # üó£Ô∏è Change this to whatever phrase you want!
print(f"ü•î Tater Totterson is listening for: '{TARGET_WORD}'")

ü•î Tater Totterson is listening for: 'mila'


In [2]:
import platform
import sys
import os

# mac-only helper deps
if platform.system() == "Darwin":
    !"{sys.executable}" -m pip install 'git+https://github.com/puddly/pymicro-features@puddly/minimum-cpp-version' --root-user-action=ignore

!"{sys.executable}" -m pip install 'git+https://github.com/whatsnowplaying/audio-metadata@d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f' --root-user-action=ignore

# üëá use the actual location in the container
repo_path = "/data/microWakeWord"

if not os.path.exists(repo_path):
    print("‚¨áÔ∏è Cloning microWakeWord repository to /data‚Ä¶")
    !git clone https://github.com/TaterTotterson/micro-wake-word.git {repo_path}

# optional: pin to a commit
# !cd /data/microWakeWord && git checkout ac6502bf48b5e372c47ed509f5f5ca181e6d50bb

if os.path.exists(repo_path):
    print("üì¶ Installing microWakeWord...")
    !"{sys.executable}" -m pip install -e {repo_path} --root-user-action=ignore
else:
    print(f"‚ùå Repository not found at {repo_path}. Clone might have failed.")

Collecting git+https://github.com/whatsnowplaying/audio-metadata@d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f
  Cloning https://github.com/whatsnowplaying/audio-metadata (to revision d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f) to /tmp/pip-req-build-bwv5jk22
  Running command git clone --filter=blob:none --quiet https://github.com/whatsnowplaying/audio-metadata /tmp/pip-req-build-bwv5jk22
  Running command git rev-parse -q --verify 'sha^d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f'
  Running command git fetch -q https://github.com/whatsnowplaying/audio-metadata d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f
  Running command git checkout -q d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f
  Resolved https://github.com/whatsnowplaying/audio-metadata to commit d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f
done
done5h  Getting requirements to build wheel ... [?25l
done5h  Preparing metadata (pyproject.toml) ... [?25l
üì¶ Installing microWakeWord...
Obtaining file:///data/microWakeWord
doneuild dependencies ... [?2

In [3]:
# --- GPU Check (Torch + ONNX Runtime) ---

import torch
import onnxruntime as ort

print("üîß Torch CUDA Available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("  ‚Ä¢ Device count:", torch.cuda.device_count())
    print("  ‚Ä¢ Current device:", torch.cuda.current_device())
    print("  ‚Ä¢ Device name:", torch.cuda.get_device_name(torch.cuda.current_device()))
else:
    print("‚ö†Ô∏è  Torch cannot see a GPU ‚Äî check Docker runtime (--gpus all) and nvidia-container-toolkit")

print("\nüîß ONNX Runtime Providers:")
try:
    providers = ort.get_available_providers()
    print("  ‚Ä¢", providers)
    if "CUDAExecutionProvider" not in providers:
        print("‚ö†Ô∏è  CUDAExecutionProvider not available ‚Äî ONNX will fall back to CPU.")
except Exception as e:
    print("‚ö†Ô∏è  Could not query ONNX Runtime providers:", e)


üîß Torch CUDA Available: True
  ‚Ä¢ Device count: 1
  ‚Ä¢ Current device: 0
  ‚Ä¢ Device name: NVIDIA GeForce RTX 4080

üîß ONNX Runtime Providers:
  ‚Ä¢ ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']


In [4]:
# Generate a large number of wake word samples for training (with length-scale sweep)
import sys, subprocess
from pathlib import Path

REPO_DIR = Path.cwd() / "piper-sample-generator"
MODELS_DIR = REPO_DIR / "models"
MODEL_NAME = "en_US-libritts_r-medium.pt"
MODEL_NAME = "ru_RU-denis-medium.onnx" 
MODEL_NAME = "ru_RU-irina-medium.onnx"

MAX_SAMPLES = 0
BATCH_SIZE = 100

# Piper "speed" control via piper-sample-generator is length_scale(s)
LENGTH_SCALES = ["0.85", "0.95", "1.00", "1.05", "1.15"]

cmd = [
    sys.executable,
    str(REPO_DIR / "generate_samples.py"),
    TARGET_WORD,
    "--model", str(MODELS_DIR / MODEL_NAME),
    "--max-samples", str(MAX_SAMPLES),
    "--batch-size", str(BATCH_SIZE),
    "--output-dir", "generated_samples",
    "--length-scales", *LENGTH_SCALES,
]

print("‚Üí", " ".join(cmd))
subprocess.run(cmd, check=True)

‚Üí /usr/bin/python3 /data/piper-sample-generator/generate_samples.py mila --model /data/piper-sample-generator/models/ru_RU-irina-medium.onnx --max-samples 0 --batch-size 100 --output-dir generated_samples --length-scales 0.85 0.95 1.00 1.05 1.15


DEBUG:__main__:Loading ['/data/piper-sample-generator/models/ru_RU-irina-medium.onnx']
DEBUG:piper.voice:Guessing voice config path: /data/piper-sample-generator/models/ru_RU-irina-medium.onnx.json
DEBUG:piper.voice:Using CUDA
[0;93m2026-01-08 07:28:46.526729303 [W:onnxruntime:, transformer_memcpy.cc:111 ApplyImpl] 28 Memcpy nodes are added to the graph torch_jit for CUDAExecutionProvider. It might have negative impact on performance (including unable to run CUDA graph). Set session_options.log_severity_level=1 to see the detail logs before this message.[m
[0;93m2026-01-08 07:28:46.539973140 [W:onnxruntime:, session_state.cc:1316 VerifyEachNodeIsAssignedToAnEp] Some nodes were not assigned to the preferred execution providers which may or may not have an negative impact on performance. e.g. ORT explicitly assigns shape related ops to CPU to improve perf.[m
[0;93m2026-01-08 07:28:46.540009959 [W:onnxruntime:, session_state.cc:1318 VerifyEachNodeIsAssignedToAnEp] Rerunning with verb

CompletedProcess(args=['/usr/bin/python3', '/data/piper-sample-generator/generate_samples.py', 'mila', '--model', '/data/piper-sample-generator/models/ru_RU-irina-medium.onnx', '--max-samples', '0', '--batch-size', '100', '--output-dir', 'generated_samples', '--length-scales', '0.85', '0.95', '1.00', '1.05', '1.15'], returncode=0)

In [5]:
# NVIDIA Linux Docker: generate 1 sample of the target word (robust + CUDA check)

import os, sys, shutil, subprocess, time, platform
from pathlib import Path
from IPython.display import Audio, display

REPO_URL = "https://github.com/rhasspy/piper-sample-generator"
REPO_DIR = Path.cwd() / "piper-sample-generator"
MODELS_DIR = REPO_DIR / "models"
MODEL_NAME = "en_US-libritts_r-medium.pt"
MODEL_NAME = "ru_RU-denis-medium.onnx" 
MODEL_NAME = "ru_RU-irina-medium.onnx"
MODEL_URL  = f"https://github.com/rhasspy/piper-sample-generator/releases/download/v2.0.0/{MODEL_NAME}"
AUDIO_OUT_DIR = Path.cwd() / "generated_samples"
AUDIO_PATH = AUDIO_OUT_DIR / "0.wav"

def run(cmd, check=True):
    print("‚Üí", " ".join(cmd))
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    for line in proc.stdout:
        print(line, end="")
    rc = proc.wait()
    if check and rc != 0:
        raise RuntimeError(f"Command failed with exit code {rc}: {' '.join(cmd)}")
    return rc

def pip_install(*pkgs):
    run([sys.executable, "-m", "pip", "install", "--upgrade", "pip"], check=False)
    run([sys.executable, "-m", "pip", "install", *pkgs])

def safe_clone(repo_url, branch=None, dest=REPO_DIR, retries=2):
    if dest.exists() and not (dest / ".git").exists():
        print("‚ö†Ô∏è  Found partial clone. Removing‚Ä¶")
        shutil.rmtree(dest, ignore_errors=True)
    if not dest.exists():
        for i in range(retries + 1):
            try:
                cmd = ["git", "clone", "--depth", "1", repo_url, str(dest)]
                if branch:
                    cmd = ["git", "clone", "--depth", "1", "--branch", branch, repo_url, str(dest)]
                run(cmd)
                break
            except Exception as e:
                if i == retries:
                    raise
                print(f"Clone failed ({i+1}/{retries+1}). Retrying in 2s‚Ä¶ [{e}]")
                time.sleep(2)

def ensure_model():
    MODELS_DIR.mkdir(parents=True, exist_ok=True)
    mp = MODELS_DIR / MODEL_NAME
    if not mp.exists() or mp.stat().st_size == 0:
        import urllib.request
        print(f"Downloading model to {mp} ‚Ä¶")
        with urllib.request.urlopen(MODEL_URL) as r, open(mp, "wb") as f:
            shutil.copyfileobj(r, f)
        if mp.stat().st_size < 100 * 1024:
            raise RuntimeError("Downloaded model looks too small; download may have failed.")
    print(f"‚úÖ Model ready: {mp}")

# 1) Clone main repo (Linux/NVIDIA)
print("Linux/NVIDIA detected ‚Äî using main piper-sample-generator repo.")
safe_clone(REPO_URL)

# 2) Install deps
#   - piper-tts provides the `piper` module (required by generate_samples.py)
#   - piper-phonemize-cross does the phonemization
#   - onnxruntime-gpu enables CUDA (container must have NVIDIA runtime)
deps = [
    "piper-tts>=1.2.0",
    "piper-phonemize-cross==1.2.1",
    "soundfile",
    "numpy",
    "onnxruntime-gpu>=1.16.0",
]
pip_install(*deps)

# 3) Verify CUDA provider is available
try:
    import onnxruntime as ort
    providers = ort.get_available_providers()
    print(f"ONNX Runtime providers: {providers}")
    if "CUDAExecutionProvider" not in providers:
        print("‚ö†Ô∏è CUDAExecutionProvider not available. "
              "The sample will still run on CPU, but check your NVIDIA container setup "
              "(nvidia-container-toolkit, runtime, and driver).")
except Exception as e:
    print("‚ö†Ô∏è Could not import onnxruntime to verify providers:", e)

# 4) Ensure model present
ensure_model()

# 5) Generate one sample
AUDIO_OUT_DIR.mkdir(parents=True, exist_ok=True)
gen_script = REPO_DIR / "generate_samples.py"
if not gen_script.exists():
    raise FileNotFoundError(f"Missing generator: {gen_script}")

cmd = [
    sys.executable, str(gen_script),
    TARGET_WORD,
    "--model", str(MODELS_DIR / MODEL_NAME),  # ‚Üê pass the generator .pt explicitly
    "--max-samples", "1",
    "--batch-size", "1",
    "--output-dir", str(AUDIO_OUT_DIR),
]
run(cmd)

# 6) Play the audio (if the notebook frontend supports it)
if AUDIO_PATH.exists():
    print(f"üéß Playing {AUDIO_PATH}")
    display(Audio(str(AUDIO_PATH), autoplay=True))
else:
    print(f"Audio file not found at {AUDIO_PATH}")

Linux/NVIDIA detected ‚Äî using main piper-sample-generator repo.
‚Üí /usr/bin/python3 -m pip install --upgrade pip
‚Üí /usr/bin/python3 -m pip install piper-tts>=1.2.0 piper-phonemize-cross==1.2.1 soundfile numpy onnxruntime-gpu>=1.16.0
ONNX Runtime providers: ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']
‚úÖ Model ready: /data/piper-sample-generator/models/ru_RU-irina-medium.onnx
‚Üí /usr/bin/python3 /data/piper-sample-generator/generate_samples.py mila --model /data/piper-sample-generator/models/ru_RU-irina-medium.onnx --max-samples 1 --batch-size 1 --output-dir /data/generated_samples
DEBUG:__main__:Loading ['/data/piper-sample-generator/models/ru_RU-irina-medium.onnx']
DEBUG:piper.voice:Guessing voice config path: /data/piper-sample-generator/models/ru_RU-irina-medium.onnx.json
DEBUG:piper.voice:Using CUDA
[0;93m2026-01-08 07:28:53.463211282 [W:onnxruntime:, transformer_memcpy.cc:111 ApplyImpl] 28 Memcpy nodes are added to the graph torch_jit for 

In [6]:
# NVIDIA/Linux dataset prep to match the Apple behavior, but with pinned AudioSet
# MIT RIR -> resample to 16 kHz
# AudioSet -> fetch from a working HF revision, convert to 16 kHz mono, skip bad
# FMA -> resample to 16 kHz mono

import os, sys, subprocess, scipy.io.wavfile, numpy as np
from pathlib import Path
from tqdm import tqdm
import soundfile as sf
import librosa
from datasets import load_dataset

# -------------------------------------------------
# small shell helpers (for curl/tar probing)
# -------------------------------------------------
def sh(cmd: str) -> int:
    return subprocess.call(cmd, shell=True)

def curl(url: str, out: Path) -> int:
    # -L follow, -s silent, --fail to get nonzero on 404
    return subprocess.call(f"curl -L -s --fail '{url}' -o '{out}'", shell=True)

def write_wav(dst: Path, data: np.ndarray, sr: int):
    x = np.clip(data, -1.0, 1.0)
    scipy.io.wavfile.write(dst, sr, (x * 32767).astype(np.int16))

# -----------------------------
# MIT RIR (resample to 16 kHz)
# -----------------------------
print("=== MIT RIR ===")
rir_out = Path("mit_rirs")
rir_out.mkdir(exist_ok=True)
if not any(rir_out.rglob("*.wav")):
    ok = 0
    try:
        # Avoid datasets.Audio to keep TorchCodec out:
        # Use streaming=True + manual decode with librosa
        print("‚¨áÔ∏è MIT RIR (streaming + manual decode)‚Ä¶")
        ds = load_dataset(
            "davidscripka/MIT_environmental_impulse_responses",
            split="train",
            streaming=True
        )
        for i, row in enumerate(tqdm(ds)):
            try:
                audio_path = row["audio"]["path"]
                y, sr = librosa.load(audio_path, sr=16000, mono=True)
                write_wav(rir_out / f"rir_{i:04d}.wav", y, 16000)
                ok += 1
            except Exception:
                pass
        print(f"‚úÖ MIT RIR saved: {ok} files")
    except Exception as e:
        print(f"‚ö†Ô∏è MIT RIR download failed: {e}")
        # Fallback ZIP route
        try:
            print("‚¨áÔ∏è MIT RIR (fallback ZIP)‚Ä¶")
            zip_url = "https://mcdermottlab.mit.edu/Reverb/IRMAudio/Audio.zip"
            zip_path = rir_out.parent / "MIT_RIR_Audio.zip"
            if not zip_path.exists():
                os.system(f"wget -q -O '{zip_path}' '{zip_url}'")
            os.system(f'unzip -q -o "{zip_path}" -d "{rir_out}"')
            # Normalize to 16k mono
            for p in tqdm(list(rir_out.rglob("*.wav")), desc="Normalize MIT RIR"):
                a, sr = sf.read(p, always_2d=False)
                if a.ndim > 1:
                    a = a[:, 0]
                if sr != 16000:
                    a, _ = librosa.load(p, sr=16000, mono=True)
                write_wav(p, a, 16000)
            print("‚úÖ MIT RIR fallback complete")
        except Exception as e2:
            print(f"‚ùå MIT RIR fallback failed: {e2}")
else:
    print("‚úÖ mit_rirs exists; skipping.")

# ============================================================
# AudioSet (pinned FLAC .tar ‚Üí 16k mono, skip bad files)
# ============================================================
print("\n=== AudioSet subset (pinned FLAC .tar ‚Üí 16k mono) ===")
audioset_dir = Path("audioset"); audioset_dir.mkdir(exist_ok=True)
audioset_out = Path("audioset_16k"); audioset_out.mkdir(exist_ok=True)

if any(audioset_out.rglob("*.wav")):
    print("‚úÖ audioset_16k exists; skipping.")
else:
    # commits / refs we know about ‚Äî we‚Äôll probe them
    REV_CANDIDATES = [
        "6762f044d1c88619c7f2006486036192128fb07e",
        "0049167e89f259a010c3f070fe3666d9e5242836",
        "ceb9eaaa7844c9ad7351e659c84a572e376ad06d",
        "main",  # last resort
    ]
    # possible folder layouts
    TAR_PATTERNS = [
        "data/bal_train0{idx}.tar",
        "data/bal_train/bal_train0{idx}.tar",
    ]

    def find_working_rev():
        for rev in REV_CANDIDATES:
            for pat in TAR_PATTERNS:
                probe = f"https://huggingface.co/datasets/agkphysics/AudioSet/resolve/{rev}/{pat.format(idx=0)}"
                rc = sh(f"curl -I -L --fail -s '{probe}' > /dev/null")
                if rc == 0:
                    return rev, pat
        return None, None

    rev, pattern = find_working_rev()
    if rev is None:
        raise RuntimeError("Could not locate an AudioSet revision with FLAC tarballs still present on HF.")

    print(f"üìå Using AudioSet revision: {rev}")
    print(f"üóÇÔ∏è Tar layout pattern: {pattern}")

    # download + extract bal_train00..09
    for i in range(10):
        rel = pattern.format(idx=i)
        url = f"https://huggingface.co/datasets/agkphysics/AudioSet/resolve/{rev}/{rel}"
        fname = rel.split("/")[-1]
        out_tar = audioset_dir / fname
        
        # –ü—Ä–æ–≤–µ—Ä–∫–∞ –∏ –∑–∞–≥—Ä—É–∑–∫–∞ —Ñ–∞–π–ª–∞
        if not out_tar.exists():
            print(f"‚¨áÔ∏è {fname}")
            rc = curl(url, out_tar)
            if rc != 0:
                print(f"‚ö†Ô∏è Could not fetch {fname} at rev {rev}; continuing.")
                continue
        else:
            print(f"‚úÖ {fname} —É–∂–µ —Å–∫–∞—á–∞–Ω, –ø—Ä–æ–ø—É—Å–∫–∞–µ–º –∑–∞–≥—Ä—É–∑–∫—É")
            continue
        
        # –†–∞—Å–ø–∞–∫–æ–≤–∫–∞ —Ñ–∞–π–ª–∞
        print(f"üì¶ Extract {fname}")
        rc = sh(f"tar -xf '{out_tar}' -C '{audioset_dir}'")
        if rc != 0:
            print(f"‚ö†Ô∏è tar extract failed for {fname}; continuing.")


    # convert FLAC ‚Üí 16k mono WAV
    flacs = list(audioset_dir.rglob("*.flac"))
    print(f"üîé FLAC files: {len(flacs)}")
    audioset_bad = []
    ok = 0
    for p in tqdm(flacs, desc="AudioSet‚ÜíWAV (resample 16k mono)"):
        try:
            y, _ = librosa.load(p, sr=16000, mono=True)
            if y.size == 0:
                raise ValueError("empty audio")
            write_wav(audioset_out / (p.stem + ".wav"), y, 16000)
            ok += 1
        except Exception as e:
            audioset_bad.append(f"{p}:{e}")

    if audioset_bad:
        (audioset_out / "audioset_corrupted_files.log").write_text("\n".join(audioset_bad))
    print(f"‚úÖ AudioSet complete ({ok} ok, {len(audioset_bad)} failed)")

# -----------------------------
# FMA xsmall (resample to 16 kHz mono)
# -----------------------------
print("\n=== FMA xsmall ===")
fma_zip_dir = Path("fma"); fma_zip_dir.mkdir(exist_ok=True)
fma_out = Path("fma_16k"); fma_out.mkdir(exist_ok=True)

zipname = "fma_xs.zip"
zipurl  = f"https://huggingface.co/datasets/mchl914/fma_xsmall/resolve/main/{zipname}"
zipout  = fma_zip_dir / zipname
if not zipout.exists():
    os.system(f"wget -q -O '{zipout}' '{zipurl}'")
    os.system(f"cd fma && unzip -q '{zipname}'")

mp3s = list(Path("fma/fma_small").rglob("*.mp3"))
print(f"üéµ FMA mp3 count: {len(mp3s)}")
corrupt = []
for p in tqdm(mp3s, desc="FMA‚Üí16k WAV"):
    try:
        y, sr = librosa.load(p, sr=16000, mono=True)
        if y.size == 0:
            raise ValueError("empty audio")
        write_wav(fma_out / (p.stem + ".wav"), y, 16000)
    except Exception as e:
        corrupt.append(f"{p}:{e}")
if corrupt:
    Path("fma_corrupted_files.log").write_text("\n".join(corrupt))

print("\n‚úÖ Dataset prep complete!")

=== MIT RIR ===
‚úÖ mit_rirs exists; skipping.

=== AudioSet subset (pinned FLAC .tar ‚Üí 16k mono) ===
‚úÖ audioset_16k exists; skipping.

=== FMA xsmall ===
üéµ FMA mp3 count: 210


FMA‚Üí16k WAV: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 210/210 [01:42<00:00,  2.04it/s]


‚úÖ Dataset prep complete!





In [7]:
# Sets up the augmentations.
# To improve your model, experiment with these settings and use more sources of
# background clips.
import sys, os
from pathlib import Path

# try the common places we‚Äôve used
candidates = [
    "/data/microWakeWord",        # what the last install log showed
    "/data/microwakeword",        # lowercase variant
    "./microwakeword",            # local clone
    "./microWakeWord",            # camel case
]

for base in candidates:
    if os.path.isdir(base):
        # add the repo root
        sys.path.insert(0, base)
        # add the actual package dir inside the repo
        if os.path.isdir(os.path.join(base, "microwakeword")):
            sys.path.insert(0, os.path.join(base, "microwakeword"))
        break
from microwakeword.audio.augmentation import Augmentation
from microwakeword.audio.clips import Clips
from microwakeword.audio.spectrograms import SpectrogramGeneration

def validate_directories(paths):
    for path in paths:
        if not os.path.exists(path):
            print(f"Error: Directory {path} does not exist. Please ensure preprocessing is complete.")
            return False
    return True

# Paths to augmented data
impulse_paths = ['mit_rirs']
background_paths = ['fma_16k', 'audioset_16k']

if not validate_directories(impulse_paths + background_paths):
    raise ValueError("One or more required directories are missing.")

# Process TTS generated samples (default)
clips_tts = Clips(
    input_directory='./generated_samples',
    file_pattern='*.wav',
    max_clip_duration_s=5,
    remove_silence=True,
    random_split_seed=10,
    split_count=0.1,
)

# Process personal recordings if available (optional)
clips_personal = None
if os.path.exists("./personal_samples") and any(Path("./personal_samples").glob("*.wav")):
    clips_personal = Clips(
        input_directory="./personal_samples",
        file_pattern="*.wav",
        max_clip_duration_s=5,
        remove_silence=True,
        random_split_seed=10,
        split_count=0.1,
    )
    print("‚úÖ Found personal samples, will create separate feature set")

augmenter = Augmentation(
    augmentation_duration_s=3.2,
    augmentation_probabilities={
        "SevenBandParametricEQ": 0.1,
        "TanhDistortion": 0.05,
        "PitchShift": 0.15,
        "BandStopFilter": 0.1,
        "AddColorNoise": 0.1,
        "AddBackgroundNoise": 0.7,
        "Gain": 0.8,
        "RIR": 0.7,
    },
    impulse_paths=impulse_paths,
    background_paths=background_paths,
    background_min_snr_db=5,
    background_max_snr_db=10,
    min_jitter_s=0.2,
    max_jitter_s=0.3,
)


2026-01-08 07:30:39.683729: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2026-01-08 07:30:39.926010: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1767857440.016898      49 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1767857440.044443      49 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2026-01-08 07:30:40.280472: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instr

In [8]:
# Augment a random generated-sample WAV and play it back (pass ndarray to augmenter)
from pathlib import Path
from IPython.display import Audio, display
import numpy as np
import soundfile as sf
import librosa, random, glob

output_dir = Path("./augmented_clips")
output_dir.mkdir(exist_ok=True)

# 1) Pick a random WAV from the Piper outputs
candidates = glob.glob("generated_samples/*.wav")
if not candidates:
    raise SystemExit("No files in generated_samples/. Run the TTS sample cell first.")
src_path = random.choice(candidates)

# 2) Load as 16 kHz mono float32
y, sr = librosa.load(src_path, sr=16000, mono=True)
y = y.astype(np.float32, copy=False)

# 3) Augment ‚Äî microwakeword Augmentation expects a 1-D numpy array
try:
    y_aug = augmenter.augment_clip(y)
except Exception as e:
    # some versions accept (samples, sr) ‚Äî try that as a fallback
    try:
        y_aug = augmenter.augment_clip((y, sr))
    except Exception:
        raise

# 4) Save and play
out_path = output_dir / "augmented_clip.wav"
sf.write(str(out_path), y_aug.astype(np.float32, copy=False), sr, subtype="PCM_16")
print(f"Augmented clip saved to {out_path}")
display(Audio(str(out_path), autoplay=True))

Augmented clip saved to augmented_clips/augmented_clip.wav


In [None]:
# Augment samples and save the training, validation, and testing sets.
# This version avoids datasets.Audio entirely by driving Clips from local WAVs.

import os, glob, random
from pathlib import Path
import types
import numpy as np
import librosa
from mmap_ninja.ragged import RaggedMmap
from microwakeword.audio.spectrograms import SpectrogramGeneration
# Reload the augmentation module to get the fix
import importlib
import sys
if 'microwakeword.audio.augmentation' in sys.modules:
    del sys.modules['microwakeword.audio.augmentation']
from microwakeword.audio.augmentation import Augmentation


# ---- Patch: drive clips from generated_samples/*.wav (no datasets.Audio, no torchcodec) ----
def audio_generator_from_wavs(self, split="train", repeat=1, source_dir="generated_samples"):
    """
    Yield 1-D float32 arrays loaded via librosa from source_dir/*.wav.
    Deterministic 80/10/10 split with seed 10 to mirror original Clips behavior.
    """
    files = sorted(glob.glob(f"{source_dir}/*.wav"))
    if not files:
        raise SystemExit(f"‚ùå No WAVs in {source_dir}/. Generate samples first.")

    rng = random.Random(10)   # deterministic shuffling like Clips(random_split_seed=10)
    files_shuf = files[:]
    rng.shuffle(files_shuf)

    n = len(files_shuf)
    n_val = max(1, int(0.10 * n))
    n_test = max(1, int(0.10 * n))
    n_train = max(0, n - n_val - n_test)
    splits = {
        "train":      files_shuf[:n_train],
        "validation": files_shuf[n_train:n_train + n_val],
        "test":       files_shuf[n_train + n_val:],
    }
    file_list = splits.get(split, [])
    if not file_list:
        return  # nothing to yield

    for _ in range(max(1, int(repeat))):
        for p in file_list:
            y, sr = librosa.load(p, sr=16000, mono=True)
            yield y.astype(np.float32, copy=False)

# Bind the patched generator to clips_tts instance
def audio_generator_tts(self, split="train", repeat=1):
    return audio_generator_from_wavs(self, split, repeat, "generated_samples")

clips_tts.audio_generator = types.MethodType(audio_generator_tts, clips_tts)
print("‚úÖ Patched clips_tts.audio_generator to stream from generated_samples/*.wav (no torchcodec).")

# Bind the patched generator to clips_personal if it exists
if clips_personal is not None:
    def audio_generator_personal(self, split="train", repeat=1):
        return audio_generator_from_wavs(self, split, repeat, "personal_samples")
    clips_personal.audio_generator = types.MethodType(audio_generator_personal, clips_personal)
    print("‚úÖ Patched clips_personal.audio_generator to stream from personal_samples/*.wav (no torchcodec).")

# ---- Validate augmentation asset folders exist ----
def validate(paths):
    for p in paths:
        if not Path(p).exists():
            raise SystemExit(f"‚ùå Missing directory: {p}. Run dataset prep first.")

impulse_paths = ["mit_rirs"]
background_paths = ["fma_16k", "audioset_16k"]
validate(impulse_paths + background_paths)

# ---- Output root ----
out_root = Path("generated_augmented_features")
out_root.mkdir(exist_ok=True)

# ---- Split config (same as before) ----
split_cfg = {
    "training":   {"name": "train",      "repetition": 2, "slide_frames": 10},
    "validation": {"name": "validation", "repetition": 1, "slide_frames": 10},
    "testing":    {"name": "test",       "repetition": 1, "slide_frames": 1},
}

# ---- Generate features for TTS samples ----
for split, cfg in split_cfg.items():
    out_dir = out_root / split
    out_dir.mkdir(parents=True, exist_ok=True)
    print(f"üß™ Processing {split} (TTS) ‚Ä¶")

    spectros = SpectrogramGeneration(
        clips=clips_tts,             # now backed by our WAV loader
        augmenter=augmenter,         # your existing augmenter
        slide_frames=cfg["slide_frames"],
        step_ms=10,
    )

    RaggedMmap.from_generator(
        out_dir=str(out_dir / "wakeword_mmap"),
        sample_generator=spectros.spectrogram_generator(
            split=cfg["name"], repeat=cfg["repetition"]
        ),
        batch_size=100,
        verbose=True,
    )

# ---- Generate features for personal samples if available ----
if clips_personal is not None:
    out_root_personal = Path("personal_augmented_features")
    out_root_personal.mkdir(exist_ok=True)
    for split, cfg in split_cfg.items():
        out_dir = out_root_personal / split
        out_dir.mkdir(parents=True, exist_ok=True)
        print(f"üß™ Processing {split} (personal) ‚Ä¶")
        spectros = SpectrogramGeneration(
            clips=clips_personal,
            augmenter=augmenter,
            slide_frames=cfg["slide_frames"],
            step_ms=10,
        )
        RaggedMmap.from_generator(
            out_dir=str(out_dir / "wakeword_mmap"),
            sample_generator=spectros.spectrogram_generator(split=cfg["name"], repeat=cfg["repetition"]),
            batch_size=100,
            verbose=True,
        )

print("‚úÖ Features ready (generated_augmented_features/*/wakeword_mmap)")

‚úÖ Patched clips_tts.audio_generator to stream from generated_samples/*.wav (no torchcodec).
üß™ Processing training (TTS) ‚Ä¶


0it [00:00, ?it/s]

In [None]:
# Downloads pre-generated spectrogram features (made for microWakeWord in
# particular) for various negative datasets. This can be slow!

import os
import requests
import zipfile
from pathlib import Path
from tqdm import tqdm

# Function to download a file with progress bar
def download_file(url, output_path):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    with open(output_path, "wb") as f, tqdm(
        desc=f"Downloading {output_path.name}",
        total=total_size,
        unit="B",
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        for chunk in response.iter_content(chunk_size=1024):
            f.write(chunk)
            bar.update(len(chunk))
    print(f"Downloaded: {output_path}")

# Function to extract ZIP files
def extract_zip(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    print(f"Extracted: {zip_path} to {extract_to}")

# Directory for negative datasets
output_dir = Path('./negative_datasets')
output_dir.mkdir(exist_ok=True)

# Negative dataset URLs
link_root = "https://huggingface.co/datasets/kahrendt/microwakeword/resolve/main/"
filenames = ['dinner_party.zip', 'dinner_party_eval.zip', 'no_speech.zip', 'speech.zip']

# Download and extract files
for fname in filenames:
    link = link_root + fname
    zip_path = output_dir / fname

    # Check if extracted directory already exists
    extracted_dir = output_dir / fname.replace('.zip', '')
    if extracted_dir.exists() and any(extracted_dir.iterdir()):
        print(f"Directory {extracted_dir} already contains files. Skipping {fname}.")
        continue
        
    # Download only if the file doesn't already exist
    if not zip_path.exists():
        try:
            download_file(link, zip_path)
        except Exception as e:
            print(f"Error downloading {fname}: {e}")
            continue

    # Extract the ZIP file
    try:
        extract_zip(zip_path, output_dir)
    except Exception as e:
        print(f"Error extracting {fname}: {e}")


In [None]:
# --- Save a yaml config that controls the training process ---

import os, sys, yaml
from pathlib import Path

config = {}

config["window_step_ms"] = 10
config["train_dir"] = "trained_models/wakeword"

config["features"] = [
    {"features_dir":"generated_augmented_features","sampling_weight":2.0,"penalty_weight":1.0,"truth":True,"truncation_strategy":"truncate_start","type":"mmap"},
    {"features_dir":"negative_datasets/speech","sampling_weight":12.0,"penalty_weight":1.0,"truth":False,"truncation_strategy":"random","type":"mmap"},
    {"features_dir":"negative_datasets/dinner_party","sampling_weight":12.0,"penalty_weight":1.0,"truth":False,"truncation_strategy":"random","type":"mmap"},
    {"features_dir":"negative_datasets/no_speech","sampling_weight":5.0,"penalty_weight":1.0,"truth":False,"truncation_strategy":"random","type":"mmap"},
    {"features_dir":"negative_datasets/dinner_party_eval","sampling_weight":0.0,"penalty_weight":1.0,"truth":False,"truncation_strategy":"split","type":"mmap"},
]

# Add personal features if they exist
if os.path.exists("personal_augmented_features/training"):
    config["features"].insert(1, {"features_dir": "personal_augmented_features", "sampling_weight": 3.0, "penalty_weight": 1.0, "truth": True, "truncation_strategy": "truncate_start", "type": "mmap"})
    print("‚úÖ Added personal features with higher weight (3.0)")

config["training_steps"] = [40000]
config["positive_class_weight"] = [1]
config["negative_class_weight"] = [20]
config["learning_rates"] = [0.001]

# Smaller batch to avoid GPU copy/alloc failures on 3070 laptop VRAM
config["batch_size"] = 16

# SpecAugment off (as before)
config["time_mask_max_size"] = [0]
config["time_mask_count"] = [0]
config["freq_mask_max_size"] = [0]
config["freq_mask_count"] = [0]

config["eval_step_interval"] = 500
config["clip_duration_ms"] = 1500
config["target_minimization"] = 0.9
config["minimization_metric"] = None
config["maximization_metric"] = "average_viable_recall"

with open("training_parameters.yaml", "w") as f:
    yaml.dump(config, f)

print("‚úÖ Wrote training_parameters.yaml (batch_size=16)")

In [None]:
# Train + export with GPU first, then automatic CPU fallback on GPU/VRAM errors
# (LIVE streaming output + full log capture for error detection)
# NOTE: Suppress ONLY the noisy "Validation Batch #..." lines (everything else still streams)
import os, sys, subprocess, textwrap

# ---- Common TF env (applies to BOTH attempts) ----
base_env = os.environ.copy()
base_env.setdefault("TF_CPP_MIN_LOG_LEVEL", "2")
base_env.setdefault("TF_XLA_FLAGS", "--tf_xla_auto_jit=0")          # disable XLA JIT (more stable mem)
base_env.setdefault("NVIDIA_TF32_OVERRIDE", "1")                    # allow TF32 (perf/VRAM win on Ampere+)

# These only matter when a GPU is visible:
base_env.setdefault("TF_FORCE_GPU_ALLOW_GROWTH", "true")
base_env.setdefault("TF_GPU_ALLOCATOR", "cuda_malloc_async")
# Optional (uncomment if you want a smaller cuDNN workspace):
# base_env.setdefault("TF_CUDNN_WORKSPACE_LIMIT_IN_MB", "256")

# ---- Training argv (same as your runpy args) ----
train_args = [
    "-m", "microwakeword.model_train_eval",
    "--training_config", "training_parameters.yaml",
    "--train", "1",
    "--restore_checkpoint", "1",
    "--test_tf_nonstreaming", "0",
    "--test_tflite_nonstreaming", "0",
    "--test_tflite_nonstreaming_quantized", "0",
    "--test_tflite_streaming", "0",
    "--test_tflite_streaming_quantized", "1",
    "--use_weights", "best_weights",
    "mixednet",
    "--pointwise_filters", "64,64,64,64",
    "--repeat_in_block", "1,1,1,1",
    "--mixconv_kernel_sizes", "[5], [7,11], [9,15], [23]",
    "--residual_connection", "0,0,0,0",
    "--first_conv_filters", "32",
    "--first_conv_kernel_size", "5",
    "--stride", "2",
]

OOM_MARKERS = (
    "resourceexhaustederror",
    "resource exhausted",
    "oom",
    "out of memory",
    "cuda_error_out_of_memory",
    "cudnn",
    "failed to allocate",
    "blas xgemm",
    "cublas",
    "internalerror: cuda",
    "failed call to cuinit",
)

class RunResult:
    def __init__(self, returncode: int, stdout: str):
        self.returncode = returncode
        self.stdout = stdout

def run_training(label: str, extra_env: dict) -> RunResult:
    env = base_env.copy()
    env.update(extra_env or {})

    print(f"\nüöÄ {label}")
    print("‚Üí", " ".join([sys.executable] + train_args))

    proc = subprocess.Popen(
        [sys.executable] + train_args,
        env=env,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        bufsize=1,                    # line-buffered (best effort)
        universal_newlines=True,
    )

    full_log = []
    try:
        # Stream lines live AND capture them for OOM detection / error messages
        assert proc.stdout is not None
        for line in proc.stdout:
            full_log.append(line)

            # Hide ONLY the per-minibatch validation spam
            if line.startswith("Validation Batch #"):
                continue

            # Everything else streams live
            print(line, end="")
    finally:
        returncode = proc.wait()

    return RunResult(returncode, "".join(full_log))

# Attempt 1: GPU (normal visibility)
cp = run_training(
    "Attempt 1/2: GPU training (with allow_growth + cuda_malloc_async)",
    extra_env={},  # no override
)

if cp.returncode == 0:
    print("‚úÖ Training and testing complete (GPU path).")
else:
    out_l = (cp.stdout or "").lower()
    looks_like_gpu_oom = any(m in out_l for m in OOM_MARKERS)

    if looks_like_gpu_oom:
        # Attempt 2: CPU fallback (hide GPUs completely)
        cp2 = run_training(
            "Attempt 2/2: CPU fallback (GPU hidden via CUDA_VISIBLE_DEVICES='')",
            extra_env={
                "CUDA_VISIBLE_DEVICES": "",   # hard-disable GPU
                # (Optional) makes TF less chatty about GPU init on some builds:
                "TF_CPP_MIN_LOG_LEVEL": "2",
            },
        )
        if cp2.returncode == 0:
            print("‚úÖ Training and testing complete (CPU fallback).")
        else:
            raise RuntimeError(
                "Training failed on BOTH GPU and CPU.\n\n"
                + textwrap.indent(cp2.stdout or "(no output)", prefix="  ")
            )
    else:
        # Not an OOM-style failure: surface the original error
        raise RuntimeError(
            "Training failed (does not look like a VRAM/OOM issue).\n\n"
            + textwrap.indent(cp.stdout or "(no output)", prefix="  ")
        )

In [None]:
import shutil
import json
from IPython.display import display, HTML

# Use the wake word from Cell 3
wake_word = TARGET_WORD

# --- Copy TFLite file to working dir with wake word name ---
source_path = "trained_models/wakeword/tflite_stream_state_internal_quant/stream_state_internal_quant.tflite"
tflite_filename = f"{wake_word}.tflite"
tflite_path = f"./{tflite_filename}"
shutil.copy(source_path, tflite_path)

# --- Write JSON metadata file with matching model name ---
json_data = {
    "type": "micro",
    "wake_word": wake_word,
    "author": "Tater Totterson",
    "website": "https://github.com/TaterTotterson/microWakeWord-Trainer-Nvidia-Docker.git",
    "model": tflite_filename,
    "trained_languages": ["en"],
    "version": 2,
    "micro": {
        "probability_cutoff": 0.97,
        "sliding_window_size": 5,
        "feature_step_size": 10,
        "tensor_arena_size": 30000,
        "minimum_esphome_version": "2024.7.0"
    }
}
json_filename = f"{wake_word}.json"
json_path = f"./{json_filename}"
with open(json_path, "w") as json_file:
    json.dump(json_data, json_file, indent=2)

# --- Display nice download links ---
html = f"""
<h3>Download your files:</h3>
<ul>
  <li><a href="{tflite_filename}" download>‚¨áÔ∏è Download Model ({tflite_filename})</a></li>
  <li><a href="{json_filename}" download>‚¨áÔ∏è Download Metadata ({json_filename})</a></li>
</ul>
"""
display(HTML(html))