In [None]:
%matplotlib inline

import collections
import matplotlib.pyplot as plt
import nltk
import numpy as np
import pandas as pd
import sklearn.metrics
import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Recurrent neural networks 1

Pooling operations allow us to reduce a sequence of vectors into a single vector, which then allows the neural network to work with any length sequence.
However, this does not preserve a lot of information about the sequence and it requires using multiple convolution operations in parallel.
In order to compact as much information about the sequence as possible in a single vector, we'll need to use a special type of neural architecture called a **recurrent neural network** (**RNN**).

## A neural network with a memory

Our neural networks up to now did not have a memory as they only operate on what they are currently seeing, not on what they saw earlier.
The simplest way to add a memory, called a **state**, is to make the neural network give an output that will be used as input the next time it is used.
This is the basic building block of the **simple recurrent neural network**:

![](rnn_cell.png)

The above diagram is illustrating a single layer neural network (in red), called an **RNN cell**, taking in a state vector and an input vector in order to produce a new state vector.
The $t$ is used to indicate time, where $t+1$ happens one step after $t$.
An example of an input vector is a token vector.
$s_{t+1}$ is going to be a vector of the same size as $s_t$ and it is a function of both the previous state and the new input.
The state and input vectors are usually just concatenated together before they are fed to the single layer neural network.

We can create a chain of these cells where the state vector that comes out of one cell is fed into the next cell like this:

![](rnn_chain.png)

This chain is consuming three separate inputs in three time steps.
Each one of the red layers is using the exact same parameters (same weight matrix and bias vector), so the neural network size does not change as more inputs are consumed because it's the same one used multiple times.
state<sub>3</sub> is called the **final state** and it is a vector that is influenced by all three inputs, so it should be storing information summarising the three inputs.
state<sub>2</sub> is a vector that is only influenced by the first two inputs, so it is storing information about a prefix of the sequence.
state<sub>0</sub> is called the **initial state** and it is normally a fixed vector (not influenced by the input).
You can either use an all-zeros vector or let the optimiser find a suitable vector during training.
Since the memory isn't growing with the number of inputs, there will come a point where it must forget some inputs in order to remember new ones, so the size of the state vector will determine what the maximum number of inputs remembered will be.

We can now do something like classify the sequence of inputs by passing the final state into a normal neural layer with softmax for example.
So how do we do this in PyTorch?
It's actually just a matter of using a `for` loop:

In [None]:
input_ = torch.tensor([
    [0],
    [1],
    [1],
    [0],
    [1],
], dtype=torch.float32, device=device)

w = torch.tensor([[-15], [15]], dtype=torch.float32, device=device)
b = torch.tensor([-10], dtype=torch.float32, device=device)

state = torch.tensor([0], dtype=torch.float32, device=device)
for t in range(input_.shape[0]):
    state_input = torch.concat((state, input_[t, :]), dim=0)
    state = torch.sigmoid(state_input@w + b)
    print(f'state {t+1}:', state.round(decimals=0)[0].detach().cpu().tolist())

The above recurrent neural network produces a parity bit, that is, tells you if the number of '1' bits seen in the input up to that time step is even or odd.

Mathematically, the RNN cell is defined as:

$$s_{t+1} = f(\text{conc}(s_t, x_{t+1}) W + b)$$

where $s_t$ is the state at time step $t$, $f$ is the activation function such as sigmoid, *conc* means concatenate two vectors into one, $x_t$ is the input at time step $t$, and $W$ and $b$ are the weights and bias.

## Simple RNNs in practice

In order to apply RNNs on a training set, we need to first learn about a few useful techniques in PyTorch.

First, since we'll be working with batches of inputs rather than one vector at a time, we will need to have an initial RNN state for each item in the batch, such that there will be a matrix with an initial state vector in each row.
Most of the time, we'll be replicating the same initial state vector for each row.
In PyTorch, replication is done using the `tile` function.
`tile` takes a tensor and repeats each of its dimensions a number of times (for example, repeat the columns of a matrix 2 times and the rows 3 times).

