# Code for the project

Libraries

In [None]:
import os, numpy as np, scipy.signal, simpleaudio as sa, matplotlib.pyplot as plt
import librosa, tensorflow as tf, torch, torchaudio
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import (Conv2D, MaxPooling2D, Dropout,
                                     Flatten, Dense, BatchNormalization)
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.callbacks import (EarlyStopping, ModelCheckpoint,
                                        ReduceLROnPlateau)
from sklearn.metrics import (confusion_matrix, classification_report,
                             ConfusionMatrixDisplay)
from sklearn.metrics import roc_curve, auc, precision_recall_curve
from sklearn.model_selection import learning_curve as sk_learning_curve
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import make_scorer, accuracy_score
from scikeras.wrappers import KerasClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
from tensorflow.keras.utils import to_categorical


### Data Augmentation 
We start by augmenting the data with the goal of building a robust yet accurate model.  
In this function we create all the variant we want to get our input files through similar to a rotation tree. 
 
We use:  
- Pitch shift  
-  Background noise  
- Echo  
- Low frequency rumble: change in tone + delay  

All of this are things you may encounter in a real-life scenario. 

In [None]:
def augment_audio(y, sr):


    aug = []
    # Convert input to tensor for pitch shifting
    y_tensor = torch.tensor(y).float()
    if len(y_tensor.shape) == 1:
        y_tensor = y_tensor.unsqueeze(0)
    # 1-a Pitch ‚Üë 2 st
    aug.append(torchaudio.functional.pitch_shift(
        waveform=y_tensor,
        sample_rate=sr,
        n_steps=2
    ).squeeze().numpy())
    # 1-b Pitch ‚Üì 2 st
    aug.append(torchaudio.functional.pitch_shift(
        waveform=y_tensor,
        sample_rate=sr,
        n_steps=-2
    ).squeeze().numpy())
    # 2   Background-noise mix
    if len(_noise_clips) > 0: 
        
        noise_idx = np.random.randint(0, len(_noise_clips))
        noise = _noise_clips[noise_idx]
        if len(noise) < len(y):
            noise = np.tile(noise, int(np.ceil(len(y)/len(noise))))[:len(y)]
        else:
            noise = noise[:len(y)]
            
        snr = 10 
        rms_y = np.sqrt(np.mean(y**2))
        rms_n = np.sqrt(np.mean(noise**2))
        noise_scaled = noise * (rms_y / (10**(snr/20)) / (rms_n + 1e-6))
        aug.append(np.clip(y + noise_scaled, -1.0, 1.0))

    # 3   Far-field (-9 dB) + 40 ms echo
    quiet = y * 0.35
    echo = np.pad(quiet * 0.3, (int(0.04*sr), 0))[:len(y)]
    aug.append(np.clip(quiet + echo, -1.0, 1.0))

    #4 Low Rumble
    rumble = np.random.randn(len(y)) * 0.004
    b, a = scipy.signal.butter(4, 100/(sr/2), 'low')
    rumble = scipy.signal.filtfilt(b, a, rumble)
    aug.append(np.clip(y + rumble, -1.0, 1.0))

    return aug

### Add White Noise
- To add even more similarities to what one might expect in real life, we also decided to add white noise with the following finction 

In [None]:
def load_noise_library(noise_dir):

    noise_clips = []
    if os.path.exists(noise_dir):
        for fn in os.listdir(noise_dir):
            if fn.lower().endswith(".wav"):
                try:
                    y, _ = librosa.load(os.path.join(noise_dir, fn), sr=22050)
                    if len(y.shape) > 1:
                        y = y.mean(axis=1)
                    noise_clips.append(y)
                except Exception as e:
                    print(f"Error loading noise file {fn}: {e}")
    return noise_clips

# load white noise
NOISE_DIR = "/Users/vijaysridhar/Documents/white noise"
_noise_clips = load_noise_library(NOISE_DIR)

### Data Cleaning and standarization
- Here we first grab our data and standarize it at the same frequency and duration (set later to 3s ) in the first function. 

- In the Second we use mel spectrogram which reads the input file like a heatmap showing how the energy in waveform chages over time and frequency.

- We go for 128 mels for accuracy and final function here check all spectrograms are the same lenght.

In [None]:

def load_and_process_audio(file_path, sr=22050, duration=None):
    y, sr = librosa.load(file_path, sr=sr, duration=duration)
    return y, sr

def create_melspectrogram(y, sr, n_mels=128, n_fft=2048, hop_length=512):
    mel_spect = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels,
                                               n_fft=n_fft, hop_length=hop_length)
    mel_spect_db = librosa.power_to_db(mel_spect, ref=np.max)
    return mel_spect_db

def pad_or_truncate(mel_spect, target_length):
    if mel_spect.shape[0] > target_length:
        return mel_spect[:target_length, :]
    else:
        pad_width = target_length - mel_spect.shape[0]
        return np.pad(mel_spect, ((0, pad_width), (0, 0)), mode='constant')


### Data Preprocessing 
- We put to use the augmentation function we created before with 'augment=False'when we test data, or run the base model and turn it "True" when trianing our proper model.  

