In [None]:
# @title Pip install comands
!pip install librosa
!pip install torch torchvision torchaudio
!pip install torch
!pip install jiwer
!pip install wandb
!pip install datasets


In [13]:
# @title Imports

import os
import librosa
import numpy as np
import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import torch.optim as optim
from jiwer import wer
from google.colab import drive
import pdb
import wandb
from datasets import load_dataset, load_metric
import random
cer_metric = load_metric("cer")


In [14]:
# @title Mount drive

drive.mount('/content/drive')
dir_path = '/content/drive/My Drive/AudioFinalProject/data'
wandb.login()




Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


True

In [15]:
# @title Constants

WAV_PATH = "/wav"
TXT_PATH = "/txt"
TRAIN_PATH = "/train"
VAL_PATH = "/val"
TEST_PATH = "/test"
MODEL_FILE_PATH = dir_path + "/model_files/model_weights.pth"

hparams = {
      "n_cnn_layers": 4,
      "n_rnn_layers": 3,
      "rnn_dim": 512,
      "n_class": 28,
      "n_feats": 16,
      "stride": 2,
      "dropout": 0.1,
      "learning_rate": 0.0005,
      "batch_size": 16,
      "epochs": 200
  }


In [16]:
class CustomDataset(Dataset):

    def __init__(self, data, labels, input_lengths, label_lengths,is_train):
        """
        Init CustomDataset obj
        :param data: spectrograms extracted from wav files
        :param labels: text of the wav files
        :param input_lengths: of wav
        :param label_lengths: of text
        :param is_train: boolean
        """
        self.data = data
        self.labels = labels
        self.input_lengths = input_lengths
        self.label_lengths = label_lengths
        self.is_train = is_train

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        returns data[idx]
        :param idx: index
        :return: data_sample, labels_sample, input_lengths_sample, label_lengths_sample
        """
        # Retrieve individual samples using the provided index
        data_sample = self.data[idx]
        labels_sample = self.labels[idx]
        input_lengths_sample = self.input_lengths[idx]
        label_lengths_sample = self.label_lengths[idx]

        # You may need to convert your data to PyTorch tensors if they are not already
        data_sample = torch.tensor(data_sample, dtype=torch.float32)
        labels_sample = torch.tensor(labels_sample, dtype=torch.long)
        input_lengths_sample = torch.tensor(input_lengths_sample, dtype=torch.int32)
        label_lengths_sample = torch.tensor(label_lengths_sample, dtype=torch.int32)

        masking = nn.Sequential(
            torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
            torchaudio.transforms.TimeMasking(time_mask_param=20))

        if self.is_train: # masking  train
            augmentation_prob = random.random()
            if augmentation_prob < 0.4:
                data_sample = masking(data_sample)

        return data_sample, labels_sample, input_lengths_sample, label_lengths_sample

# base model - phase 1
class CNN(torch.nn.Module):

  def __init__(self,stride=2):
    """
    Base model - build from 3 convolution layers followed by BatchNorm and Relu, and finaly a fully connected layer.
    :param stride: how much the filter moves across the input
    """
    super().__init__()
    self.cnn = nn.Sequential(
        nn.Conv2d(1, 32, 3, stride, padding=3 // 2),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.Conv2d(32, 32, 3, 1, padding=3 // 2),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.Conv2d(32, 32, 3, 1, padding=3 // 2),
        nn.BatchNorm2d(32),
        nn.ReLU(),
    )
    self.fc = torch.nn.Linear(256, 28)

  def forward(self, x):
    """
    Takes the input image and propagates it through the network, layer by layer.
    :param x: input
    :return: output of the cnn
    """
    x = self.cnn(x)
    sizes= x.shape
    x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
    x = x.transpose(1, 2)
    x = self.fc(x)
    return x

# second model - phase 2
class model2(nn.Module):
    def __init__(self, stride, n_feats, rnn_dim, n_class, n_layers=1, bidirectional=True, dropout_prob=0.1):
        """
        This model is based on the previous model, but with the following additions:
        * Dropout after the ReLU activation function in the CNN layer.
        * A fully connected layer between the CNN and LSTM layers.
        * An LSTM layer to model sequential data and capture long range dependencies.
        :param stride: Stride of the convolution operation in the CNN layer.
        :param n_feats: Number of features from the input audio data.
        :param rnn_dim: Dimensionality of the hidden state in the LSTM layer.
        :param n_class: Number of classes in the classification task.
        :param n_layers: Number of layers in the LSTM layer.
        :param bidirectional:  Whether to use a bidirectional LSTM layer.
        :param dropout_prob: Dropout probability.
        """
        super(SpeechRecognitionModel, self).__init__()
        n_feats = round(n_feats / 2)

        # CNN layer
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, 3, stride, padding=3 // 2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, 3, 1, padding=3 // 2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Dropout2d(dropout_prob)  # Adding dropout after ReLU
        )


        # FC layer needed before lstm
        self.fc = nn.Linear(n_feats * 32, rnn_dim)

        # LSTM layer
        self.rnn = nn.LSTM(input_size=rnn_dim, hidden_size=rnn_dim, num_layers=n_layers,
                           batch_first=True, bidirectional=bidirectional, dropout=dropout_prob)

        # Classifier layer
        self.classifier = nn.Linear(rnn_dim * 2 if bidirectional else rnn_dim, n_class)

    def forward(self, x):
        """
        Takes the input and propagates it through the network, layer by layer.
        :param x: input
        :return: output of the cnn
        """
        x = self.cnn(x)
        batch, channel, feature, time = x.size()
        x = x.view(batch, channel * feature, time)  # (batch, feature, time)
        x = x.transpose(1, 2)  # (batch, time, feature)
        x = self.fc(x)
        x, _ = self.rnn(x)
        x = self.classifier(x)
        return x

# phase 3 - deep speech model
class CNNLayerNorm(nn.Module):

    def __init__(self, n_feats):
        """
        CNN Model
        :param n_feats: Number of features from the input audio data.
        """
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        """
        Takes the input and propagates it through the network, layer by layer.
        :param x: input
        :return: output of the cnn
        """
        x = x.transpose(2, 3).contiguous()  # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous()  # (batch, channel, feature, time)


class ResidualCNN(nn.Module):



    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        """
        Residual CNNs uses shortcut connections to help the network learn more complex features.
        :param in_channels: of input
        :param out_channels: of input
        :param kernel: size
        :param stride: Stride of the convolution operation in the CNN layer.
        :param dropout: Dropout probability.
        :param n_feats:  Number of features from the input audio data.
        """
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel // 2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel // 2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)

    def forward(self, x):
        """
        Takes the input and propagates it through the network, layer by layer.
        :param x: input
        :return: output of the cnn
        """
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = F.relu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x  # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        """
        GRUs are a type of recurrent neural network (RNN) that are well-suited for modeling sequential data, such as speech and text, because they allow to learn long-range dependencies in the data from both directions.
        :param rnn_dim:  Dimensionality of the hidden state in the GRU layer.
        :param hidden_size: Dimensionality of the hidden state in the GRU layer.
        :param dropout: Dropout probability.
        :param batch_first: Whether the input data is in batch-first format.
        """
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Takes the input and propagates it through the network, layer by layer.
        :param x: input
        :return: output of the cnn
        """
        x = self.layer_norm(x)
        x = F.relu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    """
    speech recognition model that uses a combination of convolutional neural networks (CNNs) and bidirectional recurrent neural networks (RNNs) to extract features from audio and after CTC process predicts the text said in the audio file.
    """
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats // 2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3 // 2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats)
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats * 32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i == 0 else rnn_dim * 2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i == 0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim * 2, rnn_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )



    def forward(self, x):
        """
        Takes the input and propagates it through the network, layer by layer.
        :param x: input
        :return: output of the cnn
        """
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x