We'll first turn the initial state vector into a single row matrix and then repeat the row for as many times as there are batch items, like so:

In [None]:
batch_size = 5

initial_state = torch.tensor(
    [1, 2, 3],
    dtype=torch.float32, device=device
)
print('initial_state:')
print(initial_state.shape)
print(initial_state)
print()

matrix = initial_state[None, :]
print('initial_state matrix:')
print(matrix.shape)
print(matrix)
print()

tiled = matrix.tile([batch_size, 1]) # Repeat the row batch_size times and the columns 1 time (leave as-is).
print('tiled:')
print(tiled.shape)
print(tiled)

Second, we need to be able to ignore the pad tokens.
Just like in CNNs, we'll need to use a mask to ignore some of the inputs.
What we want to do is, during each time step in the `for` loop, choose whether to apply the RNN cell on a text or leave the state as-is because there is no input to process, like this:

<table>
    <tr><th>token</th><td style="text-align: center;">I</td><td style="text-align: center;">like</td><td style="text-align: center;">it</td><td style="text-align: center;">.</td><td style="text-align: center;">PAD</td><td style="text-align: center;">PAD</td></tr>
    <tr><th>state</th><td><pre>[1, -1]</pre></td><td><pre>[2, -2]</pre></td><td><pre>[3, -3]</pre></td><td><pre>[4, -4]</pre></td><td><pre>[4, -4]</pre></td><td><pre>[4, -4]</pre></td></tr>
</table>

Note how the state stops changing after the last actual token in the text.
What we want is to have a final state that is only influenced by the actual tokens and not by the pad tokens.

This can be done using the `torch.where` function which chooses items from two tensors based on a mask, like this:

In [None]:
curr_state = torch.tensor([
    [1 , 2 ],
    [3 , 4 ],
    [5 , 6 ],
], dtype=torch.float32, device=device)
print('current state')
print(curr_state)
print()

new_state = torch.tensor([
    [10, 20],
    [30, 40],
    [50, 60],
], dtype=torch.float32, device=device)
print('new state')
print(new_state)
print()

mask = torch.tensor([
    [0, 0],
    [1, 1],
    [0, 0],
], dtype=torch.bool, device=device)
print('mask')
print(mask)
print()

state = torch.where(mask, curr_state, new_state)
print('next state')
print(state)

The difference between `masked_fill` and `where` is that `masked_fill` always replaces masked values using the same value whereas `where` replaces using the corresponding value in another tensor.

The mask we'll be using is produced much more simply than in CNNs because we're not looking at windows but at individual tokens now.
The mask will just indicate where in the input are there non-pad tokens.

In [None]:
indexed = torch.tensor([
    [1, 0, 0],
    [1, 2, 0],
    [1, 2, 3],
], dtype=torch.int64, device=device)
pad_index = 0
print('indexed:')
print(indexed)
print()

non_pad_mask = indexed != pad_index
print('non_pad_mask:')
print(non_pad_mask)
print()

print('mask used at every time step:')
print()
for t in range(3):
    mask = non_pad_mask[:, t, None] # The mask needs to be applied to a matrix of state vectors so we need to add a singleton dimension.
    print(f'time step {t}:')
    print(mask)
    print()

Now we can use our simple RNN implementation in the toy sentiment analysis task:

In [None]:
train_x = [
    'I like it .'.split(' '),
    'I hate it .'.split(' '),
    'I don\'t hate it .'.split(' '),
    'I don\'t like it .'.split(' '),
]
train_y = torch.tensor([
    [1],
    [0],
    [1],
    [0],
], dtype=torch.float32, device=device)

max_len = max(len(text) for text in train_x)
print('max_len:', max_len)

