# Text Generation with RNN

In this notebook, we will implement an RNN in Tensorflow for generating text at character-level.

**Note:** This notebook has been created as part of the Encoder-Decoder Architecture course on Google Cloud Skills Boost platform.

## Setup

Here, we are setting up the libraries and reading the dataset.

### Libraries

In [1]:
import os
import time

import numpy as np
import tensorflow as tf

### Dataset

In [2]:
path_to_file = tf.keras.utils.get_file(
    "shakespeare.txt",
    "https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt"
)

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


In [3]:
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
print(f"Length of text: {len(text)} characters.")

Length of text: 1115394 characters.


In [4]:
print(text[:100])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [5]:
vocab = sorted(set(text))
print(f"{len(vocab)} unique characters.")

65 unique characters.


## Preprocessing

In this section, we will process the text to get it in a format that can be used for training an RNN Encoder-Decoder.

### Forward Mapping
Mapping characters to ids.

In [6]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None
)

### Reverse Mapping
Mapping ids to characters.

In [7]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None
)

**Note:** We are using `ids_from_chars.get_vocabulary()` instead of passing the original vocabulary `vocab` for inverse mapping so that `[UNK]` token gets set too.

Writing a utility function to return as one string a list of ids.

In [8]:
def text_from_ids(ids: list) -> list:
    """
        Function to return as single strings a list of list of ids.

        Arguments:
            ids (list): List of list of ids.

        Returns (list): Returns a list of strings reverse mapped from the ids.
    """
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

Create training examples and targets.

In [9]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [10]:
seq_length = 100
examples_per_epoch = len(text) // (seq_length + 1)

In [11]:
sequences = ids_dataset.batch(seq_length + 1, drop_remainder=True)

for seq in sequences.take(1):
    print(text_from_ids(seq))

tf.Tensor(b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou ', shape=(), dtype=string)


## Inference
Task: Given a character, or a sequence of characters, what is the most probable next character?

In [12]:
def split_input_target(sequence: list) -> tuple:
    """
        Function that returns input and target (input shifted by 1) from a
        sequence.

        Arguments:
            sequence (list): Input sequence

        Returns (tuple): (list, list) of input and target
    """
    input_text = sequence[:-1]
    target_text = sequence[1:]

    return input_text, target_text

In [13]:
dataset = sequences.map(split_input_target)

In [14]:
BATCH_SIZE = 64

BUFFER_SIZE = 10000

dataset = (
    dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True).prefetch(tf.data.experimental.AUTOTUNE)
)

## Build the Model

In [15]:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024

In [16]:
class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(
            rnn_units, return_sequences=True, return_state=True
        )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = self.embedding(inputs, training=training)

        if states is None:
            states = self.gru.get_initial_state(x)

        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else:
            return x

In [17]:
model = MyModel(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units
)

In [18]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(
        example_batch_predictions.shape,
        "# (batch_size, sequence_length, vocab_size)"
    )

(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [19]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4022850 (15.35 MB)
Trainable params: 4022850 (15.35 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [20]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [21]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)

print(
    "Prediction shape: ",
    example_batch_predictions.shape,
    " # (batch_size, sequence_length, vocab_size)",
)

print("Mean loss: ", example_batch_mean_loss)

Prediction shape:  (64, 100, 66)  # (batch_size, sequence_length, vocab_size)
Mean loss:  tf.Tensor(4.1898823, shape=(), dtype=float32)


In [22]:
tf.exp(example_batch_mean_loss).numpy()

66.01502

In [23]:
model.compile(optimizer="adam", loss=loss)

In [24]:
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath = checkpoint_prefix, save_weights_only = True
)

In [25]:
EPOCHS = 10

In [26]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## Generate Text

In [31]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        skip_ids = self.ids_from_chars(["[UNK]"])[:, None] # To skip UNK from generation
        sparse_mask = tf.SparseTensor(
            values = [-float("inf")] * len(skip_ids),
            indices = skip_ids,
            dense_shape = [len(ids_from_chars.get_vocabulary())]
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        input_chars = tf.strings.unicode_split(inputs, "UTF-8")
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        predicted_logits, states = self.model(
            inputs=input_ids, states=states, return_state=True
        )

        # Only use last prediction
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits / self.temperature
        predicted_logits = predicted_logits + self.prediction_mask # Apply UNK mask

        # Sample output logits to generate token IDs
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to characters
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return characters and model state
        return predicted_chars, states

In [32]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [34]:
start = time.time()
states = None
next_char = tf.constant(["ROMEO:"])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(
        next_char, states = states
    )
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()

In [35]:
print(result[0].numpy().decode("utf-8"), "\n\n" + "_" * 80)
print("\nRun time:", end - start)

ROMEO:
I for, like pains; heaven want thee stay,
His robable traitors unwasted o'er the court
When he's a charm a holy a should
With witden mean rubject
Of fitles wish, that swear I mean,
think you dit? wratch, may it is, I'll go not helice,
Let me encures and yet encounter'd. I hear you
Your loss
And begrar here from the fear,
My life in the pricker doub, I know you then first fence I fled.
He has the Henry'cle of the sun atternhy
The emperion in our bebals;
And underneat' to see where they come both proclaim'st
This friend upon the petter,
And therefore I'll be full of you of it;
indeed, I'ld auntay; sus his hand me one of Gloucestory!
'What weak a heapt of my desiish father in
his neelf, so than where I live: I cannot
Believe not to all meat, worthy Murderer, an hours but sad
A harder no have and countinal.

KING RICHARD III:
We remember me well deliver'd here to ride.

ROMEO:
I meant, and a knight of ignor house of Gloucester,
Do most issure fill, they can do guess;
Yet, make it my