# Deep Learning
## Formative assessment
### Week 6: Recurrent neural networks

#### Instructions

In this notebook, you will develop an RNN language model to generate text.

Some code cells are provided you in the notebook. You should avoid editing provided code, and make sure to execute the cells in order to avoid unexpected errors. Some cells begin with the line: 

`#### GRADED CELL ####`

These cells require you to write your own code to complete them.

#### Let's get started!

We'll start by running some imports, and loading the dataset.

In [None]:
#### PACKAGE IMPORTS ####

# Run this cell first to import all required packages. Do not make any imports elsewhere in the notebook

import keras
from keras import ops
import torch
import numpy as np
import os
import json
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.layers import TextVectorization
from pathlib import Path

<center><img src="figures/shakespeare.png" title="Shakespeare" style="width: 350px;"/></center>
  
#### The Shakespeare dataset

In this assignment, you will use a subset of the [Shakespeare dataset](http://shakespeare.mit.edu). This dataset consists of a single text file with several excerpts concatenated together. The data is in raw text form, and so far has not yet had any preprocessing. 

Your goal is to construct an unsupervised RNN model that can generate text according to a distribution learned from the dataset. This will be a character-level sequence model, that will predict text one character at a time.

#### Load and inspect the dataset

In [None]:
# Load the text file into a string

with open(Path('data/Shakespeare.txt'), 'r', encoding='utf-8') as file:
    text = file.read()

In [None]:
# Create a list of chunks of text

text_chunks = text.split('.')

To give you a feel for what the text looks like, we will print a few chunks from the list.

In [None]:
# Display some randomly selected text samples

num_samples = 3
inx = np.random.choice(len(text_chunks), num_samples, replace=False)
for chunk in np.array(text_chunks)[inx]:
    print(chunk)
    print('-----')

#### Prepare the Datasets

The model will receive a sequence of characters and predict the next character in the sequence. At training time, the model can be passed an input sequence, with the target sequence shifted by one.

For example, consider the expression `to be or not to be` from Shakespeare's play 'Hamlet'. Given the input `to be or not to b`, the correct prediction is `o be or not to be`. Notice that the prediction is the same length as the input.

<center><img src="figures/to-be-or-not-to-be.png" alt="Training procedure" style="width: 750px;"/></center>
<center>Schematic diagram showing the training procedure for the character language model. The target sequence is the same as the input sequence, shifted by one.</center>
<br>

We will use PyTorch custom Datasets to handle the data processing for this task. These Datasets will do some filtering on the text chunks data, and tokenize the text at the character level. The corresponding DataLoaders will return zero-padded batches of integer tokens for both inputs and outputs. 

We will first do some preliminary processing on the text chunks and create training and validation splits.

In [None]:
# Strip any whitespace at the beginning or end of the strings

text_chunks = [s.strip() for s in text_chunks]

In [None]:
# Make train and validation splits

train_split, valid_split = train_test_split(text_chunks, test_size=0.25)

You should now complete the following `ShakespeareDataset` class which we will use to create a custom Dataset object from one of the splits above. This class subclasses from the `torch.utils.data.Dataset` class. 

We will need to convert the sentence strings to integer tokens for processing by the recurrent neural network. This conversion should be performed by your custom Dataset class, and we will use the `TextVectorization` layer for this.

* The class initializer takes `data_split` (one of the splits above), `min_len`, `max_len` and `textvectorization_layer` as arguments
* Your class should implement the `__init__`, `__len__` and `__getitem__` methods
* The Dataset should filter out any example that has less than `min_len` characters or more than `max_len` characters
* The Dataset should tokenize the text using the `textvectorization_layer` (to be defined later)
* For each tokenized example (with length `seq_len`), the Dataset should split the example into `input_tokens` and `target_tokens`
  * Both `input_tokens` and `target_tokens` should have length `seq_len - 1`
  * `input_tokens` should contain the first `seq_len - 1` tokens of each sequence 
  * `target_tokens` should contain the last `seq_len - 1` tokens of each sequence
* The Dataset should return the tuple `(input_tokens, target_tokens)`

In [None]:
# GRADED CELL ####

# Complete the following class.
# Make sure not to change the function name or arguments.

class ShakespeareDataset(torch.utils.data.Dataset):
    """
    This custom Dataset class takes data_split, min_len, max_len and textvectorization_layer 
    as arguments in the initializer, and returns tokenized text according to the spec above.
    """
    
    def __init__(self, data_split, min_len, max_len, textvectorization_layer):
        self.min_len = min_len
        self.max_len = max_len
        self.data_split = [s for s in data_split if self.min_len <= len(s) <= self.max_len]
        self.textvectorization = textvectorization_layer

    def __len__(self):
        return len(self.data_split)

    def __getitem__(self, index):
        text = self.data_split[index]
        tokenized_text = self.textvectorization(text)
        return tokenized_text[:-1], tokenized_text[1:]

You should now complete the following `get_dataloaders` function to create DataLoaders from Dataset instances using your custom class above.

* The function takes `train_text`, `valid_text`, `min_len`, `max_len` and `batch_size` as arguments
  * `train_text` and `valid_text` are lists of text chunks, as defined above
  * `min_len` and `max_len` are integers defining minimum (resp. maximum) character lengths, as described above in the custom Dataset pre-processing
  * `batch_size` is an integer defining the batch size to be returned by the DataLoaders
* Your function should create an instance of the `TextVectorization` layer
  * This layer should be set up to allow unlimited number of tokens
  * It should standardize the text by converting it to lower case
  * It should split the input sentences at the character level
  * The `TextVectorization` object should be configured using the `train_text` chunks
* A Dataset should be created for both training and validation splits using the `ShakespeareDataset` class above
* Training and validation DataLoaders should then be defined
  * The training dataset should be shuffled, the validation dataset should not
  * Both dataset should use a batch size of `batch_size`
  * The DataLoaders should return tuples of integer tokens `(inputs, outputs)`, with each example padded with zeros up to the length of the longest sequence in the batch
* The function should then return the DataLoaders and `TextVectorization` object in a tuple `(train_dl, valid_dl, text_vectorization)`

In [None]:
#### GRADED CELL ####

# Complete the following function.
# Make sure not to change the function name or arguments.

def get_dataloaders(train_text, valid_text, min_len, max_len, batch_size):
    """
    This function uses the training and validation text chunks and creates Dataset
    objects and corresponding DataLoaders as described above. 
    It should then return the train_dataloader, valid_dataloader and TextVectorization object
    """
    text_vectorization = TextVectorization(max_tokens=None, standardize='lower', split='character')
    text_vectorization.adapt(train_text)

    train_ds = ShakespeareDataset(train_text, min_len, max_len, text_vectorization)
    valid_ds = ShakespeareDataset(valid_text, min_len, max_len, text_vectorization)

    def padded_batch(batch):
        inputs, outputs = zip(*batch)
        
        # The pad_sequence fn expects torch Tensors. The following conversion is only necessary for TF backend
        inputs = [torch.tensor(ops.convert_to_numpy(t)) for t in inputs]
        outputs = [torch.tensor(ops.convert_to_numpy(l)) for l in outputs]
        
        inputs = torch.nn.utils.rnn.pad_sequence(inputs, batch_first=True, padding_value=0)
        outputs = torch.nn.utils.rnn.pad_sequence(outputs, batch_first=True, padding_value=0)
        return inputs, outputs

    train_dataloader = torch.utils.data.DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=padded_batch)
    val_dataloader = torch.utils.data.DataLoader(valid_ds, batch_size=batch_size, shuffle=False, collate_fn=padded_batch)
    return train_dataloader, val_dataloader, text_vectorization