vocab = ['<PAD>'] + sorted({token for text in train_x for token in text})
token2index = {t: i for (i, t) in enumerate(vocab)}
pad_index = token2index['<PAD>']
print('vocab:', vocab)
print()

train_x_indexed_np = np.full([len(train_x), max_len], pad_index, np.int64)
for i in range(len(train_x)):
    for j in range(len(train_x[i])):
        train_x_indexed_np[i, j] = token2index[train_x[i][j]]
train_x_indexed = torch.tensor(train_x_indexed_np, device=device)
print('train_x_indexed:')
print(train_x_indexed)

In [None]:
class Model(torch.nn.Module):

    def __init__(self, vocab_size, embedding_size, state_size, pad_index):
        super().__init__()
        self.pad_index = pad_index
        
        self.embedding = torch.nn.Embedding(vocab_size, embedding_size)
        self.rnn_s0 = torch.nn.Parameter(torch.zeros((state_size,), dtype=torch.float32)) # Initial state starts as zeros and is then optimised.
        self.rnn_cell = torch.nn.Linear(state_size + embedding_size, state_size)
        self.output_layer = torch.nn.Linear(state_size, 1)
        
    def forward(self, x_indexed):
        batch_size = x_indexed.shape[0]
        time_steps = x_indexed.shape[1]

        non_pad_mask = x_indexed != self.pad_index
        
        embedded = self.embedding(x_indexed)
        state = self.rnn_s0[None, :].tile([batch_size, 1])
        for t in range(time_steps):
            state_input = torch.concat([state, embedded[:, t, :]], dim=1)
            new_state = torch.nn.functional.leaky_relu(self.rnn_cell(state_input))
            state = torch.where(non_pad_mask[:, t, None], new_state, state)
        return self.output_layer(state)

model = Model(len(vocab), embedding_size=2, state_size=3, pad_index=pad_index)
model.to(device)

optimiser = torch.optim.Adam(model.parameters(), lr=0.1)

print('epoch', 'error')
train_errors = []
for epoch in range(1, 2000+1):
    optimiser.zero_grad()
    logits = model(train_x_indexed)
    train_error = torch.nn.functional.binary_cross_entropy_with_logits(logits, train_y)
    train_errors.append(train_error.detach().cpu().tolist())
    train_error.backward()
    optimiser.step()

    if epoch%200 == 0:
        print(epoch, train_errors[-1])
print()

with torch.no_grad():
    print('text', 'output')
    output = torch.sigmoid(model(train_x_indexed)).round(decimals=0)[:, 0].cpu().tolist()
    for (text, y) in zip(train_x, output):
        print(text, y)

(fig, ax) = plt.subplots(1, 1)
ax.set_xlabel('epoch')
ax.set_ylabel('$E$')
ax.plot(range(1, len(train_errors) + 1), train_errors, color='blue', linestyle='-', linewidth=3)
ax.grid()

Just like with the CNN, we can check whether the pad token is having any influence by checking the gradients of the tokens.

In [None]:
optimiser.zero_grad()
logits = model(train_x_indexed)
train_error = torch.nn.functional.binary_cross_entropy_with_logits(logits, train_y)
train_error.backward()

grads = model.embedding.weight.grad.abs().sum(dim=1).tolist()
for (token, grad) in zip(vocab, grads):
    print(f'{token: >6s}: {grad}')

## Unstable gradients and the LSTM

Remember how, in the two layer network, the gradients of the first layer were much smaller than the gradients of the second layer?
Well the same thing happens with time steps in an RNN.
As the sequence gets longer, the gradients will either vanish or explode.
Let's see an example.

We'll generate several random input sequences with single number vectors and use a randomly generated weights matrices for RNNs with a single number state vector.
The RNN will then go through the input sequence and the number in the final state will be used to calculate the gradient with respect to each input item in the input sequences.

