In [1]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import platform
import time
import pathlib
import os

print('Python version:', platform.python_version())
print('Tensorflow version:', tf.__version__)
print('Keras version:', tf.keras.__version__)

Python version: 3.11.8
Tensorflow version: 2.19.0
Keras version: 3.9.2


In [2]:
cache_dir = './tmp'
dataset_file_name = 'sherlockholmes.txt'

dataset_file_path = dataset_file_name

print(dataset_file_path)

sherlockholmes.txt


In [3]:
text = open(dataset_file_path, mode='r').read()
print(text[:250])

The Adventures of Sherlock Holmes

by Arthur Conan Doyle

I. A SCANDAL IN BOHEMIA


I.

To Sherlock Holmes she is always _the_ woman. I have seldom heard him
mention her under any other name. In his eyes she eclipses and
predominates the whole of her


In [4]:
from transformers import GPT2Tokenizer
from tokenizers import ByteLevelBPETokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
tokenizer = ByteLevelBPETokenizer()
save_dir="./tokenizer"

tokenizer.train(files=["sherlockholmes.txt"], vocab_size=30_000, min_frequency=2)

if not os.path.exists(save_dir):
    os.makedirs(save_dir)

# Save the tokenizer model
tokenizer.save_model(save_dir)

# Load the tokenizer using GPT2Tokenizer
custom_tokenizer = GPT2Tokenizer.from_pretrained(save_dir)








Tokenize the text file

In [6]:
input_ids = custom_tokenizer.encode(text)
print(f"Total tokens in text: {len(input_ids)}")

Total tokens in text: 148854


Prepping data

In [7]:
tokens_dataset = tf.data.Dataset.from_tensor_slices(input_ids)

In [8]:
sequence_length = 100
examples_per_epoch = len(input_ids) // (sequence_length + 1)

print(f'Examples per epoch: {examples_per_epoch}')


Examples per epoch: 1473


In [9]:
# Generate batched sequences out of the token dataset
sequences = tokens_dataset.batch(sequence_length + 1, drop_remainder=True)

In [10]:
# Split sequences into input and target
def split_input_target(chunk):
    input_text = chunk[:-1]
    target_text = chunk[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)

In [11]:
# Show some examples of input-target pairs
for input_example, target_example in dataset.take(1):
    for i in range(5):
        if i < len(input_example):
            input_token = input_example[i].numpy()
            target_token = target_example[i].numpy()
            print(f'Step {i:2d}')
            print(f'  input token: {input_token} ({custom_tokenizer.decode([input_token])})')
            print(f'  expected output token: {target_token} ({custom_tokenizer.decode([target_token])})')

Step  0
  input token: 364 (The)
  expected output token: 400 ( A)
Step  1
  input token: 400 ( A)
  expected output token: 67 (d)
Step  2
  input token: 67 (d)
  expected output token: 2508 (venture)
Step  3
  input token: 2508 (venture)
  expected output token: 82 (s)
Step  4
  input token: 82 (s)
  expected output token: 283 ( of)


2025-05-14 19:39:46.658247: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [12]:
# Batch size.
BATCH_SIZE = 64

# Buffer size to shuffle the dataset (TF data is designed to work
# with possibly infinite sequences, so it doesn't attempt to shuffle
# the entire sequence in memory. Instead, it maintains a buffer in
# which it shuffles elements).
BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

dataset

<_BatchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int32, name=None), TensorSpec(shape=(64, 100), dtype=tf.int32, name=None))>

In [13]:
print('Batched dataset size: {}'.format(len(list(dataset.as_numpy_iterator()))))

Batched dataset size: 23


2025-05-14 19:39:46.797616: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [14]:
# Get vocabulary size from tokenizer
vocab_size = custom_tokenizer.vocab_size + 1  # +1 for the padding token if added

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [15]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    model = tf.keras.models.Sequential()

    model.add(tf.keras.layers.InputLayer(batch_input_shape=[batch_size, None]))

    model.add(tf.keras.layers.Embedding(
        input_dim=vocab_size,
        output_dim=embedding_dim
    ))

    model.add(tf.keras.layers.LSTM(
        units=rnn_units,
        return_sequences=True,
        stateful=True,
        recurrent_initializer=tf.keras.initializers.GlorotNormal()
    ))

    model.add(tf.keras.layers.Dense(vocab_size))

    return model