In [1]:
def pad_strings_to_equal_length(str1, str2):
    """
    The function first finds the lengths of the two strings and then determines which string is longer and which string is shorter. It then calculates the difference in lengths and pads the shorter string with spaces to match the length of the longer string
    :param str1: text string
    :param str2: text string
    :return: a list of the two strings, with the shorter string padded with spaces.
    """
    # Find the lengths of the strings
    len1 = len(str1)
    len2 = len(str2)

    # Determine the longer and shorter strings
    if len1 > len2:
        longer_str = str1
        shorter_str = str2
        length_diff = abs(len1 - len2)

        # Pad the shorter string with spaces
        padded_shorter_str = shorter_str + ' ' * length_diff

        return [longer_str], [padded_shorter_str]
    else:
        longer_str = str2
        shorter_str = str1

    # Calculate the difference in lengths
    length_diff = abs(len1 - len2)

    # Pad the shorter string with spaces
    padded_shorter_str = shorter_str + ' ' * length_diff

    return [padded_shorter_str], [longer_str]

def map_txt_to_int(text):
    """
    Converts each character to an integer using the dictionary.
    :param text: string.
    :return: a list of the integers
    """
    char_to_integer = {' ': 0, 'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7,
                       'H': 8, 'I': 9, 'J': 10, 'K': 11, 'L': 12, 'M': 13, 'N': 14,
                       'O': 15, 'P': 16, 'Q': 17, 'R': 18, 'S': 19, 'T': 20, 'U': 21,
                       'V': 22, 'W': 23, 'X': 24, 'Y': 25, 'Z': 26}

    # Convert text to a list of integers using the mapping
    integer_list = [char_to_integer[char] for char in text]
    return integer_list


