In [None]:
!pip install datasets torch torchaudio librosa scikit-learn


In [None]:
from datasets import load_dataset
import torchaudio
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import torch
import IPython.display as ipd

In [None]:
!pip install evaluate

In [None]:
!pip install gradio wandb

In [None]:
import os
from huggingface_hub import login
from google.colab import userdata
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import Wav2Vec2Processor, Wav2Vec2Model, BertTokenizer, BertModel
from datasets import load_dataset
import evaluate
import gradio as gr
import torchaudio
import wandb
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
hug_api_key="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"
login(hug_api_key)

In [None]:

dataset = load_dataset("speech_commands", "v0.01")



In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import get_dataset_split_names

train_dataset = dataset["train"]
val_dataset = dataset["validation"]
test_dataset = dataset["test"]

In [None]:

# Use only half the training data
half_length = len(train_dataset) // 2
train_dataset = train_dataset.select(range(half_length))

# Check size
print(f"Original training set size: {len(dataset['train'])}")
print(f"Reduced training set size: {len(train_dataset)}")


# Preprocessing

In [None]:
import torchaudio
import torchaudio.transforms as T
import matplotlib.pyplot as plt


example= dataset["train"][0]["audio"]
waveform=example["array"]
sample_rate=example["sampling_rate"]

#plotting waveform

plt.figure(figsize=(10,4))
plt.plot(waveform)
plt.title("Waveform")
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.show()


# to play the audio
ipd.display(ipd.Audio(waveform, rate=sample_rate))

In [None]:
#converting audio to melo spectrogram

import librosa.display
import numpy as np

#convert to mel spectrogram
mel_spec=librosa.feature.melspectrogram(y=waveform,sr=sample_rate, n_mels= 128)

#convert to db scale
mel_spec_db=librosa.power_to_db(mel_spec, ref=np.max)

#plot the spectrogram
plt.figure(figsize=(10,4))
librosa.display.specshow(mel_spec_db, sr=sample_rate, x_axis='time', y_axis='mel')
plt.colorbar(format='%+2.0f dB')
plt.title("Mel Spectrogram")
plt.show()


In [None]:
#normalize the audio

import numpy as np

def normalize_waveform(waveform):
   return waveform/np.max(np.abs(waveform))

#Apply normalization

waveform=normalize_waveform(waveform)

In [None]:
# extract Mel-Frequency Cepstral Coefficient (features)
mfccs=librosa.feature.mfcc(y=waveform, sr=sample_rate, n_mfcc=13)

#plot mfcc

plt.figure(figsize=(10,4))
librosa.display.specshow(mfccs, x_axis='time')
plt.colorbar()
plt.title("MFCC")
plt.show()

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import Wav2Vec2Model, BertModel, Wav2Vec2Processor, BertTokenizer
from datasets import load_dataset
from torch.utils.data import DataLoader
import evaluate
import time

In [None]:
# Data preprocessing

from transformers import Wav2Vec2Processor, Wav2Vec2Model, BertTokenizer, BertModel

# Audio encoder
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")
#audio_embedding_model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base")
#audio_embedding_model.gradient_checkpointing_enable()
# Text encoder
tokenizer = BertTokenizer.from_pretrained("google-bert/bert-base-uncased")
#text_embedding_model = BertModel.from_pretrained("google-bert/bert-base-uncased")
#text_embedding_model.gradient_checkpointing_enable()



In [None]:
#dataset = load_dataset("speech_commands", "v0.01")
labels = train_dataset.features["label"].names
label2id = {l: i for i, l in enumerate(labels)}