model = build_model(vocab_size, embedding_dim, rnn_units, BATCH_SIZE)

In [16]:
model.summary()

In [17]:
tf.keras.utils.plot_model(
    model,
    show_shapes=True,
    show_layer_names=True,
)

You must install pydot (`pip install pydot`) for `plot_model` to work.


In [None]:
# Loss function
def loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(
        y_true=labels,
        y_pred=logits,
        from_logits=True
    )

# Compile the model
adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(
    optimizer=adam_optimizer,
    loss=loss
)

# Directory for checkpoints
checkpoint_dir = 'tmp/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)

# Checkpoint filename
checkpoint_prefix = os.path.join(checkpoint_dir, 'model.weights.h5')

checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
    'model_checkpoint.h5', save_best_only=True
)

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='loss', patience=3, restore_best_weights=True
)


In [19]:
EPOCHS = 80
history = model.fit(
    x=dataset,
    epochs=EPOCHS,
    callbacks=[
        checkpoint_cb, early_stopping
    ]
)


Epoch 1/80
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m71s[0m 3s/step - loss: 7.8675
Epoch 2/80


  self._save_model(epoch=epoch, batch=None, logs=logs)
  current = self.get_monitor_value(logs)


[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m75s[0m 3s/step - loss: 6.4970
Epoch 3/80
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m93s[0m 4s/step - loss: 6.3815
Epoch 4/80
[1m 8/23[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m1:02[0m 4s/step - loss: 6.1687

KeyboardInterrupt: 

In [None]:
def render_training_history(training_history):
    loss = training_history.history['loss']
    plt.title('Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.plot(loss, label='Training set')
    plt.legend()
    plt.grid(linestyle='--', linewidth=1, alpha=0.5)
    plt.show()

In [None]:
render_training_history(history)

# generate text

In [None]:
from tensorflow.keras.models import load_model

model = load_model('model_checkpoint.h5')

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
    model = tf.keras.models.Sequential()

    model.add(tf.keras.layers.InputLayer(batch_input_shape=[batch_size, None]))

    model.add(tf.keras.layers.Embedding(
        input_dim=vocab_size,
        output_dim=embedding_dim
    ))


    model.add(tf.keras.layers.LSTM(
      units=rnn_units,
      return_sequences=True,
      stateful=True,
      recurrent_initializer=tf.keras.initializers.GlorotNormal()
    ))

    model.add(tf.keras.layers.Dense(vocab_size))

    return model

model = build_model(vocab_size, embedding_dim, rnn_units, BATCH_SIZE)


In [None]:
model.summary()

In [None]:
def generate_text(model, start_string, num_generate=1000, temperature=1.0):
    # Tokenize the start string
    input_ids = custom_tokenizer.encode(start_string)
    input_ids = tf.expand_dims(input_ids, 0)

    # Empty list to store generated tokens
    generated_tokens = []

    # Reset states for RNN layers if they are stateful
    for layer in model.layers:
        if hasattr(layer, 'reset_states'):
            layer.reset_states()

    for _ in range(num_generate):
        predictions = model(input_ids)
        # Remove batch dimension
        predictions = tf.squeeze(predictions, 0)

        # Use the last prediction
        predictions = predictions[-1:, :] / temperature
        predicted_id = tf.random.categorical(
            predictions,
            num_samples=1
        )[-1, 0].numpy()

        # Append the predicted token
        generated_tokens.append(predicted_id)

        # Update input_ids for next prediction
        input_ids = tf.expand_dims([predicted_id], 0)

    # Decode the generated tokens
    generated_text = custom_tokenizer.decode(generated_tokens)
    
    # Return the full text
    return start_string + generated_text

# Generate text with the model
print(generate_text(model, start_string="Sherlock: ", num_generate=500))

In [None]:
# Generate the text with default temperature (1.0).
print(generate_text(model, start_string=u"Sherlock: "))

In [None]:
# Generate the text with higher temperature to get more unexpected results.
print(generate_text(model, start_string=u"Sherlock: ", temperature=1.5))