In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import glob
import librosa
import numpy as np
from tqdm.auto import tqdm
import os
import random
import wave
import matplotlib.pyplot as plt

In [25]:
def plot_wavs_in_folder(folder_path):
    """
    Reads all WAV files in a folder, converts them to numpy arrays, and plots their waveforms.
    """
    # Iterate through all files in the folder
    for filename in os.listdir(folder_path):
        if filename.endswith('.wav'):  # Process only .wav files
            file_path = os.path.join(folder_path, filename)
            try:
                # Open the WAV file
                wf = wave.open(file_path, 'rb')  

                # Get audio parameters
                nframes = wf.getnframes()
                framerate = wf.getframerate()

                # Read the audio data
                audio_data = wf.readframes(nframes)
                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                
                # Normalize the data
                audio_array = audio_array / (2 ** (wf.getsampwidth() * 8 - 1))

                # Time axis for plotting
                time_axis = np.arange(0, len(audio_array)) / framerate

                # Plot the waveform
                plt.figure(figsize=(10, 4))
                plt.plot(time_axis, audio_array)
                plt.xlabel("Time (seconds)")
                plt.ylabel("Amplitude")
                plt.title(f"Waveform: {filename}")
                plt.grid()
                plt.tight_layout()
                plt.show()
            except Exception as e:
                print(f"Error processing {file_path}: {e}")

# Provide the folder path containing the .wav files
# folder_path = '../data/archers/'
# plot_wavs_in_folder(folder_path)

In [None]:
def gather_data(data_root):
    """
    Scans `data_root` directory. Each subfolder is a class label.
    Returns:
      - all_files: list of (wav_path, label_index) pairs
      - classes: list of class names (strings) in sorted order
    """
    classes = sorted(os.listdir(data_root))
    class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
    
    all_files = []
    for cls_name in classes:
        cls_folder = os.path.join(data_root, cls_name)
        # Gather all WAV files in this folder
        wav_files = glob.glob(os.path.join(cls_folder, "*.wav"))
        for wav_file in wav_files:
            all_files.append((wav_file, class_to_idx[cls_name]))
    
    return all_files, classes


data_root = "../../data"  # Path to your data folder
all_files, classes = gather_data(data_root)
num_classes = len(classes)
print("Found classes:", classes)
print("Total examples:", len(all_files))


In [None]:
random.shuffle(all_files)  # Shuffle in-place

split_idx = int(0.8 * len(all_files))  # 80% for train
train_files = all_files[:split_idx]
val_files   = all_files[split_idx:]

print(f"Train size: {len(train_files)}")
print(f"Val   size: {len(val_files)}")

In [28]:
class SpeechCommandsDataset(Dataset):
    def __init__(self, file_list, n_mfcc=12, sr=16000, augment=False):
        """
        file_list: list of (wav_path, label_index)
        n_mfcc: number of MFCC coefficients
        sr: sample rate to which audio is (optionally) resampled
        augment: if True, apply random augmentations to training data
        """
        self.file_list = file_list
        self.n_mfcc = n_mfcc
        self.sr = sr
        self.augment = augment

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        wav_path, label = self.file_list[idx]
        
        # Load audio
        waveform, sr = librosa.load(wav_path, sr=self.sr)

        # Trim silence
        if label != classes.index('_silence'):
            waveform, _ = librosa.effects.trim(waveform, top_db=20)


        # If augment is enabled, apply random transformations
        if self.augment and len(waveform) > 2048:
            # Random time-stretch (speed up/down by up to +/-10%)
            if random.random() < 0.5:
                rate = 1.0 + np.random.uniform(-0.1, 0.1)  # e.g., 0.9 to 1.1
                waveform = librosa.effects.time_stretch(waveform, rate=rate)

            # Random pitch shift (up/down by up to +/-2 semitones)
            if random.random() < 0.5:
                
                n_steps = np.random.uniform(-2, 2)
                waveform = librosa.effects.pitch_shift(waveform, sr=sr, n_steps=n_steps)

            # Random time shift
            # For example, shift by up to 10% of the wave length
            if random.random() < 0.5:
                shift_max = int(0.1 * len(waveform))
                shift = np.random.randint(-shift_max, shift_max)
                waveform = np.roll(waveform, shift)

            # Random background noise injection
            if random.random() < 0.5:
                noise_level = np.random.uniform(0.01, 0.02)  # Adjust range as needed
                noise = np.random.randn(len(waveform)) * noise_level
                waveform = waveform + noise

        # Now compute MFCC
        mfcc = librosa.feature.mfcc(y=waveform, sr=sr, n_mfcc=self.n_mfcc, n_fft=1024)
        mfcc = mfcc.T  # shape: (time_frames, n_mfcc)

        # Convert to tensors
        mfcc_tensor = torch.tensor(mfcc, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.long)

        return mfcc_tensor, label_tensor

