In [1]:
#import tensorflow and other libraries
import os #using operating system-dependent functionality
import warnings #handles warnings

warnings.filterwarnings("ignore") # if there are any warning messages during the script execution, they won't be displayed.
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" #Used to control the logging behavior of TensorFlow. Setting it to "2" suppresses all INFO-level logs and only displays warnings and errors.
import time #provides time related functions
import numpy as np
import tensorflow as tf

In [2]:
#Download the Shakespeare dataset
path_to_file = tf.keras.utils.get_file(
    "shakespeare.txt",
    "https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt",
)
#tf.keras.utils.get_file function used to download files

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


In [3]:
text = open(path_to_file, "rb").read().decode(encoding="utf-8") #rb==binary read mode    #.decode(encoding="utf-8")==decoded from bytes to Unicode using the UTF-8 encoding. This assumes that the content of the file is encoded in UTF-8.
print(f"Length of text: {len(text)} characters") #number of characters in the string text

Length of text: 1115394 characters


In [4]:
#first 250 characters
print(text[:250])
print("----------------------------")
#num of unique characters in the content
vocab = sorted(set(text))
print(f"{len(vocab)} unique characters")

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

----------------------------
65 unique characters


**PROCESS THE TEXT**

VECTORIZE THE TEXT

*converting the strings to a numerical representation.*
*vectorize text data using TensorFlow's StringLookup layer. It converts strings into numeric representations, allowing for easier processing in a machine learning model.*


In [5]:
example_texts = ["abcdefg", "xyz"]
chars = tf.strings.unicode_split(example_texts, input_encoding="UTF-8") #splits Unicode strings into tokens
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [6]:
#Creating StringLookup Layer for Token-to-ID Conversion
ids_from_chars = tf.keras.layers.StringLookup(       #mapping string(token) to integer IDs
    vocabulary=list(vocab), mask_token=None    # list of unique tokens present in text, also #not mask token is used
)

In [7]:
#converting tokens to character IDs
ids = ids_from_chars(chars)  # 'ids_from_chars' layer

GOAL IS TO GENERATE TEXT, THEREFORE IT IS IMPORTANT TO TURN THE CODED STUFF BACK INTO WORDS WE CAN UNDERSTAND

In [8]:
#Creating StringLookup Layer for Id-to-Token Conversion
chars_from_ids=tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(),invert=True,mask_token=None
)

In [9]:
#converting IDs to Token
chars=chars_from_ids(ids)

In [10]:

#to join characters back to string  us tf.string.reduce_join
tf.strings.reduce_join(chars,axis=-1).numpy()



array([b'abcdefg', b'xyz'], dtype=object)

In [11]:
#Function to Convert IDs to Text
def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids),axis=-1)

*Create training examples and targets*

In [12]:
#text to charac IDs
all_ids=ids_from_chars(tf.strings.unicode_split(text,"UTF-8"))


In [13]:
#create dataset of these charac IDs
ids_dataset=tf.data.Dataset.from_tensor_slices(all_ids)             # 'tf.data.Dataset.from_tensor_slices'  creates a dataset of slices from a given tensor(here all ids)


In [14]:
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode("utf-8"))

F
i
r
s
t
 
C
i
t
i


In [15]:

seq_length= 100 #desired length of each input sequence
examples_per_epoch=len(text)//(seq_length+1)  #number of examples (input sequences) that can be created from the entire text, considering the specified sequence length.

In [16]:
#batch method lets you easily convert these individual characters to sequences of the desired size.
sequences=ids_dataset.batch(seq_length + 1, drop_remainder=True)   #creates batches of length - 'seq_length + 1' and 'drop_remainder=True' ensures that the remaining elements that do not fit are dropped
for seq in sequences.take(1):
    print(chars_from_ids(seq))         # loop prints the first batch of sequences.


tf.Tensor(
[b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'B' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'S' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'F' b'i'
 b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'Y'
 b'o' b'u' b' '], shape=(101,), dtype=string)


In [17]:
for seq in sequences.take(5):
    print(text_from_ids(seq).numpy())       # first 5 batches of sequences, demonstrating how the characters are joined back into strings.

b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


In [18]:
#for training
#takes a sequence as input, duplicates, and shifts it to align the input and label for each timestep
def split_input_target(sequence):
    input_text=sequence[:-1]
    target_text=sequence[1:]
    return input_text, target_text

In [19]:
split_input_target(list("Tensorflow"))

