# Tutorial VI: Recurrent Neural Networks

<p>
Bern Winter School on Machine Learning, 2024<br>
Prepared by Mykhailo Vladymyrov.
</p>

This work is licensed under a <a href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

In this session we will see what RNN is. We will use it to predict/generate text sequence, but same approach can be applied to any sequential data.


So far we looked at the data available altogether. In many cases the data is sequential (weather, speach, sensor signals etc).
RNNs are specifically designed for such tasks.

<img src="https://github.com/neworldemancer/BMLWS/raw/main/figures/rnn.png" alt="drawing" width="90%"/><br>



## 1. Load necessary libraries

In [None]:
colab = True # set to True is using google colab

In [None]:
import sys
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import tarfile

import numpy as np
import matplotlib.pyplot as plt
import IPython.display as ipyd
import collections
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.hub import download_url_to_file
from torchvision import transforms, datasets
from torch.utils.tensorboard import SummaryWriter

# We'll tell matplotlib to inline any drawn figures like so:
%matplotlib inline

%load_ext tensorboard

## unpack libraries
if using colab, run the next cell

In [None]:
if colab:
    path = os.path.join(os.path.abspath('.')+'material.tgz')
    url = 'https://github.com/neworldemancer/BMLWS/raw/main/tut_files/tpub0320.tgz'
    # p = tf.keras.utils.get_file(path, url)
    # Download compressed file with torch utils

    download_url_to_file(url=url, dst=path)

    tar = tarfile.open(path, "r:gz")
    tar.extractall()
    tar.close()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## 2. Load the text data

In [None]:
def read_data(fname):
    with open(fname) as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    content = [word for i in range(len(content)) for word in content[i].split()]
    content = np.array(content)
    return content

In [None]:
book_file = 'RNN/rnn.txt'

In [None]:
book_words = read_data(book_file)

In [None]:
print(book_words[:100])

## 3. Build dataset
We will assign an id to each word, and make dictionaries word->id and id->word.
The most frequently repeating words have the lowest id

In [None]:
def build_dictionaries(words):
    count = collections.Counter(words).most_common()
    dictionary = {}
    for word, _ in count:
        dictionary[word] = len(dictionary)
    reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    return dictionary, reverse_dictionary

In [None]:
dictionary, reverse_dictionary = build_dictionaries(book_words)
vocab_size = len(dictionary)

In [None]:
print(dictionary)

Then the whole text will look as a sequence of word ids:

In [None]:
def text_to_ints(text):
    if type(text) == str:
        text = text.split()
    return [dictionary[w] for w in text]

def ints_to_text(arr):
    return ' '.join([reverse_dictionary[it] for it in arr])

In [None]:
words_as_int = text_to_ints(book_words)
print(words_as_int)

print(len(words_as_int))
print(ints_to_text(words_as_int[:100]))

## 3. Data streaming

Here we will see how to feed a dataset for model training:

In [None]:
class WordDataset(torch.utils.data.Dataset):
  def __init__(self, words_as_int, n_input):
    self.words_as_int = words_as_int
    self.n_input = n_input

    self.block_len = self.n_input + 1
    self.n_block = len(self.words_as_int) // self.block_len


  def __len__(self):
    #return len(self.words_as_int) - self.n_input - 1
    return self.n_block

  def __getitem__(self, idx):
    #return self.words_as_int[idx:idx+self.n_input+1]
    start = idx * self.block_len
    end = start + self.block_len
    return self.words_as_int[start:end]

In [None]:
# make preprocessing function converting data to torch tensors
def preprocess(list_word_seq):
    # Make random crops of the sequences n_input+1 length,
    # obtain the input sequence and the target sequence

    # Separate the input and target sequences
    data = [word_seq[:-1] for word_seq in list_word_seq]
    labels = [word_seq[1:] for word_seq in list_word_seq]

    # stack the data and labels into NumPy arrays along the batch dimension (axis 1, SBC format)
    data = np.stack(data, axis=1)
    labels = np.stack(labels, axis=1)

    # Convert NumPy arrays to PyTorch tensors
    # and move them to the specified device (e.g., GPU)
    data_t = torch.tensor(data, dtype=torch.long).to(device)
    labels_t = torch.tensor(labels, dtype=torch.long).to(device)

    return data_t, labels_t

