# Audrey - The First Speech Recognition System

Motivation - we need a large dataset with variations of speech that we used for traning and eventual speech recognition

In [None]:
# Let's create our own speech dataset! This work with numbers first

# Recording audio with sounddevice and soundfile
# 
# https://python-soundfile.readthedocs.io/
# https://python-sounddevice.readthedocs.io


import sounddevice as sd
import soundfile as sf
import numpy as np
import time

def record_audio(filename: str, duration: int):

    # config
    samplerate = 44100
    duration = duration
    channels = 1

    print(f"Recording for {duration} seconds at {samplerate} Hz...")

    # record audio from the microphone into a numpy array with sounddevice
    recording = sd.rec(
        int(duration * samplerate),
        samplerate=samplerate,
        channels=channels,
        dtype='float32'
    )
    sd.wait()

    print(f"Recording finished. Saving to {filename}...")

    # save the file with soundfile
    sf.write(
        filename,
        recording,
        samplerate,
        subtype='PCM_16'
        )

    print(f"File '{filename}' saved successfully.")

In [None]:
# record yourself saying the digits 0-9

record_audio(
    filename='unprocessed/0.wav', # rename the file to 1.wav, 2.wav, 3.wav, etc.
    duration=2)

In [None]:
# let's create some variations of your voice through data augmentation!

import numpy as np
import librosa

def noise(data, noise_amt=0.035):
    noise_amp = noise_amt*np.random.uniform()*np.amax(data)
    data = data + noise_amp * np.random.normal(size=data.shape[0])
    return data

def stretch(data, rate=0.8):
    return librosa.effects.time_stretch(data, rate=rate)

def shift(data):
    shift_range = int(np.random.uniform(low=-5, high = 5) * 1000)
    return np.roll(data, shift_range)

def pitch(data, sampling_rate, n_steps=2):
    return librosa.effects.pitch_shift(data, sr=sampling_rate, n_steps=n_steps)

In [None]:
# take our recorded digits, and augment them to create a larger dataset

import os
import glob
import subprocess
import librosa
import numpy as np
import soundfile as sf
from tqdm.notebook import tqdm

# get all files in the 'unprocessed' directory (only .wav files)
files = glob.glob('unprocessed/*.wav')
print(files)

for file in tqdm(files):
    # get the digit from the file name
    digit = file.split('/')[-1].split('.')[0]
    
    # create the directory if it doesn't exist
    os.makedirs(f'processed/{digit}', exist_ok=True)
    # load file with sf
    audio, sample_rate = sf.read(file)

    for i in tqdm(range(1000)):
        
        processed_audio = noise(audio, np.random.uniform(0.001, 0.01))
        processed_audio = stretch(processed_audio, rate=np.random.uniform(0.8, 1.2))
        processed_audio = shift(processed_audio)
        processed_audio = pitch(processed_audio, sample_rate, n_steps=np.random.randint(-3, 3))

        sf.write(f'processed/{digit}/{digit}_{i}.wav', processed_audio, sample_rate)

In [None]:
# get all of the fies in speech_digits with glob
import glob

files = glob.glob('processed/*/*')

print(len(files))
print(files[:5])


In [None]:
# display spectogramand audio player for all files in jo_digits/unprocessed
from IPython.display import Audio, display
import librosa
import matplotlib.pyplot as plt
import numpy as np
import os


# Define the directory for digit 0
#digit_dir = 'jo_digits/unprocessed'
digit_dir = 'processed'


# Get all files in the digit directory
files = glob.glob('processed/*/*')
print(files)

# Display spectogram and audio player for each file
for file in files[:5]:
    # Load the audio file
    y, sr = librosa.load(file, sr=None)
    
    # Display the spectogram
    plt.figure(figsize=(10, 4))
    librosa.display.waveshow(y, sr=sr)
    plt.title(f'Waveform for {os.path.basename(file)}')
    plt.show()
    
    # Display the spectogram
    plt.figure(figsize=(10, 4))
    D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
    
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='log')
    plt.colorbar(format='%+2.0f dB')
    plt.title(f'Spectrogram for {os.path.basename(file)}')
    plt.show()


    display(Audio(file))