In [None]:
def preprocess(example):
    # Extract the audio data
    audio = example["audio"]

    # Get the label index (eg: 0, 1, 2)
    label_idx = example["label"]

    # Convert the label index to the actual text label (eg: "yes", "no", etc.)
    label_text = labels[label_idx]

    # Process the raw audio array into model-ready input using a pre-trained audio processor
    # Converts audio to tensor
    # Pads/truncates to fixed length (16000 samples=1 sec at 16kHz)
    # Generates attention mask to indicate real audio vs. padding
    audio_input = processor(
        audio["array"],
        sampling_rate=audio["sampling_rate"],
        return_tensors="pt",           # Return as PyTorch tensors
        padding="max_length",          # Pad to fixed size
        truncation=True,               # Truncate if too long
        max_length=16000,              # Fixed length audio input
        return_attention_mask=True     # Return attention mask
    )

    # Tokenize the text label into input IDs using a tokenizer (eg:BERT tokenizer)
    text_input = tokenizer(
        label_text,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=128
    )

    # Return a dictionary of processed inputs(for the model)
    return {
        "audio_input": audio_input.input_values[0],         # Tensor of processed audio
        "audio_attention": audio_input.attention_mask[0],   # Audio attention mask
        "text_input": text_input.input_ids[0],              # Tensor of tokenized text
        "text_attention": text_input.attention_mask[0],     # Text attention mask
        "label": label_idx                                  # Original label index
    }


In [None]:
# Model Architecture

class MultimodalCommandClassifier(nn.Module):
    def __init__(self, audio_model_name="facebook/wav2vec2-base", text_model_name="bert-base-uncased", hidden_dim=768, num_classes=35):
        super().__init__()
        self.audio_encoder = Wav2Vec2Model.from_pretrained(audio_model_name) #load pretrained audio encoder
        self.text_encoder = BertModel.from_pretrained(text_model_name)# pretrained text encoder
        self.audio_proj = nn.Linear(self.audio_encoder.config.hidden_size, hidden_dim) #Project audio features to a common hidden size
        self.text_proj = nn.Linear(self.text_encoder.config.hidden_size, hidden_dim)
        decoder_layer = nn.TransformerDecoderLayer(d_model=hidden_dim, nhead=8)#to fuse audio and text representations
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=2)
        self.classifier = nn.Linear(hidden_dim, num_classes)#Final classification layer to predict the command label

    def forward(self, audio_input, audio_attention, text_input, text_attention):
        audio_feat = self.audio_encoder(audio_input, attention_mask=audio_attention).last_hidden_state
        audio_proj = self.audio_proj(audio_feat).permute(1, 0, 2)
        text_feat = self.text_encoder(text_input, attention_mask=text_attention).last_hidden_state
        text_proj = self.text_proj(text_feat).permute(1, 0, 2)
        fused = self.transformer_decoder(tgt=audio_proj, memory=text_proj)
        fused = fused.permute(1, 0, 2).mean(dim=1)
        logits = self.classifier(fused)# to get logits for each class
        return logits


In [None]:
# Apply map and filter nulls
print("datasets ")
train_dataset = train_dataset.map(preprocess)

In [None]:
# Handles batching of examples with different tensor types
def collate_fn(batch):
    collated = {}

    # Iterate over keys like "audio_input", "text_input"
    for key in batch[0]:
        values = [example[key] for example in batch]

        if isinstance(values[0], torch.Tensor):
            # Stack tensor values into a batch (eg:[B, ...])
            if key in ["text_input", "label"]:
                collated[key] = torch.stack(values).long()  # Convert to LongTensor
            else:
                collated[key] = torch.stack(values)

        elif isinstance(values[0], (int, float)):
            # Convert list of scalars to tensor
            collated[key] = torch.tensor(values)

        elif isinstance(values[0], list):
            # Handle lists (eg: if inputs are lists of ints)
            if key in ["text_input", "label"] and all(isinstance(item, int) for item in values[0]):
                collated[key] = torch.tensor(values, dtype=torch.long)
            else:
                collated[key] = torch.Tensor(values)

    return collated

In [None]:
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, collate_fn=collate_fn)

