In [None]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
from torchvision import datasets, transforms
import gc
import shutil

In [None]:
## Input the necessary directories

#Excel Files containing file names and labels
excel_root = r'G:/School/2020 Mozilla Dataset/Excel Files/'

# Location of where the unzipped numpy files are located
root = r'G:/School/2020 Mozilla Dataset/mp3_to_np'
''' 
Note: Audio files must first be converted to spectrogram
as a numpy array on Google Colab before running this code on
local machine that runs Windows 10. Windows 10 cannot run
Torchaudio, which is required to convert audio files (such
as mp3, wav, or m4a) to spectrograms.
'''

# Location of pretrained saved models
model_fpath = r'G:/School/2020 Mozilla Dataset/saved_models/'

In [None]:
# Get list of all the train, val, and test file names and labels

train_excel = pd.read_excel(excel_root + 'train.xlsx', sheet_name=0)
val_excel = pd.read_excel(excel_root + 'validation.xlsx', sheet_name=0)
test_excel = pd.read_excel(excel_root + 'test.xlsx', sheet_name=0)

# Train
train_filenames_labels = (train_excel['path'].tolist(), train_excel['sentence'].tolist())

# Validation
val_filenames_labels = (val_excel['path'].tolist(), val_excel['sentence'].tolist())

# Test
test_filenames_labels = (test_excel['path'].tolist(), test_excel['sentence'].tolist())

In [None]:
# Define a silu function

def silu(input):
    '''
    Applies the Sigmoid Linear Unit (SiLU) function element-wise:
        SiLU(x) = x * sigmoid(x)
    '''
    return input * torch.sigmoid(input) # use torch.sigmoid to make sure that we created the most efficient implemetation based on builtin PyTorch functions

# create a class wrapper from PyTorch nn.Module, so
# the function now can be easily used in models
class SiLU(nn.Module):
    '''
    Applies the Sigmoid Linear Unit (SiLU) function element-wise:
        SiLU(x) = x * sigmoid(x)
    Shape:
        - Input: (N, *) where * means, any number of additional
          dimensions
        - Output: (N, *), same shape as the input
    References:
        -  Related paper:
        https://arxiv.org/pdf/1606.08415.pdf
    Examples:
        >>> m = silu()
        >>> input = torch.randn(2)
        >>> output = m(input)
    '''
    def __init__(self):
        '''
        Init method.
        '''
        super().__init__() # init the base class

    def forward(self, input):
        '''
        Forward pass of the function.
        '''
        return silu(input)

In [None]:
# Functions for pre-processing data

class TextTransform:
    """Maps characters to integers and vice versa"""
    def __init__(self):
        char_map_str = """
        ' 0
        <SPACE> 1
        a 2
        b 3
        c 4
        d 5
        e 6
        f 7
        g 8
        h 9
        i 10
        j 11
        k 12
        l 13
        m 14
        n 15
        o 16
        p 17
        q 18
        r 19
        s 20
        t 21
        u 22
        v 23
        w 24
        x 25
        y 26
        z 27
        """
        self.char_map = {}
        self.index_map = {}
        for line in char_map_str.strip().split('\n'):
            ch, index = line.split()
            self.char_map[ch] = int(index)
            self.index_map[int(index)] = ch
        self.index_map[1] = ' '

    def text_to_int(self, text):
        """ Use a character map and convert text to an integer sequence """
        int_sequence = []
        for c in text:
            if c == ' ':
                ch = self.char_map['<SPACE>']
            else:
                ch = self.char_map[c]
            int_sequence.append(ch)
        return int_sequence

    def int_to_text(self, labels):
        """ Use a character map and convert integer labels to an text sequence """
        string = []
        for i in labels:
            string.append(self.index_map[i])
        return ''.join(string).replace('<SPACE>', ' ')

text_transform = TextTransform()

