## Text Generation with RNN 

based on https://www.tensorflow.org/text/tutorials/text_generation

In [7]:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

import numpy as np
import os
import time

In [8]:
# Read, then decode for py2 compat.
text = open("divina_commedia.txt", 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')


Length of text: 552078 characters


In [11]:
# Take a look at the first 250 characters in text
print(text[:250])

LA DIVINA COMMEDIA

di Dante Alighieri


INFERNO




Inferno
Canto I


Nel mezzo del cammin di nostra vita
mi ritrovai per una selva oscura,
ché la diritta via era smarrita.

Ahi quanto a dir qual era è cosa dura
esta selva selvagg


In [12]:
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

80 unique characters


### MAP the chars into numerical rapresentation and viceversa

In [13]:

ids_from_chars = preprocessing.StringLookup(vocabulary=list(vocab), mask_token=None)

In [14]:
chars_from_ids = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

In [18]:
def text_from_ids(ids):
    """just return the sequence of chars from ids"""
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [20]:
#generate the dataset

all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [23]:
seq_length = 100                                         #number of elements in a sequence
examples_per_epoch = len(text)//(seq_length+1)

In [25]:
#take advantage of the batch -> The text is not shuffled!

sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(5):
    print(text_from_ids(seq).numpy())

b'LA DIVINA COMMEDIA\r\n\r\ndi Dante Alighieri\r\n\r\n\r\nINFERNO\r\n\r\n\r\n\r\n\r\nInferno\r\nCanto I\r\n\r\n\r\nNel mezzo del ca'
b'mmin di nostra vita\r\nmi ritrovai per una selva oscura,\r\nch\xc3\xa9 la diritta via era smarrita.\r\n\r\nAhi quant'
b'o a dir qual era \xc3\xa8 cosa dura\r\nesta selva selvaggia e aspra e forte\r\nche nel pensier rinova la paura!\r'
b'\n\r\nTant\xe2\x80\x99 \xc3\xa8 amara che poco \xc3\xa8 pi\xc3\xb9 morte;\r\nma per trattar del ben ch\xe2\x80\x99i\xe2\x80\x99 vi trovai,\r\ndir\xc3\xb2 de l\xe2\x80\x99altre cose'
b' ch\xe2\x80\x99i\xe2\x80\x99 v\xe2\x80\x99ho scorte.\r\n\r\nIo non so ben ridir com\xe2\x80\x99 i\xe2\x80\x99 v\xe2\x80\x99intrai,\r\ntant\xe2\x80\x99 era pien di sonno a quel punto\r\nc'


### generate the labels (the next character to be predited)

In [26]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [27]:
dataset = sequences.map(split_input_target)


In [28]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset shapes: ((64, 100), (64, 100)), types: (tf.int64, tf.int64)>

In [29]:
for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

Input : [b'he de\xe2\x80\x99 sodisfar chi qui s\xe2\x80\x99astalla;\r\n\r\ne l\xc3\xa0 dov\xe2\x80\x99 io fermai cotesto punto,\r\nnon s\xe2\x80\x99ammendava, per prega'
 b'rte\xc2\xbb.\r\n\r\nE questo fece i nostri passi scarsi,\r\ntanto che pria lo scemo de la luna\r\nrigiunse al letto'
 b'ra mal dilettar con giuste pene.\r\n\r\nVostra natura, quando pecc\xc3\xb2 tota\r\nnel seme suo, da queste dignit'
 b'a,\r\nche soffera congiunto \xe2\x80\x98sono\xe2\x80\x99 ed \xe2\x80\x98este\xe2\x80\x99.\r\n\r\nDe la profonda condizion divina\r\nch\xe2\x80\x99io tocco mo, la m'
 b'sempre amore accende;\r\n\r\ne s\xe2\x80\x99altra cosa vostro amor seduce,\r\nnon \xc3\xa8 se non di quella alcun vestigio,\r'
 b'sa che non duri\r\netternalmente, quello amor si spoglia.\r\n\r\nQuale per li seren tranquilli e puri\r\ndis'
 b'rse e che la punse.\r\n\r\nNe l\xe2\x80\x99ordine che fanno i terzi sedi,\r\nsiede Rachel di sotto da costei\r\ncon B\xc3\xaba'
 b'perchia;\r\nmontar potrete su per la ruina,\r\n

In [30]:
# Length of the vocabulary in chars
vocab_size = len(vocab)

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [32]:
class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units,
                                       return_sequences=True,
                                       return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else:
            return x

In [33]:
model = MyModel(
    # Be sure the vocabulary size matches the `StringLookup` layers.
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [34]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

In [36]:
# Directory where the checkpoints will be saved
os.makedirs("training_checkpoints",exist_ok=True)
checkpoint_dir = 'training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)
history = model.fit(dataset, epochs=20, callbacks=[checkpoint_callback])


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


## Function to generate text

In [37]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create a mask to prevent "[UNK]" from being generated.
        skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            # Put a -inf at each bad index.
            values=[-float('inf')]*len(skip_ids),
            indices=skip_ids,
            # Match the shape to the vocabulary
            dense_shape=[len(ids_from_chars.get_vocabulary())])
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs.
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Run the model.
        # predicted_logits.shape is [batch, char, next_char_logits]
        predicted_logits, states = self.model(inputs=input_ids, states=states,
                                              return_state=True)
        # Only use the last prediction.
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits/self.temperature
        
        # Apply the prediction mask: prevent "[UNK]" from being generated.
        predicted_logits = predicted_logits + self.prediction_mask

        # Sample the output logits to generate token IDs.
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to characters
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return the characters and model state.
        return predicted_chars, states


In [38]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)


In [39]:
start = time.time()
states = None
next_char = tf.constant(['CANTO:'])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(next_char, states=states)
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

CANTO:
ancor, se ’l mondo fé paura e diso,
a la veduta sol di nova pietate,

che qui si gittar tutta si diparte
tu ’l segno di ragionar dentro al monte.

La perfusca penna che li ’mpenide:
vedesse così sen gio sovvenne.

Ben far poscia, pera di la coda incora.

Quando si vostrò poi: grandizio e vaso,
accorto son di grande artore,
così la coronato del suo prossi,
e venne a l’embision si cerchio
là dove mi lieve si divolse:
per che ’l dolor non fosse chiamata;
“Deh, se, Cristolondo, e anco involto.

Ed elli a me: «Tutti son ricusci,
e vinne lui, onore, e aspetto
salir potiensi, per lor s’affette
intremaste a la sua melvadina;
qui conobbi il color de la piuma
fanno uscito duca, giunti a la rvolto.

Come la mia manna, quando Lodo
si fa del renare o di sotto?».

Ed elli a me: «Tu vedëa i miei passi
per danni de la voglia alte stiala;

però che la notte il secondo imprenta
e ’l papal de la scaleata scala;
ed el prima avea pensava carca,
che ballella sì per