<a href="https://colab.research.google.com/github/kaifoerster/ML_Lab_1_Group-A/blob/main/Lab_session_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Lab Session 6

We will implement a RNN from scratch using `pytorch`, based on Chapter 9 and 10 of the textbook (<a href="https://d2l.ai/chapter_recurrent-neural-networks/rnn.html">Zhang et al.</a>), and use it to implement a simple language model.

Note well: Make sure that external cookies are enabled for this notebook to be able to run all cells.

In [None]:
#!pip install torch==2.0.0 torchvision==0.15.1
#!pip install d2l==1.0.3

In [1]:
import math
import torch
import re
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l
%matplotlib inline

##Preprocessing sequential data

We will working with natural language sentences, which by definition, are sequential data.

**Definition**: Sequential Data is characterized by: i) that the ordering of instances is relevant and ii) that instances depend on other instances in the dataset.

An important part of dealing with natural language sequences is defining the input units for the algorithms (tokens) and translate it to numerical input. Each time step will correspond to 1 token, but what precisely constitutes a token is a design choice:
<ul>
<li>If we define tokens to be words, then
in the sequence "Mary has a little lamb", $x_1$ would be "Mary"</li>
<li>If we define them to be letters, it would $x_1$ would be "M"</li>
</ul>

We then encode tokens by assigning to each different token in our sequence an integer representing its position in the comprenshive list of all the tokens in our dataframe.


#Loading the data

We will be working with H. G. Wells’ The Time Machine. For simplicity, we will define tokens to be single letters.

In [6]:
class TimeMachine(d2l.DataModule):
    """The Time Machine dataset."""

    def __init__(self, batch_size, num_steps, num_train=10000, num_val=5000):

        super().__init__()
        self.save_hyperparameters()
        corpus, self.vocab = self.build(self._download())
        array = d2l.tensor([corpus[i:i+num_steps+1]
                            for i in range(len(corpus)-num_steps)])
        self.X, self.Y = array[:,:-1], array[:,1:]

    def build(self, raw_text, vocab=None):

        tokens = self._tokenize(self._preprocess(raw_text))
        if vocab is None: vocab = d2l.Vocab(tokens)
        corpus = [vocab[token] for token in tokens]
        return corpus, vocab

    def get_dataloader(self, train):
        idx = slice(0, self.num_train) if train else slice(
            self.num_train, self.num_train + self.num_val)
        return self.get_tensorloader([self.X, self.Y], train, idx)

    def _download(self):
        fname = d2l.download(d2l.DATA_URL + 'timemachine.txt', self.root,
                             '090b5e7e70c295757f55df93cb0a180b9691891a')
        with open(fname) as f:
            return f.read()

    def _preprocess(self, text):
        return re.sub('[^A-Za-z]+', ' ', text).lower()

    def _tokenize(self, text):
      #TODO: implement the tokenization step
      return list(text)



Task: implement the tokenization step

In [4]:
# @title
def _tokenize(self, text):
  return list(text)


In [7]:
data = TimeMachine(batch_size=2**10, num_steps=2**5)

##Implementing a vanilla RNN
We define a vanilla RNN with three parameters:

- $W_{x}$: the weight matrix multiplying input $X_t$
- $W_{h}$: the weight matrix multiplying hidden state $H_{t-1}$
- $b$: the bias of the hidden state.

Altogether, the class applies the following recursion:

$f(X_t) = \phi(W_x X_t + W_h H_{t-1}+b) =  \phi\bigg(W_x X_t + W_h \phi(W_x X_{t-1} + W_h H_{t-2} )+b\bigg) = \dots $

Can you guess what will be $f(X_1)$?

