# Training a microWakeWord Model for ESP32-S3-BOX3

This notebook guides you through training a wake word detection model for Malayalam (‡¥∞‡¥æ‡¥ò‡¥µ‡¥æ).

## üìã Prerequisites

- **Python**: 3.10 (required)
- **GPU**: T4 or better (recommended for faster training)
- **Disk Space**: ~15GB free
- **RAM**: 16GB+ recommended
- **Time**: 2-4 hours total

## üîÑ Workflow Overview

1. **Setup** (5 min): Install dependencies and validate environment
2. **TTS Setup** (10 min): Download Piper TTS and voice models
3. **Sample Generation** (20-30 min): Generate 1000 wake word samples
4. **Augmentation Data** (30-60 min): Download background audio datasets
5. **Feature Generation** (20-30 min): Create training spectrograms
6. **Training** (1-2 hours): Train the model
7. **Export** (5 min): Export TFLite model for ESP32

## ‚ö†Ô∏è Important Notes

- Run cells **in order** from top to bottom
- Don't skip cells unless explicitly marked optional
- Large downloads will show progress bars
- Training can be resumed if interrupted

## üéØ Expected Output

A quantized TFLite model file (~200-500KB) ready for deployment to ESP32-S3-BOX3.

---
## Step 1: Environment Setup & Validation

In [None]:
# Install microWakeWord and dependencies
import sys
import platform
print("üîß Installing dependencies...")
print(f"Python version: {sys.version}")
# Check Python version
if sys.version_info[:2] != (3, 10):
    print("‚ö†Ô∏è  WARNING: Python 3.10 is recommended. Current version may cause issues.")
# Install audio-metadata from fork (fixes attrs dependency)
!pip install -q 'git+https://github.com/whatsnowplaying/audio-metadata@d4ebb238e6a401bb1a5aaaac60c9e2b3cb30929f'
# Clone and install microWakeWord
import os
if not os.path.exists('./microWakeWord'):
    print("üì• Cloning microWakeWord...")
    !git clone https://github.com/kahrendt/microWakeWord
else:
    print("‚úÖ microWakeWord directory already exists")
# Always install/reinstall to ensure it's available
print("üì¶ Installing microWakeWord package...")
!pip install -e ./microWakeWord
# Verify installation
print("\nüîç Verifying installation...")
try:
    import microwakeword
    print("‚úÖ microwakeword module successfully imported!")
except ImportError as e:
    print(f"‚ùå ERROR: Could not import microwakeword: {e}")
print("\n‚úÖ Dependencies installation complete!")

In [None]:
# Validate environment and check resources
import shutil
import torch

print("üîç Environment Validation\n" + "="*50)

# Check disk space
total, used, free = shutil.disk_usage(".")
free_gb = free // (2**30)
print(f"üíæ Free disk space: {free_gb} GB")
if free_gb < 15:
    print("‚ö†Ô∏è  WARNING: Less than 15GB free. You may run out of space.")
else:
    print("‚úÖ Sufficient disk space")

# Check GPU
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    print(f"\nüéÆ GPU: {gpu_name}")
    print(f"   CUDA version: {torch.version.cuda}")
    print("‚úÖ GPU acceleration available")
else:
    print("\n‚ö†Ô∏è  No GPU detected. Training will be VERY slow.")
    print("   Consider using Google Colab with GPU runtime.")

print("\n" + "="*50)
print("‚úÖ Environment validation complete!")

---
## Step 2: Configuration

**All configuration in one place** - modify these values as needed.

In [None]:
# ============================================================================
# CONFIGURATION - Modify these values as needed
# ============================================================================

# Wake word configuration
TARGET_WORD = "‡¥∞‡¥æ‡¥ò‡¥µ‡¥æ"  # Malayalam wake word

# TTS Models (Malayalam voices)
TTS_MODELS = {
    "meera": {
        "name": "ml_IN-meera-medium",
        "path": "ml/ml_IN/meera/medium"
    },
    "arjun": {
        "name": "ml_IN-arjun-medium",
        "path": "ml/ml_IN/arjun/medium"
    }
}