In [None]:
# First pass: preprocess audio files so that they are all the same length

import glob
import librosa
import soundfile as sf
import numpy as np
from tqdm import tqdm

files = glob.glob('processed/*/*')
print(f"Total files: {len(files)}")

audio_data = []
labels = [] # here is where we create our labels
longest_audio_file_length = 0

# First pass: load data and find longest audio file
for f in tqdm(files):
    try:
        audio, sample_rate = librosa.load(f)
        if len(audio) == 0:
            print(f"Warning: Empty audio file: {f}")
            continue
        labels.append(int(f.split('/')[-2]))  # Adjust this based on your file structure
        longest_audio_file_length = max(longest_audio_file_length, len(audio))
    except Exception as e:
        print(f"Error processing file {f}: {str(e)}")

print(f"Longest audio size: {longest_audio_file_length}")

In [None]:
# Second pass: Pad audio files and resave them
for f in tqdm(files):
    try:
        audio, sample_rate = librosa.load(f)
        if len(audio) == 0:
            print(f"Warning: Empty audio file: {f}")
            continue
        current_size = len(audio)
        pad_size = longest_audio_file_length - current_size
        left_pad = pad_size // 2
        right_pad = pad_size - left_pad
        padded_audio = np.pad(audio, (left_pad, right_pad), mode='constant')
        sf.write(f, padded_audio, sample_rate)
    except Exception as e:
        print(f"Error processing file {f}: {str(e)}")

In [None]:
# Third pass: Verify that all files have the same size
file_sizes = []
for f in tqdm(files):
    try:
        audio, _ = librosa.load(f)
        file_sizes.append(len(audio))
    except Exception as e:
        print(f"Error processing file {f}: {str(e)}")

if len(set(file_sizes)) == 1:
    print(f"All files have the same size: {file_sizes[0]} samples")
else:
    print("Warning: Not all files have the same size")
    print(f"Unique file sizes: {set(file_sizes)}")
    print(f"Min size: {min(file_sizes)}, Max size: {max(file_sizes)}")

In [None]:
print(files[1000:1005])
print(labels[1000:1005])


In [None]:
import torch as t
from torchaudio import transforms
import torchaudio
import random
from torch.utils.data import Dataset, DataLoader

class AudioDataset(Dataset):
    def __init__(self, file_paths, labels, transforms=transforms.MelSpectrogram()):
        self.file_paths = file_paths
        self.labels = labels
        self.transforms = transforms

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):

        audio_path = self.file_paths[idx]

        waveform, _ = torchaudio.load(audio_path)

        # ensure its mono
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0).unsqueeze(0)

        # apply transforms
        if self.transforms:
            spec = self.transforms(waveform)
        return spec, self.labels[idx]


# Create datasets
full_dataset = AudioDataset(files, labels, transforms=transforms.MelSpectrogram())

train_size = int(0.7 * len(full_dataset))
validation_size = int(0.2 * len(full_dataset))
test_size = int(0.1 * len(full_dataset))

print(train_size)
print(validation_size)
print(test_size)

train_dataset, validation_dataset, test_dataset  = t.utils.data.random_split(full_dataset, [train_size, validation_size, test_size])


# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=True)

print(f"Number of training batches: {len(train_loader)}")
print(f"Number of validation batches: {len(validation_loader)}")

In [None]:
import matplotlib.pyplot as plt

# see a batch
for batch in train_loader:
    inputs, targets = batch
    print(inputs.shape)
    print(inputs[0][0].shape)
    print(targets)
    break


mel_freq_bins = inputs[0][0].shape[0]
time_steps = inputs[0][0].shape[1]

print("mel freq bins: ", mel_freq_bins)
print("time steps: ", time_steps)