- Other features here,the function essentially goes into the folder where we have our wav files, grabs them, augments them and createds 5 extra versions if feature is turn on,runs them through the mel spectrogram and save them as features.   
  
- The Second function is our standard train-test where we go for 60% train , 20% validation, 20% test. also for CNN, we need to change the shape of the split 

In [None]:
def process_audio_dataset(data_folder, classes, sr=22050, duration=3.0, n_mels=128, augment=False):
    features = []
    labels = []

    target_length = int(duration * sr / 512) + 1

    for class_index, class_name in enumerate(classes):
        class_path = os.path.join(data_folder, class_name)
        if not os.path.isdir(class_path):
            print(f"Warning: Folder {class_path} not found.")
            continue
        for file in os.listdir(class_path):
            if file.endswith('.wav'):
                file_path = os.path.join(class_path, file)
                y, sr_ret = load_and_process_audio(file_path, sr=sr, duration=duration)
                if y is None:
                    continue
                # Original sample
                mel_spec = create_melspectrogram(y, sr_ret, n_mels=n_mels)
                mel_spec = mel_spec.T 
                mel_spec = pad_or_truncate(mel_spec, target_length)
                features.append(mel_spec)
                labels.append(class_index)
                # augmented samples if enabled
                if augment:
                    for y_aug in augment_audio(y, sr_ret):
                        mel_spec_aug = create_melspectrogram(y_aug, sr_ret, n_mels=n_mels)
                        mel_spec_aug = mel_spec_aug.T
                        mel_spec_aug = pad_or_truncate(mel_spec_aug, target_length)
                        features.append(mel_spec_aug)
                        labels.append(class_index)
    X = np.array(features)
    y = np.array(labels)
    return X, y
# Train Test Split
def prepare_data(X, y, test_split=0.2, val_split=0.2):
    from sklearn.model_selection import train_test_split
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=test_split, 
                                                        random_state=42, stratify=y)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_split, 
                                                      random_state=42, stratify=y_train)
    num_classes = len(np.unique(y))
    y_train = to_categorical(y_train, num_classes)
    y_val   = to_categorical(y_val, num_classes)
    y_temp  = to_categorical(y_temp, num_classes)
    # channel dimension for CNN input
    X_train = X_train[..., np.newaxis]
    X_val   = X_val[..., np.newaxis]
    X_temp  = X_temp[..., np.newaxis]
    return (X_train, y_train), (X_val, y_val), (X_temp, y_temp) 

### Building the model 

We build our CNN with tensor flow and keras we apply  
- Conv2D: 16 filters, 3x3 to learn local patterns  
-  Batchnorm: speed up training, stabilizes learning
-  Maxpoolin: reduces size, make model look at the big picture
-  Drouput: helps prevent overfitting  

Then we apply flatten to convert 2d features into 1D vector. Followed by Dense, to build a fully connected layer  with 64 neurons. 
Another droput, and finally apply dense to get a probability for each class and 'softmax' for multiclass classification.  

Afterwards, we use model.compile with adam optimizer so weight are adjusted automaticalle, categorical crossentropy becasue our labels are one-hot encoded.

In [None]:
def build_model(input_shape, num_classes):
    reg = tf.keras.regularizers.l2(1e-4)
    model = Sequential([
        Conv2D(16, (3, 3), activation='relu', padding='same', kernel_regularizer=reg,
               input_shape=input_shape),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.3),
        
        Conv2D(32, (3, 3), activation='relu', padding='same', kernel_regularizer=reg),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Dropout(0.3),
        
        Flatten(),
        Dense(64, activation='relu', kernel_regularizer=reg),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    print(model.optimizer.learning_rate.numpy())
    return model

### Train the model 
- Here we construct the CNN as well as apply some neat feature such as eraly stopping to save time if model stops improving, ReduceLROnPlateau, this slows the learning rate if val loss plateaus, and lastly modelchekpoint to save the best model.

In [None]:
def train_model(data_folder, classes, model_path, sr=22050, duration=3.0,
                n_mels=128, batch_size=32, epochs=30, augment=True):
    print("Processing dataset...")
    X, y = process_audio_dataset(data_folder, classes, sr, duration, n_mels, augment=augment)
    if len(X) == 0:
        print("No audio files processed. Check dataset path and file formats.")
        return
    (X_train, y_train), (X_val, y_val), (X_test, y_test) = prepare_data(X, y)
    input_shape = X_train.shape[1:] 
    num_classes = y_train.shape[1]
    
    print("Building and training the model...")
    model = build_model(input_shape, num_classes)
    
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=3, verbose=1),
        ModelCheckpoint(model_path, monitor='val_loss', save_best_only=True, verbose=1)
    ]
    
    model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs,
              validation_data=(X_val, y_val), callbacks=callbacks)
    
    loss, acc = model.evaluate(X_test, y_test)
    print("Test accuracy:", acc)
    model.save(model_path)
    print("Model saved to", model_path)
    return model