In [None]:
# Initialize model and training config
model = MultimodalCommandClassifier(num_classes=len(labels))
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
loss_fn = nn.CrossEntropyLoss()
metric_acc = evaluate.load("accuracy")
metric_f1 = evaluate.load("f1")

In [None]:
# Freeze both encoders
for param in model.audio_encoder.parameters():
    param.requires_grad = False
for param in model.text_encoder.parameters():
    param.requires_grad = False

# Optimizer, loss, metrics
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
loss_fn = torch.nn.CrossEntropyLoss()
metric_acc = evaluate.load("accuracy")
metric_f1 = evaluate.load("f1")

# Trackers
train_loss_values = []
train_acc_values = []

best_f1 = 0

# Training loop
start_time = time.time()
for epoch in range(1, 4):
    model.train()
    total_train_loss = 0

    for batch in train_loader:
        for k in batch:
            if isinstance(batch[k], torch.Tensor):
                batch[k] = batch[k].to(device)

        logits = model(batch["audio_input"], batch["audio_attention"],
                       batch["text_input"], batch["text_attention"])
        loss = loss_fn(logits, batch["label"])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()

        with torch.no_grad():
            preds = torch.argmax(logits, dim=1)
            metric_acc.add_batch(predictions=preds, references=batch["label"])
            metric_f1.add_batch(predictions=preds, references=batch["label"])

    # After all batches
    avg_train_loss = total_train_loss / len(train_loader)
    train_metrics = metric_acc.compute()
    train_f1 = metric_f1.compute(average="weighted")

    train_loss_values.append(avg_train_loss)
    train_acc_values.append(train_metrics["accuracy"])

    print(f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Train Acc: {train_metrics['accuracy']:.4f} | Train F1: {train_f1['f1']:.4f}")

end_time = time.time()
print(f"Training Complete  | Total Training Time: {(end_time - start_time)/60:.2f} minutes")

In [None]:
# Save model
torch.save(model.state_dict(), "best_multimodal_fusion_model.pt")

print("Training Complete")

In [None]:
# Preprocess test set

test_dataset = dataset["test"].map(preprocess)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, collate_fn=collate_fn)


In [None]:
model.eval()
test_loss = 0
y_true = []
y_pred = []