def map_int_to_txt(labels):
    """
    Maps the integer 0 to the space character and the integers 1-26 to the letters A-Z.
    :param labels: integers
    :return: text
    """
    integer_to_char = {0: ' ', 1: 'A', 2: 'B', 3: 'C', 4: 'D', 5: 'E', 6: 'F', 7: 'G',
                       8: 'H', 9: 'I', 10: 'J', 11: 'K', 12: 'L', 13: 'M', 14: 'N',
                       15: 'O', 16: 'P', 17: 'Q', 18: 'R', 19: 'S', 20: 'T', 21: 'U',
                       22: 'V', 23: 'W', 24: 'X', 25: 'Y', 26: 'Z'}

    # Convert list of integers to text using the reverse mapping
    text = ''.join(integer_to_char[integer] for integer in labels)
    return text


def load_data(dir_path,hparams, is_train=True):
    """
    Loads audio and text data for training or testing.
    :param dir_path:  The path to the directory containing the audio and text data.
    :param hparams: hyperparameters.
    :param is_train: Whether the data is being loaded for training or testing.
    :return: spectograms, labels, input_lengths, label_lengths.
    """
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []

    wav_files_list = os.listdir(dir_path + WAV_PATH)

    # 1.load wav and txt files and find max_length
    for wav_filename in wav_files_list:
        txt_filename = wav_filename[:-3] + "txt"  # replace 'wav' end with 'txt'
        wav_path = os.path.join(dir_path + WAV_PATH, wav_filename)
        txt_path = os.path.join(dir_path + TXT_PATH, txt_filename)
        waveform, sr = librosa.load(wav_path)
        waveform = torch.tensor(waveform)

        extract_mfcc = torchaudio.transforms.MFCC(
          sample_rate=sr,
          n_mfcc=hparams["n_feats"],
          )
        spec = extract_mfcc(waveform).T

        spectrograms.append(spec)

        # fill labels with txt
        with open(txt_path) as f:
            txt = f.readlines()[0]
        label = torch.Tensor(map_txt_to_int(txt))
        labels.append(label)

        # fill input_lengths and label_lengths
        input_lengths.append(spec.shape[0] // 2)
        label_lengths.append(len(label))

    # apply padding to spectrograms and to labels
    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True)
    spectrograms = spectrograms.unsqueeze(1).transpose(2, 3)  # (data_size, channel, freq, spec_length)

    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)
    return spectrograms, labels, torch.from_numpy(np.array(input_lengths)), torch.from_numpy(np.array(label_lengths))


