In [1]:
import tensorflow as tf
import numpy as np
import os
import time

In [2]:
path_to_file = f'./data/shakespeare.txt'

In [9]:
# read and decode the data
text = open(path_to_file, 'rb').read().decode(encoding='utf-8').lower()

# check the text 
print(text[:300])

first citizen:
before we proceed any further, hear me speak.

all:
speak, speak.

first citizen:
you are all resolved rather to die than to famish?

all:
resolved. resolved.

first citizen:
first, you know caius marcius is chief enemy to the people.

all:
we know't, we know't.

first citizen:
let us


In [11]:
# create the vocab 
vocab = sorted(set(text))
print(len(vocab))
print(vocab)

39
['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [27]:
# encoding string to numerical value using StringLookUp 

# create a token first 
example_texts = ['abcdefg', 'xyz']
chars = tf.strings.unicode_split(example_texts, input_encoding = 'UTF-8')

ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary = list(vocab), 
    mask_token = None
)

In [20]:
ids = ids_from_chars(chars)
ids

<tf.RaggedTensor [[14, 15, 16, 17, 18, 19, 20], [37, 38, 39]]>

In [21]:
chars_from_id = tf.keras.layers.StringLookup(
    vocabulary = ids_from_chars.get_vocabulary(),
    invert = True, 
    mask_token = None
)

chars = chars_from_id(ids)
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [23]:
def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_id(ids), axis=-1).numpy()

In [30]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

for id in ids_dataset.take(10):
    print(chars_from_id(id).numpy().decode('utf-8')) 

f
i
r
s
t
 
c
i
t
i


In [33]:
seq_length = 100
sequences = ids_dataset.batch(seq_length + 1, drop_remainder=True)

# see the sequence

for seq in sequences.take(1):
    print(chars_from_id(seq))

tf.Tensor(
[b'f' b'i' b'r' b's' b't' b' ' b'c' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'b' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'a' b'l' b'l' b':' b'\n' b's' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'f' b'i'
 b'r' b's' b't' b' ' b'c' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'y'
 b'o' b'u' b' '], shape=(101,), dtype=string)


In [34]:
# see the actual sentence 

for seq in sequences.take(1):
    print(text_from_ids(seq))

b'first citizen:\nbefore we proceed any further, hear me speak.\n\nall:\nspeak, speak.\n\nfirst citizen:\nyou '


In [35]:
# make the input and label from the sequence

def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]

    return input_text, target_text

In [38]:
dataset_split = sequences.map(split_input_target)

for ex_input, ex_target in dataset_split.take(1):
    print("input Text : ", text_from_ids(ex_input))
    print("label Text : ", text_from_ids(ex_target))


input Text :  b'first citizen:\nbefore we proceed any further, hear me speak.\n\nall:\nspeak, speak.\n\nfirst citizen:\nyou'
label Text :  b'irst citizen:\nbefore we proceed any further, hear me speak.\n\nall:\nspeak, speak.\n\nfirst citizen:\nyou '


In [39]:
# create the batch and shuffle the dataset

BATCH_SIZE = 64
BUFFER_SIZE = 10000

dataset = (
    dataset_split
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .cache()
    .prefetch(tf.data.AUTOTUNE)
)

dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(None, 100), dtype=tf.int64, name=None), TensorSpec(shape=(None, 100), dtype=tf.int64, name=None))>

In [41]:
# the model variable / parameter

vocab_size = len(ids_from_chars.get_vocabulary())

embedding_dim = 256

rnn_unit = 1024

In [43]:
# the model 

model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim),
    tf.keras.layers.GRU(rnn_unit, return_sequences=True),
    tf.keras.layers.Dense(vocab_size)
])



In [53]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 40) # (batch_size, sequence_length, vocab_size)


In [45]:
model.summary()

In [54]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
sampled_indices

print("Input:\n", text_from_ids(input_example_batch[0]))
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices))

Input:
 b'f the duke is with the soldiers;\nand for your brother, he was lately sent\nfrom your kind aunt, duche'