In [8]:
class RNN(d2l.Module):

    """The RNN model implemented from scratch."""
    def __init__(self, num_inputs, num_hiddens, sigma=0.01):
        super().__init__()
        self.save_hyperparameters()

        # Weight matrix that multiplies input X_t
        self.W_x = nn.Parameter(
            torch.randn(num_inputs, num_hiddens) * sigma)

        # Weight matrix that multiplies output of hidden layer H_(t-1)
        self.W_h = nn.Parameter(
            torch.randn(num_hiddens, num_hiddens) * sigma)

        #Bias of the hidden layer
        self.b = nn.Parameter(torch.zeros(num_hiddens))

        #Activation function of the hidden layer
        self.activation_func = torch.tanh

    def forward(self, inputs, state=None):
      """Executes the RNN recurrent step.
      Inputs shape: (num_steps, batch_size, num_inputs)"""

      outputs = []

      if state is None:
          # Initial state with shape: (batch_size, num_hiddens)
          state = torch.zeros((inputs.shape[1], self.num_hiddens),
                            device=inputs.device)
      else:
        #Transformed input list in variable
          state, = state

      for X in inputs:
          state = self.activation_func(torch.matmul(X, self.W_x) +
                          torch.matmul(state, self.W_h) + self.b)
          outputs.append(state)

      return outputs, state



##Implementing a classifier using RNNs

We will use our RNN to predict the next $n$ tokens in a sequence of characters.
This problem is a classification problem because our input sequence is composed of integer tokens from a finite sets (alphanumeric characters). Hence we will use inherite from `d2l.Classifier` appropriate loss and accuracy definitions.

In [21]:
class RNNLModel(d2l.Classifier):

    def __init__(self, rnn, vocab_size, lr=0.01):
        super().__init__()
        self.save_hyperparameters()

        self.W_h_out = nn.Parameter(
            d2l.randn(
                self.rnn.num_hiddens, self.vocab_size) * self.rnn.sigma)
        self.b_out = nn.Parameter(d2l.zeros(self.vocab_size))
# implicit application of softmax

    def one_hot(self, X):
        return F.one_hot(X.T, self.vocab_size).type(torch.float32)

    def forward(self, X, state=None):

        #encode input as binary
        embs = self.one_hot(X)
        #apply recurrent step
        rnn_outputs, _ = self.rnn(embs, state)
        #apply weight matrix on all hidden state
        #Since we are using torch.nn.CrossEntropyLoss, no need for softmax output func
        outputs = [d2l.matmul(H, self.W_h_out) + self.b_out for H in rnn_outputs]
        outputs = d2l.stack(outputs, 1)

        return outputs

    def training_step(self, batch):
        l = self.loss(self(*batch[:-1]), batch[-1])
        self.plot('ppl', d2l.exp(l), train=True)
        return l

    def validation_step(self, batch):
        l = self.loss(self(*batch[:-1]), batch[-1])
        self.plot('ppl', d2l.exp(l), train=False)

def predict(self, prefix, num_preds, vocab, device=None):
    device = device or self.W_h_out.device
    state, outputs = None, [vocab[prefix[0]]]
    for i in range(len(prefix) + num_preds - 1):
        X = d2l.tensor([[outputs[-1]]], device=device)
        embs = self.one_hot(X)
        rnn_outputs, state = self.rnn(embs, state)
        if i < len(prefix) - 1:  # Warm-up period
            outputs.append(vocab[prefix[i + 1]])
        else:  # Predict num_preds steps
            Y = d2l.matmul(rnn_outputs, self.W_h_out) + self.b_out  # this line was modified
            outputs.append(int(d2l.reshape(d2l.argmax(Y, axis=2), 1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])

In [22]:
data = d2l.TimeMachine(batch_size=1024, num_steps=32)
rnn = RNN(num_inputs=len(data.vocab), num_hiddens=32)
model = RNNLModel(rnn, len(data.vocab))
#trainer = d2l.Trainer(max_epochs=50, gradient_clip_val=1, num_gpus=1)
#trainer.fit(model, data)

In [23]:
def predict(self, prefix, num_preds, vocab, device=None):
    device = device or self.W_h_out.device
    state, outputs = None, [vocab[prefix[0]]]
    for i in range(len(prefix) + num_preds - 1):
        X = d2l.tensor([[outputs[-1]]], device=device)
        embs = self.one_hot(X)
        rnn_outputs, state = self.rnn(embs, state)
        if i < len(prefix) - 1:  # Warm-up period
            outputs.append(vocab[prefix[i + 1]])
        else:  # Predict num_preds steps
            Y = d2l.matmul(rnn_outputs, self.W_h_out) + self.b_out  # this line was modified
            outputs.append(int(d2l.reshape(d2l.argmax(Y, axis=2), 1)))
    return ''.join([vocab.idx_to_token[i] for i in outputs])



AttributeError: 'RNNLModel' object has no attribute 'predict'

##Bonus: implementing a classifier using a LSTM structure

We now implement a more sophisticated classifier using LSTM. In a LSTM network, each memory cell is equipped with an internal state $C$ and other gates that determine:
<ul>
<li>the impact of input on the internal state $C$ (input gate $I$)</li>
<li>the internal state should be flushed to (forget gate $F$) </li>
<li>the impact of the internal state on the the output of the cell the output gate).