In [None]:
# train with a simple Multi-Layer Perceptron (MLP) - Fully-Connected Neural Network

device = t.device('cuda' if t.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model = t.nn.Sequential(
    t.nn.Flatten(),
    t.nn.Linear(mel_freq_bins*time_steps, 512), # 128 mel bins, 366 time steps
    t.nn.ReLU(),
    t.nn.Linear(512, 512),
    t.nn.ReLU(),
    t.nn.Linear(512, 10),
    t.nn.Softmax(dim=1)
)

# train our model
device = t.device('cuda' if t.cuda.is_available() else 'cpu')

model.to(device)

loss_fn = t.nn.CrossEntropyLoss()
optimizer = t.optim.Adam(model.parameters(), lr=0.001)

epochs = 10

print(f"Training for {epochs} epochs")
for epoch in tqdm(range(epochs)):
    model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")

# evaluate our model

model.eval()

correct = 0
total = 0

with t.no_grad():
    for batch in validation_loader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)
        _, predicted = t.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

print(f"Accuracy of the model on the test set: {100 * correct / total}%")

In [None]:
# train with conv net
import torch as t
import torch.nn as nn

device = t.device('cuda' if t.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

class ConvModel(nn.Module):
    def __init__(self, mel_freq_bins, time_steps, num_classes=10):
        super(ConvModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Calculate the size of the flattened features (using explicit parameters)
        self.flat_features = 128 * (mel_freq_bins // 8) * (time_steps // 8)
        
        self.fc1 = nn.Linear(self.flat_features, 512)
        self.relu4 = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        # Input shape: (batch_size, 1, 128, 366)
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))
        x = x.view(-1, self.flat_features) # rewrite this line with einops / ARENA
        x = self.relu4(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
# Initialize the model (passing mel_freq_bins and time_steps explicitly)

conv_model = ConvModel(mel_freq_bins=mel_freq_bins, time_steps=time_steps)
print(conv_model)

In [None]:

# Train the model

conv_model = conv_model.to(device)

loss_fn = t.nn.CrossEntropyLoss()
optimizer = t.optim.Adam(conv_model.parameters(), lr=0.001)

epochs = 15

loss_history = []

print(f"Training for {epochs} epochs")
for epoch in tqdm(range(epochs)):
    conv_model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = conv_model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item()}")

    loss_history.append(avg_loss)

print("finished training")

# plot the loss
plt.plot(loss_history)
plt.show()

# evaluate our model

conv_model.eval()
correct = 0
total = 0

# plot the accuracy
accuracy_history = []

with t.no_grad():
    for batch in validation_loader:
        inputs, targets = batch
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = conv_model(inputs)
        _, predicted = t.max(outputs.data, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()
        accuracy_history.append(100 * correct / total)

plt.plot(accuracy_history)
plt.show()

print(f"Accuracy of the conv model on the test set: {100 * correct / total}%")





In [None]:
# save the model with today's datetime
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# Create config dict with all parameters needed for inference
config = {
    # Model architecture parameters
    "mel_freq_bins": mel_freq_bins,
    "time_steps": time_steps,
    "num_classes": 10,
    
    # Audio preprocessing parameters (needed to recreate the same spectrogram shape)
    "sample_rate": 22050,  # librosa default
    "longest_audio_file_length": longest_audio_file_length,  # in samples
    
    # MelSpectrogram params (torchaudio defaults)
    "n_mels": 128,
    "n_fft": 400,
    "hop_length": 512,
}

# Extract the TRUE test set file paths (files the model never saw during training!)
# test_dataset is a Subset, so we get the original indices and map to file paths
test_file_paths = [
    test_dataset.dataset.file_paths[idx] 
    for idx in test_dataset.indices
]
test_file_labels = [
    test_dataset.dataset.labels[idx] 
    for idx in test_dataset.indices
]

print(f"Saving {len(test_file_paths)} test set file paths (held out from training)")

# Save config, model weights, AND test set info
checkpoint = {
    "config": config,
    "model_state_dict": conv_model.state_dict(),
    "test_file_paths": test_file_paths,
    "test_file_labels": test_file_labels,
}

#make dir called model_weights
os.makedirs('model_weights', exist_ok=True) 
saved_model_path = f'model_weights/audrey_model_weights_{timestamp}.pth'

t.save(checkpoint, saved_model_path)
print(f"Saved model checkpoint to: {saved_model_path}")
print(f"Config: {config}")

In [None]:
# ============================================================
# INFERENCE ONLY - Run this cell after restart to load model
# ============================================================
# This cell is self-contained and can be run after clearing 
# all variables or restarting the notebook kernel.

import torch as t
import torch.nn as nn
from torchaudio import transforms
import torchaudio

# 1. Define the model architecture (must match training)
class ConvModel(nn.Module):
    def __init__(self, mel_freq_bins, time_steps, num_classes=10):
        super(ConvModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.flat_features = 128 * (mel_freq_bins // 8) * (time_steps // 8)
        
        self.fc1 = nn.Linear(self.flat_features, 512)
        self.relu4 = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.pool1(self.relu1(self.conv1(x)))
        x = self.pool2(self.relu2(self.conv2(x)))
        x = self.pool3(self.relu3(self.conv3(x)))
        x = x.view(-1, self.flat_features)
        x = self.relu4(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# 2. Load checkpoint (contains both config and weights)
device = t.device('cuda' if t.cuda.is_available() else 'cpu')
saved_model_path = 'model_weights/audrey_model_weights_2026-02-01_18-00-30.pth' # MAKE SURE TO UPDATE THIS PATH!

checkpoint = t.load(saved_model_path, map_location=device)
config = checkpoint['config']

print(f"Loaded config: {config}")

# Load the true test set (files the model never saw during training)
test_file_paths = checkpoint.get('test_file_paths', [])
test_file_labels = checkpoint.get('test_file_labels', [])
print(f"Loaded {len(test_file_paths)} held-out test files")

# 3. Initialize model with saved config and load weights
model = ConvModel(
    mel_freq_bins=config['mel_freq_bins'],
    time_steps=config['time_steps'],
    num_classes=config['num_classes']
)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

# 4. Define preprocessing function
def preprocess_audio(audio_path, config):
    """Load and preprocess audio to match training data shape."""
    waveform, sr = torchaudio.load(audio_path)
    
    # Resample to match training sample rate (librosa default is 22050)
    target_sr = config['sample_rate']  # 22050
    if sr != target_sr:
        resampler = transforms.Resample(orig_freq=sr, new_freq=target_sr)
        waveform = resampler(waveform)
    
    # Ensure mono
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0).unsqueeze(0)
    
    # Pad or truncate to match training audio length
    target_length = config['longest_audio_file_length']
    current_length = waveform.shape[1]
    
    if current_length < target_length:
        # Pad (center padding like training)
        pad_size = target_length - current_length
        left_pad = pad_size // 2
        right_pad = pad_size - left_pad
        waveform = t.nn.functional.pad(waveform, (left_pad, right_pad))
    elif current_length > target_length:
        # Truncate (center crop)
        start = (current_length - target_length) // 2
        waveform = waveform[:, start:start + target_length]
    
    # Use default MelSpectrogram (matches AudioDataset training setup)
    mel_transform = transforms.MelSpectrogram()
    spec = mel_transform(waveform)
    
    return spec.unsqueeze(0)  # Add batch dimension

# 5. Define prediction function
def predict_digit(audio_path):
    """Predict the digit from an audio file."""
    spec = preprocess_audio(audio_path, config).to(device)
    
    with t.no_grad():
        output = model(spec)
        predicted = t.argmax(output, dim=1).item()
        confidence = t.softmax(output, dim=1)[0, predicted].item()
    
    return predicted, confidence

print(f"Model loaded successfully! Ready for inference.")
print(f"Use predict_digit('path/to/audio.wav') to make predictions.")

# Testing: True Held-Out Test Set vs Random Files

The cells below demonstrate why having a proper test set matters:

1. **True Test Set** - Files the model NEVER saw during training (saved in checkpoint)
2. **Random Files** - Random samples from `processed/` (some may have been in training!)

If your model is overfitting, you'll see higher accuracy on random files than on the true test set.

In [None]:
# Test on the TRUE HELD-OUT TEST SET (model never saw these during training!)
import random

if not test_file_paths:
    print("No test set found in checkpoint. Re-run training with the updated save cell.")
else:
    # Test on a sample of the held-out test set
    num_tests = min(20, len(test_file_paths))
    test_indices = random.sample(range(len(test_file_paths)), num_tests)
    correct = 0
    
    print(f"Testing on {num_tests} files from the TRUE HELD-OUT TEST SET")
    print(f"(Total held-out files: {len(test_file_paths)})")
    print("=" * 50)
    print("These files were NEVER seen during training!\n")
    
    for i, idx in enumerate(test_indices):
        test_file = test_file_paths[idx]
        true_label = test_file_labels[idx]
        predicted_digit, confidence = predict_digit(test_file)
        
        is_correct = predicted_digit == true_label
        correct += is_correct
        
        status = "correct" if is_correct else "WRONG"
        print(f"{i+1}. True={true_label}, Pred={predicted_digit}, Conf={confidence:.1%} [{status}]")
    
    print("=" * 50)
    print(f"TRUE TEST SET Accuracy: {correct}/{num_tests} = {100*correct/num_tests:.1f}%")

In [None]:
# Test inference with a random file from the processed dataset
# (Works after kernel restart - just needs the inference cell to be run first)
import random
import glob

# Get all processed audio files
processed_files = glob.glob('processed/*/*')
print(f"Found {len(processed_files)} files in processed/")

# Pick a random file
test_file = random.choice(processed_files)

# Extract true label from folder name (e.g., "processed/3/3_123.wav" -> 3)
true_label = int(test_file.split('/')[-2])

# Make prediction
predicted_digit, confidence = predict_digit(test_file)

print(f"\nTest file: {test_file}")
print(f"True label: {true_label}")
print(f"Predicted: {predicted_digit}")
print(f"Confidence: {confidence:.2%}")
print(f"Correct: {'Yes' if predicted_digit == true_label else 'No'}")

In [None]:
# Test on RANDOM files from processed/ (MAY INCLUDE TRAINING DATA!)
# Compare this accuracy to the true test set above to see if the model is overfitting
import random
import glob

processed_files = glob.glob('processed/*/*')
num_tests = min(20, len(processed_files))
correct = 0

# Get random files
test_files = random.sample(processed_files, num_tests)

print(f"Testing on {num_tests} RANDOM files from processed/")
print("=" * 50)
print("WARNING: Some of these may have been in the training set!\n")

for i, test_file in enumerate(test_files):
    true_label = int(test_file.split('/')[-2])
    predicted_digit, confidence = predict_digit(test_file)
    
    is_correct = predicted_digit == true_label
    correct += is_correct
    
    status = "correct" if is_correct else "WRONG"
    print(f"{i+1}. True={true_label}, Pred={predicted_digit}, Conf={confidence:.1%} [{status}]")

print("=" * 50)
print(f"RANDOM FILES Accuracy: {correct}/{num_tests} = {100*correct/num_tests:.1f}%")
print("\nCompare this to the TRUE TEST SET accuracy above!")

In [None]:
# Test on 10 random files from the processed dataset
import glob
import random

processed_files = glob.glob('processed/*/*')
print(f"Found {len(processed_files)} files in processed/\n")

num_tests = 10
test_files = random.sample(processed_files, num_tests)
correct = 0

for i, test_file in enumerate(test_files):
    true_label = int(test_file.split('/')[-2])
    predicted_digit, confidence = predict_digit(test_file)
    
    is_correct = predicted_digit == true_label
    correct += is_correct
    
    status = "correct" if is_correct else "WRONG"
    print(f"{i+1}. {test_file.split('/')[-1]}: True={true_label}, Pred={predicted_digit}, Conf={confidence:.1%} [{status}]")

print(f"\nAccuracy: {correct}/{num_tests} = {100*correct/num_tests:.1f}%")

In [None]:
# Test on ALL original recordings from the unprocessed dataset
import glob

unprocessed_files = sorted(glob.glob('unprocessed/*.wav'))
print(f"Found {len(unprocessed_files)} original recordings in unprocessed/\n")

correct = 0

for test_file in unprocessed_files:
    # Extract true label from filename (e.g., "0.wav" -> 0)
    true_label = int(test_file.split('/')[-1].split('.')[0])
    predicted_digit, confidence = predict_digit(test_file)
    
    is_correct = predicted_digit == true_label
    correct += is_correct
    
    status = "correct" if is_correct else "WRONG"
    print(f"{test_file}: True={true_label}, Pred={predicted_digit}, Conf={confidence:.1%} [{status}]")

print(f"\nAccuracy on original recordings: {correct}/{len(unprocessed_files)} = {100*correct/len(unprocessed_files):.1f}%")

# Real-time Digit Recognition (Voice Activated)

Run the cell below to do live digit recognition from your microphone. 

**Requirements:** Run the inference cell (cell 18) first to load the model!

**How it works:**
1. Listens for audio above the RMS volume threshold (voice activity detection)
2. When speech is detected, starts recording
3. Stops recording after silence is detected (or max duration reached)
4. Preprocesses the audio (resample, pad, mel spectrogram)
5. Runs inference and displays the predicted digit
6. Returns to listening for the next utterance

**Tunable parameters:**
- `VOLUME_THRESHOLD`: RMS level to trigger recording (increase if noisy environment)
- `SILENCE_DURATION`: How long to wait for silence before stopping
- `MIN_RECORDING_DURATION`: Minimum recording length
- `MAX_RECORDING_DURATION`: Maximum recording length

In [None]:
# Real-time digit recognition with voice activity detection (VAD)
# Requires the inference cell (cell 18) to be run first!

import sounddevice as sd
import numpy as np
import torch as t
from torchaudio import transforms
from IPython.display import clear_output

# Config
SAMPLE_RATE = 44100  # Recording sample rate
TARGET_SAMPLE_RATE = 22050  # Model expects this (librosa default)
BLOCK_SIZE = 1024  # Samples per callback

# Voice Activity Detection settings
VOLUME_THRESHOLD = 0.02  # RMS threshold to detect speech (adjust if needed)
SILENCE_DURATION = 0.5  # Seconds of silence before stopping recording
MIN_RECORDING_DURATION = 0.3  # Minimum seconds to record
MAX_RECORDING_DURATION = 3.0  # Maximum seconds to record

# State variables
audio_buffer = []
is_recording = False
silence_samples = 0
silence_samples_threshold = int(SILENCE_DURATION * SAMPLE_RATE / BLOCK_SIZE)
min_samples = int(MIN_RECORDING_DURATION * SAMPLE_RATE)
max_samples = int(MAX_RECORDING_DURATION * SAMPLE_RATE)

def process_and_predict(audio_data):
    """Process accumulated audio and run prediction."""
    # Normalize audio (peak normalization to match training data levels)
    audio_data = audio_data / (np.max(np.abs(audio_data)) + 1e-8)
    audio_data = audio_data * 0.9  # Scale to ~90% to avoid clipping
    
    # Convert to tensor
    waveform = t.tensor(audio_data, dtype=t.float32).unsqueeze(0)
    
    # Resample to match training
    resampler = transforms.Resample(orig_freq=SAMPLE_RATE, new_freq=TARGET_SAMPLE_RATE)
    waveform = resampler(waveform)
    
    # Pad or truncate to match training audio length
    target_length = config['longest_audio_file_length']
    current_length = waveform.shape[1]
    
    if current_length < target_length:
        pad_size = target_length - current_length
        left_pad = pad_size // 2
        right_pad = pad_size - left_pad
        waveform = t.nn.functional.pad(waveform, (left_pad, right_pad))
    elif current_length > target_length:
        start = (current_length - target_length) // 2
        waveform = waveform[:, start:start + target_length]
    
    # Compute mel spectrogram (using defaults to match training)
    mel_transform = transforms.MelSpectrogram()
    spec = mel_transform(waveform).unsqueeze(0).to(device)
    
    # Run inference
    with t.no_grad():
        output = model(spec)
        predicted = t.argmax(output, dim=1).item()
        confidence = t.softmax(output, dim=1)[0, predicted].item()
    
    return predicted, confidence

def audio_callback(indata, frames, time_info, status):
    """Called for each block of audio from the microphone."""
    global audio_buffer, is_recording, silence_samples
    
    if status:
        print(f"Status: {status}")
    
    # Calculate RMS volume for this block
    audio_block = indata[:, 0]
    rms = np.sqrt(np.mean(audio_block**2))
    
    if not is_recording:
        # Waiting for speech to start
        if rms > VOLUME_THRESHOLD:
            is_recording = True
            silence_samples = 0
            audio_buffer = list(audio_block)  # Start with this block
            clear_output(wait=True)
            print("Recording... (speak your digit)")
    else:
        # Currently recording
        audio_buffer.extend(audio_block.tolist())
        
        if rms < VOLUME_THRESHOLD:
            silence_samples += 1
        else:
            silence_samples = 0  # Reset silence counter if sound detected
        
        # Check if we should stop recording
        should_stop = False
        
        if len(audio_buffer) >= max_samples:
            should_stop = True  # Hit max duration
        elif silence_samples >= silence_samples_threshold and len(audio_buffer) >= min_samples:
            should_stop = True  # Silence detected after minimum recording
        
        if should_stop:
            # Process the recording
            audio_data = np.array(audio_buffer)
            duration = len(audio_data) / SAMPLE_RATE
            
            predicted, confidence = process_and_predict(audio_data)
            
            clear_output(wait=True)
            print("=" * 40)
            print(f"  Predicted Digit:  {predicted}")
            print(f"  Confidence:       {confidence:.1%}")
            print(f"  Audio duration:   {duration:.2f}s")
            print("=" * 40)
            print(f"\nListening... (speak a digit to start)")
            
            # Reset state
            audio_buffer = []
            is_recording = False
            silence_samples = 0

# Start listening
print("Real-time Digit Recognition (Voice Activated)")
print("=" * 40)
print(f"Volume threshold: {VOLUME_THRESHOLD} RMS")
print(f"Silence timeout:  {SILENCE_DURATION}s")
print(f"Max recording:    {MAX_RECORDING_DURATION}s")
print("=" * 40)
print("\nListening... (speak a digit to start)")
print("\n>>> Click the STOP button (square) or press 'i' twice to stop <<<")

stream = None
try:
    stream = sd.InputStream(
        samplerate=SAMPLE_RATE,
        blocksize=BLOCK_SIZE,
        channels=1,
        callback=audio_callback
    )
    stream.start()
    
    # Loop with short sleeps - much easier to interrupt
    while True:
        sd.sleep(100)  # 100ms intervals - responsive to interrupts
        
except KeyboardInterrupt:
    pass
finally:
    if stream is not None:
        stream.stop()
        stream.close()
    audio_buffer = []
    is_recording = False
    silence_samples = 0
    print("\nStopped!")

In [None]:
 # Exercise - Build a version of Audrey using your own recorrdings to create a "template database" and make your own sound / speech recognition project. Try doing this with different kinds of utterances that aren't just numbers.
 # 
 # e.g.
 # yes / no
 # colors
 # shapes