### Play Chime sound as well as detecting trigger word
- The key feature from this block are grabbing the test file and standarize it.  
- Playing chime sound if our model predicts > 50% of trigger word in audio

In [None]:
 def infer_trigger_word(model, file_path, sr=22050, duration=3.0, n_mels=128):
    y, sr_ret = load_and_process_audio(file_path, sr=sr, duration=duration)
    if y is None:
        print("Error: Could not load audio for inference.")
        return False
    mel_spec = create_melspectrogram(y, sr_ret, n_mels=n_mels)
    mel_spec = mel_spec.T
    target_length = int(duration * sr / 512) + 1
    mel_spec = pad_or_truncate(mel_spec, target_length)
    X_in = np.expand_dims(mel_spec, axis=0)     
    X_in = np.expand_dims(X_in, axis=-1)         
    pred = model.predict(X_in)

    trigger_prob = pred[0][1]
    print("Trigger word probability:", trigger_prob)
    return trigger_prob >= 0.5  # threshold 

def play_chime(chime_file):
    try:
        wave_obj = sa.WaveObject.from_wave_file(chime_file)
        play_obj = wave_obj.play()
        play_obj.wait_done()
    except Exception as e:
        print("Error playing chime:", e)

### Testing on sentences 
- Here we apply a novel technique to test our model in longer audios but we break them down into 3 sec 'windows' and slide accros .5s as we check.
- We also appply our standarization and the infer.

In [None]:
def sliding_window_infer(model, file_path, sr=22050,
                         win_dur=3.0, hop_dur=0.5,
                         n_mels=128, thresh=0.25):
    y, sr = librosa.load(file_path, sr=sr)
    win_len = int(win_dur * sr)
    hop_len = int(hop_dur * sr)
    target_len = int(win_dur * sr / 512) + 1

    for start in range(0, len(y) - win_len + 1, hop_len):
        chunk = y[start:start + win_len]
        mel = create_melspectrogram(chunk, sr, n_mels).T
        mel = pad_or_truncate(mel, target_len)
        X = mel[np.newaxis, ..., np.newaxis]
        prob = model.predict(X, verbose=0)[0][1]
        print(f"{start/sr:5.1f}s ‚Üí prob={prob:.3f}")  # Optional debug print
        if prob >= thresh:
            print(f"Trigger word detected at ‚âà {start/sr:.1f}s (prob={prob:.2f})")
            return True
    print("Trigger word not detected.")
    return False

### RUN the code
- here depending on the mode 'train' or 'infer' it grabs the necessary files and do as we have explained beforehand.

In [None]:
if __name__ == "__main__":
    # set mode to either 'train' or 'infer'
    mode = 'train'  

   # Parameters
    SAMPLE_RATE = 22050
    DURATION = 3.0
    N_MELS = 128
    MODEL_PATH = "/Users/vijaysridhar/Documents/trigger_model.keras"
    
    if mode == 'train':
        # Path to folder (with subfolders for each class)
        DATA_FOLDER = "/Users/vijaysridhar/Documents/WAV"  
        CLASSES = ["negative", "activate"]  
        # Automatically set augmentation true when trianing, turn off for base model
        train_model(DATA_FOLDER, CLASSES, MODEL_PATH, sr=SAMPLE_RATE, duration=DURATION, n_mels=N_MELS, augment=True)
    elif mode == 'infer':
        AUDIO_FILE = "/Users/vijaysridhar/Documents/inference/inference_test.wav"
        CHIME_FILE = "/Users/vijaysridhar/Documents/inference/chime.wav"
        model = load_model(MODEL_PATH)
        detected = sliding_window_infer(
            model,
            AUDIO_FILE,
            sr=SAMPLE_RATE,
            win_dur=3.0,
            hop_dur=0.5,
            n_mels=N_MELS,
            thresh=0.25)
        if detected:
            print("Trigger word detected! Playing chime.")
            play_chime(CHIME_FILE)
        else:
            print("Trigger word not detected.")


### Visuals
Spectrogram, Classification report, ROC curve, etc

In [None]:
def save_model_diagram(model, out_path="/Users/vijaysridhar/Documents/model_graph.png"):

    try:
        plot_model(model, to_file=out_path, show_shapes=True, dpi=120)
        print(f"[VIS] model diagram  ‚Üí  {out_path}")
    except Exception as e:
        print("[VIS] could not save model diagram:", e)

def plot_training_history(history, out_path="/Users/vijaysridhar/Documents/train_curves.png"):
    fig, ax = plt.subplots(1, 2, figsize=(8, 3))
    ax[0].plot(history.history["accuracy"], label="train")
    ax[0].plot(history.history["val_accuracy"], label="val")
    ax[0].set_title("Accuracy"); ax[0].legend()
    ax[1].plot(history.history["loss"], label="train")
    ax[1].plot(history.history["val_loss"], label="val")
    ax[1].set_title("Loss");  ax[1].legend()
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close(fig)
    print(f"[VIS] learning curves ‚Üí  {out_path}")

