# RNNs, LSTMs & GRUs

#### Description

This tutorial was based off a deep learning coursework. The aim was to implement basic RNN cells, as well as LSTMs and GRUs to better understand their inner mechanisms.


### Dataset

I will be using the Google [*Speech Commands*](https://www.tensorflow.org/tutorials/sequences/audio_recognition) v0.02 [1] dataset. In particular, I will be using a subset of the dataset containing only the words "one", "two" and "three". In this notebook, I will be developing RNNs to classify the respective audio signals into the approriate labels. Rather than working with the raw audio signals, we will be using Mel spectogram representations of the original data. 

[1] Warden, P. (2018). [Speech commands: A dataset for limited-vocabulary speech recognition](https://arxiv.org/abs/1804.03209). *arXiv preprint arXiv:1804.03209.*

In [None]:
## MAKE SURE THIS POINTS INSIDE THE DATASET FOLDER.
dataset_folder = "../" # this should change depending on where you have stored the data files

In [None]:
import math
import os
import random
from collections import defaultdict

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset
import numpy as np
from scipy.io.wavfile import read
import librosa
from matplotlib import pyplot as plt

cuda = True if torch.cuda.is_available() else False

Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor


In [None]:
def set_seed(seed_value):
    """Set seed for reproducibility.
    """
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)

set_seed(42)

In [None]:
class SpeechCommandsDataset(Dataset):
    """Google Speech Commands dataset."""

    def __init__(self, root_dir, split):
        """
        Args:
            root_dir (string): Directory with all the data files.
            split    (string): In ["train", "valid", "test"].
        """
        self.root_dir = root_dir
        self.split = split

        self.number_of_classes = len(self.get_classes())

        self.class_to_file = defaultdict(list)

        self.valid_filenames = self.get_valid_filenames()
        self.test_filenames = self.get_test_filenames()

        for c in self.get_classes():
            file_name_list = sorted(os.listdir(self.root_dir + "data_speech_commands_v0.02/" + c))
            for filename in file_name_list:
                if split == "train":
                    if (filename not in self.valid_filenames[c]) and (filename not in self.test_filenames[c]):
                        self.class_to_file[c].append(filename)
                elif split == "valid":
                    if filename in self.valid_filenames[c]:
                        self.class_to_file[c].append(filename)
                elif split == "test":
                    if filename in self.test_filenames[c]:
                        self.class_to_file[c].append(filename)
                else:
                    raise ValueError("Invalid split name.")

        self.filepath_list = list()
        self.label_list = list()
        for cc, c in enumerate(self.get_classes()):
            f_extension = sorted(list(self.class_to_file[c]))
            l_extension = [cc for i in f_extension]
            f_extension = [self.root_dir + "data_speech_commands_v0.02/" + c + "/" + filename for filename in f_extension]
            self.filepath_list.extend(f_extension)
            self.label_list.extend(l_extension)
        self.number_of_samples = len(self.filepath_list)

    def __len__(self):
        return self.number_of_samples

    def __getitem__(self, idx):
        sample = np.zeros((16000, ), dtype=np.float32)

        sample_file = self.filepath_list[idx]

        sample_from_file = read(sample_file)[1]
        sample[:sample_from_file.size] = sample_from_file
        sample = sample.reshape((16000, ))
        
        sample = librosa.feature.mfcc(y=sample, sr=16000, hop_length=512, n_fft=2048).transpose().astype(np.float32)

        label = self.label_list[idx]

        return sample, label

    def get_classes(self):
        return ['one', 'two', 'three']

    def get_valid_filenames(self):
        class_names = self.get_classes()

        class_to_filename = defaultdict(set)
        with open(self.root_dir + "data_speech_commands_v0.02/validation_list.txt", "r") as fp:
            for line in fp:
                clean_line = line.strip().split("/")

                if clean_line[0] in class_names:
                    class_to_filename[clean_line[0]].add(clean_line[1])

        return class_to_filename

    def get_test_filenames(self):
        class_names = self.get_classes()

        class_to_filename = defaultdict(set)
        with open(self.root_dir + "data_speech_commands_v0.02/testing_list.txt", "r") as fp:
            for line in fp:
                clean_line = line.strip().split("/")

                if clean_line[0] in class_names:
                    class_to_filename[clean_line[0]].add(clean_line[1])

        return class_to_filename

In [None]:

train_dataset = SpeechCommandsDataset(dataset_folder,
                                      "train")
valid_dataset = SpeechCommandsDataset(dataset_folder,
                                      "valid")

test_dataset = SpeechCommandsDataset(dataset_folder,
                                     "test")

batch_size = 100


num_epochs = 20
valid_every_n_steps = 20
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=valid_dataset,
                                           batch_size=batch_size,
                                           shuffle=False)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

### Question 1:  Finalise the LSTM and GRU cells by completing the missing code

You are allowed to use nn.Linear.

