<a href="https://colab.research.google.com/github/nyp-sit/sdaai-iti107/blob/main/session-6/text_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab" align="left"/></a>

# Generating Text using RNN

Sequence model (such as RNN) is effective in learning a language model. In this programming exercise, we are going to make our RNN model read a lot of poems of William Shakespeare and use it to write poems in the style of Shakespeare!!

![shakespeare](nb_images/shakespeare.jpg)


**You will learn how to:**
- set up a 'vanila' RNN model 
- prepare the input data for learning a character-level language model
- generate and sample character from the RNN output


## Setup

### Import TensorFlow and other libraries

In [None]:
import tensorflow as tf
import numpy as np
import os

## Prepare Data

For this example, we will use the Shakespeare's Sonnets as our training corpus. Change the following line to run this code on your own text corpus.

In [None]:
path_to_file = tf.keras.utils.get_file('shakespeare_sonnets.txt','https://sdaai-bucket.s3-ap-southeast-1.amazonaws.com/datasets/shakespeare_sonnets.txt')
# Read, we assume the input text is utf-8. ignore non-utf-8 char if it happend to be in the text
text = open(path_to_file, 'rb').read().decode('utf-8', 'ignore')
# length of text is the number of characters in it
print ('Length of text: {} characters'.format(len(text)))

In [None]:
# Take a look at the first 250 characters in text
print(text[:200])

**Character-based language model**

For language model, we can either choose to use word or character as our vocabulary for our model. In this exercise, we will be buidling a character-level language model. The benefit of character-based language models is their small vocabulary and flexibility in handling any words, punctuation, and other document structure.

**Exercise:**

Create a vocabulary based on all unique characters in the text corpus. (Hint: use a sorted set). 

What is your vocabulary size? 

<br/>
<details><summary>Click here for answer</summary>

```
vocab = sorted(set(text))
```

there are total of 65 unique characters

</details>


In [None]:
# Get the unique characters in the file

### START YOUR CODE HERE (~1 line) ###


## END YOUR CODE ###

print ('{} unique characters'.format(len(vocab)))

### Vectorize the text

As neural network only deals with the numbers, we need to map our text to a numerical representation. We will create two mapping tables: one mapping characters to numbers (integer numbers), and another for numbers to characters. The reverse mapping (numbers to characters) will be used to convert the model output (which are in numbers) to recognizable text.

In [None]:
# Creating a mapping from unique characters to indices

#enumerate() in python will iterate over the list and return the index (0, 1, 2, ..) in addition to the element itself.
#in the line below, we swap the place of i, and u so that the dictionary key is u (i.e. char) and the value is i (i.e. index)
char2idx = {u:i for i, u in enumerate(vocab)}

# mapping of index to character
idx2char = np.array(vocab)

print(char2idx)

`char2idx` mapping table can now be used to convert the text to numbers (integers).

**Exercise:**

Convert `text` into np.array of integers. 

***Hint*** Use list comprehension for text by mapping each character to numbers using `char2idx` and then convert the list of numbers to numpy array by call np.array(list). 

<br/>

<details><summary>Click here for answer</summary>  
    
```
text_as_int = np.array([char2idx[c] for c in text])
```
</details>


In [None]:
### START YOUR CODE HERE ###


### END YOUR CODE ###

In [None]:
# Show how the first 30 characters from the text are mapped to integers
print ('{} ---- characters mapped to int ---- > {}'.format(repr(text[:30]), text_as_int[:30]))

### Formulate the learning task

This is what we want to achieve for our model: given a character, or a sequence of characters, what is the most probable next character? 

So how should we create our training samples? The input to the model will be a sequence of characters, and the target sequence will be the same sequence of characters, but offset by one timestep.  For example, given the the corpus text such as 
```
From fairest creatures we desire increase, 
That thereby beauty's rose might never die,
```

If we fix our timestep as 10 (input sequence length is 10), we can choose our train input sequence to be 'From faire', and our target sequence be 'rom faires'. So basically we are training our model to predict next character correctly by minimizing cross entropy loss between expected output character and predicted output character across all time steps. 