def data_processing(data):
    spectrograms = []
    labels = []
    input_lengths = []
    label_lengths = []
    for (spec, utterance) in data:
        spectrograms.append(torch.Tensor(spec).detach())
        label = torch.Tensor(text_transform.text_to_int(utterance.lower()))
        labels.append(label.detach())
        input_lengths.append(torch.Tensor(spec).detach().shape[0]//2)
        label_lengths.append(len(label.detach()))

    spectrograms = nn.utils.rnn.pad_sequence(spectrograms, batch_first=True).unsqueeze(1).transpose(2, 3)
    labels = nn.utils.rnn.pad_sequence(labels, batch_first=True)

    return spectrograms, labels, input_lengths, label_lengths

def Decoder(output, labels, label_lengths, blank_label=28, collapse_repeated=True):
    arg_maxes = torch.argmax(output, dim=2)
    decodes = []
    targets = []
    for i, args in enumerate(arg_maxes):
      decode = []
      targets.append(text_transform.int_to_text(labels[i][:label_lengths[i]].tolist()))
      for j, index in enumerate(args):
        if index != blank_label:
          if collapse_repeated and j != 0 and index == args[j -1]:
            continue
          decode.append(index.item())
      decodes.append(text_transform.int_to_text(decode))
    return decodes, targets

In [None]:
# Custom Data Loader that can load custom dataset

def load_speech_item(file_name, label_text, path):
    fpath = path + '/' + file_name

    # Load Audio
    spec = np.load(fpath)   

    return (spec, label_text)

class Data_Loader(Dataset):
    def __init__(self, root, filenames_labels):
        self._path = root
        self._filenames, self._labels = filenames_labels

    def __getitem__(self, n):
        file_name = self._filenames[n] + ".npy"
        label_text = self._labels[n]
        return load_speech_item(file_name, label_text, self._path)
      
    def __len__(self):
        return len(self._filenames)

In [None]:
# The Model

class CNNLayerNorm(nn.Module):
    """Layer normalization built for cnns input"""
    def __init__(self, n_feats):
        super(CNNLayerNorm, self).__init__()
        self.layer_norm = nn.LayerNorm(n_feats)

    def forward(self, x):
        # x (batch, channel, feature, time)
        x = x.transpose(2, 3).contiguous() # (batch, channel, time, feature)
        x = self.layer_norm(x)
        return x.transpose(2, 3).contiguous() # (batch, channel, feature, time) 


class ResidualCNN(nn.Module):

    def __init__(self, in_channels, out_channels, kernel, stride, dropout, n_feats):
        super(ResidualCNN, self).__init__()

        self.cnn1 = nn.Conv2d(in_channels, out_channels, kernel, stride, padding=kernel//2)
        self.cnn2 = nn.Conv2d(out_channels, out_channels, kernel, stride, padding=kernel//2)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.layer_norm1 = CNNLayerNorm(n_feats)
        self.layer_norm2 = CNNLayerNorm(n_feats)
        self.silu = SiLU()

    def forward(self, x):
        residual = x  # (batch, channel, feature, time)
        x = self.layer_norm1(x)
        x = self.silu(x)
        x = self.dropout1(x)
        x = self.cnn1(x)
        x = self.layer_norm2(x)
        x = self.silu(x)
        x = self.dropout2(x)
        x = self.cnn2(x)
        x += residual
        return x # (batch, channel, feature, time)


class BidirectionalGRU(nn.Module):

    def __init__(self, rnn_dim, hidden_size, dropout, batch_first):
        super(BidirectionalGRU, self).__init__()

        self.BiGRU = nn.GRU(
            input_size=rnn_dim, hidden_size=hidden_size,
            num_layers=1, batch_first=batch_first, bidirectional=True)
        self.layer_norm = nn.LayerNorm(rnn_dim)
        self.dropout = nn.Dropout(dropout)
        self.silu = SiLU()

    def forward(self, x):
        x = self.layer_norm(x)
        x = self.silu(x)
        x, _ = self.BiGRU(x)
        x = self.dropout(x)
        return x


class SpeechRecognitionModel(nn.Module):
    
    def __init__(self, n_cnn_layers, n_rnn_layers, rnn_dim, n_class, n_feats, stride=2, dropout=0.1):
        super(SpeechRecognitionModel, self).__init__()
        n_feats = n_feats//2
        self.cnn = nn.Conv2d(1, 32, 3, stride=stride, padding=3//2)  # cnn for extracting heirachal features

        # n residual cnn layers with filter size of 32
        self.rescnn_layers = nn.Sequential(*[
            ResidualCNN(32, 32, kernel=3, stride=1, dropout=dropout, n_feats=n_feats) 
            for _ in range(n_cnn_layers)
        ])
        self.fully_connected = nn.Linear(n_feats*32, rnn_dim)
        self.birnn_layers = nn.Sequential(*[
            BidirectionalGRU(rnn_dim=rnn_dim if i==0 else rnn_dim*2,
                             hidden_size=rnn_dim, dropout=dropout, batch_first=i==0)
            for i in range(n_rnn_layers)
        ])
        self.classifier = nn.Sequential(
            nn.Linear(rnn_dim*2, rnn_dim),  # birnn returns rnn_dim*2
            SiLU(),
            nn.Dropout(dropout),
            nn.Linear(rnn_dim, n_class)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.rescnn_layers(x)
        
        sizes = x.size()
        x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3])  # (batch, feature, time)
        x = x.transpose(1, 2) # (batch, time, feature)
        
        x = self.fully_connected(x)
        x = self.birnn_layers(x)
        x = self.classifier(x)
        return x

In [None]:
# Training and validation

def train(model, device, train_loader, criterion, optimizer, scheduler, epoch):
    model.train()
    data_len = len(train_loader.dataset)
    losses = []

    for batch_idx, _data in enumerate(train_loader):
        spectrograms, labels, input_lengths, label_lengths = _data 
        
        if spectrograms.shape[3] > 2600:
            print("Skipped a batch because it is too large (>2600). Sequence length is", spectrograms.shape[3])
            continue

        spectrograms, labels = spectrograms.to(device), labels.to(device)

        optimizer.zero_grad()
        output = model(spectrograms)  # (batch, time, n_class)
        output = F.log_softmax(output, dim=2)
        output = output.transpose(0, 1) # (time, batch, n_class)
        
        loss = criterion(output, labels, input_lengths, label_lengths)
        losses.append(round(loss.detach().item(), 4))
        loss.backward()

        optimizer.step()
        scheduler.step()
        
        if batch_idx % 100 == 0 or batch_idx == data_len:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(spectrograms), data_len,
                100. * batch_idx / len(train_loader), loss.item()))
            
        torch.cuda.empty_cache()
        del spectrograms, labels
        gc.collect()  
    
    return losses

def test(model, device, test_loader, criterion, epoch):

    model.eval()
    test_loss = 0
    losses = []
    data_len = len(test_loader.dataset)

    with torch.no_grad():
        for i, _data in enumerate(test_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            
            if spectrograms.shape[3] > 2600:
                print("Skipped a batch because it is too large (>2600). Sequence length is", spectrograms.shape[3])
                continue                

            spectrograms, labels = spectrograms.to(device), labels.to(device)

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            test_loss += loss.item() / len(test_loader)

            decoded_preds, decoded_targets = Decoder(output.transpose(0, 1), labels, label_lengths)
            
            if i % 100 == 0 or i == data_len:
                print(decoded_preds, decoded_targets)
            
            torch.cuda.empty_cache()
            del spectrograms, labels
            gc.collect()   

    print(f'Test set: Average loss: {test_loss}.\n')
    
    return round(test_loss, 4)

In [None]:
def main(model, optimizer, scheduler, hparams, train_loader, 
        test_loader, learning_rate=5e-4, batch_size=20, epochs=10):
    
    losses_train, losses_val = [], []

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CTCLoss(blank=28).to(device)

    for epoch in range(1, epochs + 1):
        losses_train.append(train(model, device, train_loader, criterion, optimizer, scheduler, epoch))
        losses_val.append(test(model, device, test_loader, criterion, epoch))
        
        state = {
            'epoch': epoch,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict(),
            'scheduler': scheduler.state_dict()
        }
        
        lr = hparams['learning_rate']
        model_name = f'Final_Model_epoch_{epoch}_lr_{lr}.pt'
        torch.save(state, hparams['model_fpath'] + model_name)
        
    return losses_train, losses_val

In [None]:
# Load parameters

learning_rate = 5e-4
batch_size = 4
epochs = 10
hparams = {
    "n_cnn_layers": 4,
    "n_rnn_layers": 4,
    "rnn_dim": 1024,
    "n_class": 29,
    "n_feats": 128,
    "stride":2,
    "dropout": 0.1,
    "learning_rate": learning_rate,
    "batch_size": batch_size,
    "epochs": epochs,
    "model_fpath": model_fpath
}

In [None]:
# Load Pretrained Model
load_pretrained = False
model_name = f'Final_Model_epoch_3.pt'

# Setup CUDA and load data
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

torch.manual_seed(1)

train_dataset = Data_Loader(root, train_filenames_labels)
test_dataset = Data_Loader(root, val_filenames_labels)

kwargs = {'num_workers': 0, 'pin_memory': True} if use_cuda else {}
train_loader = data.DataLoader(dataset=train_dataset,
                            batch_size=hparams['batch_size'],
                            shuffle=True,
                            collate_fn=lambda x: data_processing(x),
                            **kwargs)
test_loader = data.DataLoader(dataset=test_dataset,
                            batch_size=hparams['batch_size'],
                            shuffle=False,
                            collate_fn=lambda x: data_processing(x),
                            **kwargs)

if load_pretrained:
    filepath = model_fpath + model_name
    state = torch.load(filepath)

model = SpeechRecognitionModel(
    hparams['n_cnn_layers'], hparams['n_rnn_layers'], hparams['rnn_dim'],
    hparams['n_class'], hparams['n_feats'], hparams['stride'], hparams['dropout']
    ).to(device)
if load_pretrained:
    model.load_state_dict(state['state_dict'])

optimizer = optim.AdamW(model.parameters(), hparams['learning_rate'])
if load_pretrained:
    optimizer.load_state_dict(state['optimizer'])

scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=hparams['learning_rate'], 
                                        steps_per_epoch=int(len(train_loader)),
                                        epochs=hparams['epochs'],
                                        anneal_strategy='linear')

In [None]:
# Train
losses_train, losses_val = main(model, optimizer, scheduler, hparams, train_loader, 
                                test_loader, learning_rate, batch_size, epochs)


In [None]:
df = pd.DataFrame([list(i) for i in zip(*losses_train)])
writer = pd.ExcelWriter('Final_Model_reducedLR_losses_train.xlsx', engine='xlsxwriter')
df.to_excel(writer,sheet_name='losses_train',index=False)
writer.save() 
'''
Saves where this juypter notebook is saved. To check where the excel file is located,
run the code below %pwd.
'''


df = pd.DataFrame(losses_val)
writer = pd.ExcelWriter('Final_Model_reducedLR_losses_val.xlsx', engine='xlsxwriter')
df.to_excel(writer,sheet_name='losses_val',index=False)
writer.save()
'''
Saves where this juypter notebook is saved. To check where the excel file is located,
run the code below %pwd.
'''

In [None]:
%pwd