In [None]:
class LSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
    
        self.x2h = nn.Linear(input_size, 4*hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, 4*hidden_size, bias=bias)
        self.reset_parameters()

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)
            hx = (hx, hx)
            
        hx, cx = hx # packing both hidden states
        preact = self.x2h(input) + self.h2h(hx)
        i, o, f, c_tilde = torch.chunk(preact, 4, dim=-1)
        # Activations
        i, o, f, c_tilde = i.sigmoid(), o.sigmoid(), f.sigmoid(), c_tilde.tanh()
        cy = torch.mul(f, cx) + torch.mul(i, c_tilde)
        hy = torch.mul(o, cy.tanh())
        return (hy, cy)

class BasicRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True, nonlinearity="tanh"):
        super(BasicRNNCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        self.nonlinearity = nonlinearity
        if self.nonlinearity not in ["tanh", "relu"]:
            raise ValueError("Invalid nonlinearity selected for RNN.")

        self.x2h = nn.Linear(input_size, hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, hidden_size, bias=bias)

        self.reset_parameters()
        

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

            
    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)

        activation = getattr(nn.functional, self.nonlinearity)
        hy = activation(self.x2h(input) + self.h2h(hx))

        return hy

class GRUCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(GRUCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias

        self.x2h = nn.Linear(input_size, 2*hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, 2*hidden_size, bias=bias)
        self.x2r = nn.Linear(input_size, hidden_size, bias=bias)
        self.h2r = nn.Linear(hidden_size, hidden_size, bias=bias)
        self.reset_parameters()
        

    def reset_parameters(self):
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)

    def forward(self, input, hx=None):
        if hx is None:
            hx = input.new_zeros(input.size(0), self.hidden_size, requires_grad=False)
        gates = (self.x2r(input) + self.h2r(hx)).sigmoid()
        r, z = torch.chunk(gates, 2, dim=-1)
        # Activations
        ht_tilde = (self.h2h(torch.mul(r, hx) +self.x2h(input))).tanh()
        hy = torch.mul(1 - z, h) + torch.mul(z, ht_tilde) # compute hidden unit
        return hy

## Finalise the RNNModel and BidirRecurrentModel

Note that there are serveral different ways that one can implement a bi-directional recurrent neural network. In this task I implement bidirectional RNNs with one RNN going each way.

In [None]:
class RNNModel(nn.Module):
    def __init__(self, mode, input_size, hidden_size, num_layers, bias, output_size):
        super(RNNModel, self).__init__()
        self.mode = mode
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.output_size = output_size
        self.rnn_cell_list = nn.ModuleList([])
        
        if mode == 'LSTM':
            self.rnn_cell_list.append(LSTMCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(LSTMCell(hidden_size, hidden_size, bias=True))

        elif mode == 'GRU':
            self.rnn_cell_list.append(GRUCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(GRUCell(hidden_size, hidden_size, bias=True)) 
        
        elif mode == 'RNN_TANH':
            self.rnn_cell_list.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='tanh'))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='tanh'))
                
        elif mode == 'RNN_RELU':
            self.rnn_cell_list.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='relu'))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='relu'))


        else:
            raise ValueError("Invalid RNN mode selected.")


        self.att_fc = nn.Linear(self.hidden_size, 1)
        self.fc = nn.Linear(self.hidden_size, self.output_size)

        
    def forward(self, input, hx=None):

        outs = []
        h0 = [None] * self.num_layers if hx is None else list(hx)
        
        # In this forward pass we want to create our RNN from the rnn cells,
        # ..taking the hidden states from the final RNN layer and passing these 
        # ..through our fully connected layer (fc).
        
        # The multi-layered RNN should be able to run when the mode is either 
        # .. LSTM, GRU, RNN_TANH or RNN_RELU.
        
        h_prev = input
        for n in range(self.num_layers):
            h_new = torch.zeros(input.size(0), input.size(1), self.hidden_size)
            ht = h0[n]
            for j in range(input.size(1)): # for each sequence step
                ht = self.rnn_cell_list[n](h_prev[:,j,:], ht) # forward step for each cell
                h_new[:,j,:] = ht if self.mode != 'LSTM' else ht[0]
            h_prev = h_new

        out = h_prev[:,-1,:].squeeze()
        out = self.fc(out)
        
        return out
    