We can visualize the training steps as follow: 


<img src="nb_images/training_samples.png" style="width:400;height:300px;">



### Create training examples and targets

We will divide the text into sample sequences. Each sample sequence will contain `seq_length` characters from the text.

For each input sequence, the corresponding target sequence contain the same length of text, except shifted one character to the right.

**Exercise:**

One way to create sample sequences are to divide the corpus into sequences of `seq_length+1` characters. The 1st sequence will take `seq_length+1` characters starting from `0`, 2nd sequence will take `seq_length+1` characters starting from `seq_length+1`, and 3rd sequence will take `seq_length+1` characters starting from `2*(seq_length+1)` and so on. For each sequence, `[0:seq_length]` becomes input sequence, and `[1:seq_length+1]`, i.e. offset by 1, becomes target sequence. 


Complete the codes below. 

<br/>

<details><summary>Click here for answer</summary>  
    
```
 x = text[start_idx:end_idx][:seq_length]
 y = text[start_idx:end_idx][1:seq_length+1]
```
</details>

In [None]:
def create_training_samples(text, seq_length):
    """
    Create samples of x (input) and y (target) character sequences of length seq_length from the given corpus text

    Arguments:
    text -- the corpus text (as integers)
    seq_length -- the length of the character sequence (i.e. the number of time steps)

    Returns:
    X -- a list of input character (as integers) sequences 
    Y -- a list of target character (as integers) sequences 
    """
    
    X = []
    Y = []
    
    # find out how many samples of (seq_length + 1) can be created from text
    num_samples = len(text) // (seq_length+1)
    
    for i in range(num_samples): 
        # offset by seq_length+1 each time
        start_idx = i*(seq_length+1)
        end_idx = start_idx + (seq_length+1)
        
        ### BEGIN YOUR CODE HERE 

        

        ### END YOUR CODE HERE  
        
        X.append(x)
        Y.append(y)
     
    return X, Y

In [None]:
def decode(text_in_int):
    decoded = ''.join([idx2char[c] for c in text_in_int])
    return decoded

Create the training samples and print the first examples input and target values:

In [None]:
seq_length = 100
num_samples = len(text) // (seq_length+1)
print(num_samples)

In [None]:
X, Y = create_training_samples(text_as_int, seq_length=seq_length)
print(repr(decode(X[0])))
print(repr(decode(Y[0])))

Each index of these vectors are processed as one time step. For the input at time step 0, the model receives the index for "T" and tries to predict the index for "h" as the next character. At the next timestep, it does the same thing but the `RNN` considers the previous step's context in addition to the current input character.

### Create training batches

We will need to shuffle the training sequences before training to reduce variance in our model. Of course we can use our beloved scikit-learn to do the shuffling, but here we want to introduce you the 'tensorflow-way' of doing data pipelining and transformation, by using `tf.data.Dataset`. We first create a `tf.data.Dataset` from the X and Y sequences and create a pipeline to shuffle and batching the data, by calling `shuffle()` and `batch()` on the pipeline.

For shuffling operation, we need to specify a buffer size. This specifies the size of the buffer which will be filled with the data samples for us to randomly sampled from. For perfect shuffling, a buffer size greater than or equal to the full size of the dataset is required.


In [None]:
# define a batch_size
BATCH_SIZE = 64
# buffer size for shuffling
BUFFER_SIZE = 10000

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((X, Y))
dataset_shuffled = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)


Let's just test our data pipeline to see if our sequences are shuffled properly and the batching is working. We should expect for each take() operation, we will get BATCH_SIZE of x and y, already shuffled. 

In [None]:
for x, y in dataset_shuffled.take(1):
    print('shape of x={} and y={}'.format(x.shape, y.shape))
    print('1st x sample of the batch = {}'.format(repr(decode(x[0]))))
    print('1st y sample of the batch = {}'.format(repr(decode(y[0]))))


## Build The Model