In [None]:
# Run your function to create the DataLoaders and TextVectorization object

train_dl, valid_dl, text_vectorization = get_dataloaders(train_split, valid_split, 
                                                         min_len=10, max_len=400, batch_size=32)

In [None]:
# Test the training DataLoader

inputs, outputs = next(iter(train_dl))
print(inputs)
print(outputs)

#### Build and train the recurrent neural network model

You are now ready to build your RNN character-level language model. You should write the following function to build and compile the model. The function takes arguments `text_vectorization` (created earlier), `embedding_dim` (for the `Embedding` layer), and `gru_units` (for the GRU layer). 

Using the functional API, your function should build your model according to the following specifications:

* The first layer should be an `Input` layer, with a single dimension for the (variable) sequence length
* The second layer should be an Embedding layer with an embedding dimension of `embedding_dim`, and the vocabulary size set using the `text_vectorization` object
  * *Hint: Use the `get_vocabulary` method of the* `TextVectorization` *object to determine the vocabulary size*
  * The Embedding layer should also mask the zero padding in the input sequences
* The next layer should be a (uni-directional) GRU layer with number of units set by the `gru_units` argument
  * The GRU layer should return the full sequence, instead of just the output state at the final time step.
  * It should also return its internal state
* The output of the GRU layer should then be fed through a final `Dense` layer with number of units set to vocabulary size, and no activation function. Call this layer `preds`
* The network should have multiple outputs consisting of the `Dense` layer output and the internal state of the GRU layer
* The model should then be compiled.
  * Use the Adam optimizer with the default arguments
  * For the `loss` argument, you should pass a list of losses, one for each model output. The `Dense` layer output should have a cross entropy loss, and the GRU internal state loss can be `None`
  * Similarly, use a sparse categorical accuracy metric, just for the `Dense` layer output
