# Language model using an RNN

This notebook implements a simple RNN language model. Go through it and try understanding the code. If you see an error or something which could be made better, please notice your teacher.

In [None]:
import time
from typing import Any, Callable, Dict, List, Tuple
import re

import numpy as np
import torch
from torch import nn, optim, Tensor
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import DataLoader, Dataset
from torchtext.data.utils import get_tokenizer
from torchtext.datasets import PennTreebank
from torchtext.vocab import build_vocab_from_iterator, Vocab

## Text pipeline
We will use the Penn TreeBank dataset to train an RNN as a language model. The Penn TreeBank is composed of separate sentences so it will be easier for our problem.

In [None]:
train, valid, test = PennTreebank()

In [None]:
type(train)

In [None]:
def iter_to_item_dataset(dataset) -> List[Any]:
    """
    Extracting dataset as a simple List.
    """
    output = [data for data in dataset]
    return output


train = iter_to_item_dataset(train)
valid = iter_to_item_dataset(valid)
test = iter_to_item_dataset(test)

### Data cleaning

We apply a very simple data preprocessing with a simple removal of special characters and lower casing.

In [None]:
START_TOKEN = "<s>"
END_TOKEN = "</s>"
UNK_TOKEN = "<unk>"

NON_CHAR_RE = re.compile("\W+")
MULTI_SPACE_RE = re.compile("\s+")

tokenizer = get_tokenizer("basic_english")


def clean_text(text: str) -> str:
    """
    Remove special characters and lower-case the text
    """
    txt = NON_CHAR_RE.sub(" ", text.lower())
    txt = MULTI_SPACE_RE.sub(" ", txt)
    return txt.strip()


def tokenize_raw(tokenizer: Callable[[str], List[str]], text: str) -> List[int]:
    """
    Tokenize the raw text by cleaning it and adding the special start and end
    tokens.
    """
    return [START_TOKEN] + tokenizer(clean_text(text)) + [END_TOKEN]


def yield_tokens(data):
    """
    Iterating function
    """
    for text in data:
        tokens = tokenize_raw(tokenizer, text)
        yield tokens


vocab = build_vocab_from_iterator(
    yield_tokens(train), min_freq=3, specials=[UNK_TOKEN, START_TOKEN, END_TOKEN]
)
vocab.set_default_index(vocab[UNK_TOKEN])
len(vocab)

For this example, we keep a rather small vocabulary (mostly so we can iterate on the model and try different parameters faster).

In [None]:
class TextPipeline:
    """
    The text pipeline which will take raw text as input and return
    tokenized words.
    """

    def __init__(self, tokenizer, vocab):
        self._tokenizer = tokenizer
        self._vocab = vocab

    def pipeline(self, text):
        return self._vocab(tokenize_raw(self._tokenizer, text))


text_pipeline = TextPipeline(tokenizer, vocab)

So what does our data look like when it goes through the pipeline?

In [None]:
print(train[1])
print(text_pipeline.pipeline((train[1])))

### Dataset and DataLoader
PyTorch asks for data being provided through a DataLoader. This Dataset is just a convenience class which wraps the dataset with a `__len__` and a `__getitem__` function.

Note that the `__getitem__` function returns  tuple which correspond to the original sentence and a shifted version of it, so that the first element correspond to the next input to the RNN and the second one to the corresponding expected output.

In [None]:
class LMData(Dataset):
    """
    Dataset class wrapping our dataset by adding a `__len__` and a `__getitem__`
    function.
    """

    def __init__(self, data, vocab, text_pipeline: TextPipeline):
        self._data = data
        self._vocab = vocab
        self._pipeline = text_pipeline

    def __len__(self) -> int:
        """
        Return the length of the dataset
        """
        return len(self._data)

    def __getitem__(self, idx: int) -> Tuple[List[int], List[int]]:
        """
        Return a tuple corresponding to the current sentence and a left-shifted
        version of it.
        """
        return (
            torch.tensor(self._pipeline.pipeline(self._data[idx])[:-1]),
            torch.tensor(self._pipeline.pipeline(self._data[idx])[1:]),
        )

In [None]:
train_dataset = LMData(train, vocab, text_pipeline)
valid_dataset = LMData(valid, vocab, text_pipeline)
test_dataset = LMData(test, vocab, text_pipeline)

Let's check again our data.

In [None]:
[vocab.vocab.get_itos()[i] for i in train_dataset[10][0].numpy()]

The DataLoader is used by pyTorch to provide batches to the model. For these batches to be treated in parallel on a GPU, they need to all have the same size. We will simply pad the shorter sentences in each batch with end of sentence token ("\</s\>").