def plot_confusion_matrix(model, X, y, class_names, out_path="/Users/vijaysridhar/Documents/conf_mat.png"):
    y_pred = np.argmax(model.predict(X, verbose=0), axis=1)
    cm = confusion_matrix(y, y_pred, normalize="true")
    disp = ConfusionMatrixDisplay(cm, display_labels=class_names)
    fig, ax = plt.subplots(figsize=(5, 4))
    disp.plot(ax=ax, cmap="Blues", colorbar=False, values_format=".2f")
    ax.set_title("Normalized Confusion Matrix")
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close(fig)
    print(f"[VIS] confusion matrix ‚Üí  {out_path}")

def print_classif_report(model, X, y, class_names):
    y_pred = np.argmax(model.predict(X, verbose=0), axis=1)
    rep = classification_report(y, y_pred, target_names=class_names, digits=3)
    print("\n=== Classification report ===\n", rep)

def show_random_spectrogram(mel_batch, out_path="/Users/vijaysridhar/Documents/rand_spec.png"):
    
    idx = np.random.randint(len(mel_batch))
    S = mel_batch[idx, :, :, 0].T  
    plt.figure(figsize=(6, 3))
    plt.imshow(S, origin="lower", aspect="auto")
    plt.colorbar(); plt.title("Training log-Mel spectrogram")
    plt.xlabel("Time-frames"); plt.ylabel("Mel bins")
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close()
    print(f"[VIS] spectrogram snapshot ‚Üí  {out_path}")