# Sample generation settings
SAMPLES_PER_MODEL = 25  # Total: 1000 samples (500 per voice)
MAX_WORKERS = 8  # Parallel workers for sample generation (adjust for your CPU)

# TTS variation parameters
LENGTH_SCALES = [1.2, 1.3, 1.4, 1.5, 1.6]  # Speech speed variations
NOISE_SCALES = [0.5, 0.667, 0.8, 1.0]  # Voice variation

# Augmentation settings
AUGMENTATION_DURATION_S = 3.2
BACKGROUND_MIN_SNR_DB = -5
BACKGROUND_MAX_SNR_DB = 10

# Training settings
TRAINING_STEPS = 10000  # Increase for better quality (but longer training)
BATCH_SIZE = 128  # Adjust based on GPU memory (T4 can handle 128)
LEARNING_RATE = 0.001

# Directory structure
DIRS = {
    "piper": "./piper_standalone",
    "models": "./models",
    "samples": "./generated_samples",
    "mit_rirs": "./mit_rirs",
    "esc50": "./esc50_16k",
    "fma": "./fma_16k",
    "features": "./generated_augmented_features",
    "negative": "./negative_datasets",
    "trained": "./trained_models/wakeword"
}

print("‚úÖ Configuration loaded")
print(f"\nüìù Target word: {TARGET_WORD}")
print(f"üé§ TTS models: {', '.join(TTS_MODELS.keys())}")
print(f"üìä Total samples to generate: {SAMPLES_PER_MODEL * len(TTS_MODELS)}")
print(f"üîß Parallel workers: {MAX_WORKERS}")
print(f"üéØ Training steps: {TRAINING_STEPS}")

---
## Step 3: Directory Setup

In [None]:
# Create all required directories
import os

print("üìÅ Creating directory structure...\n")

for name, path in DIRS.items():
    os.makedirs(path, exist_ok=True)
    print(f"‚úÖ {name:12} ‚Üí {path}")

# Create subdirectories for features
for split in ["training", "validation", "testing"]:
    os.makedirs(os.path.join(DIRS["features"], split), exist_ok=True)

print("\n‚úÖ Directory structure created!")

---
## Step 4: Download Piper TTS

In [None]:
import os
import shutil

# 1. Clean up existing broken installations
if os.path.exists("piper_standalone"):
    print("üóëÔ∏è Removing existing piper_standalone to ensure a clean install...")
    shutil.rmtree("piper_standalone")

# 2. Download and Extract
print("üì• Downloading Piper TTS...")
!wget -q https://github.com/rhasspy/piper/releases/download/v1.2.0/piper_amd64.tar.gz
!tar -xf piper_amd64.tar.gz

# 3. Rename the extracted folder 'piper' to 'piper_standalone'
if os.path.exists("piper"):
    os.rename("piper", "piper_standalone")
    print("‚úÖ Piper folder renamed to piper_standalone")
else:
    print("‚ùå Error: Extracted folder 'piper' not found!")

# 4. Clean up the archive
if os.path.exists("piper_amd64.tar.gz"):
    os.remove("piper_amd64.tar.gz")

# 5. Verify and set permissions
piper_exe = "./piper_standalone/piper"
if os.path.exists(piper_exe):
    os.chmod(piper_exe, 0o755)
    print(f"‚ú® SUCCESS: Piper is ready at {piper_exe}")
    !{piper_exe} --version
else:
    print("üîç Binary not found in expected spot. Searching...")
    !find . -name "piper" -type f

---
## Step 5: Download TTS Voice Models

In [None]:
# Download Malayalam voice models
import os

print("üì• Downloading TTS voice models...\n")

for voice_id, config in TTS_MODELS.items():
    model_name = config["name"]
    model_path = config["path"]

    onnx_file = f"{DIRS['models']}/{model_name}.onnx"
    json_file = f"{DIRS['models']}/{model_name}.onnx.json"

    if os.path.exists(onnx_file) and os.path.exists(json_file):
        print(f"‚úÖ {voice_id:6} model already exists")
    else:
        print(f"üì• Downloading {voice_id} model...")
        !wget -q -L -O {onnx_file} "https://huggingface.co/rhasspy/piper-voices/resolve/main/{model_path}/{model_name}.onnx"
        !wget -q -L -O {json_file} "https://huggingface.co/rhasspy/piper-voices/resolve/main/{model_path}/{model_name}.onnx.json"
        print(f"‚úÖ {voice_id} model downloaded")