Note that pyTorch provides a mechanism called ["packing"](https://stackoverflow.com/questions/51030782/why-do-we-pack-the-sequences-in-pytorch) which prevents the useless overhead in forward propagation and remove the padded data from the loss computation (and which is hardly documented).

In [None]:
batch_size = 8

In [None]:
def collate_fn(data: LMData):
    """
    This function is given to the DataLoader. It only pads the input and output
    with </s>
    """
    data.sort(key=lambda x: len(x[0]), reverse=True)
    lens = [len(sent) for sent, _ in data]
    labels = []
    padded_inputs = (
        torch.ones(len(data), max(lens)).long() * vocab.get_stoi()[END_TOKEN]
    )
    padded_outputs = (
        torch.ones(len(data), max(lens)).long() ** vocab.get_stoi()[END_TOKEN]
    )
    for i, (input_sent, output_sent) in enumerate(data):
        padded_inputs[i, : lens[i]] = torch.LongTensor(input_sent)
        padded_outputs[i, : lens[i]] = torch.LongTensor(output_sent)
    # Putting data in the right shape
    padded_inputs = padded_inputs.transpose(0, 1)
    padded_outputs = padded_outputs.transpose(0, 1)

    return padded_inputs, padded_outputs, lens

In [None]:
train_loader = DataLoader(
    train_dataset, shuffle=True, batch_size=batch_size, collate_fn=collate_fn
)
valid_loader = DataLoader(
    valid_dataset, shuffle=True, batch_size=batch_size, collate_fn=collate_fn
)
test_loader = DataLoader(
    test_dataset, shuffle=True, batch_size=batch_size, collate_fn=collate_fn
)

## The model

Now we will compose an RNN model composed of:
* An embedding layer
* An RNN
* A fully connected layer which project the hidden layer to the vocabulary size (output logit).

At every call, the mode takes the current input and hidden state and return the computed logit and new hidden state.

In [None]:
class RNNLM(nn.Module):
    """
    A RNN-based class for language modelling
    """

    def __init__(
        self,
        vocab_size: int,
        embedding_size: int,
        hidden_size: int,
        output_size: int,
        n_layers: int = 1,
        dropout: int = 0.1,
    ):
        super().__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        # Embedding layer (vocab_size -> embedding_size)
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        # RNN layer (embedding_size -> (output_size, hidden_size)
        self.rnn = nn.RNN(embedding_size, hidden_size, n_layers, dropout=dropout)
        # Fully connected layer (hidden_size > vocab_size)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x: Tensor, hidden: Tensor, lens: Tensor):
        """
        The function exectured every time the model is called
        """
        embeddings = self.embedding(x)
        # To avoid computing the loss and working on padded sequences
        # PyTorch provides a mechanism called packing
        embeddings = nn.utils.rnn.pack_padded_sequence(embeddings, lens)
        output, hidden = self.rnn(embeddings, hidden)
        output, output_lens = pad_packed_sequence(output)
        output = self.fc(output)
        return output, hidden

    def init_hidden(self, batch_size: int) -> torch.Tensor:
        """
        Initialize the hidden layer at 0s. There are better ways...
        """
        return torch.zeros(self.n_layers, batch_size, self.hidden_size)

Before training, let's check we have access to a GPU. If not (and you are on colab), make sure you enable a GPU in `Runtime>Change runtime type`. You can also train the model on a CPU, but it will take a lot longer.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# this should show "device(type='cuda')"
device

In [None]:
model = RNNLM(len(vocab), 16, 256, 64, 1)
model.to(device)

### Training
We train the model on our data by trying to predict the actual output and updating weights accordingly.

Note that if you are not familiar with pyTorch, the cross-entropy function applies softmax on the logits by itself, so there is no need to apply a softmax function here.

In [None]:
def train(
    train_loader,
    model: RNNLM,
    nb_epochs: int,
    learning_rate: float = 0.001,
    batch_size: int = 1,
):
    """
    Our training procedure
    """
    model.train()
    total_loss = 0
    start_time = time.time()
    vocab_size = len(vocab)
    # We are using a cross entropy loss function and a RMSprop optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)

    # Main training loop
    for epoch in range(nb_epochs):
        total_loss = 0
        nb_batches = 0
        for batch, (x, y, lens) in enumerate(train_loader):
            # Putting the data on the device (GPU)
            x = x.to(device)
            y = y.to(device)
            # Initializing the hidden layer
            hidden = model.init_hidden(x.shape[1]).to(device)
            # And the optimizer
            optimizer.zero_grad()
            y_pred, hidden = model(x, hidden, lens)
            loss = criterion(y_pred.transpose(1, 2), y)
            loss.backward()
            optimizer.step()

            total_loss += loss
            nb_batches += 1
            if nb_batches % 1000 == 0:
                print({"batch": nb_batches, "loss": total_loss / nb_batches})

        print(
            {"epoch": epoch, "nb_batches": nb_batches, "loss": total_loss / nb_batches}
        )
        print(f"Next learning rate: {learning_rate}")

In [None]:
train(train_loader, model, 3, learning_rate=0.001, batch_size=batch_size)

## Prediction

To predict the next word, we use a random sampling taking into account the probability of each word given by a softmax plugged on top of our logits. We also use temperatures (0 < temperature <= 1) to affect the probabilities. A lower temperature will make the most likely word more likely to be chosen, while a higher temperature will make picks more random (more on that topic in our next course).

In [None]:
def predict(model: RNNFLM, vocab: Vocab, start: int=[START_TOKEN], max_words: int = 25, temperature: int = 1.0):
    """
    Generate a more or less likely sentence from a given start
    The temperature parameter is something we'll see in our next lesson
    For now think that a lower temperature make the prediction closer to always
    choosing the most likely word.
    """
    model.eval()

    hidden = model.init_hidden(1).to(device)
    current_word = start[-1]
    words = []
    with torch.no_grad():
        for i in range(0, max_words):
            x = torch.tensor(vocab([current_word])).reshape(1, 1).to(device)
            y_pred, hidden = model(x, hidden, [1])
            # We force the input value until we reach the end of the start list
            if i >= len(start):
                last_word_logits = y_pred[0][0]
                last_word_logits /= temperature
                p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().cpu().numpy()
                word_index = np.random.choice(len(last_word_logits), p=p)
#                 word_index = np.argmax(p)
            else:
                word_index = vocab.vocab.get_stoi()[start[i]]
            words.append(vocab.vocab.get_itos()[word_index])
            if words[-1] == "</s>":
                break
            current_word = words[-1]

    return words

In [None]:
predict(model, vocab, [START_TOKEN, "the", "company"], temperature = 1.0)

## Now it's your turn

* Play with the dimensions of the model (embedding, hidden, number of layers, ...) and compare the generated outputs.
* Create a class LSTMLM or GRULM which uses an LSTM or a GRU cell and compare the results.
* Use the language model to compute the perplexity on the test set.