Next Char Predictions:
 b"q&:y3ua'''jlypm&ecfyls!!kphuf$;vzip;h\npwu.lgr3;o?z\n$bhh3rog- wwxir\n&udq'ga!nr?ge&:!f[UNK]ms..vmtqj;b[UNK]l&t"


In [56]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

model.compile(optimizer='adam', loss=loss, metrics=['sparse_categorical_accuracy'])

Prediction shape:  (64, 100, 40)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(3.6867273, shape=(), dtype=float32)


In [57]:
history = model.fit(dataset, epochs=20)

Epoch 1/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m477s[0m 3s/step - loss: 2.9008 - sparse_categorical_accuracy: 0.2546
Epoch 2/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m438s[0m 3s/step - loss: 1.8735 - sparse_categorical_accuracy: 0.4392
Epoch 3/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m441s[0m 3s/step - loss: 1.5847 - sparse_categorical_accuracy: 0.5186
Epoch 4/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m438s[0m 3s/step - loss: 1.4494 - sparse_categorical_accuracy: 0.5538
Epoch 5/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m450s[0m 3s/step - loss: 1.3719 - sparse_categorical_accuracy: 0.5733
Epoch 6/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m443s[0m 3s/step - loss: 1.3173 - sparse_categorical_accuracy: 0.5871
Epoch 7/20
[1m173/173[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m441s[0m 3s/step - loss: 1.2720 - sparse_categorical_accuracy: 0.5987
Epoch 8/20
[1m173/1

In [61]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_id, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_id = chars_from_id
        self.ids_from_chars = ids_from_chars

        # create a mask to prevent "[UNK]" from being generated

        skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            # put an -inf at each bad index
            values = [-float('inf')]*len(skip_ids),
            indices = skip_ids,
            
            # match the size to the vocabulary
            dense_shape = [len(ids_from_chars.get_vocabulary())]
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs.
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Embedding layer
        x = self.model.layers[0](input_ids)
        # GRU layer
        x = self.model.layers[1](x, initial_state=states)     
        # Get the hidden state of the last timestep
        states = x[:, -1, :]
        # Dense layer
        predicted_logits = self.model.layers[2](x)

        # Only use the last prediction.
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits/self.temperature

        # Apply the prediction mask: prevent "[UNK]" from being generated.
        predicted_logits = predicted_logits + self.prediction_mask

        # # Sample the output logits to generate token IDs.
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # # Convert from token ids to characters
        predicted_chars = self.chars_from_id(predicted_ids)

        # Return the characters and model state.
        return predicted_chars, states

In [62]:
one_step_model = OneStep(model, chars_from_id, ids_from_chars)

In [63]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

ROMEO:
nay, madam; 'tis a very looker-bady, i

mercutio:
do not my cold food?

tarisan:
on both!

coriolanus:
you are too soon but book.

katharina:
what appecialling i stand on, when this isles of person, that
i am abused king richer of but which here
within me; not i pressing him; you are
untimelfachery,
and much some slower for that father fret,
i warrant them. a
thought shall watch the proise hurt upon the world;
this bond of others to unactouboth!
i promise home:
for bitter conferer, is something rich.

buckingham:
you have been so, but at last word in their
cames like unto thee, and master crow,
hath brought a like untap' time unto your brother;
ere i not proud, which too meet borne thin all-a-rubied is;
for my tunes look to help with thee.

angelo:

duke vincentio:
on what tongue not? what, ho! where have won 'em;
but thomas told me, boy.

polixand
of greater:
help, yese! why, jood slaining kissing, i never chance
proclaimended: ongend at his mind;
that thou wouldst disphrietes 

In [66]:
tf.keras.utils.get_custom_objects().update({'OneStep': OneStep})

In [67]:
tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')

TypeError: this __dict__ descriptor does not support '_DictWrapper' objects

In [None]:
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(100):
  next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
  result.append(next_char)

print(tf.strings.join(result)[0].numpy().decode("utf-8"))