print("\n‚úÖ All TTS models ready!")

---
## Step 6: Generate Wake Word Samples

This will generate 1000 samples (500 per voice) with variations in speed and tone.

In [None]:
# Generate wake word samples in parallel
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from tqdm.auto import tqdm

# Piper executable path
piper_exe = "./piper_standalone/piper.exe" if os.name == 'nt' else "./piper_standalone/piper"

# Progress tracking
progress_lock = Lock()
progress_counters = {}
failed_samples = []

def generate_sample(model_name, model_path, sample_idx, output_dir):
    """Generate a single wake word sample"""
    length = LENGTH_SCALES[sample_idx % len(LENGTH_SCALES)]
    noise = NOISE_SCALES[sample_idx % len(NOISE_SCALES)]
    output_file = f"{output_dir}/{sample_idx}.wav"

    try:
        cmd = f'echo {TARGET_WORD} | "{piper_exe}" --model "{model_path}" --length_scale {length} --noise_scale {noise} --output_file "{output_file}"'

        result = subprocess.run(
            cmd,
            shell=True,
            capture_output=True,
            text=True,
            timeout=30
        )

        if result.returncode == 0:
            return True, None
        else:
            return False, f"Sample {sample_idx}: {result.stderr[:100]}"

    except Exception as e:
        return False, f"Sample {sample_idx}: {str(e)[:100]}"

# Generate samples for each model
print(f"üéôÔ∏è  Generating {SAMPLES_PER_MODEL * len(TTS_MODELS)} wake word samples...\n")

for voice_id, config in TTS_MODELS.items():
    model_name = config["name"]
    model_path = f"{DIRS['models']}/{model_name}.onnx"
    output_dir = f"{DIRS['samples']}/{voice_id}"

    os.makedirs(output_dir, exist_ok=True)

    print(f"üé§ Generating {SAMPLES_PER_MODEL} samples for {voice_id}...")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = []

        for i in range(SAMPLES_PER_MODEL):
            future = executor.submit(
                generate_sample,
                voice_id,
                model_path,
                i,
                output_dir
            )
            futures.append(future)

        # Progress bar
        success_count = 0
        with tqdm(total=SAMPLES_PER_MODEL, desc=f"{voice_id}") as pbar:
            for future in as_completed(futures):
                success, error = future.result()
                if success:
                    success_count += 1
                else:
                    failed_samples.append((voice_id, error))
                pbar.update(1)

        print(f"‚úÖ {voice_id}: {success_count}/{SAMPLES_PER_MODEL} samples generated\n")

# Summary
total_expected = SAMPLES_PER_MODEL * len(TTS_MODELS)
total_failed = len(failed_samples)
total_success = total_expected - total_failed

print("="*60)
print(f"‚úÖ Sample generation complete!")
print(f"   Success: {total_success}/{total_expected}")
if total_failed > 0:
    print(f"   Failed: {total_failed}")
    print(f"\n‚ö†Ô∏è  First 5 errors:")
    for voice, error in failed_samples[:5]:
        print(f"   [{voice}] {error}")
print("="*60)

In [None]:
# Validate generated samples
from pathlib import Path

print("üîç Validating generated samples...\n")

for voice_id in TTS_MODELS.keys():
    sample_dir = Path(DIRS["samples"]) / voice_id
    wav_files = list(sample_dir.glob("*.wav"))
    print(f"‚úÖ {voice_id:6}: {len(wav_files)} WAV files")

total_samples = len(list(Path(DIRS["samples"]).rglob("*.wav")))
print(f"\nüìä Total samples: {total_samples}")

if total_samples < 100:
    print("\n‚ö†Ô∏è  WARNING: Very few samples generated. Training may not work well.")
    print("   Consider regenerating samples or checking for errors above.")

---
## Step 7: Download Augmentation Data