def plot_roc_curve(model, X, y, out_path="/Users/vijaysridhar/Documents/roc_curve.png"):
    y_score = model.predict(X, verbose=0)[:, 1]
    y_true = y
    fpr, tpr, _ = roc_curve(y_true, y_score)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(5, 4))
    plt.plot(fpr, tpr, color='darkorange', lw=2,
             label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlabel('False Positive Rate'); plt.ylabel('True Positive Rate')
    plt.title('ROC Curve'); plt.legend(loc="lower right")
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close()
    print(f"[VIS] ROC curve ‚Üí  {out_path}")

def plot_pr_curve(model, X, y, out_path="/Users/vijaysridhar/Documents/pr_curve.png"):
    y_score = model.predict(X, verbose=0)[:, 1]
    y_true = y
    precision, recall, _ = precision_recall_curve(y_true, y_score)

    plt.figure(figsize=(5, 4))
    plt.plot(recall, precision, color='green', lw=2)
    plt.xlabel('Recall'); plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.tight_layout(); plt.savefig(out_path, dpi=150); plt.close()
    print(f"[VIS] PR curve ‚Üí  {out_path}")


def plot_true_learning_curve(X, y, build_fn, out_path="/Users/vijaysridhar/Documents/learning_curve.png"):
    # 
    class KerasWrapper(BaseEstimator, ClassifierMixin):
        def __init__(self):
            self.model = None
            self.n_classes = len(np.unique(y))  
            
        def fit(self, X, y):
            
            X_reshaped = X.reshape(-1, 130, 128, 1)
       
            y_one_hot = to_categorical(y, num_classes=self.n_classes)
            self.model = build_fn()
            self.model.fit(X_reshaped, y_one_hot, 
                          epochs=10, batch_size=32, verbose=0)
            return self
            
        def predict(self, X):
    
            X_reshaped = X.reshape(-1, 130, 128, 1)
            return np.argmax(self.model.predict(X_reshaped, verbose=0), axis=1)
            
        def predict_proba(self, X):

            X_reshaped = X.reshape(-1, 130, 128, 1)
            return self.model.predict(X_reshaped, verbose=0)


    if len(X.shape) == 4:  
        X_input = X
    else:  
        X_input = X.reshape(-1, 130, 128, 1)


    train_sizes, train_scores, val_scores = learning_curve(
        KerasWrapper(), X_input, y,
        train_sizes=np.linspace(0.1, 1.0, 5),
        cv=StratifiedKFold(n_splits=3),
        scoring='accuracy',
        n_jobs=1,
        verbose=1
    )

    train_mean = np.mean(train_scores, axis=1)
    val_mean = np.mean(val_scores, axis=1)

    plt.figure(figsize=(6, 4))
    plt.plot(train_sizes, train_mean, 'o-', label="Train")
    plt.plot(train_sizes, val_mean, 'o-', label="Validation")
    plt.xlabel("Training Set Size")
    plt.ylabel("Accuracy")
    plt.title("Learning Curve")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()
    print(f"[VIS] learning curve ‚Üí  {out_path}")

# Apendix


### Negative Generation
- We adjusted the number of samples as needed



In [None]:
import os
import random
from TTS.api import TTS

output_dir = "/Users/vijaysridhar/Documents/negative"
os.makedirs(output_dir, exist_ok=True)

negative_words = [
    "hello", "music", "light", "fan", "coffee", "open", "stop", "go", "yes", "no",
    "volume", "rain", "snow", "call", "message", "mail", "turn", "lock", "alarm", "mute",
    "joke", "news", "play", "pause", "resume", "sleep", "date", "timer", "time", "cancel",
    "skip", "repeat", "weather", "next", "previous", "map", "drive", "car", "email", "morning",
    "evening", "night", "wake", "walk", "run", "drink", "schedule", "note", "list", "shopping",
    "reminder", "stock", "currency", "balance", "heat", "cold", "mode", "zoom", "book",
    "door", "window", "curtain", "bed", "couch", "tv", "show", "series", "camera", "photo",
    "record", "video", "chat", "search", "browse", "web", "scroll", "battery", "charge", "plug",
    "wifi", "bluetooth", "hotspot", "flash", "lamp", "air", "brush", "bottle", "phone", "charger",
    "temperature", "unlock", "send", "receive", "folder", "file", "help", "info"
]

tts = TTS(model_name="tts_models/multilingual/multi-dataset/your_tts", progress_bar=False)
male_speakers = [s for s in tts.speakers if "male" in s.lower()] or tts.speakers

for i in range(500):
    word = random.choice(negative_words)
    speaker = random.choice(male_speakers)
    out_path = os.path.join(output_dir, f"neg_male_{speaker}_{i}.wav")
    print(f"[{i+1}/500] {word} ({speaker})")
    tts.tts_to_file(text=word, speaker=speaker, language="en", file_path=out_path)

print("\n Done generating negative word samples.")

### Activate data generation
- This script automatically generates 'activate' in different languages we adjust the number of samples as needed.
- some of these also we can change the speaker to collect more voices

In [None]:
import os
import random
from TTS.api import TTS


output_dir = "/Users/vijaysridhar/Documents/activate_tts"
os.makedirs(output_dir, exist_ok=True)

# Translations of the word "activate"
translations = {
    "en": "activate",
    #"es": "activar",
    #"hi": "‡§∏‡§ï‡•ç‡§∞‡§ø‡§Ø ‡§ï‡§∞‡•á‡§Ç",
    #"zh": "ÊøÄÊ¥ª",
    #"ar": "ÿ™ŸÅÿπŸäŸÑ",
    "fr-fr": "activer",
    #"ru": "–∞–∫—Ç–∏–≤–∏—Ä–æ–≤–∞—Ç—å",
    "pt-br": "ativar"
    #"de": "aktivieren",
    #"it": "attivare",
    #"nl": "activeren"
}

# Load multilingual TTS model
tts = TTS(model_name="tts_models/multilingual/multi-dataset/your_tts", progress_bar=False)

all_speakers = tts.speakers
male_speakers = [s for s in all_speakers if "male" in s.lower()] or all_speakers

for i in range(100):
    lang = random.choice(list(translations.keys()))
    word = translations[lang]
    text = f"The command is {word}."

    speaker = random.choice(male_speakers)
    out_path = os.path.join(output_dir, f"{lang}_{speaker}_{i}.wav")

    print(f"[{i+1}/100] Generating: {text} ({lang}, speaker: {speaker})")
    tts.tts_to_file(text=text, speaker=speaker, language=lang, file_path=out_path)

print(f"\nfiles saved to: {os.path.abspath(output_dir)}")

### Generate more activates through pyTTSx3
- we use more TTs in order to create more different samples

In [None]:
import os
import pyttsx3


engine = pyttsx3.init()

driver = getattr(engine, '_driver', None)
if driver is not None and not hasattr(driver, '_current_text'):
    setattr(driver, '_current_text', "")

OUT_DIR = "/Users/vijaysridhar/Documents/activate_tts/pyttsx3_TTS"
os.makedirs(OUT_DIR, exist_ok=True)


voices = engine.getProperty('voices')
print(f"Found {len(voices)} voices; writing one 'activate' per voice...")

for idx, voice in enumerate(voices, start=1):
    engine.setProperty('voice', voice.id)
    out_path = os.path.join(OUT_DIR, f"activate_{idx:03d}.wav")
    print(f"[{idx:03d}/{len(voices):03d}] ‚Üí {out_path}")
    engine.save_to_file("activate", out_path)


engine.runAndWait()
engine.stop()

print("Done! Check the 'activate_outputs' folder.")

### With the following coquiTTs 
- we wanted to generate sentences to test our model in more realife scenarios

In [None]:
import os
from TTS.api import TTS

OUTPUT_DIR = "/Users/vijaysridhar/Documents/activate_tts/coquiTTS"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 50 distinct sentences containing the word "activate"
sentences = [
    "Please activate the alarm before you go to bed.",
    "Can you activate the sprinkler system for the garden?",
    "He pressed the button to activate the emergency shutdown.",
    "She needed a password to activate her new software license.",
    "The fire marshal will activate the sprinklers in case of smoke.",
    "To begin the experiment, you must activate the centrifuge.",
    "We‚Äôll activate your account once we‚Äôve verified your email.",
    "Don‚Äôt forget to activate the router after you plug it in.",
    "The scientist hoped to activate the enzyme with heat.",
    "You can activate night mode in the app‚Äôs settings.",
    "Once you activate the coupon code, you‚Äôll see the discount.",
    "The lifeguard will activate the rescue buoy if someone drowns.",
    "I need to activate airplane mode before the flight takes off.",
    "To proceed, you must activate two-factor authentication.",
    "The remote control can activate the garage door opener.",
    "He whispered the secret word to activate the hidden door.",
    "After installation, you‚Äôll need to activate the new feature pack.",
    "She turned the key to activate the electric fence.",
    "They plan to activate the old mining equipment this Saturday.",
    "The magician said a spell to activate the floating orb.",
    "Please activate the backup generator in case of a power outage.",
    "When you activate the floodlights, they will stay on for five minutes.",
    "The app prompts you to activate location services.",
    "He had to activate his membership before booking the class.",
    "The teacher will activate the quiz at exactly 3:00 PM.",
    "I clicked OK to activate the new privacy settings.",
    "To activate the rocket‚Äôs engines, enter the launch code.",
    "She needed to activate the warranty within 30 days.",
    "The security guard will activate the lockdown procedure if needed.",
    "Please activate the Bluetooth pairing on your device.",
    "They decided to activate the emergency beacon in the storm.",
    "The keynote speaker will activate the presentation with a clicker.",
    "To activate the light show, dial the control panel.",
    "He set a timer to activate the coffee maker at 6 AM.",
    "We must activate the vaccination registry by next week.",
    "Once you activate your card, you can start using it immediately.",
    "The chef hit a switch to activate the meat grinder.",
    "Don‚Äôt activate the self-destruct sequence by accident!",
    "I love how you can activate voice commands hands-free.",
    "The coach will activate the substitution at halftime.",
    "Please activate your camera so we can see you.",
    "She plans to activate her online store this weekend.",
    "The technician will activate the alarm panel remotely.",
    "To save power, the screen will only activate when you touch it.",
    "We need to activate the heating system before winter arrives.",
    "The archaeologist hopes to activate the ancient mechanism.",
    "You must activate the trial period within 14 days.",
    "When you activate your profile, people can start following you.",
    "The diver will activate his flippers before entering the water."
]


tts = TTS(model_name="tts_models/en/vctk/vits")

voices = tts.speakers[:100]
print(f"Generating {len(sentences) * len(voices)} samples across {len(sentences)} sentences and {len(voices)} voices.")


for sent in sentences:
    
    safe_sent = (
        sent.lower()
        .replace(" ", "_")
        .replace("‚Äô", "")
        .replace("'", "")
        .replace("?", "")
        .replace(",", "")
        .replace(".", "")
    )
    for spk in voices:
        
        safe_spk = spk.replace("/", "_")
        filename = f"{safe_sent}__{safe_spk}.wav"
        out_path = os.path.join(OUTPUT_DIR, filename)
        tts.tts_to_file(text=sent, speaker=spk, file_path=out_path)

print("Finished generating sentence samples at:", OUTPUT_DIR)

### Amazon polly
- we also wanted to experiment with this powerful TTs to create even more samples, this one can do evrything we did before all in one prety much

In [None]:
import os
import boto3
from botocore.exceptions import BotoCoreError, ClientError


AWS_REGION = "us-east-1"     

OUTPUT_DIR = "/Users/vijaysridhar/Documents/activate_tts/polly"
os.makedirs(OUTPUT_DIR, exist_ok=True)

polly = boto3.client(
    "polly",
    region_name=AWS_REGION
)



try:
    # Get all voices
    response = polly.describe_voices()
    all_voices = response.get("Voices", [])
    print(f"Found {len(all_voices)} total voices in Polly")
    
    # Filter for English voices
    english_voices = [v for v in all_voices if v["LanguageCode"].startswith("en")]
    print(f"\nFound {len(english_voices)} English voices:")
    for v in english_voices:
        print(f"  - {v['Name']} ({v['LanguageName']}, gender={v['Gender']}, engine={v.get('SupportedEngines', ['standard'])[0]})")
    
    # Filter for French voices
    french_voices = [v for v in all_voices if v["LanguageCode"].startswith("fr")]
    print(f"\nFound {len(french_voices)} French voices:")
    for v in french_voices:
        print(f"  - {v['Name']} ({v['LanguageName']}, gender={v['Gender']}, engine={v.get('SupportedEngines', ['standard'])[0]})")
    
    # Filter for Portuguese voices
    portuguese_voices = [v for v in all_voices if v["LanguageCode"].startswith("pt")]
    print(f"\nFound {len(portuguese_voices)} Portuguese voices:")
    for v in portuguese_voices:
        print(f"  - {v['Name']} ({v['LanguageName']}, gender={v['Gender']}, engine={v.get('SupportedEngines', ['standard'])[0]})")
    
    # Filter for Spanish voices
    spanish_voices = [v for v in all_voices if v["LanguageCode"].startswith("es")]
    print(f"\nFound {len(spanish_voices)} Spanish voices:")
    for v in spanish_voices:
        print(f"  - {v['Name']} ({v['LanguageName']}, gender={v['Gender']}, engine={v.get('SupportedEngines', ['standard'])[0]})")
    
except (BotoCoreError, ClientError) as e:
    print("Error fetching voices:", e)
    exit(1)


language_texts = {
    "en": "activate",
    "fr": "activer",
    "pt": "ativar",
    "es": "activar"
}


print("\n--- Generating English samples ---")
for v in english_voices:
    voice_id = v["Id"]
    
    
    supported_engines = v.get("SupportedEngines", ["standard"])
    engine = supported_engines[0]  
    
    out_path = os.path.join(OUTPUT_DIR, f"activate_en_{voice_id}.mp3")

    try:
        resp = polly.synthesize_speech(
            Text=language_texts["en"],
            OutputFormat="mp3",
            VoiceId=voice_id,
            Engine=engine
        )
        with open(out_path, "wb") as f:
            f.write(resp["AudioStream"].read())
        print(f"Saved '{language_texts['en']}' as {out_path}")
    except (BotoCoreError, ClientError) as e:
        print(f"Error synthesizing with voice {voice_id}:", e)

# Process French voices
print("\n--- Generating French samples ---")
for v in french_voices:
    voice_id = v["Id"]
    
    
    supported_engines = v.get("SupportedEngines", ["standard"])
    engine = supported_engines[0] 
    
    out_path = os.path.join(OUTPUT_DIR, f"activate_fr_{voice_id}.mp3")

    try:
        resp = polly.synthesize_speech(
            Text=language_texts["fr"],
            OutputFormat="mp3",
            VoiceId=voice_id,
            Engine=engine
        )
        with open(out_path, "wb") as f:
            f.write(resp["AudioStream"].read())
        print(f"Saved '{language_texts['fr']}' as {out_path}")
    except (BotoCoreError, ClientError) as e:
        print(f"Error synthesizing with voice {voice_id}:", e)

# Process Portuguese voices
print("\n--- Generating Portuguese samples ---")
for v in portuguese_voices:
    voice_id = v["Id"]
    
  
    supported_engines = v.get("SupportedEngines", ["standard"])
    engine = supported_engines[0]  
    
    out_path = os.path.join(OUTPUT_DIR, f"activate_pt_{voice_id}.mp3")

    try:
        resp = polly.synthesize_speech(
            Text=language_texts["pt"],
            OutputFormat="mp3",
            VoiceId=voice_id,
            Engine=engine
        )
        with open(out_path, "wb") as f:
            f.write(resp["AudioStream"].read())
        print(f"Saved '{language_texts['pt']}' as {out_path}")
    except (BotoCoreError, ClientError) as e:
        print(f"Error synthesizing with voice {voice_id}:", e)

# Process Spanish voices
print("\n--- Generating Spanish samples ---")
for v in spanish_voices:
    voice_id = v["Id"]
    
  
    supported_engines = v.get("SupportedEngines", ["standard"])
    engine = supported_engines[0]  
    
    out_path = os.path.join(OUTPUT_DIR, f"activate_es_{voice_id}.mp3")

    try:
        resp = polly.synthesize_speech(
            Text=language_texts["es"],
            OutputFormat="mp3",
            VoiceId=voice_id,
            Engine=engine
        )
        with open(out_path, "wb") as f:
            f.write(resp["AudioStream"].read())
        print(f"Saved '{language_texts['es']}' as {out_path}")
    except (BotoCoreError, ClientError) as e:
        print(f"Error synthesizing with voice {voice_id}:", e)


# Convert Files
try:
    from pydub import AudioSegment
    print("\nConverting MP3 files to WAV format...")
    
    for filename in os.listdir(OUTPUT_DIR):
        if filename.endswith(".mp3"):
            mp3_path = os.path.join(OUTPUT_DIR, filename)
            wav_path = os.path.join(OUTPUT_DIR, filename.replace(".mp3", ".wav"))
            
            
            sound = AudioSegment.from_mp3(mp3_path)
            sound.export(wav_path, format="wav")
            print(f"Converted {mp3_path} to {wav_path}")
            

except ImportError:
    print("\ninstall pydub")
    print("install ffmpeg")

print("\nAll audio samples generated")


### Libri speech
- We get more data negative from this pre trained library. This is not generated data, we just take some negatives and add them to our folder. We tried to get activates, but there were none

In [None]:
import os
import shutil
import random
import requests
import zipfile
import tarfile
import librosa
import soundfile as sf

# Paths
ROOT = "/Users/vijaysridhar/Documents/data"
TMP = "/Users/vijaysridhar/Downloads"
ACTIVATE_DIR = os.path.join(ROOT, "activate")
NEGATIVE_DIR = os.path.join(ROOT, "negative")

def safe_mkdir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def download_file(url, dest):
    r = requests.get(url, stream=True)
    with open(dest, 'wb') as f:
        shutil.copyfileobj(r.raw, f)

def fetch_speech_commands():
    url = "http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz"
    tar_path = os.path.join(TMP, "speech_commands.tar.gz")
    if not os.path.exists(tar_path):
        print("‚¨áDownloading Speech Commands...")
        download_file(url, tar_path)
        print("Downloaded Speech Commands.")
    return tar_path

def fetch_common_voice():
    url = "https://voice.mozilla.org/en/datasets"  # Requires manual login/download
    print("Common Voice needs manual download. Skipping automatic download.")
    return None

def fetch_librispeech():
    url = "http://www.openslr.org/resources/12/dev-clean.tar.gz"
    tar_path = os.path.join(TMP, "librispeech_dev_clean.tar.gz")
    if not os.path.exists(tar_path):
        print("Downloading LibriSpeech (dev-clean)...")
        download_file(url, tar_path)
        print("Downloaded LibriSpeech.")
    return tar_path

def fetch_chime5():
    print("CHiME-5 requires a restricted license. Skipping download.")
    return None

def extract_tar(tar_path, extract_to):
    if tar_path:
        with tarfile.open(tar_path, 'r:gz') as tar:
            tar.extractall(path=extract_to)

def process_audio_files(input_dir, output_dir, label="negative", duration=2.0, max_files=200):
    files = []
    for root, _, filenames in os.walk(input_dir):
        for f in filenames:
            if f.endswith('.wav'):
                files.append(os.path.join(root, f))
    random.shuffle(files)
    count = 0
    for file_path in files:
        try:
            y, sr = librosa.load(file_path, sr=22050)
            if len(y) < sr * duration:
                continue
            start_idx = random.randint(0, len(y) - int(sr * duration))
            clip = y[start_idx:start_idx + int(sr * duration)]
            save_path = os.path.join(output_dir, f"{label}_{count}.wav")
            sf.write(save_path, clip, sr)
            count += 1
            if count >= max_files:
                break
        except Exception as e:
            print(f"Skipping file {file_path}: {e}")

def collect_clips():
    
    safe_mkdir(ACTIVATE_DIR)
    safe_mkdir(NEGATIVE_DIR)

    
    sc_path = fetch_speech_commands()
    if sc_path:
        extract_tar(sc_path, TMP)
        activate_files = [
             "activate"  
        ]
        negative_files = ["no", "off", "stop", "down", "left", "up", "right","yes", "on", "go"]

        for word in activate_files:
            word_dir = os.path.join(TMP, word)
            if os.path.exists(word_dir):
                process_audio_files(word_dir, ACTIVATE_DIR, label="activate", max_files=50)
        for word in negative_files:
            word_dir = os.path.join(TMP, word)
            if os.path.exists(word_dir):
                process_audio_files(word_dir, NEGATIVE_DIR, label="negative", max_files=50)

    # 2. LibriSpeech 
    ls_path = fetch_librispeech()
    if ls_path:
        extract_tar(ls_path, TMP)
        libri_dir = os.path.join(TMP, "LibriSpeech", "dev-clean")
        if os.path.exists(libri_dir):
            process_audio_files(libri_dir, NEGATIVE_DIR, label="negative", max_files=200)



if __name__ == "__main__":
    safe_mkdir(TMP)
    safe_mkdir(ROOT)
    collect_clips()
    print("Dataset creation complete. Check the 'data' folder.")

### Generate noise 
- Similarly we generate noise with the following

In [None]:
import os
import requests
import subprocess
from pathlib import Path


OUTPUT_DIR = "/Users/vijaysridhar/Documents/white noise"
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)