(['T', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o'],
 ['e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w'])

In [20]:
dataset= sequences.map(split_input_target)
# resulting dataset contains pairs of input and target sequences, where each input sequence corresponds to predicting the next character in the target sequence.

In [21]:
for input_examp, target_examp in dataset.take(1):
    print("input : ", text_from_ids(input_examp).numpy())
    print("target : ", text_from_ids(target_examp).numpy())


input :  b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
target :  b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '


In [22]:
#preparing the dataset for training by shuffling the sequences, packing them into batches, and prefetching data for optimization.
BATCH_SIZE=64
BUFFER_SIZE = 10000

dataset = (
    dataset.shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

**Build The Model**

In [23]:
vocab_size =len(vocab)
embedding_dim = 256    #the size of the vectors that represents the characters
rnn_units=1024

In [24]:
class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__()
        self.embedding=tf.keras.layers.Embedding(vocab_size,embedding_dim)
        self.gru = tf.keras.layers.GRU(
            rnn_units, return_sequences=True, return_state=True
        )
        self.dense=tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        x = self.embedding(inputs, training=training)
        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
          return x, states
        else:
          return x

In [25]:
model=MyModel(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units,
)

**TRYING THE MODEL**


In [26]:
#checking the shape of the output
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(
        example_batch_predictions.shape,
        "# (batch_size, sequence_length, vocab_size)",
    )

(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [27]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4022850 (15.35 MB)
Trainable params: 4022850 (15.35 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [28]:
sampled_indices = tf.random.categorical(
    example_batch_predictions[0], num_samples=1
)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

In [29]:
sampled_indices

array([33, 16, 30, 15, 52, 24, 63, 17, 63, 20,  0, 41, 18,  5, 12, 36, 23,
       45, 16, 16, 33, 20,  6, 10, 53, 21, 36, 28, 32, 52, 56, 56, 24, 33,
       16, 17, 62, 64, 51, 34, 63,  3, 33, 31, 20, 50,  1, 31, 24, 45, 11,
        1, 48, 61, 13, 26, 42, 21, 28, 37, 59, 31, 43,  9, 30, 53, 10, 12,
       54, 54, 34, 29, 48, 42, 34,  5, 32, 15, 49,  2, 13, 56, 55, 29, 17,
       38, 13, 25, 45, 23,  2, 29, 60,  3, 10, 38, 27, 49,  7, 39])

In [30]:
#prediction of next character (shows how untrained the model is)
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

Input:
 b"iers for these Irish wars.\nCome, gentlemen, let's all go visit him:\nPray God we may make haste, and "

Next Char Predictions:
 b"TCQBmKxDxG[UNK]bE&;WJfCCTG'3nHWOSmqqKTCDwylUx!TRGk\nRKf:\niv?McHOXtRd.Qn3;ooUPicU&SBj ?qpPDY?LfJ Pu!3YNj,Z"


**TRAIN THE MODEL**

In [31]:
#Define Loss Function
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [32]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print(
    "Prediction shape: ",
    example_batch_predictions.shape,
    " # (batch_size, sequence_length, vocab_size)",
)
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (64, 100, 66)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(4.1901584, shape=(), dtype=float32)


In [33]:
tf.exp(example_batch_mean_loss).numpy()

66.03325

In [34]:
#Configure Optimizer and Compile Model
model.compile(optimizer="adam", loss=loss)   #Adam optimizer, a popular choice for stochastic optimization, and the specified loss function

In [35]:

#to save model checkpoints during training
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix, save_weights_only=True
)

In [36]:
#execute training
EPOCHS=30

In [37]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


**GENERATE TEXT using the trained text generation model**


In [38]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars
        skip_ids = self.ids_from_chars(["[UNK]"])[:, None]
        sparse_mask = tf.SparseTensor(
            values=[-float("inf")] * len(skip_ids),
            indices=skip_ids,
            dense_shape=[len(ids_from_chars.get_vocabulary())],
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        input_chars = tf.strings.unicode_split(inputs, "UTF-8")
        input_ids = self.ids_from_chars(input_chars).to_tensor()
        # input_ids = tf.expand_dims(input_ids, axis=0)
        # if states is None:
        #   states = [tf.zeros([1, self.model.gru.units])]
        predicted_logits, states = self.model(
            inputs=input_ids, states=states, return_state=True
            )
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits / self.temperature
        predicted_logits = predicted_logits + self.prediction_mask
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)
        predicted_chars = self.chars_from_ids(predicted_ids)
        return predicted_chars, states

In [39]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [40]:
start = time.time()
states = None
next_char = tf.constant(["ROMEO:"])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(
        next_char, states=states
    )
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode("utf-8"), "\n\n" + "_" * 80)
print("\nRun time:", end - start)

ROMEO:
These greatful mother then will I my daughter?

BIONDELLO:
No.

Third Servingman:
Where is he?

CURTIS:
They are life better.

HASTINGS:
The queen with slooking souls,
And scared my duty were you make,
Which way it to curse the city.

BRUTUS:
We should bo else
I thought it was so fair a servant of incelstant
And learning in but for the truth o' the common early,
I should possess you. You mean a gross:
Colse throne, on ey; and for even near
No reap' thus spoil;
'Tis those confusions.

LUCIO:
Right.

DUKE VINCENTIO:
Bishop, 'twixt what is yours,
For the most murm revenge that Were wear it
In eyes, with honey, shore! why, then, thou slew me
To such this spartle grief hath done.
For the fair done, by right once more than you
Have caused him to the execution.

BRUTUS:
Not inquite; away.

DUKE VINCENTIO:
You should acconceive your vantage,
The common lie that it must beat down their deeds.

WARWICK:
Thus by not heir to hear of thee thee, friend
Anon, my lord. And then to give him home

In [41]:
start = time.time()
states = None
next_char = tf.constant(["ROMEO:", "ROMEO:", "ROMEO:", "ROMEO:", "ROMEO:"])
result = [next_char]

for n in range(1000):
    next_char, states = one_step_model.generate_one_step(
        next_char, states=states
    )
    result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result, "\n\n" + "_" * 80)
print("\nRun time:", end - start)

tf.Tensor(
[b"ROMEO:\nThe honourable father was the lieutenant\nFrom handing to be winter in:\nO, ighis prayers partly survain.\n\nLUCIO:\nWell, no more of him, that have you are our barring.\n\nServant:\nYou have a spirit! Would all the world abuse,\nExemeth in jest. Hark, hark! I am not brow and say 'Ala's lie\nHis love with you: but yet I'll profess to me.\nI do fair, sweet both are a king, and perjure\nHis name is manner, in the divines of ske\nThat you might well for commoun's aid;\nBut look'd for here, I have determined steep Than my son,\nWe'll dispersed them awhile: he shall between like him;\nMy love the heavens go sweet for his count.\n\nFirst Citizen:\nYour will be deliver'd, I warrant, an I go;\nNor is a dear actlempo, we are full of ourselves;\nHer words reeving more than the service's wounds.\nO, he is lost between, as we have causer'd to be\nwhat raimed in your orce to part.\n\nPETRUCHIO:\nWhy, sir, I would not dare; but looks too light.\nDown, what mayor is with such a 

**EXPORT THE GENERATOR**

In [43]:
tf.saved_model.save(one_step_model, "one_step") #model will be saved as a TensorFlow SavedModel
one_step_reloaded = tf.saved_model.load("one_step")



In [44]:
#Generating Text with Reloaded Model
states = None
next_char = tf.constant(["ROMEO:"])
result = [next_char]

for n in range(100):
    next_char, states = one_step_reloaded.generate_one_step(
        next_char, states=states
    )
    result.append(next_char)
print(tf.strings.join(result)[0].numpy().decode("utf-8"))

ROMEO:
The foe is it to be the master, Pompey,
And yet plucked dangerously, to bearing one
change garments


**custom training loop for a character-level text generation model**

In [45]:
#CustomTraining Class
class CustomTraining(MyModel):
    @tf.function
    def train_step(self, inputs):
        inputs, labels = inputs
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.loss(labels, predictions)
        grads = tape.gradient(loss, model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, model.trainable_variables))

        return {"loss": loss}

In [46]:
#model is instantiated and compiled using the Adam optimizer and SparseCategoricalCrossentropy loss
model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units,
)
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
)

