<a href="https://colab.research.google.com/github/jason-gogolook/DL_Homework/blob/main/Lab8_RNNs_Make_a_Copy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Lab 8 - Recurrent Neural Networks (RNNs)**

This tutorial demonstrates how to implement and use Recurrent Neural Networks (RNNs). We will first start with a the implementatiom from scratch of a simple RNN. Then, we will use PyTorch libraries to rtain a Long Short-Term Memory (LSTM) network. LSTMs are widely used for processing sequential data such as text, videos, etc..

This tutorial is adapted from [Chapter 9](https://d2l.ai/chapter_recurrent-neural-networks/index.html) and  [Chapter 10](https://classic.d2l.ai/chapter_convolutional-neural-networks/index.html) of the textbook.

## **1. Simple RNN from scratch**

This tutorial is adapted from [this PyTorch tutorial on classifying names](https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html). Specifically, we build and train a simple RNN that takes surnames and predicts which language the name is from based on its spelling. We consider $18$ languages.

### **1.1. Data preparation**

First, download the data from [here](https://download.pytorch.org/tutorial/data.zip) and extract it into the current directory. In my case, I named the folder `language_data`. You will notice that the directory `language_data/name` contains $18$ text files (one for each language) named as `[Language].txt`. Each file contains a bunch of names, one name per line, mostly romanized (but we still need to convert from Unicode to ASCII).

First, we need to:
- Convert the names from Unicode into ASCII.
- Build up a dictionary of names per language, in the form `{language: [name1, name2, ...]}`.



In [None]:
from __future__ import unicode_literals, print_function, division
from io import open
import glob   # short for global. It is used to return all file paths that match a specific pattern.
import os
from google.colab import drive

file_path = 'drive/My Drive/DL Course/data/names/*.txt'

# Find all file names within a given path
def findFiles(path):
  return glob.glob(path)

# You just need to run the comman below once
drive.mount('/content/drive')

# Testing
print(findFiles(file_path))


Mounted at /content/drive
['drive/My Drive/DL Course/data/names/English.txt', 'drive/My Drive/DL Course/data/names/Dutch.txt', 'drive/My Drive/DL Course/data/names/French.txt', 'drive/My Drive/DL Course/data/names/Irish.txt', 'drive/My Drive/DL Course/data/names/Japanese.txt', 'drive/My Drive/DL Course/data/names/Italian.txt', 'drive/My Drive/DL Course/data/names/Polish.txt', 'drive/My Drive/DL Course/data/names/Spanish.txt', 'drive/My Drive/DL Course/data/names/Portuguese.txt', 'drive/My Drive/DL Course/data/names/Scottish.txt', 'drive/My Drive/DL Course/data/names/Chinese.txt', 'drive/My Drive/DL Course/data/names/Czech.txt', 'drive/My Drive/DL Course/data/names/Korean.txt', 'drive/My Drive/DL Course/data/names/German.txt', 'drive/My Drive/DL Course/data/names/Russian.txt', 'drive/My Drive/DL Course/data/names/Greek.txt', 'drive/My Drive/DL Course/data/names/Arabic.txt', 'drive/My Drive/DL Course/data/names/Vietnamese.txt']


In [None]:
import torch
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

device = get_default_device()
print(device)

cuda


Next, we write a function that turns Unicode strings into plain ASCII.

In [None]:
import unicodedata
import string

# Turning Unicode string to pain ASCII
all_letters = string.ascii_letters + " .,;'"
n_letters = len(all_letters)

def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
        and c in all_letters
    )

print(unicodeToAscii('Ślusàrski'))

Slusarski


Finally, we would like to build the `category_lines` dictionary, which is a list of names per language.

In [None]:
category_lines = {}
all_categories = []

# Read a file and split into lines
def readLines(filename):
    lines = open(filename, encoding='utf-8').read().strip().split('\n')
    return [unicodeToAscii(line) for line in lines]

for filename in findFiles(file_path):
    # extract filename as the category name
    category = os.path.splitext(os.path.basename(filename))[0]
    all_categories.append(category)

    # read lines from the category, which are surnames of that category
    lines = readLines(filename)
    category_lines[category] = lines

n_categories = len(all_categories)

## Just for test
# print all the Chinese names
print(category_lines['Chinese'])

# Print only the first 5 Chinese names
print(category_lines['Chinese'][:5])

**Next**, we need to turn the names into Tensors so that we can use them. We use "on-hot-vector" representation. For example, when we have an alphabet of $26$ characters (which is the case in English), we represent each letter of the alphabet as a vector of length  $26$. It will be filled with zeros except for the one at the index of the current letter. For example, the letter "b" will be represented as `<0 1 0 0 ...>`.

To make a word, e.g., `Hamid`, we join the one-hot-vectors of each of the letters into a 2D matrix of size `word_length X 1 X n_alphabet_letters`. The extra 1 dimension is because PyTorch assumes everything is in batches - we’re just using a batch size of 1 here.

In [None]:
import torch

# Find letter index from all_letters, e.g. "a" = 0
def letterToIndex(letter):
    return all_letters.find(letter)

# Just for demonstration, turn a letter into a <1 x n_letters> Tensor
def letterToTensor(letter):
    tensor = torch.zeros(1, n_letters)
    tensor[0][letterToIndex(letter)] = 1
    return tensor

# Turn a line into a <line_length x 1 x n_letters>,
# or an array of one-hot letter vectors
def lineToTensor(line):
    tensor = torch.zeros(len(line), 1, n_letters)
    for li, letter in enumerate(line):
        tensor[li][0][letterToIndex(letter)] = 1
    return tensor

## Testing
# print(letterToTensor('J'))

# print(lineToTensor('Jones').size())

print(lineToTensor('Jones'))

### **1.2. Creating the network**
We will use a simple RNN module which takes the input and the hidden state,  concatenates them, and feeds them into:
- The input-to-output branch (i2o) composed of a linear layer followed by a LogSoftMax layer (activation function) to produce the output
- The input-to-hidden branch (i2h) composed of a linear layer. It outputs the hidden state, which is fed back to the network (the recurrent loop).

The code below shows how to create such RNN module.

In [None]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, lr):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        self.lr = lr

        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.i2o = nn.Linear(input_size + hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, input, hidden):
      # Note here that the forward step takes as input the input and the hidden state
      # It then combines them bby concatenation before feeding them to the network
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        # the hidden state needs to be initialized (for the firs iteration)
        return torch.zeros(1, self.hidden_size).to(device)

    ## The loss function - Here, we will use Negative Log Likelihood
    def loss(self, y_hat, y):
      fn = nn.NLLLoss()
      return fn(y_hat, y)

    ## The optimization algorithm
    def configure_optimizers(self):
      return torch.optim.Adam(self.parameters(), self.lr)

    def evaluate(self, X):
      hidden  = self.initHidden()

      for i in range(X.size()[0]):
        output, hidden = self.forward(X[i], hidden)

      return output



