### Setup

In [1]:
# NOTE: import TF and other libraries

import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import time
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import tensorflow as tf


In [2]:
# NOTE: download Shakespeare dataset

path_to_file = tf.keras.utils.get_file(
    "shakespeare.txt",
    "https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt",
)

In [3]:
# NOTE: read the data

text = open(path_to_file, "rb").read().decode(encoding="utf-8")
print(f"[DEBUG] length of text: {len(text)} characters")
print(f"{text[:250]=}")

vocab = sorted(set(text))
print(f"{len(vocab)} unique characters")

[DEBUG] length of text: 1115394 characters
text[:250]='First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you know Caius Marcius is chief enemy to the people.\n'
65 unique characters


### Process the text

In [4]:
# NOTE: vectorize the text

example_texts = ["abcdefg", "xyz"]

# string to set of chars
chars = tf.strings.unicode_split(example_texts, input_encoding="UTF-8")
print(chars)

# set of chars to numerical representation
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None
)
ids = ids_from_chars(chars)
print(ids)

# numerical representation to set of chars
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), 
    invert=True, 
    mask_token=None
)
chars = chars_from_ids(ids)
print(chars)

# set of chars to string
def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>
<tf.RaggedTensor [[40, 41, 42, 43, 44, 45, 46], [63, 64, 65]]>
<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>


In [6]:
# NOTE: the prediction task: 
# given sequence of character, 
# what is the most probable next character?

# NOTE: create training examples and targets
all_ids = ids_from_chars(tf.strings.unicode_split(text, "UTF-8"))
print(f"{all_ids=}")

ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode("utf-8"))

# batching
seq_length = 100
examples_per_epoch = len(text) // (seq_length+1) # NOTE: training set; given sequence(seq_length), predict next character(+1)
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)
for seq in sequences.take(5):
    print(text_from_ids(seq).numpy())

all_ids=<tf.Tensor: shape=(1115394,), dtype=int64, numpy=array([19, 48, 57, ..., 46,  9,  1])>
F
i
r
s
t
 
C
i
t
i
b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


In [9]:
# NOTE: preparing dataset: (input, label)
# `input`: current character
# `label`: next character

def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]

    return input_text, target_text
split_input_target(list("Tensorflow"))

dataset = sequences.map(split_input_target) # NOTE: transformation using input function
for input_example, target_example in dataset.take(1):
    print(f"[DEBUG] Input:\t {text_from_ids(input_example).numpy()}")
    print(f"[DEBUG] Target:\t {text_from_ids(target_example).numpy()}")

[DEBUG] Input:	 b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
[DEBUG] Target:	 b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '


In [10]:
# NOTE: create training batches

BATCH_SIZE = 64
BUFFER_SIZE = 10000 # NOTE: size of container which will store already shuffled dataset

dataset = (
    dataset.shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
) # NOTE: pack shuffled data into batches

print(dataset)

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>


In [12]:
# NOTE: build the model

vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024

class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__(self)

        # NOTE: layers
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(
            rnn_units, 
            return_sequences=True, 
            return_state=True
        )
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, 
             inputs, 
             states=None, 
             return_state=False, 
             training=False):
        
        x = self.embedding(inputs, training=training)

        # NOTE: if training, load & use previous state
        if None is states:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        return (x, states) if return_state else x
    
model = MyModel(
    # NOTE: be sure the vocabulary size matches the `StringLookup` layers
    vocab_size=len(ids_from_chars.get_vocabulary()), 
    embedding_dim=embedding_dim, 
    rnn_units=rnn_units
)

### Try the model

In [17]:
# NOTE: validate the model

# NOTE: 1. check the shape of the output:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(f"{example_batch_predictions.shape} # (batch_size, sequence_length, vocab_size)")
print()

# NOTE: 2. layer architecture
print(f"{model.summary()}")
print()

# NOTE: 3. get predictions using random
# NOTE: using softmax for choosing next best prediction will result stuck in a loop!
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
print(f"[DEBUG] {sampled_indices=}")
print()
print(f"[DEBUG] input: \n{text_from_ids(input_example_batch[0]).numpy()}")
print(f"[DEBUG] next char predictions: \n{text_from_ids(sampled_indices).numpy()}")

(64, 100, 66) # (batch_size, sequence_length, vocab_size)
Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4022850 (15.35 MB)
Trainable params: 4022850 (15.35 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
None
[56 24 61 31 17  1 15 59 50 13 47 51 21 10 17  7 14 27  1 45 54 46 33 54
 32 58  0 56 36 53  2 47 45  0 35 11 24 30 33 43 52 37  8  6 46 33 54 59
 27 33 52 30 22 61 44 29 57 54 35 37  3 52  9 23 60 31 39  2 51 48 38 31
