In [None]:
from LAB3_1.utils import compute_acc, show_loss, Sequential_mnist

from typing import Tuple
from itertools import product

import torch, copy
from torch import Tensor,zeros, cuda
from torch.nn import Module, RNN, GRU, LSTM, Linear, CrossEntropyLoss
from torch.nn.utils import clip_grad_norm_
from torch.optim.adam import Adam
from torch.utils.data import Dataset, DataLoader

In [None]:
gpu = 'cuda' if cuda.is_available() else 'cpu'

## Model

In [None]:
class Recurrent_RNN(Module): # Recurrent Neural network
    def __init__(self,rnn_type:str, hidden: int, layers:int, bi:bool):
        super(Recurrent_RNN, self).__init__()

        if rnn_type == "RNN":
            self.rnn = RNN(1, hidden, num_layers=layers, bidirectional=bi, batch_first=True)
        elif rnn_type == "LSTM":
            self.rnn = LSTM(1, hidden, num_layers=layers, bidirectional=bi, batch_first=True)
        elif rnn_type == "GRU":
            self.rnn = GRU(1, hidden, num_layers=layers, bidirectional=bi, batch_first=True)

        B = 2 if bi else 1
        self.readout = Linear(B * hidden, 10)
        self.criteria = CrossEntropyLoss()

    def forward(self, x: Tensor, y:Tensor=None) -> Tensor:
        self.rnn.flatten_parameters()
        out, _ = self.rnn(x)
        y_pred =  self.readout(out[:,-1,:]) # take only the last hidden state as (cumulative knowledge)

        loss = None
        if y is not None:
            loss = self.criteria(y_pred, y)
        return (loss, y_pred) if loss is not None else y_pred

## Trainer

In [None]:
class RNN_trainer:
    def __init__(self, rnn_type:str, hidden:int, layers:int, bi:bool):

        # model
        self.model = Recurrent_RNN(rnn_type, hidden=hidden, layers=layers, bi=bi).to(gpu)

    def fit(self, dataset:Dataset, epochs:int=2, lr:float=0.001):

        # Build a dataloader with the training dataset
        loader = DataLoader(dataset, batch_size=64)
        opt = Adam(self.model.parameters(), lr)
        history_tr = zeros(epochs) # keep track the loss and accuracy through epochs

        loss, y, y_pred =  None, None, None

        self.model.train()
        for i in range(epochs):

            for x, y in loader:
                x,y = x.to(gpu), y.to(gpu)

                opt.zero_grad(set_to_none=True)
                loss, y_pred = self.model(x, y)
                loss.backward()
                clip_grad_norm_(self.model.parameters(), 1)
                opt.step()

            history_tr[i] = loss.item()
            if i % 15 == 0:
                print(f'Epoch {i} Loss: {round(loss.item(), 4)} Accuracy {round(compute_acc(y_pred, y), 4)}')

        return history_tr

    def validate(self, dataset:Dataset) -> Tuple:
        # Build a dataloader with the dataset taken (train, validation or test)
        loader = DataLoader(dataset, batch_size=64)
        cum_loss, cum_acc = 0, 0

        self.model.eval()
        with torch.no_grad():
            for x, y in loader:
                x,y = x.to(gpu), y.to(gpu)
                loss, y_pred = self.model(x, y)
                cum_loss += loss.item()
                cum_acc += compute_acc(y_pred, y)

        cum_loss /= len(loader)
        cum_acc /= len(loader)

        return  cum_loss, cum_acc

In [None]:
class GridSearch:

    def __init__(self, rnn_type:str, parameters_grid:dict, tr:Dataset, dev:Dataset):

        self.rnn_type = rnn_type
        all_configs = [dict(zip(parameters_grid.keys(), configs)) for configs in product(*parameters_grid.values())]

        print("Number of configurations to try: ",len(all_configs))
        # returns the performance in each configuration, the best model and the history of the loss
        rank, best, loss = self.run(tr, dev, all_configs)

        # we sort by validation loss
        rank = sorted(rank, key=lambda conf: -conf[2])

        print("\nThe best solution in ", rank[0])
        self.best_config = rank[0][0]
        self.best_model = best
        self.tr_loss = loss

    def run(self, tr:Dataset, dev:Dataset, configs:list):
        """
        In the grid search, we explore all configurations provided and try to find the best
        hyperparameter configuration using the training set to train the model and the validation
        set to compare the performance among all models instantiated by configurations.
        """

        rank = [] # the keep in track the configuration and the corresponding performance

        # we save the best trained model and the training loss during the epochs
        best, loss = None, None
        best_dev_acc = 0

        for idx, config in enumerate(configs):
            print("Config: ",idx)

            trainer = RNN_trainer(rnn_type=self.rnn_type,
                                  hidden=config["units"],
                                  layers=config["layers"],
                                  bi=config["bi"])

            history  = trainer.fit(tr, config["epochs"], config["lr"])
            _, acc_vl = trainer.validate(dev)

            rank.append((config, round(history[-1].item(), 4), round(acc_vl, 4)))

            print(f'Results: Acc tr: {round(history[-1].item(), 4)}', f'Acc vl: {round(acc_vl, 4)}')

            # we keep the best model
            if best_dev_acc < acc_vl:
                best_dev_acc = acc_vl
                loss = copy.deepcopy(history)
                best = copy.deepcopy(trainer)

        return rank, best, loss