In [29]:
def collate_fn(batch):
    """
    batch: list of (mfcc_tensor, label_tensor) pairs
    We pad MFCC tensors on the time dimension (dim=0) so they have the same length.
    """
    mfccs = [item[0] for item in batch]
    labels = [item[1] for item in batch]

    # Find max sequence length in this batch
    max_len = max(m.shape[0] for m in mfccs)
    n_mfcc  = mfccs[0].shape[1]  # number of MFCC coefficients

    padded_mfccs = []
    for m in mfccs:
        length = m.shape[0]
        pad_length = max_len - length
        if pad_length > 0:
            pad = torch.zeros(pad_length, n_mfcc)
            m = torch.cat([m, pad], dim=0)
        padded_mfccs.append(m)

    # Stack along batch dimension
    padded_mfccs = torch.stack(padded_mfccs, dim=0)
    labels = torch.stack(labels)

    return padded_mfccs, labels


In [30]:
batch_size = 8  # Small batch size, given few examples per class

train_dataset = SpeechCommandsDataset(train_files, n_mfcc=12, sr=16000, augment=True)
val_dataset   = SpeechCommandsDataset(val_files,   n_mfcc=12, sr=16000, augment=False)

train_loader = DataLoader(train_dataset, 
                          batch_size=batch_size, 
                          shuffle=True, 
                          collate_fn=collate_fn)

val_loader   = DataLoader(val_dataset, 
                          batch_size=batch_size, 
                          shuffle=False,
                          collate_fn=collate_fn)


In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM expects (batch, seq, feature)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # x shape: (batch_size, seq_length, input_size)
        batch_size = x.size(0)

        # Initialize hidden state and cell state
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size, device=x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size, device=x.device)

        # LSTM output: (batch_size, seq_length, hidden_size)
        out, _ = self.lstm(x, (h0, c0))
        
        # Take the last time-step
        out = out[:, -1, :]  # shape: (batch_size, hidden_size)
        
        # Fully-connected layer
        out = self.fc(out)   # shape: (batch_size, num_classes)
        return out


# Instantiate the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

input_size = 12      # Because we used n_mfcc=12
hidden_size = 48     # Hyperparameter - tune as needed
num_layers = 2       # Hyperparameter - tune as needed
learning_rate = 1e-3
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)


In [32]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for mfccs, labels in tqdm(dataloader, desc="Training", leave=False):
        mfccs, labels = mfccs.to(device), labels.to(device)
        
        # Forward pass
        outputs = model(mfccs)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * mfccs.size(0)
        
        # Compute accuracy
        _, predicted = torch.max(outputs, dim=1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    epoch_loss = running_loss / total
    epoch_acc  = correct / total
    return epoch_loss, epoch_acc


def evaluate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for mfccs, labels in tqdm(dataloader, desc="Evaluating", leave=False):
            mfccs, labels = mfccs.to(device), labels.to(device)
            
            outputs = model(mfccs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * mfccs.size(0)
            
            _, predicted = torch.max(outputs, dim=1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_loss = running_loss / total
    val_acc  = correct / total
    return val_loss, val_acc


In [None]:
num_epochs = 40
best_loss = float("inf")
best_model_weights = None
for epoch in range(num_epochs):
    print(f"\nEpoch [{epoch+1}/{num_epochs}]")
    
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    
    if val_loss < best_loss:
        best_loss = val_loss
        best_model_weights = model.state_dict()

    print(f"Train Loss: {train_loss:.4f}  |  Train Acc: {train_acc:.4f}")
    print(f"Val   Loss: {val_loss:.4f}  |  Val   Acc: {val_acc:.4f}")


In [None]:
best_loss

In [13]:
torch.save(best_model_weights, "lstm_model.pt")


In [5]:
def predict(model, wav_path, classes, device='cpu', sr=16000, n_mfcc=12):
    """
    Predict the class for a single .wav audio file.

    Args:
        model (nn.Module): Trained PyTorch model.
        wav_path (str): Path to the .wav file.
        classes (list): List of class names (strings), where index=label.
        device (str): 'cpu' or 'cuda' device.
        sr (int): Sample rate to which the audio is (optionally) resampled.
        n_mfcc (int): Number of MFCC features to compute.

    Returns:
        predicted_label (str): The predicted class name.
        confidence (float): Softmax confidence for the predicted class.
    """
    model.load_state_dict(torch.load("/home/linden/Desktop/projects/cr_prediction/src/model/lstm_model.pt", map_location=device))

    # Put model in eval mode
    model.eval()

    # Load and preprocess audio
    waveform, sr = librosa.load(wav_path, sr=sr)       # waveform: float32 numpy array
    mfcc = librosa.feature.mfcc(y=waveform, sr=sr, n_mfcc=n_mfcc)  # shape: (n_mfcc, time_frames)
    mfcc = mfcc.T  # shape: (time_frames, n_mfcc)
    
    # Convert to Torch tensor, add batch dimension => (1, time_frames, n_mfcc)
    mfcc_tensor = torch.tensor(mfcc, dtype=torch.float32).unsqueeze(0).to(device)

    with torch.no_grad():
        # Forward pass
        outputs = model(mfcc_tensor)  # shape: (1, num_classes)
        
        # Optionally compute softmax to get probabilities
        probs = torch.softmax(outputs, dim=1)
        
        # Get predicted class index and its confidence
        predicted_idx = torch.argmax(probs, dim=1).item()
        confidence = probs[0, predicted_idx].item()

    # Map index back to class label
    predicted_label = classes[predicted_idx]
    
    return predicted_label, confidence

In [None]:
predict(model=model, wav_path='/home/linden/Desktop/projects/cr_prediction/11.wav', classes=classes, device='cuda')