NOISE_SOURCES = {
    "airplane_cabin.wav": "https://actions.google.com/sounds/v1/ambiences/airplane_cabin_background.ogg",
    "cafe_ambience.wav": "https://actions.google.com/sounds/v1/ambiences/coffee_shop.ogg",
    "white_noise.wav": "https://actions.google.com/sounds/v1/ambiences/room_tone.ogg"
}


HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': '*/*',
    'Accept-Encoding': 'gzip, deflate, br',
    'Connection': 'keep-alive'
}

def download_and_convert(name, url, sr=22050):
    try:
        print(f"‚¨áÔ∏è  Downloading {name}...")
        r = requests.get(url, headers=HEADERS)
        if r.status_code != 200:
            print(f"Failed to download {name}: HTTP {r.status_code}")
            return False
            
        temp_path = os.path.join(OUTPUT_DIR, "temp.ogg")
        with open(temp_path, 'wb') as f:
            f.write(r.content)

        print(f"üéß Converting {name} to {sr} Hz mono WAV...")
        final_path = os.path.join(OUTPUT_DIR, name)
        
       
        cmd = [
            'ffmpeg', '-y',
            '-i', temp_path,
            '-ar', str(sr),        
            '-ac', '1',            
            '-t', '10',            
            '-acodec', 'pcm_s16le', 
            final_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            print(f"FFmpeg error: {result.stderr}")
            return False

        # Clean up temp file
        if os.path.exists(temp_path):
            os.remove(temp_path)
            
        print(f"‚úÖ Saved: {final_path}")
        return True
        
    except Exception as e:
        print(f"Error processing {name}: {str(e)}")
        return False
    finally:
        # Ensure temp file is cleaned up even if there's an error
        if os.path.exists(temp_path):
            os.remove(temp_path)


success_count = 0
for name, url in NOISE_SOURCES.items():
    if download_and_convert(name, url):
        success_count += 1

print(f"\n Successfully processed {success_count}/{len(NOISE_SOURCES)} noise files in: {OUTPUT_DIR}")