In [None]:
class RNN(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.w = torch.nn.Parameter(2*torch.rand((2, 1), dtype=torch.float32) - 1)
        self.b = torch.nn.Parameter(2*torch.rand((1,), dtype=torch.float32) - 1)
        self.s0 = torch.nn.Parameter(torch.tensor([0], dtype=torch.float32))

    def forward(self, x):
        state = self.s0
        for t in range(x.shape[0]):
            state_input = torch.concat((state, x[t, :]), dim=0)
            state = torch.nn.functional.leaky_relu(state_input@self.w + self.b)
        return state

(fig, axs) = plt.subplots(1, 8, figsize=(20, 2))
axs[0].set_ylabel('gradient')
for i in range(8):
    model = RNN()
    model.to(device)
        
    input_seq = torch.randn((10, 1), device=device, requires_grad=True)
    state = model(input_seq)[0]
    state.backward()
    grads = input_seq.grad.abs()[:, 0].cpu().numpy()
    
    axs[i].bar(np.arange(input_seq.shape[0]), grads)
    axs[i].set_xlabel('time step')
    axs[i].grid()

In almost all of the cases, the gradient with respect to the last item is much larger than the gradient with respect to the first item, meaning that the RNN will ignore inputs close to the beginning of the sequence during training.
To fix this, in 1997, Hochreiter and Schmidhuber developed the **Long Short-Term Memory network** (**LSTM**).
The main change from the simple RNN is to add the current state to the new state:

In [None]:
class RNN2(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.w = torch.nn.Parameter(2*torch.rand((2, 1), dtype=torch.float32) - 1)
        self.b = torch.nn.Parameter(2*torch.rand((1,), dtype=torch.float32) - 1)
        self.s0 = torch.nn.Parameter(torch.tensor([0], dtype=torch.float32))

    def forward(self, x):
        state = self.s0
        for t in range(x.shape[0]):
            state_input = torch.concat((state, x[t, :]), dim=0)
            state = torch.nn.functional.leaky_relu(state_input@self.w + self.b) + state
        return state

(fig, axs) = plt.subplots(1, 8, figsize=(20, 2))
axs[0].set_ylabel('gradient')
for i in range(8):
    model = RNN2()
    model.to(device)
        
    input_seq = torch.randn((10, 1), device=device, requires_grad=True)
    state = model(input_seq)[0]
    state.backward()
    grads = input_seq.grad.abs()[:, 0].cpu().numpy()
    
    axs[i].bar(np.arange(input_seq.shape[0]), grads)
    axs[i].set_xlabel('time step')
    axs[i].grid()

The above is the simplified version; in reality the LSTM is a little more complex.

For example, the LSTM actually uses a **hyperbolic tangent** (**tanh**) instead of leaky ReLU.
Hyperbolic tangent is a sigmoid function which is stretched so that its range is between 1 and -1 instead of 1 and 0:

In [None]:
(fig, ax) = plt.subplots(1, 1)
xs = np.linspace(-10, 10, 100)
ys = np.tanh(xs)
ax.plot(xs, ys, linestyle='-', linewidth=2, marker='', color='red')
ax.text(-7.0, 0.0, '$y = \\tanh(x)$', color='red', fontsize=16)
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.grid()

The LSTM also uses **gates** which are fractions between 1 and 0 that are multiplied to activation values in order to either leave the activation values unchanged (gate open) or replace them with a zero (gate closed).
The gate value is produced by a layer in the LSTM which is defined like this:

$$g_{t+1} = \text{sig}(\text{conc}(s_t, x_{t+1}) W + b)$$

where $g_t$ is the gate at time step $t$ and $\sig$ means sigmoid.
The sigmoid is what makes the gate a fraction between zero and one.
The gate is actually a vector of fractions rather than a single number so that it can be multiplied by a vector of activation values.
When multiplied by another vector, the gate will act as a **soft mask** that zeros out some values in the vector whilst leaving other values as-is but in a continuous way rather than a hard yes or no.
Note that this factor is generated based on the new input vector and the current state vector such that the LSTM can learn to ignore different values at different time steps.

The LSTM is mathematically defined as follows:

$$c_{t+1} = g^i_t \times \tanh(\text{conc}(s_t, x_{t+1}) W + b) + g^f_t \times c_t$$
$$s_{t+1} = g^o_t \times \tanh(c_{t+1})$$

There's quite a few things to unpack here.
First of all, $g^i$, $g^f$, and $g^o$ are all different gates with their own parameters.
They are the **input gate**, used to control which parts of the input vector gets to be processed, the **forget gate**, used to forget some part of the state, and the **output gate**, used to control which parts of the state get to go out of the LSTM.
There's also $c_t$, which is sort of a second RNN state.
The $s_t$ state is called the **hidden state** whilst the $c_t$ is called the **cell state**.
Usually it is the hidden state that is used to represent the sequence.
Both states need to be initialised for time step zero.

Let's implement it:

In [None]:
class LSTM(torch.nn.Module):

    def __init__(self, input_size, state_size):
        super().__init__()
        self.linear_gi = torch.nn.Linear(state_size + input_size, state_size)
        self.linear_gf = torch.nn.Linear(state_size + input_size, state_size)
        self.linear_go = torch.nn.Linear(state_size + input_size, state_size)
        self.linear_c = torch.nn.Linear(state_size + input_size, state_size)

    def forward(self, x, c, state):
        state_input = torch.concat((state, x), dim=1)
        gi = torch.sigmoid(self.linear_gi(state_input))
        gf = torch.sigmoid(self.linear_gf(state_input))
        go = torch.sigmoid(self.linear_go(state_input))
        c = gi*torch.tanh(self.linear_c(state_input)) + gf*c
        state = go*torch.tanh(c)
        return (c, state)

input_size = 1
state_size = 1
time_steps = 5
batch_size = 3

lstm = LSTM(input_size=input_size, state_size=state_size)
lstm.to(device)

input_seq = torch.randn((batch_size, time_steps, input_size), dtype=torch.float32, device=device)

c = torch.zeros([batch_size, state_size], dtype=torch.float32, device=device)
state = torch.zeros([batch_size, state_size], dtype=torch.float32, device=device)
for t in range(time_steps):
    (c, state) = lstm(input_seq[:, t, :], c, state)

Of course there's no need to write down all this code as it's already available in PyTorch as

    torch.nn.LSTMCell

In [None]:
input_size = 1
state_size = 1
time_steps = 5
batch_size = 3

lstm = torch.nn.LSTMCell(input_size, state_size)
lstm.to(device)

input_seq = torch.randn((batch_size, time_steps, input_size), dtype=torch.float32, device=device)

c = torch.zeros([batch_size, state_size], dtype=torch.float32, device=device)
state = torch.zeros([batch_size, state_size], dtype=torch.float32, device=device)
for t in range(time_steps):
    (c, state) = lstm(input_seq[:, t, :], (c, state)) # States must be provided as a tuple.

Now let's use this LSTM in the sentiment analysis toy task:

In [None]:
class Model(torch.nn.Module):

    def __init__(self, vocab_size, embedding_size, state_size, pad_index):
        super().__init__()
        self.pad_index = pad_index
        self.embedding = torch.nn.Embedding(vocab_size, embedding_size)
        self.rnn_s0 = torch.nn.Parameter(torch.zeros((state_size,), dtype=torch.float32))
        self.rnn_c0 = torch.nn.Parameter(torch.zeros((state_size,), dtype=torch.float32))
        self.rnn_cell = torch.nn.LSTMCell(embedding_size, state_size)
        self.output_layer = torch.nn.Linear(state_size, 1)
        
    def forward(self, x_indexed):
        batch_size = x_indexed.shape[0]
        time_steps = x_indexed.shape[1]

        non_pad_mask = x_indexed != self.pad_index
        
        embedded = self.embedding(x_indexed)
        state = self.rnn_s0[None, :].tile((batch_size, 1))
        c = self.rnn_c0[None, :].tile((batch_size, 1))
        for t in range(time_steps):
            (new_state, c) = self.rnn_cell(embedded[:, t, :], (state, c))
            state = torch.where(non_pad_mask[:, t, None], new_state, state) # If we're not outputting the c state then we don't need to mask it.
        return self.output_layer(state)

model = Model(len(vocab), embedding_size=2, state_size=3, pad_index=pad_index)
model.to(device)

optimiser = torch.optim.Adam(model.parameters(), lr=0.1)

print('epoch', 'error')
train_errors = []
for epoch in range(1, 1000+1):
    optimiser.zero_grad()
    logits = model(train_x_indexed)
    train_error = torch.nn.functional.binary_cross_entropy_with_logits(logits, train_y)
    train_errors.append(train_error.detach().tolist())
    train_error.backward()
    optimiser.step()

    if epoch%100 == 0:
        print(epoch, train_errors[-1])
print()

with torch.no_grad():
    print('text', 'output')
    output = torch.sigmoid(model(train_x_indexed))[:, 0].cpu().tolist()
    for (text, y) in zip(train_x, output):
        print(text, y)

(fig, ax) = plt.subplots(1, 1)
ax.set_xlabel('epoch')
ax.set_ylabel('$E$')
ax.plot(range(1, len(train_errors) + 1), train_errors, color='blue', linestyle='-', linewidth=3)
ax.grid()

## Exercises

### 1) Using the LSTM

Rewrite the movie reviews classification program using an LSTM.
Preprocessing has been done for you.
Don't forget to calculate the test set accuracy after training.

In [None]:
min_freq = 3

train_df = pd.read_csv('../data_set/sentiment/train.csv')
test_df = pd.read_csv('../data_set/sentiment/test.csv')

train_x = train_df['text']
train_y = train_df['class']
test_x = test_df['text']
test_y = test_df['class']
categories = ['neg', 'pos']
cat2idx = {cat: i for (i, cat) in enumerate(categories)}

train_y_indexed = torch.tensor(
    train_y.map(cat2idx.get).to_numpy()[:, None],
    dtype=torch.float32, device=device
)
test_y_indexed = test_y.map(cat2idx.get).to_numpy()[:, None]

nltk.download('punkt')
train_x_tokens = [nltk.word_tokenize(text) for text in train_x]
test_x_tokens = [nltk.word_tokenize(text) for text in test_x]
max_len = max(max(len(text) for text in train_x_tokens), max(len(text) for text in test_x_tokens))

frequencies = collections.Counter(token for text in train_x_tokens for token in text)
vocabulary = sorted(frequencies.keys(), key=frequencies.get, reverse=True)
while frequencies[vocabulary[-1]] < min_freq:
    vocabulary.pop()
vocab = ['<PAD>', '<UNK>'] + vocabulary
token2index = {token: i for (i, token) in enumerate(vocab)}
pad_index = token2index['<PAD>']
unk_index = token2index['<UNK>']

train_x_indexed_np = np.full((len(train_x_tokens), max_len), pad_index, np.int64)
for i in range(len(train_x_tokens)):
    for j in range(len(train_x_tokens[i])):
        train_x_indexed_np[i, j] = token2index.get(train_x_tokens[i][j], unk_index)
train_x_indexed = torch.tensor(train_x_indexed_np, device=device)

test_x_indexed_np = np.full((len(test_x_tokens), max_len), pad_index, np.int64)
for i in range(len(test_x_tokens)):
    for j in range(len(test_x_tokens[i])):
        test_x_indexed_np[i, j] = token2index.get(test_x_tokens[i][j], unk_index)
test_x_indexed = torch.tensor(test_x_indexed_np, device=device)