In [None]:
n_input = 3  # word sequence to predict the following word
batch_size = 50

# make data loader

dataset = WordDataset(words_as_int, n_input)
train_loader = torch.utils.data.DataLoader(dataset,
                                           batch_size=batch_size, shuffle=True,
                                           collate_fn=preprocess)

In [None]:
len(dataset)

In [None]:
unique = set([tuple(dataset[i]) for i in range(len(dataset))])
len(unique)

The model will predict input_text -> target_text:

In [None]:
# test data loader
for batch_idx, (data, target) in enumerate(train_loader):
    print(data.shape, target.shape)

    for data_np, tgt_np in zip(data.cpu().numpy().T, target.cpu().numpy().T):
        data_str = ints_to_text(data_np)
        target_str = ints_to_text(tgt_np)
        print(f'{data_str}    ->    {target_str}')

    break

## 4. Construct model

We will build the model in Torch.
It will contain an embedding layer, and three LSTM layers.
Dense layer on top is used to output probability of the next word:

In [None]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, lstm_hidden_dim):
        super(RNN, self).__init__()

        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)

        if type(lstm_hidden_dim) == int:
          lstm_hidden_dim = [lstm_hidden_dim]

        self.rnn = []
        for i, hidd_d in enumerate(lstm_hidden_dim):
          prev_d = embedding_dim if i == 0 else lstm_hidden_dim[i-1]
          rnn = nn.LSTM(prev_d, hidd_d, batch_first=True)
          self.add_module(f'lstm_{i}', rnn)  # add_module is needed to register module in model, so that it can be found by model.parameters()
          self.rnn.append(rnn)

        self.fc = nn.Linear(hidd_d, vocab_size)

    def forward(self, x):
        embedded = F.relu(self.embedding(x))
        for rnn in self.rnn:
            embedded, (hidden_hn, hidden_cn) = rnn(embedded)
            #print(embedded.shape, hidden_hn.shape, hidden_cn.shape)

        output = embedded
        rnn_out = output
        return self.fc(rnn_out)

In [None]:
# Parameters
n_input = 3  # word sequence to predict the following word

# number of units in RNN cells
lstm_hidden_dim = [256, 512, 128]

model = RNN(vocab_size=vocab_size, embedding_dim=128, lstm_hidden_dim=lstm_hidden_dim).to(device)

In [None]:
x = words_as_int[:5]

x = torch.tensor(x, dtype=torch.long).to(device)
print('singe sample shape:', x.shape)
x = x.unsqueeze(1)  # add batch dimension. By default, in torch LSTM expects input of shape (seq_len, batch, input_dim)
print('batch shape:', x.shape)

writer = SummaryWriter('runs/inspect_RNN')
writer.add_graph(model, x)
writer.close()

y = model(x)
print('output shape:', y.shape)

In [None]:
%tensorboard --logdir=runs/inspect_RNN

Let's test not trained model:

In [None]:
for batch_idx, (data, target) in enumerate(train_loader):
    break

In [None]:
pred = model(data)
print(pred.shape)

In [None]:
# get word ids with highest probability
pred_ids = pred.detach().cpu().numpy().argmax(axis=2)

In [None]:
print('input: ', ints_to_text(data.cpu().numpy()[:, 0]))
print('output:', ints_to_text(target.cpu().numpy()[:, 0]))
print('pred:  ', ints_to_text(pred_ids[:, 0]))


## 5. Train!

In [None]:
# Parameters
n_input = 3  # word sequence to predict the following word
batch_size = 50

# make data loader

dataset = WordDataset(words_as_int, n_input)
train_loader = torch.utils.data.DataLoader(dataset,
                                           batch_size=batch_size, shuffle=True,
                                           collate_fn=preprocess)

# number of units in RNN cells
lstm_hidden_dim = [128]

model = RNN(vocab_size=vocab_size, embedding_dim=128, lstm_hidden_dim=lstm_hidden_dim).to(device)

# Define the sparse cross-entropy loss function
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.001)