with torch.no_grad():
    for batch in test_loader:
        for k in batch:
            if isinstance(batch[k], torch.Tensor):
                batch[k] = batch[k].to(device)

        logits = model(batch["audio_input"], batch["audio_attention"],
                       batch["text_input"], batch["text_attention"])
        loss = loss_fn(logits, batch["label"])
        test_loss += loss.item()

        preds = torch.argmax(logits, dim=1)
        y_true.extend(batch["label"].cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

# Compute Accuracy & F1
import numpy as np
from sklearn.metrics import f1_score, accuracy_score

avg_test_loss = test_loss / len(test_loader)
test_accuracy = accuracy_score(y_true, y_pred)
test_f1 = f1_score(y_true, y_pred, average="weighted")

print(f"\nTest Loss: {avg_test_loss:.4f} | Test Accuracy: {test_accuracy:.4f} | Test F1 Score: {test_f1:.4f}")


In [None]:
# Confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(range(len(labels))))
fig, ax = plt.subplots(figsize=(8, 6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(xticks_rotation=45, ax=ax)
plt.title("Test Set Confusion Matrix")
plt.tight_layout()
plt.savefig("test_confusion_matrix.png")
plt.close()

# Plot test metrics
plt.figure(figsize=(6, 4))
plt.bar(["Loss", "Accuracy", "F1"], [avg_test_loss, test_accuracy, test_f1], color=["tomato", "steelblue", "seagreen"])
plt.title("Test Set Metrics")
plt.ylabel("Score")
plt.ylim(0, 1)
for i, v in enumerate([avg_test_loss, test_accuracy, test_f1]):
    plt.text(i, v + 0.02, f"{v:.2f}", ha='center')
plt.tight_layout()
plt.savefig("test_metrics_plot.png")
plt.show()

In [None]:
def predict_command(audio_path):
    waveform, sr = torchaudio.load(audio_path)

    if sr != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)
        waveform = resampler(waveform)

    audio_input = processor(
        waveform.squeeze(), sampling_rate=16000, return_tensors="pt", padding="max_length", truncation=True, max_length=16000
    )

    input_audio = audio_input.input_values.to(device)
    attention_audio = audio_input.attention_mask.to(device)

    all_logits = []

    for label in labels:
        tokenized = tokenizer(label, return_tensors="pt", padding="max_length", truncation=True, max_length=128)
        input_text = tokenized.input_ids.to(device)
        attention_text = tokenized.attention_mask.to(device)

        with torch.no_grad():
            logits = model(input_audio, attention_audio, input_text, attention_text)
            all_logits.append(logits.squeeze(0))

    logits_stack = torch.stack(all_logits)
    probs = torch.nn.functional.softmax(logits_stack.mean(dim=0), dim=0)
    pred_idx = torch.argmax(probs).item()
    pred_label = id2label[pred_idx]
    confidence = probs[pred_idx].item()

    return pred_label, round(confidence * 100, 2)


In [None]:
import torchaudio

def predict_command(audio_file_path):
    # Load audio from file
    waveform, sample_rate = torchaudio.load(audio_file_path)

    # Resample if necessary
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)

    # Make sure shape [1, 16000]
    if waveform.shape[1] < 16000:
        waveform = torch.nn.functional.pad(waveform, (0, 16000 - waveform.shape[1]))
    else:
        waveform = waveform[:, :16000]

    # Preprocess with processor
    audio_input = processor(
        waveform.squeeze(0),
        sampling_rate=16000,
        return_tensors="pt",
        padding="longest"
    )

    input_audio = audio_input.input_values.to(device)

    # Check if attention_mask exists
    if "attention_mask" in audio_input:
        attention_audio = audio_input.attention_mask.to(device)
    else:
        attention_audio = torch.ones_like(input_audio, dtype=torch.long).to(device)

    # Create dummy text input (until you connect Pinecone retrieval)
    dummy_text = tokenizer("dummy", return_tensors="pt", padding="max_length", truncation=True, max_length=128)
    input_text = dummy_text.input_ids.to(device)
    attention_text = dummy_text.attention_mask.to(device)

    # Forward pass
    model.eval()
    with torch.no_grad():
        logits = model(input_audio, attention_audio, input_text, attention_text)
        probs = torch.softmax(logits, dim=-1)
        top_pred = torch.argmax(probs, dim=-1)
        confidence = probs[0, top_pred].item()

    pred_label = labels[top_pred.item()]
    return pred_label, confidence * 100


In [None]:

# ----- Gradio UI -----
def gradio_interface(audio):
    pred_label, confidence = predict_command(audio)
    return f" Predicted Command: {pred_label}\n Confidence: {confidence:.2f}%"

demo = gr.Interface(
    fn=gradio_interface,
    inputs=gr.Audio(type="filepath", label="Upload or Record Audio"),
    outputs=gr.Textbox(label="Predicted Command & Confidence"),
    title="Spoken Command Classifier",
    description="Upload or record a spoken command"
)

demo.launch(debug=True)

In [None]:
!pip install nbformat


In [None]:
import nbformat

filename = "/content/drive/MyDrive/Colab Notebooks/SpokenCommandClassification_Project2.ipynb"  # Replace with your notebook name

with open(filename, "r", encoding="utf-8") as f:
    nb = nbformat.read(f, as_version=4)

if "widgets" in nb["metadata"]:
    if "state" not in nb["metadata"]["widgets"]:
        nb["metadata"]["widgets"]["state"] = {}

# Save the corrected notebook
with open(filename, "w", encoding="utf-8") as f:
    nbformat.write(nb, f)

print("Notebook fixed ✅")