def GreedyDecoder(output, labels, label_lengths, blank_label=27, collapse_repeated=True):
    """
    Decodes the output of a speech recognition model using a greedy decoder.
    :param output: A tensor of output probabilities.
    :param labels: A tensor of ground truth labels.
    :param label_lengths: A tensor of label lengths.
    :param blank_label: The index of the blank label in the output.
    :param collapse_repeated: Whether to collapse repeated characters in the decoded output.
    :return: decodes - A list of decoded strings, one for each batch element.
             targets: A list of ground truth strings, one for each batch element.
    """
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    targets = []
    for i, args in enumerate(arg_maxes):
        decode = []
        targets.append(map_int_to_txt(labels[i][:label_lengths[i]].tolist()))
        for j, index in enumerate(args):
            if index != blank_label:
                if collapse_repeated and j != 0 and index == args[j - 1]:
                    continue
                decode.append(index.item())
        decodes.append(map_int_to_txt(decode))
    return decodes, targets


def train(model, device, train_loader, criterion, optimizer, epoch):
    """
    function is used to train a speech recognition model on a training dataset.
    :param model: The speech recognition model.
    :param device: The device to train the model on - Better use GPU from google colab.
    :param train_loader: A data loader for the training data.
    :param criterion: A loss function.
    :param optimizer: An optimizer.
    :param epoch: Number of epochs.
    :return: A tuple of two floats: the training WER and the training CER
    """
    model.train()
    data_len = len(train_loader.dataset)
    train_wer,train_cer = [],[]

    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data
        spectrograms, labels = spectrograms.to(device), labels.to(device)

        optimizer.zero_grad()

        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1)  # (time, batch, n_class)
        loss = criterion(output, labels, input_lengths, label_lengths)
        loss.backward()
        optimizer.step()

        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, batch_idx * len(spectrograms), data_len,
                   100. * batch_idx / len(train_loader), loss.item()))
        decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)
        for j in range(len(decoded_preds)):
            train_wer.append(wer(decoded_targets[j], decoded_preds[j]))
            str1, str2 =  pad_strings_to_equal_length(decoded_targets[j], decoded_preds[j])
            train_cer.append(cer_metric.compute(references=str1, predictions=str2))

    train_wer = sum(train_wer) / len(train_wer)
    train_cer = sum(train_cer) / len(train_cer)
    return train_wer, train_cer


def test(model, device, test_loader, criterion, is_print=False):
    """
    Evaluates the performance of a speech recognition model on a test dataset.
    The printing part of the function is responsible for printing the outputs of the model for a random sample of 20 batches.
    :param model: The speech recognition model.
    :param device: The device to evaluate the model on.
    :param test_loader: A data loader for the test data.
    :param criterion: A loss function.
    :param is_print: Whether to print the outputs of the model.
    :return: A tuple of two floats: the training WER and the training CER
    """
    print('\nevaluating...')
    model.eval()
    test_loss = 0
    test_wer, test_cer = [], []
    print_outputs = []

    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1)  # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            decoded_preds, decoded_targets = GreedyDecoder(output.transpose(0, 1), labels, label_lengths)

            for j in range(len(decoded_preds)):
                if is_print:
                    output_txt = "Target: " + decoded_targets[j] + "\n"
                    output_txt += "Predic: " + decoded_targets[j] + "\n"
                    output_txt += "-----------------------------------------------"
                    print_outputs.append(output_txt)

                test_wer.append(wer(decoded_targets[j], decoded_preds[j]))
                str1, str2 =  pad_strings_to_equal_length(decoded_targets[j], decoded_preds[j])
                test_cer.append(cer_metric.compute(references=str1, predictions=str2))
        if is_print:
            for output_txt in random.sample(print_outputs, 20):
              print(output_txt)

        test_wer = sum(test_wer) / len(test_wer)
        test_cer = sum(test_cer) / len(test_cer)
        return test_wer, test_cer