Let's do a quick test;

In [None]:
print(n_letters)
print(n_categories)

To run a step of this network we need to pass an input (in our case, the Tensor for the current letter) and a previous hidden state (which we initialize as zeros at first).

In [None]:
# size of the hidden state
n_hidden = 128

input = lineToTensor('Albert')
input = input.to(device)

# As it is the first step, we need to initialize the hidden layer

# For RNN
hidden = torch.zeros(1, n_hidden).to(device)
model = RNN(n_letters, n_hidden, n_categories, lr = 1e-04)
model = model.to(device)

output, next_hidden = model(input[0], hidden)

print(output)

By running the code above, you will notice that the output is a Tensor of size `1 X n_categories`, where every item is the likelihood of that category (higher is more likely). This, however, is not interpretable. In fact, we need to get from that output, the index of the element that has the maximum likelihood and use that index to find the name of the corresponding language.  This can be done using the following helper function:

In [None]:
def categoryFromOutput(output):
    top_n, top_i = output.topk(1)
    category_i = top_i[0].item()
    return all_categories[category_i], category_i

print(categoryFromOutput(output))


### **1.3. Training**
Each loop of training,
- Creates input and target tensors
- Creates a zeroed initial hidden state
- Reads each letter in and
- Keeps hidden state for next letter
- Compares final output to target
- Back-propagates the gradient
- Returns the output and loss