# Train the model
n_epochs = 200

loss_hist = []
acc_hist = []

for epoch in range(n_epochs):
    train_loss = 0.

    model.train()
    correct = []
    for batch in train_loader:
        data, labels = batch
        optimizer.zero_grad()
        output = model(data)

        # labels are of shape (seq_len, batch_size), output is of shape (seq_len, batch_size, vocab_size)
        # we need to reshape labels to (seq_len*batch_size) and output to (seq_len*batch_size, vocab_size)
        # (they are torch tensors, so we can use view() method)

        output_f = output.view(-1, vocab_size)
        labels_f = labels.view(-1)

        loss = criterion(output_f, labels_f)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

        pred_class = torch.argmax(output, dim=2)
        corr = pred_class == labels
        correct.append(corr.detach().cpu().numpy())

    train_loss /= len(train_loader)
    loss_hist.append(train_loss)

    correct = np.concatenate(correct, axis=1)
    accuracy = np.mean(correct)

    acc_hist.append(accuracy)

    print(f"{epoch}:\t Test loss: {train_loss}; accuracy: {accuracy}")

Note that the RMSProp optimizer is used here, leading to faser convergence in this case than Adam/AdamW

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(10,5))
axs[0].semilogy(loss_hist)
axs[0].set_xlabel('epoch')
axs[0].set_ylabel('loss')
axs[0].set_title('Loss history')
axs[1].plot(acc_hist)
axs[1].set_xlabel('epoch')
axs[1].set_ylabel('accuracy')
axs[1].set_title('Accuracy history')
plt.show()

## 6. Generating text with RNN

Take word sequence and generate the following 128 words:

In [None]:
def gen_long(model, word_id_arr, n_words=128):
  out = []
  words = list(word_id_arr.copy())
  with torch.no_grad():
      for i in range(n_words):
          seq = torch.tensor(words, dtype=torch.long).unsqueeze(1).to(device)
          pred = model(seq)
          pred_class = torch.argmax(pred, dim=2)
          pred_class_np = pred_class.detach().cpu().numpy()
          pred_class_np = pred_class_np[:, 0]  # take only first element of batch

          next_word_idx = pred_class_np[-1]  # take last word of sequence
          words.append(next_word_idx)

  sentence = ints_to_text(words)
  return sentence

In [None]:
for batch_idx, (data, target) in enumerate(train_loader):
  input_seq = data.cpu().numpy()[:, 0]
  sentence = gen_long(model, input_seq)
  print(ints_to_text(input_seq), '...')
  print('\t...', sentence, '\n')

  if batch_idx > 5:
    break

Or try to input some text and see continuation:

In [None]:
try:
    sentence = input("few words")
except KeyboardInterrupt:
    pass

sentence = sentence.strip()
words = sentence.split(' ')

try:
    symbols_in_keys = [dictionary[str(words[i])] for i in range(len(words))]
except:
    print("Word not in dictionary")

sentence = gen_long(model, symbols_in_keys)
print(sentence)

In [None]:
while True:
    prompt = "%s words: " % n_input

    try:
      sentence = input(prompt)
    except KeyboardInterrupt:
      break

    sentence = sentence.strip()
    words = sentence.split(' ')
    # if len(words) != n_input:
    #     continue
    try:
        symbols_in_keys = [dictionary[str(words[i])] for i in range(len(words))]
    except:
        print("Word not in dictionary")
        continue

    sentence = gen_long(model, symbols_in_keys)
    print(sentence)


## 7. Exercise


* Run with 5-7 input words instead of 3.
* increase number of training iterations, since convergance will take much longer (training as well!).

## 8. Further reading

[Illustrated Guide to Recurrent Neural Networks](https://towardsdatascience.com/illustrated-guide-to-recurrent-neural-networks-79e5eb8049c9)

[Illustrated Guide to LSTM’s and GRU’s: A step by step explanation](https://towardsdatascience.com/illustrated-guide-to-lstms-and-gru-s-a-step-by-step-explanation-44e9eb85bf21)

[Understanding LSTM Networks](https://colah.github.io/posts/2015-08-Understanding-LSTMs/)