def get_data_loader(data_path,hparams,is_train):
    """
    Loads a data loader for a speech recognition dataset.
    :param data_path: The path to the dataset directory.
    :param hparams: A dictionary of hyperparameters.
    :param is_train: Whether to load the training data or the test data.
    :return: A DataLoader object.
    """
    data = load_data(data_path,hparams, is_train)
    dataset = CustomDataset(*data, is_train)
    data_loader = DataLoader(dataset=dataset, batch_size=hparams["batch_size"], shuffle=True)
    return data_loader

def save_model(model, model_path):
    """
    Saves the model weights and biases to a file.
    :param model: The speech recognition model.
    :param model_path: Path to the file where the weights will be saved.
    """
    weights = model.state_dict()
    torch.save(weights, model_path)

def load_model_weights(model, load_path):
    """
    Load the weights of the PyTorch model from a file.
    :param model (torch.nn.Module): The PyTorch model to load the weights into.
    :param load_path (str): The path to the file from which the model weights will be loaded.
    """
    try:
        # pdb.set_trace()
        model.load_state_dict(torch.load(load_path), strict=False)
        print("Model weights loaded successfully from:", load_path)
    except FileNotFoundError:
        print("Error: The file", load_path, "does not exist.")
    except Exception as e:
        print("Error:", str(e))



In [None]:
# @title Load and process data

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

train_loader = get_data_loader(dir_path + TRAIN_PATH,hparams, is_train=True)
val_loader = get_data_loader(dir_path + VAL_PATH,hparams, is_train=False)
test_loader = get_data_loader(dir_path + TEST_PATH,hparams, is_train=False)


In [None]:
# @title Train and Test (saves WER)
wandb.init(
    project="AudioFinalProject - WER2",
    name=f"Model AudioFinalProject",
    )
# Phase 1 - cnn model
# model = CNN(hparams['stride']).to(device) #conv model

# Phase 2 - cnn and lstm model
# model = model2(hparams['stride'], hparams['n_feats'], hparams['rnn_dim'], hparams['n_class']).to(
#     device)

# phase 3 - DeepSpeech model
model = SpeechRecognitionModel(
    hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
    hparams['n_class'],  hparams["n_feats"], hparams['stride'], hparams['dropout']
).to(device)

optimizer = optim.Adam(model.parameters(), hparams['learning_rate'])
criterion = nn.CTCLoss(blank=27).to(device)


# load_model_weights(model, MODEL_FILE_PATH)

for epoch in range(1, hparams['epochs'] + 1):
    # train and validate model
    train_wer, train_cer  = train(model, device, train_loader, criterion, optimizer, epoch)
    val_wer, val_cer = test(model, device, val_loader, criterion) # validation

    # log the results
    wandb.log({"train wer": train_wer,"train cer": train_cer,"val wer": val_wer,"val cer": val_cer})


test_wer, test_cer = test(model, device, test_loader, criterion)
print("test wer: " + str(test_wer))
print("test cer: " + str(test_cer))

# save_model(model, MODEL_FILE_PATH)


In [None]:
def predict(device):
  """
  The function predict() loads a pre-trained speech recognition model and evaluates its performance on a test dataset.
  :param device: The device to load the model on.
  """
  model = SpeechRecognitionModel(
      hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
      hparams['n_class'],  hparams["n_feats"], hparams['stride'], hparams['dropout']
  ).to(device)
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  criterion = nn.CTCLoss(blank=27).to(device)
  load_model_weights(model, MODEL_FILE_PATH)
  test_loader = get_data_loader(dir_path + TEST_PATH,hparams, is_train=False)
  test_wer, test_cer = test(model, device, test_loader, criterion, is_print=True)
  print("test wer: " + str(test_wer))
  print("test cer: " + str(test_cer))

predict(device)