Let's first make a mechanism for picking random training samples from our  training dataset.

In [None]:
# A mechanism for picking a random training sample
import random
import numpy as np

# Set Random Seed
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

def randomChoice(l):
    return l[random.randint(0, len(l) - 1)]

def randomTrainingExample(all_categories, category_lines):
    # pick up a random language
    category = randomChoice(all_categories)

    # pick up a random name from that language
    line = randomChoice(category_lines[category])

    # Convert the picked data into a tensor
    category_tensor = torch.tensor([all_categories.index(category)], dtype=torch.long)
    line_tensor = lineToTensor(line)

    return category, line, category_tensor.to(device), line_tensor.to(device)

## Let's test it
for i in range(1):
    category, line, category_tensor, line_tensor = randomTrainingExample(all_categories, category_lines)
    print('category =', category, '/ line =', line)
    print(category_tensor)
    print(line_tensor)

Now, let's create the training class.

In [None]:
import time
import math

def timeSince(since):
    now = time.time()
    s = now - since
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

## The training loop
class Trainer:

  def __init__(self, n_epochs = 1):
    self.max_epochs = n_epochs
    # self.writer     = tb  # the tensorboard instance

  def fit(self, model, all_categories, category_lines):

    self.current_loss = 0
    self.all_losses   = []

    self.all_categories = all_categories
    self.category_lines = category_lines

    # Trasnfer the model to the device (GPU or CPU)
    model.to(device)

    # configure the optimizer
    self.optimizer = model.configure_optimizers()
    self.model     = model

    self.start = time.time()

    for epoch in range(self.max_epochs):
      self.fit_epoch()

      # Logging the average training loss so that it can be visualized in the tensorboard
      # self.writer.add_scalar("Training Loss", self.avg_training_loss, epoch)

    print("Training process has finished")

  def fit_epoch(self):

    n_iters = 100000;
    print_every = 5000
    plot_every  = 1000

    self.current_loss = 0.0
    self.all_losses = []

    # self.avg_training_loss = 0.0

    # iterate over the DataLoader for training data
    for iter in range(1, n_iters+1):

      ## Get input
      category, line, category_tensor, line_tensor = randomTrainingExample(all_categories, category_lines)

      ## training
      hidden = self.model.initHidden()

      # Clear gradient buffers because we don't want any gradient from previous
      # epoch to carry forward, dont want to cummulate gradients
      self.optimizer.zero_grad()

      # get output from the model, given the inputs
      for i in range(line_tensor.size()[0]):
          output, hidden = self.model(line_tensor[i], hidden)

      # get loss for the predicted output
      loss = self.model.loss(output, category_tensor)

      # get gradients w.r.t to the parameters of the model
      loss.backward()

      # update the parameters (perform optimization)
      self.optimizer.step()

      ## Let's print some statistics - Gradient is not required from here
      with torch.no_grad():
        self.current_loss += loss

        # Print the iteration no., loss, name and guess
        if iter % print_every == 0:
          guess, guess_i = categoryFromOutput(output)

          correct = '✓' if guess == category else '✗ (%s)' % category
          print('%d %d%% (%s) %.4f %s / %s %s' % (iter, iter / n_iters * 100, timeSince(self.start), loss, line, guess, correct))

        # Add current loss avg to list of losses (avergae loss of "plot_every" iterations)
        if iter % plot_every == 0:
            self.all_losses.append(self.current_loss / plot_every)
            self.current_loss = 0