In [None]:
class LSTM(d2l.Module):

    def __init__(self, num_inputs, num_hiddens, sigma=0.01):
        super().__init__()
        self.save_hyperparameters()

        init_weight = lambda *shape: nn.Parameter(torch.randn(*shape) * sigma)
        triple = lambda: (init_weight(num_inputs, num_hiddens),
                          init_weight(num_hiddens, num_hiddens),
                          nn.Parameter(torch.zeros(num_hiddens)))

        self.W_xi, self.W_hi, self.b_i = triple()  # Input gate
        self.W_xf, self.W_hf, self.b_f = triple()  # Forget gate
        self.W_xo, self.W_ho, self.b_o = triple()  # Output gate
        self.W_xc, self.W_hc, self.b_c = triple()  # Input node

    def forward(self, inputs, H_C=None):

        if H_C is None:
            # Initial state with shape: (batch_size, num_hiddens)
            H = torch.zeros((inputs.shape[1], self.num_hiddens),
                          device=inputs.device)
            C = torch.zeros((inputs.shape[1], self.num_hiddens),
                          device=inputs.device)
        else:
            H, C = H_C

        outputs = []
        for X in inputs:
            #sigmoid: output between [0,1]
            I = torch.sigmoid(torch.matmul(X, self.W_xi) +
                            torch.matmul(H, self.W_hi) + self.b_i)
            #sigmoid: output between [0,1]
            F = torch.sigmoid(torch.matmul(X, self.W_xf) +
                            torch.matmul(H, self.W_hf) + self.b_f)
            #sigmoid: output between [0,1]
            O = torch.sigmoid(torch.matmul(X, self.W_xo) +
                            torch.matmul(H, self.W_ho) + self.b_o)
            #tanh: output between [-1,1].
            C_tilde = torch.tanh(torch.matmul(X, self.W_xc) +
                              torch.matmul(H, self.W_hc) + self.b_c)

            #update internal state
            C = F * C + I * C_tilde
            #output of the hidden cell is recurrent output O times activated internal state C
            H = O * torch.tanh(C)

            outputs.append(H)
        return outputs, (H, C)



Question: what happens if $W_{xc}$,$W_{hc}$ and $b_c$ are zero matrices?


In [None]:
# @title
C = F * C = 0   #network has no memory

Question: what is the value of the update of $C$ on the first loop (i.e. for $X_1$)?

In [None]:
# @title
C = I * C_tilde #cell does not remember anything

In [None]:
data = d2l.TimeMachine(batch_size=1024, num_steps=32)
lstm = LSTM(num_inputs=len(data.vocab), num_hiddens=32)
model = RNNLModel(lstm, vocab_size=len(data.vocab), lr=4)
trainer = d2l.Trainer(max_epochs=50, gradient_clip_val=1, num_gpus=1)
trainer.fit(model, data)