In [47]:
#Training Loop
EPOCHS = 10
mean = tf.metrics.Mean()

for epoch in range(EPOCHS):
    start = time.time()

    mean.reset_states()
    for batch_n, (inp, target) in enumerate(dataset):
        logs = model.train_step([inp, target])
        mean.update_state(logs["loss"])

        if batch_n % 50 == 0:
            template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
            print(template)

    # saving (checkpoint) the model every 5 epochs
    if (epoch + 1) % 5 == 0:
        model.save_weights(checkpoint_prefix.format(epoch=epoch))

    print()
    print(f"Epoch {epoch+1} Loss: {mean.result().numpy():.4f}")
    print(f"Time taken for 1 epoch {time.time() - start:.2f} sec")
    print("_" * 80)

model.save_weights(checkpoint_prefix.format(epoch=epoch))


Epoch 1 Batch 0 Loss 4.1904
Epoch 1 Batch 50 Loss 2.8996
Epoch 1 Batch 100 Loss 2.3812
Epoch 1 Batch 150 Loss 2.2214

Epoch 1 Loss: 2.7247
Time taken for 1 epoch 14.45 sec
________________________________________________________________________________
Epoch 2 Batch 0 Loss 2.1862
Epoch 2 Batch 50 Loss 2.0688
Epoch 2 Batch 100 Loss 1.9430
Epoch 2 Batch 150 Loss 1.8758

Epoch 2 Loss: 1.9905
Time taken for 1 epoch 11.06 sec
________________________________________________________________________________
Epoch 3 Batch 0 Loss 1.7957
Epoch 3 Batch 50 Loss 1.7606
Epoch 3 Batch 100 Loss 1.6900
Epoch 3 Batch 150 Loss 1.6121

Epoch 3 Loss: 1.7115
Time taken for 1 epoch 20.47 sec
________________________________________________________________________________
Epoch 4 Batch 0 Loss 1.6160
Epoch 4 Batch 50 Loss 1.5602
Epoch 4 Batch 100 Loss 1.5263
Epoch 4 Batch 150 Loss 1.5543

Epoch 4 Loss: 1.5489
Time taken for 1 epoch 11.81 sec
_____________________________________________________________________