* Your function should then return the compiled model

_Hint: you might find [this Keras guide](https://keras.io/guides/functional_api/#manipulate-complex-graph-topologies) to be a useful example for working with multi-input and multi-output models._

In [None]:
#### GRADED CELL ####

# Complete the following function.
# Make sure not to change the function name or arguments.

def get_model(text_vectorization, embedding_dim, gru_units):
    """
    This function takes a vocabulary size and batch size, and builds and returns a 
    Sequential model according to the above specification.
    """
    vocab_size = len(text_vectorization.get_vocabulary())
    
    inputs = keras.layers.Input(shape=(None,), name="token_input")
    embedding = keras.layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim,
                                          mask_zero=True, name='embedding')(inputs)
    h, state = keras.layers.GRU(units=gru_units, return_sequences=True, 
                                   return_state=True, name='gru')(embedding)
    preds = keras.layers.Dense(vocab_size, name='preds')(h)
    model = keras.Model(inputs=inputs, outputs=[preds, state])
    
    losses = [
        keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        None
    ]
    model.compile(optimizer='adam', 
                  metrics=[['sparse_categorical_accuracy'], []], 
                  loss=losses)
    return model

In [None]:
# Build the model and print the model summary

rnn_model = get_model(text_vectorization, 256, 1024)
rnn_model.summary()

In [None]:
# Fit the model

history = rnn_model.fit(train_dl, validation_data=valid_dl, epochs=15, 
                        callbacks=[keras.callbacks.EarlyStopping(patience=3)])

In [None]:
# Run this cell to plot accuracy vs epoch and loss vs epoch

plt.figure(figsize=(15,5))
plt.subplot(121)
plt.plot(history.history['preds_sparse_categorical_accuracy'])
plt.plot(history.history['val_preds_sparse_categorical_accuracy'])
plt.title('Accuracy vs. epochs')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.xticks(np.arange(len(history.history['preds_sparse_categorical_accuracy'])))
ax = plt.gca()
ax.set_xticklabels(1 + np.arange(len(history.history['preds_sparse_categorical_accuracy'])))
plt.legend(['Training', 'Validation'], loc='lower right')

plt.subplot(122)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Loss vs. epochs')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.xticks(np.arange(len(history.history['preds_sparse_categorical_accuracy'])))
ax = plt.gca()
ax.set_xticklabels(1 + np.arange(len(history.history['preds_sparse_categorical_accuracy'])))
plt.legend(['Training', 'Validation'], loc='upper right')
plt.show() 

#### Write a text generation algorithm

An algorithm to generate text is as follows:

1. Specify a seed string (e.g. `'ROMEO:'`) to get the network started, and a define number of characters for the model to generate, `num_generation_steps`.
2. Tokenize the seed string to obtain a list containing a list of the integer tokens.
3. Reset the initial state of the recurrent network layer to zeros. 
4. Convert the token list into a Tensor (or numpy array) and pass it to your model as a batch of size one.
5. Get the model prediction (logits) for the last time step and extract the state of the recurrent layer.
6. Use the logits to construct a categorical distribution and sample a token from it.
7. Repeat the following for `num_generation_steps - 1` steps:

    1. Use the saved state of the recurrent layer and the last sampled token to get new logit predictions
    2. Use the logits to construct a new categorical distribution and sample a token from it.
    3. Save the updated state of the recurrent layer.    

8. Take the final list of tokens and convert to text using the TextVectorization layer vocabulary.

Note that we have built our RNN model to return the internal state of the recurrent layer, as well as the logits output from the `Dense` layer. For the GRU layer, the internal state is a single Tensor of shape `(batch_size, gru_units)`.

In [None]:
# Inspect the model's current recurrent state

inputs, outputs = next(iter(train_dl))
print(rnn_model(inputs)[1])

We will break the algorithm down into two steps. First, you should complete the following `sample_token` function that takes a sequence of tokens of any length and returns a token prediction for the last time step. The specification is as follows:

* The function takes the `model` instance, `token_sequence` Tensor, and optional `initial_state` Tensor for the GRU layer
* The `token_sequence` will be an integer Tensor with shape `(batch_size, seq_length)`
  * The `seq_length` will be greater or equal to one
* If the function argument `initial_state` is `None`, then the function should reset the state of the recurrent layer to zeros
* Otherwise, if the function argument `initial_state` is a 2D Tensor or numpy array, it should be used as the initial state of the GRU layer
* Get the model's prediction (logits) for the last time step only
* Use the logits to form a categorical distribution and sample from it (*hint: you might find the* `keras.random.categorical` *function useful for this; see the documentation [here](https://keras.io/api/random/random_ops/#categorical-function)*)
* The function should then return the sample as a 2D integer Tensor of shape `(batch_size, 1)` as well as an updated GRU layer state of shape `(batch_size, gru_units)` in a tuple `(samples, updated_state)`

In [None]:
#### GRADED CELL ####

# Complete the following function.
# Make sure not to change the function name or arguments.

def sample_token(model, token_sequence, initial_state=None):
    """
    This function takes a model object, a token sequence and an optional initial
    state for the recurrent layer. The function should return the logits prediction
    for the final time step as a 2D numpy array.
    """
    h = token_sequence
    updated_state = None
    for layer in model.layers:
        if isinstance(layer, keras.layers.InputLayer):
            continue  # skip the Input layer
        elif isinstance(layer, keras.layers.GRU):
            if initial_state is None:
                initial_state = layer.get_initial_state(h.shape[0])
            h, updated_state = layer(h, initial_state=initial_state)
        else:
            h = layer(h)
    final_step = h[:, -1, :]  # (batch_size, num_tokens)
    samples = keras.random.categorical(final_step, 1)  # (batch_size, 1)
    return samples, updated_state

In [None]:
# Test your function by passing in a dummy token sequence

sample_token(rnn_model, ops.convert_to_tensor([[30, 2, 24], [16, 12, 33]]))

Finally, you should complete the following function to generate text from the model, given a seed string.

* This function takes the `model` instance, `seed_string`, `text_vectorization`, `num_generation_steps` and `sample_token` function as arguments
* The function should first convert the `seed_string` to integer tokens using the `text_vectorization` object, and store them in a 2D integer Tensor with batch size equal to one
* The function should then run an internal loop for `num_generation_steps`:
  * In the first iteration through the loop, the integer token sequence should be passed to the `sample_token` function (passed in as an argument), to get the next sample token and updated GRU state
  * The `initial_state` can be set to `None` in the first iteration, in which case it is initialised to zeros
  * For the remaining iterations, the `sample_token` function should be called using the sampled token (with batch size and sequence length of one) and updated internal GRU state
* The `text_vectorization` object should then be used to convert the final sequence of integer tokens back to characters, and then concatenated to a single string
  * The final string will have length given by `num_generation_steps` plus the length of the initial seed string
* Your function should then return this final string

In [None]:
#### GRADED CELL ####

# Complete the following function.
# Make sure not to change the function name or arguments.

def generate_text(model, seed_string, text_vectorization, num_generation_steps, sample_token=sample_token):
    """
    This function takes a model object, a seed string, a TextVectorization object and a 
    number of steps to generate characters as arguments. It should generate text 
    according to the above directions and return the extended string.
    """
    token_sequence = text_vectorization(seed_string)[None, ...]  # (1, seq_length)
    input_sequence = token_sequence
    initial_state = None
    for _ in range(num_generation_steps):
        sample, updated_state = sample_token(model, input_sequence, initial_state=initial_state)
        token_sequence = ops.concatenate((token_sequence, sample), axis=1)
        input_sequence = sample
        initial_state = updated_state
    
    inx_to_chars = {i: c for i, c in enumerate(text_vectorization.get_vocabulary())}
    final_token_sequence = ops.convert_to_numpy(ops.squeeze(token_sequence))
    final_char_sequence = [inx_to_chars[token] for token in final_token_sequence]
    return ''.join(final_char_sequence)

#### Generate text from the model

You are now ready to generate text from the model!

In [None]:
# Create a seed string and number of generation steps

init_string = 'ROMEO:'
num_generation_steps = 200

In [None]:
# Use your model and function above to generate text

print(generate_text(rnn_model, init_string, text_vectorization, num_generation_steps))

Congratulations on completing this week's assignment! You have now built and trained a character-level RNN language model on text data, and used it to generate new text examples.