**This step downloads background audio for data augmentation:**
- MIT RIR: Room impulse responses (~500 files, ~50MB)
- ESC-50: Environmental sounds (50 files, ~100MB)
- FMA: Music dataset (OPTIONAL, 1000 files, ~2GB)

**Total download: ~150MB (or ~2GB with FMA)**

In [None]:
# Download MIT RIR dataset from direct source
import os
import urllib.request
import zipfile
from pathlib import Path
from tqdm.auto import tqdm
import numpy as np  # ADDED: Missing import

if not os.path.exists(DIRS["mit_rirs"]) or len(list(Path(DIRS["mit_rirs"]).glob("*.wav"))) == 0:
    print("üì• Downloading MIT RIR dataset from direct source...")

    try:
        # Download from MIT's direct link
        url = "https://mcdermottlab.mit.edu/Reverb/IRMAudio/Audio.zip"
        zip_path = "mit_rir.zip"

        print("   Downloading...")
        urllib.request.urlretrieve(url, zip_path)

        print("   Extracting...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall("mit_rir_temp")

        # Convert to 16kHz WAV files
        import librosa
        import scipy.io.wavfile

        os.makedirs(DIRS["mit_rirs"], exist_ok=True)
        audio_files = list(Path("mit_rir_temp").rglob("*.wav"))[:500]

        count = 0
        for audio_file in tqdm(audio_files, desc="Converting to 16kHz"):
            try:
                audio, sr = librosa.load(str(audio_file), sr=16000, mono=True)
                output_name = f"rir_{count:04d}.wav"
                scipy.io.wavfile.write(
                    os.path.join(DIRS["mit_rirs"], output_name),
                    16000,
                    (audio * 32767).astype(np.int16)
                )
                count += 1
            except:
                continue

        # Cleanup
        import shutil
        shutil.rmtree("mit_rir_temp")
        os.remove(zip_path)

        print(f"‚úÖ MIT RIR: {count} files downloaded")
    except Exception as e:
        print(f"‚ö†Ô∏è  MIT RIR download failed: {e}")
        print("   Continuing without MIT RIR data...")
else:
    rir_count = len(list(Path(DIRS["mit_rirs"]).glob("*.wav")))
    print(f"‚úÖ MIT RIR already exists ({rir_count} files)")

In [None]:
# Download ESC-50 environmental sounds
import os
import urllib.request
import zipfile
from pathlib import Path
from tqdm.auto import tqdm
import numpy as np  # ADDED: Missing import

if not os.path.exists(DIRS["esc50"]) or len(list(Path(DIRS["esc50"]).glob("*.wav"))) == 0:
    print("üì• Downloading ESC-50 environmental sounds...")

    try:
        # Download
        url = "https://github.com/karolpiczak/ESC-50/archive/master.zip"
        zip_path = "esc50.zip"

        print("   Downloading...")
        urllib.request.urlretrieve(url, zip_path)

        # Extract
        print("   Extracting...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(".")

        os.rename("ESC-50-master", "esc50_temp")

        # Convert to 16kHz
        import librosa
        import scipy.io.wavfile

        os.makedirs(DIRS["esc50"], exist_ok=True)
        wav_files = list(Path("esc50_temp/audio").glob("*.wav"))

        count = 0
        for wav_file in tqdm(wav_files, desc="Converting to 16kHz"):
            try:
                audio, sr = librosa.load(str(wav_file), sr=16000, mono=True)
                scipy.io.wavfile.write(
                    os.path.join(DIRS["esc50"], wav_file.name),
                    16000,
                    (audio * 32767).astype(np.int16)
                )
                count += 1
            except:
                continue

        # Cleanup
        import shutil
        shutil.rmtree("esc50_temp")
        os.remove(zip_path)

        print(f"‚úÖ ESC-50: {count} files downloaded")
    except Exception as e:
        print(f"‚ö†Ô∏è  ESC-50 download failed: {e}")
else:
    esc_count = len(list(Path(DIRS["esc50"]).glob("*.wav")))
    print(f"‚úÖ ESC-50 already exists ({esc_count} files)")

In [None]:
# OPTIONAL: Download FMA music dataset (WARNING: ~2GB download)
# Uncomment the code below if you want to include music in augmentation

DOWNLOAD_FMA = False  # Set to True to download FMA

if DOWNLOAD_FMA:
    import os
    from pathlib import Path

    if not os.path.exists(DIRS["fma"]) or len(list(Path(DIRS["fma"]).glob("*.wav"))) == 0:
        print("üì• Downloading FMA dataset (WARNING: ~2GB, this will take time)...")

        # Download and process FMA
        # (Implementation similar to ESC-50 but with larger dataset)
        print("   This is a large download. Consider skipping if not needed.")
    else:
        fma_count = len(list(Path(DIRS["fma"]).glob("*.wav")))
        print(f"‚úÖ FMA already exists ({fma_count} files)")
else:
    print("‚è≠Ô∏è  Skipping FMA download (set DOWNLOAD_FMA=True to include)")

In [None]:
# Summary of augmentation data
from pathlib import Path

print("üìä Augmentation Data Summary\n" + "="*50)

mit_count = len(list(Path(DIRS["mit_rirs"]).glob('*.wav'))) if os.path.exists(DIRS["mit_rirs"]) else 0
esc50_count = len(list(Path(DIRS["esc50"]).glob('*.wav'))) if os.path.exists(DIRS["esc50"]) else 0
fma_count = len(list(Path(DIRS["fma"]).glob('*.wav'))) if os.path.exists(DIRS["fma"]) else 0

print(f"  MIT RIRs:        {mit_count:4} files")
print(f"  ESC-50:          {esc50_count:4} files")
print(f"  FMA:             {fma_count:4} files")
print(f"  {'‚îÄ'*46}")
print(f"  Total:           {mit_count + esc50_count + fma_count:4} files")

if mit_count + esc50_count + fma_count == 0:
    print("\n‚ö†Ô∏è  WARNING: No augmentation data downloaded!")
    print("   Training may not work well without background audio.")
else:
    print("\n‚úÖ Augmentation data ready!")

---
## Step 8: Setup Augmentation Pipeline

In [None]:
# Setup audio clips and augmentation pipeline
from microwakeword.audio.augmentation import Augmentation
from microwakeword.audio.clips import Clips
from microwakeword.audio.spectrograms import SpectrogramGeneration
import os
from pathlib import Path

print("üîß Setting up augmentation pipeline...\n")

# Install torchcodec if needed
try:
    import torchcodec
    print("‚úÖ torchcodec already installed\n")
except ImportError:
    print("üì¶ Installing torchcodec for audio decoding...")
    !pip install -q torchcodec
    print("‚úÖ torchcodec installed")
    print("\n‚ö†Ô∏è  IMPORTANT: Please restart the kernel and re-run from this cell!")
    print("   (Runtime ‚Üí Restart runtime, then re-run this cell)\n")
    raise SystemExit("Kernel restart required after torchcodec installation")

# Verify samples exist
samples_dir = DIRS["samples"]
total_samples = len(list(Path(samples_dir).rglob("*.wav")))
print(f"üìä Found {total_samples} WAV files in {samples_dir}\n")

if total_samples == 0:
    print("‚ùå ERROR: No samples found! Re-run Step 6")
else:
    # Setup clips from generated samples
    print("Loading clips...")
    clips = Clips(
        input_directory=DIRS["samples"],
        file_pattern='**/*.wav',
        max_clip_duration_s=None,
        remove_silence=False,
        random_split_seed=10,
        split_count=0.1,
    )

    print(f"‚úÖ Clips loaded: {len(clips.clips)} total")

    # Handle both dict and object-based clips
    try:
        train_count = len([c for c in clips.clips if c.get('split') == 'train' or c.get('split') == 0])
        val_count = len([c for c in clips.clips if c.get('split') == 'validation' or c.get('split') == 1])
        test_count = len([c for c in clips.clips if c.get('split') == 'test' or c.get('split') == 2])
    except:
        train_count = len([c for c in clips.clips if c.split == 'train'])
        val_count = len([c for c in clips.clips if c.split == 'validation'])
        test_count = len([c for c in clips.clips if c.split == 'test'])

    print(f"   Train: {train_count}")
    print(f"   Validation: {val_count}")
    print(f"   Test: {test_count}\n")

    # Determine background paths
    background_paths = []
    if os.path.exists(DIRS["esc50"]) and len(list(Path(DIRS["esc50"]).glob("*.wav"))) > 0:
        background_paths.append(DIRS["esc50"])
    if os.path.exists(DIRS["fma"]) and len(list(Path(DIRS["fma"]).glob("*.wav"))) > 0:
        background_paths.append(DIRS["fma"])

    # Determine impulse paths
    impulse_paths = []
    if os.path.exists(DIRS["mit_rirs"]) and len(list(Path(DIRS["mit_rirs"]).glob("*.wav"))) > 0:
        impulse_paths.append(DIRS["mit_rirs"])

    print(f"üìÅ Background audio: {', '.join(background_paths) if background_paths else 'None'}")
    print(f"üìÅ Impulse responses: {', '.join(impulse_paths) if impulse_paths else 'None'}")

    if not background_paths and not impulse_paths:
        print("\n‚ö†Ô∏è  WARNING: No augmentation data available!")
        print("   Training will proceed with limited augmentation.")

    # Setup augmentation
    augmenter = Augmentation(
        augmentation_duration_s=AUGMENTATION_DURATION_S,
        augmentation_probabilities={
            "SevenBandParametricEQ": 0.1,
            "TanhDistortion": 0.1,
            "PitchShift": 0.1,
            "BandStopFilter": 0.1,
            "AddColorNoise": 0.1,
            "AddBackgroundNoise": 0.75 if background_paths else 0.0,
            "Gain": 1.0,
            "RIR": 0.5 if impulse_paths else 0.0,
        },
        impulse_paths=impulse_paths if impulse_paths else None,
        background_paths=background_paths if background_paths else None,
        background_min_snr_db=BACKGROUND_MIN_SNR_DB,
        background_max_snr_db=BACKGROUND_MAX_SNR_DB,
        min_jitter_s=0.195,
        max_jitter_s=0.205,
    )
    print("‚úÖ Augmentation configured")

    print("\n‚úÖ Augmentation pipeline ready!")

---
## Step 9: Generate Training Features

This creates spectrogram features for training, validation, and testing.

In [None]:
# Generate augmented spectrogram features
from mmap_ninja.ragged import RaggedMmap
import os
from pathlib import Path
print("üé® Generating training features...\n")
# Verify clips exist
print(f"üìä Clips summary:")
print(f"   Total clips: {len(clips.clips)}")
# Count by split
try:
    train_clips = [c for c in clips.clips if c.get('split') in ['train', 0]]
    val_clips = [c for c in clips.clips if c.get('split') in ['validation', 1]]
    test_clips = [c for c in clips.clips if c.get('split') in ['test', 2]]
except:
    train_clips = [c for c in clips.clips if c.split == 'train']
    val_clips = [c for c in clips.clips if c.split == 'validation']
    test_clips = [c for c in clips.clips if c.split == 'test']
print(f"   Train clips: {len(train_clips)}")
print(f"   Validation clips: {len(val_clips)}")
print(f"   Test clips: {len(test_clips)}\n")
if len(clips.clips) == 0:
    print("‚ùå ERROR: No clips found! Check Step 8 output")
else:
    splits_config = [
        {"name": "training", "split": "train", "repetition": 2, "slide_frames": 10},
        {"name": "validation", "split": "validation", "repetition": 1, "slide_frames": 10},
        {"name": "testing", "split": "test", "repetition": 1, "slide_frames": 1},
    ]

    for config in splits_config:
        split_name = config["name"]
        out_dir = os.path.join(DIRS["features"], split_name)

        print(f"üìä Generating {split_name} features...")

        try:
            spectrograms = SpectrogramGeneration(
                clips=clips,
                augmenter=augmenter,
                slide_frames=config["slide_frames"],
                step_ms=10,
            )

            RaggedMmap.from_generator(
                out_dir=os.path.join(out_dir, 'wakeword_mmap'),
                sample_generator=spectrograms.spectrogram_generator(
                    split=config["split"],
                    repeat=config["repetition"]
                ),
                batch_size=100,
                verbose=True,
            )

            print(f"‚úÖ {split_name} features complete\n")

        except Exception as e:
            print(f"‚ùå ERROR in {split_name}: {e}\n")

    print("‚úÖ All training features generated!")

## Verify Features
This step verifies that the features are setup

In [None]:
# Verify generated features
from pathlib import Path
import os

print("üîç Verifying generated features...\n")

for split in ["training", "validation", "testing"]:
    mmap_dir = os.path.join(DIRS["features"], split, "wakeword_mmap")

    if os.path.exists(mmap_dir):
        files = list(Path(mmap_dir).glob("*"))
        print(f"‚úÖ {split:12}: {len(files)} files in {mmap_dir}")
    else:
        print(f"‚ùå {split:12}: Directory not found: {mmap_dir}")

print("\n" + "="*60)
total_features = len(list(Path(DIRS["features"]).rglob("*")))
print(f"Total feature files: {total_features}")

if total_features < 10:
    print("\n‚ö†Ô∏è  WARNING: Very few feature files generated!")
    print("   Training may fail. Check Step 9 output for errors.")

---
## Step 10: Download Negative Datasets

In [None]:
# Download pre-generated negative datasets
import os

if not os.path.exists(DIRS["negative"]) or len(os.listdir(DIRS["negative"])) == 0:
    print("üì• Downloading negative datasets...\n")

    link_root = "https://huggingface.co/datasets/kahrendt/microwakeword/resolve/main/"
    filenames = ['dinner_party.zip', 'dinner_party_eval.zip', 'no_speech.zip', 'speech.zip']

    for fname in filenames:
        link = link_root + fname
        zip_path = f"{DIRS['negative']}/{fname}"

        print(f"üì• Downloading {fname}...")
        !wget -q -O {zip_path} {link}
        !unzip -q {zip_path} -d {DIRS['negative']}
        os.remove(zip_path)
        print(f"‚úÖ {fname} extracted")

    print("\n‚úÖ Negative datasets downloaded!")
else:
    print("‚úÖ Negative datasets already exist")

---
## Step 11: Training Configuration

In [None]:
# Generate training configuration YAML
import yaml
import os
print("üìù Generating training configuration...\n")
config = {
    "window_step_ms": 10,
    "train_dir": DIRS["trained"],

    "features": [
        {
            "features_dir": os.path.join(DIRS["features"], "training", "wakeword_mmap"),
            "sampling_weight": 2.0,
            "penalty_weight": 1.0,
            "truth": True,
            "truncation_strategy": "truncate_start",
            "type": "mmap",
        },
        {
            "features_dir": f"{DIRS['negative']}/speech",
            "sampling_weight": 10.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            "features_dir": f"{DIRS['negative']}/dinner_party",
            "sampling_weight": 10.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            "features_dir": f"{DIRS['negative']}/no_speech",
            "sampling_weight": 5.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "random",
            "type": "mmap",
        },
        {
            "features_dir": f"{DIRS['negative']}/dinner_party_eval",
            "sampling_weight": 0.0,
            "penalty_weight": 1.0,
            "truth": False,
            "truncation_strategy": "split",
            "type": "mmap",
        },
    ],

    "training_steps": [TRAINING_STEPS],
    "positive_class_weight": [1],
    "negative_class_weight": [20],
    "learning_rates": [LEARNING_RATE],
    "batch_size": BATCH_SIZE,

    "time_mask_max_size": [0],
    "time_mask_count": [0],
    "freq_mask_max_size": [0],
    "freq_mask_count": [0],

    "eval_step_interval": 500,
    "clip_duration_ms": 1500,

    "target_minimization": 0.9,
    "minimization_metric": None,
    "maximization_metric": "average_viable_recall",
}
# Save configuration
config_path = "training_parameters.yaml"
with open(config_path, "w") as file:
    yaml.dump(config, file)
print(f"‚úÖ Configuration saved to {config_path}")
print(f"\nüìä Settings:")
print(f"   Steps: {TRAINING_STEPS}")
print(f"   Batch size: {BATCH_SIZE}")
print(f"   Learning rate: {LEARNING_RATE}")
print(f"\nüìÅ Paths:")
print(f"   Wake word: {os.path.join(DIRS['features'], 'training', 'wakeword_mmap')}")
print(f"   Negative: {DIRS['negative']}")

---
## Step 12: Train Model

**This will take 1-2 hours on T4 GPU.**

The training will:
- Save checkpoints every 500 steps
- Evaluate on validation set
- Select best model based on recall
- Quantize and convert to TFLite

In [None]:
# Train the model
print("üöÄ Starting model training...\n")
print("‚è±Ô∏è  Expected time: 1-2 hours on T4 GPU")
print("üí° You can monitor progress in the output below\n")
print("="*60)

!python -m microwakeword.model_train_eval \
--training_config='training_parameters.yaml' \
--train 1 \
--restore_checkpoint 1 \
--test_tf_nonstreaming 0 \
--test_tflite_nonstreaming 0 \
--test_tflite_nonstreaming_quantized 0 \
--test_tflite_streaming 0 \
--test_tflite_streaming_quantized 1 \
--use_weights "best_weights" \
mixednet \
--pointwise_filters "64,64,64,64" \
--repeat_in_block  "1, 1, 1, 1" \
--mixconv_kernel_sizes '[5], [7,11], [9,15], [23]' \
--residual_connection "0,0,0,0" \
--first_conv_filters 32 \
--first_conv_kernel_size 5 \
--stride 3

print("\n" + "="*60)
print("‚úÖ Training complete!")

---
## Step 13: Export Model

The trained model is ready for deployment!

In [None]:
# Verify and export the model
import os

model_path = f"{DIRS['trained']}/tflite_stream_state_internal_quant/stream_state_internal_quant.tflite"

if os.path.exists(model_path):
    model_size = os.path.getsize(model_path) / 1024  # KB
    print("‚úÖ Model exported successfully!\n")
    print(f"üìÅ Model location: {model_path}")
    print(f"üìä Model size: {model_size:.1f} KB")

    # Download for Google Colab
    try:
        from google.colab import files
        print("\nüì• Downloading model file...")
        files.download(model_path)
        print("‚úÖ Model downloaded!")
    except:
        print("\nüí° Running locally - model saved to disk")
        print(f"   Copy from: {model_path}")
else:
    print("‚ùå ERROR: Model file not found!")
    print(f"   Expected at: {model_path}")
    print("   Check training output for errors.")

---
## üéâ Next Steps

### 1. Create Model Manifest JSON

Create a JSON file for ESPHome (e.g., `raghava.json`):

```json
{
  "type": "micro",
  "wake_word": "raghava",
  "author": "Your Name",
  "website": "https://github.com/yourusername/your-repo",
  "model": "stream_state_internal_quant.tflite",
  "version": 1,
  "micro": {
    "probability_cutoff": 0.5,
    "sliding_window_average_size": 10
  }
}
```

### 2. Test the Model

Before deploying to ESP32, test with audio files to verify it works.

### 3. Deploy to ESP32-S3-BOX3

1. Copy both files to your ESPHome config directory:
   - `stream_state_internal_quant.tflite`
   - `raghava.json`

2. Update your ESPHome YAML:
```yaml
micro_wake_word:
  models:
    - model: raghava.json
```

3. Flash to ESP32-S3-BOX3

### 4. Tune Performance

Adjust `probability_cutoff` in the JSON:
- **Too many false positives**: Increase cutoff (e.g., 0.6, 0.7)
- **Doesn't detect wake word**: Decrease cutoff (e.g., 0.4, 0.3)

### 5. Improve Model (if needed)

If the model doesn't work well:
- Generate more samples (increase `SAMPLES_PER_MODEL`)
- Train longer (increase `TRAINING_STEPS`)
- Adjust augmentation parameters
- Record real voice samples and add to training data

---

## üìö Resources

- [ESPHome microWakeWord Documentation](https://esphome.io/components/micro_wake_word)
- [Model Repository Examples](https://github.com/esphome/micro-wake-word-models/tree/main/models/v2)
- [microWakeWord GitHub](https://github.com/kahrendt/microWakeWord)

---

**Congratulations! You've successfully trained a wake word model! üéâ**