Now, we have all what we need to train the network.

In [None]:
## 2. The RNN model
n_hidden = 128
model = RNN(n_letters, n_hidden, n_categories, lr=1e-04)

# 3. Training the network
# 3.1. Creating the trainer class - note that here, I passed writer as a  parameter to the trainer
trainer = Trainer(n_epochs=1)

# 3.2. Training the model
trainer.fit(model, all_categories, category_lines)

**Plotting the results**

Plotting the historical loss from all_losses shows the network learning.

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

plt.figure()
all_losses = trainer.all_losses

# Convert tensors back to cpu because numpy is unable to use gpus
all_losses_cpu = [loss.cpu().item() for loss in all_losses]
plt.plot(all_losses_cpu)

Note also that you can use TensorBoard to do this plotting.

**Evaluating the results**

To see how well the network performs on different categories, we will create a confusion matrix, indicating for every actual language (rows) which language the network guesses (columns). To calculate the confusion matrix a bunch of samples are run through the network with evaluate(), which is the same as fit() minus the backprop.

In [None]:
# Keep track of correct guesses in a confusion matrix
confusion = torch.zeros(n_categories, n_categories)
n_confusion = 10000

model = model.to(device)

# Go through a bunch of examples and record which are correctly guessed
for i in range(n_confusion):
  category, line, category_tensor, line_tensor = randomTrainingExample(all_categories, category_lines)
  output = model.evaluate(line_tensor)

  guess, guess_i = categoryFromOutput(output)
  category_i = all_categories.index(category)
  confusion[category_i][guess_i] += 1

# Normalize by dividing every row by its sum
for i in range(n_categories):
    confusion[i] = confusion[i] / confusion[i].sum()

# Set up plot
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(confusion.numpy())
fig.colorbar(cax)

# Set up axes
ax.set_xticklabels([''] + all_categories, rotation=90)
ax.set_yticklabels([''] + all_categories)

# Force label at every tick
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

# sphinx_gallery_thumbnail_number = 2
plt.show()

To check whether the network performs well, the confusion matrix should have high values along the diagonal elements (ideally 1) and low values (ideally 0) in off-diagonal elements.

What do you think about the performance of your network? What can you do to improve it?

## **2. LSTM**

Now, update the code above so that instead of using a simple RNN, we will use LSTM. Start with one block LSTM and then try to cascade multiple LSTM blocks and compare the performance (e.g., by looking at the confusion matrix).

For using LSTM, please refer to this [LSTM tutorial](https://pytorch.org/tutorials/beginner/nlp/sequence_models_tutorial.html). Make sure you implement it in a modular way following the structure we defined so far.

# Example

Pytorch’s LSTM expects all of its inputs to be 3D tensors. The semantics of the axes of these tensors is important. The first axis is the sequence itself, the second indexes instances in the mini-batch, and the third indexes elements of the input.

In [None]:
idim = 3 # Input dim is 3
odim = 5 # Output dim is 4

lstm = nn.LSTM(idim, odim)  # layers is 1 (default), batch_first=False

# Sequence length
seq_size = 1

# make a sequence of seq_size number of (1,idim) row vectors
inputs = [torch.randn(1, idim) for _ in range(seq_size)]
print(inputs)

# hidden is a tuple containing the initial hidden state H
# and the initial cell state C of the LSTM.
# The hidden tuple has two elements. Each element has a shape of (1, 1, odim).
# The dimensions represent (num_layers, batch_size, hidden_size).
# Here, num_layers is 1 (the default value), batch_size is 1
# (since batch_first=False by default), and hidden_size is odim, which is
# the output dimension of the LSTM.
hidden = (torch.randn(1, 1, odim),
          torch.randn(1, 1, odim))

for i in inputs:
    # Step through the sequence one element at a time.
    # after each step, hidden contains the hidden state.
    out, hidden = lstm(i.view(1, 1, idim), hidden)

[tensor([[ 1.0116, -0.1091, -0.9262]])]