class BidirRecurrentModel(nn.Module):
    def __init__(self, mode, input_size, hidden_size, num_layers, bias, output_size):
        super(BidirRecurrentModel, self).__init__()
        self.mode = mode
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.output_size = output_size
        
        self.rnn_cell_list = nn.ModuleList()
        self.rnn_cell_list_rev = nn.ModuleList()
        
        if mode == 'LSTM':
            self.rnn_cell_list.append(LSTMCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(LSTMCell(hidden_size, hidden_size, bias=True))
            self.rnn_cell_list_rev.append(LSTMCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list_rev.append(LSTMCell(hidden_size, hidden_size, bias=True))

        elif mode == 'GRU':
            self.rnn_cell_list.append(GRUCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(GRUCell(hidden_size, hidden_size, bias=True))
            self.rnn_cell_list_rev.append(GRUCell(input_size, hidden_size, bias=True))
            for l in range(1, num_layers):
                self.rnn_cell_list_rev.append(GRUCell(hidden_size, hidden_size, bias=True))     
        
        elif mode == 'RNN_TANH':
            self.rnn_cell_list.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='tanh'))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='tanh'))
            self.rnn_cell_list_rev.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='tanh'))
            for l in range(1, num_layers):
                self.rnn_cell_list_rev.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='tanh'))
                
        elif mode == 'RNN_RELU':
            self.rnn_cell_list.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='relu'))
            for l in range(1, num_layers):
                self.rnn_cell_list.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='relu'))
            self.rnn_cell_list_rev.append(BasicRNNCell(input_size, hidden_size, bias=True, nonlinearity='relu'))
            for l in range(1, num_layers):
                self.rnn_cell_list_rev.append(BasicRNNCell(hidden_size, hidden_size, bias=True, nonlinearity='relu'))
     
    def forward(self, input, hx=None):
        
        # In this forward pass we want to create our Bidirectional RNN from the rnn cells,
        # .. taking the hidden states from the final RNN layer with their reversed counterparts
        # .. before concatening these and running them through the fully connected layer (fc)
        
        # The multi-layered RNN should be able to run when the mode is either 
        # .. LSTM, GRU, RNN_TANH or RNN_RELU.
        
        outs = []
        outs_rev = []
        
        X = list(input.permute(1, 0, 2))
        X_rev = list(input.permute(1, 0, 2))
        X_rev.reverse()
        hi = [None] * self.num_layers if hx is None else list(hx)
        hi_rev = [None] * self.num_layers if hx is None else list(hx)
        for j in range(self.num_layers):
            hx = hi[j]
            hx_rev = hi_rev[j]
            for i in range(input.shape[1]):
                hx = self.rnn_cell_list[j](X[i], hx)
                X[i] = hx if self.mode != 'LSTM' else hx[0]
                hx_rev = self.rnn_cell_list_rev[j](X_rev[i], hx_rev)
                X_rev[i] = hx_rev if self.mode != 'LSTM' else hx_rev[0]
        outs = X 
        outs_rev = X_rev 
        out = outs[-1].squeeze()
        out_rev = outs_rev[0].squeeze()
        out = torch.cat((out, out_rev), 1)

        out = self.fc(out)
        return out

## Validating Model Performance

In [None]:

seq_dim, input_dim = train_dataset[0][0].shape
output_dim = 3

hidden_dim = 32
layer_dim = 3
bias = True

### Change the code below to try running different models:
model = RNNModel("LSTM", input_dim, hidden_dim, layer_dim, bias, output_dim)
# model = BidirRecurrentModel("LSTM", input_dim, hidden_dim, layer_dim, bias, output_dim)

if torch.cuda.is_available():
    model.cuda()
    
criterion = nn.CrossEntropyLoss()

learning_rate = 0.01
optimizer = torch.optim.Adamax(model.parameters(), lr=learning_rate)

loss_list = []
iter = 0
max_v_accuracy = 0
reported_t_accuracy = 0
max_t_accuracy = 0
for epoch in range(num_epochs):
    for i, (audio, labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
            labels = Variable(labels.cuda())
        else:
            audio = Variable(audio.view(-1, seq_dim, input_dim))
            labels = Variable(labels)

        optimizer.zero_grad()

        outputs = model(audio)
        if outputs.dim() == 1:
            outputs = outputs.view(1, -1)
        loss = criterion(outputs, labels)

        if torch.cuda.is_available():
            loss.cuda()

        loss.backward()

        optimizer.step()

        loss_list.append(loss.item())
        iter += 1

        if iter % valid_every_n_steps == 0:
            correct = 0
            total = 0
            for audio, labels in valid_loader:
                if torch.cuda.is_available():
                    audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
                else:
                    audio = Variable(audio.view(-1, seq_dim, input_dim))

                outputs = model(audio)
                _, predicted = torch.max(outputs.data, 1)

                total += labels.size(0)

                if torch.cuda.is_available():
                    correct += (predicted.cpu() == labels.cpu()).sum()
                else:
                    correct += (predicted == labels).sum()

            v_accuracy = 100 * correct // total
            
            is_best = False
            if v_accuracy >= max_v_accuracy:
                max_v_accuracy = v_accuracy
                is_best = True

            if is_best:
                for audio, labels in test_loader:
                    if torch.cuda.is_available():
                        audio = Variable(audio.view(-1, seq_dim, input_dim).cuda())
                    else:
                        audio = Variable(audio.view(-1, seq_dim, input_dim))

                    outputs = model(audio)

                    _, predicted = torch.max(outputs.data, 1)

                    total += labels.size(0)

                    if torch.cuda.is_available():
                        correct += (predicted.cpu() == labels.cpu()).sum()
                    else:
                        correct += (predicted == labels).sum()

                t_accuracy = 100 * correct // total
                reported_t_accuracy = t_accuracy

            print('Iteration: {}. Loss: {}. V-Accuracy: {}  T-Accuracy: {}'.format(iter, loss.item(), v_accuracy, reported_t_accuracy))