Use `tf.keras.Sequential` to define the model. For this simple example three layers are used to define our model:

* `tf.keras.layers.Embedding`: The input layer. A trainable lookup table that will map the numbers of each character to a vector with `embedding_dim` dimensions;
* `tf.keras.layers.SimpleRNN`: A plain vanilla RNN with size `units=rnn_units` (You can replace it with better model like LSTM/GRU which will be covered next lesson). We also need to set `return_sequences=True` and `stateful=True`.
* `tf.keras.layers.Dense`: The output layer, with `vocab_size` outputs.

***Note***: 

We need to set `return_sequences` to **True**  to return the hidden state output for each input time step. We need the output at each timestep so as to compare the expected char at each time step. 

We are setting `stateful` to **True** so that the last state for each sample at index `i` in a batch will be used as initial state for the sample of index `i` in the following batch. 

In [None]:
# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

**Exercise:**

Complete the code below to build the model specified above. 

<br/>

<details><summary>Click here for answer</summary>  

```
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim, 
                         batch_input_shape=[batch_size, None]),
    tf.keras.layers.SimpleRNN(rnn_units,
                    return_sequences=True,
                    stateful=True,
                    recurrent_initializer='glorot_uniform'),
    tf.keras.layers.Dense(vocab_size)
])
```

</details>

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    
    ### START YOUR CODE HERE ###

    
    

    ### END YOUR CODE HERE ###
    
    return model

In [None]:
model = build_model(
    vocab_size = len(vocab),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units,
    batch_size=BATCH_SIZE)

For each character the model looks up the embedding, runs the Simple RNN one timestep with the embedding as input, and applies the dense layer to generate logits predicting the log-likelihood of the next character:


<img src="nb_images/text_generation_training.png" style="width:600;height:450px;"/>


## Try the model

Now let us run the model to see that it behaves as expected.

First check the shape of the output:

In [None]:
for input_batch, target_batch in dataset_shuffled.take(1):
    batch_predictions = model(input_batch)
    print(batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In the above example the sequence length of the input is `100` but the model can be run on inputs of any sequence length:

In [None]:
model.summary()

**Exercise:**

You can develop a better understanding of the RNN model by looking at the shape of the different weights in RNN layer. For example, in RNN layer, there are 3 weights `Wx`, `Wh` and `bias` returned if you call `get_weights()`. See if you can guess the shape of the weights correctly. Also compare your calculations against the `param #` show in the `model.summary()` above for the RNN layer.

In [None]:
Wx, Wh, b = model.layers[1].get_weights()

# Uncomment below to check your answer

print('Wx={}'.format(Wx.shape))
print('Wh={}'.format(Wh.shape))
print('b={}'.format(b.shape))

To get actual predictions from the model we need to sample from the output distribution, to get actual character indices. This distribution is defined by the logits over the character vocabulary.

Note: It is important to _sample_ from this distribution instead of always take _argmax_ of the distribution as it can easily get the model stuck in a loop.

Try it for the first example in the batch:

In [None]:
sampled_indices = tf.random.categorical(batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()

This gives us, at each timestep, a prediction of the next character index:

In [None]:
sampled_indices

Decode these to see the text predicted by this untrained model:

In [None]:
print("Input: \n", repr(decode(input_batch[0])))
print("Next Char Predictions: \n", repr(decode(sampled_indices)))

Well, we see that an untrained model does not really generate any interesting looking text, but some gibberish characters. We will see if our model can do better after being trained on the Shakespeare text

## Train the model

### Attach an optimizer, and a loss function

We use `tf.keras.losses.sparse_categorical_crossentropy` as our loss function as our label is not one-hot-encoded,

Because our model returns logits, we need to set the `from_logits` flag to True.


In [None]:
def loss(labels, logits):
    print(labels.shape)
    print(logits.shape)
    return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

batch_loss  = loss(y[:BATCH_SIZE], batch_predictions)
print("Prediction shape: ", batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", batch_loss.numpy().mean())

Configure the training procedure using the `tf.keras.Model.compile` method. We'll use `tf.keras.optimizers.Adam` with default arguments and the loss function as defined above.

In [None]:
#model.compile(optimizer='adam', loss=loss)
model.compile(optimizer='adam', loss=loss)

### Configure checkpoints

Use a `tf.keras.callbacks.ModelCheckpoint` to ensure that checkpoints are saved during training:

In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

### Execute the training

To keep training time reasonable, use 60 epochs to train the model. In Colab, set the runtime to GPU for faster training.

In [None]:
EPOCHS=60

In [None]:
history = model.fit(dataset_shuffled, epochs=EPOCHS, callbacks=[checkpoint_callback])

## Generate text

### Restore the latest checkpoint

To keep this prediction step simple, let us just use a single sample at a time (i.e. batch size of 1).

Because of the way the RNN state is passed from timestep to timestep, the model only accepts a fixed batch size once built.

To run the model with a different `batch_size`, we need to rebuild the model and restore the weights from the checkpoint.


In [None]:
tf.train.latest_checkpoint(checkpoint_dir)

In [None]:
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))


In [None]:
model.summary()

### The prediction loop

The following code block generates the text:

* It starts by choosing a start string, initializing the RNN state and setting the number of characters to generate.

* Get the prediction distribution of the next character using the start string and the RNN state.

* Then, use a categorical distribution to calculate the index of the predicted character. Use this predicted character as our next input to the model.

* The RNN state returned by the model is fed back into the model so that it now has more context, instead of only one word. After predicting the next word, the modified RNN states are again fed back into the model, which is how it learns as it gets more context from the previously predicted words.


![To generate text the model's output is fed back to the input](nb_images/text_generation_sampling.png)

Looking at the generated text, you'll see the model knows when to capitalize, make paragraphs and imitates a Shakespeare-like writing vocabulary. As our training samples are pretty small, it has not yet learned to form coherent sentences.

In [None]:
import sys

def generate_text(model, start_string):
    # Evaluation step (generating text using the learned model)
     # Number of characters to generate
    num_generate = 1000

    # Converting our start string to numbers (vectorizing)
    input_eval = [char2idx[s] for s in start_string]
    # add in the batch dimension at axis=0
    input_eval = tf.expand_dims(input_eval, 0)

    # Low temperatures results in more predictable text.
    # Higher temperatures results in more surprising text.
    # Experiment to find the best setting.
    
    temperature = 0.8
    model.reset_states()
    sys.stdout.write(start_string)
    for i in range(num_generate):
        predictions = model(input_eval)
        # remove the batch dimension
        predictions = tf.squeeze(predictions,0)   # predictions shape = (input_seq_len, vocab_size)
        
        predictions = predictions / temperature
        
        # sample char(s) from output distribution  
        sampled = tf.random.categorical(predictions, num_samples=1)  # sampled shape = (input_seq_len, 1)
    
        # take the last char of the sampled sequence [-1]. 0 to access the first element of second axis
        predicted_id = sampled[-1,0].numpy()
        
        # We pass the sampled char as the next input to the model
        # along with the hidden state from previous step
        
        # need to add the batch axis back before feeding to model
        input_eval = tf.expand_dims([predicted_id], 0)
        next_char = idx2char[predicted_id]
        
        sys.stdout.write(next_char)
        sys.stdout.flush()
        

Now let's ask our model to write some poem. You can try to give it a starting word(s), e.g. Love is or Thou art.

In [None]:
usr_input = input("Write the beginning of your poem, the Shakespeare machine will complete it. Your input is: ")
print('-'*40)
print(generate_text(model, start_string=usr_input))
print('-'*40)

### Things to try

1. Train your model longer for more epochs and see if it generates better text.

2. Experiment with a different start string 

3. Try adding another RNN layer to improve the model's accuracy

4. Adjust the temperature parameter to generate more or less random predictions.

5. Try replace SimpleRNN witg GRU or LSTM (to be covered in next lesson)