# Bonus-Track Assignment 2: Sequential MNIST classification task

## Retrieve the dataset and Hold out

In [None]:
tr_dataset = Sequential_mnist("train")
dev_dataset = Sequential_mnist("dev")
ts_dataset = Sequential_mnist("test")

## Grid search Vanilla RNN

In [None]:
ranges_to_explore = {
    "units" : [10, 20],
    "epochs" : [100],
    "lr" : [0.001, 0.004],
    "layers": [1, 2],
    "bi" : [True]
}

In [None]:
gs = GridSearch("RNN",ranges_to_explore, tr_dataset, dev_dataset)
best_config =  gs.best_config
best_model = gs.best_model

In [None]:
show_loss(gs.tr_loss)

In [None]:
tr_loss, tr_acc = best_model.validate(tr_dataset)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

dev_loss, dev_acc = best_model.validate(dev_dataset)
print(f'Train loss: {round(dev_loss, 6)}', f'Accuracy: {round(dev_acc, 3)}')

test_loss, test_acc = best_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')

#### Final retrain with Training and Validation set (with the best configuration)

In [None]:
best_rrn_model = RNN_trainer(rnn_type="RNN",
                            hidden=best_config["units"],
                            layers=best_config["layers"],
                            bi=best_config["bi"])
# we use both training and validation as a training set, using the best parameters
# found in the previous model selection
final_tr = Sequential_mnist("train-dev")
tr_history, tr_hist_acc = best_rrn_model.fit(final_tr, best_config["epochs"], lr=best_config["lr"])

tr_loss, tr_acc = best_rrn_model.validate(final_tr)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

test_loss, test_acc  = best_rrn_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')

In [None]:
show_loss(tr_history)

# Bonus Track Assignment 4: benchmarking RNN models on the sequential MNIST task

## Grid search LSTM

In [None]:
gs = GridSearch("LSTM",ranges_to_explore, tr_dataset, dev_dataset)
best_config =  gs.best_config
best_model = gs.best_model

In [None]:
tr_loss, tr_acc = best_model.validate(tr_dataset)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

dev_loss, dev_acc = best_model.validate(dev_dataset)
print(f'Train loss: {round(dev_loss, 6)}', f'Accuracy: {round(dev_acc, 3)}')

test_loss, test_acc = best_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')

In [None]:
best_lstm_model = RNN_trainer(rnn_type="LSTM",
                             hidden=best_config["units"],
                             layers=best_config["layers"],
                             bi=best_config["bi"])
# we use both training and validation as a training set, using the best parameters
# found in the previous model selection
final_tr = Sequential_mnist("train-dev")
best_lstm_model.fit(final_tr, best_config["epochs"], lr=best_config["lr"])

tr_loss, tr_acc = best_lstm_model.validate(final_tr)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

test_loss, test_acc  = best_lstm_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')

## Grid search GRU

In [None]:
gs = GridSearch("GRU",ranges_to_explore, tr_dataset, dev_dataset)
best_config =  gs.best_config
best_model = gs.best_model

In [None]:
tr_loss, tr_acc = best_model.validate(tr_dataset)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

dev_loss, dev_acc = best_model.validate(dev_dataset)
print(f'Train loss: {round(dev_loss, 6)}', f'Accuracy: {round(dev_acc, 3)}')

test_loss, test_acc = best_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')

In [None]:
best_gru_model = RNN_trainer(rnn_type="GRU",
                             hidden=best_config["units"],
                             layers=best_config["layers"],
                             bi=best_config["bi"])
# we use both training and validation as a training set, using the best parameters
# found in the previous model selection
final_tr = Sequential_mnist("train-dev")
best_gru_model.fit(final_tr, best_config["epochs"], lr=best_config["lr"])

tr_loss, tr_acc = best_gru_model.validate(final_tr)
print(f'Train loss: {round(tr_loss, 6)}', f'Accuracy: {round(tr_acc, 3)}')

test_loss, test_acc  = best_gru_model.validate(ts_dataset)
print(f'Train loss: {round(test_loss, 6)}', f'Accuracy: {round(test_acc, 3)}')