## Setup

In [62]:
import tensorflow as tf

import numpy as np
import os
import time
import random

In [None]:
CONFIG = {
    # Data processing
    "SEED": 42,                     # Random seed for reproducibility
    "TRAIN_SPLIT": 0.8,             # Percentage of data for training
    "VAL_SPLIT": 0.1,              # Percentage of data for validation
    "TEST_SPLIT": 0.1,              # Percentage of data for testing
    "SEQ_LENGTH": 100,              # Sequence length for training examples
    
    # Model architecture
    "EMBEDDING_DIM": 256,           # Dimension of the embedding layer
    "RNN_UNITS": 1024,              # Number of units in the RNN layer
    
    # Training parameters
    "BATCH_SIZE": 64,               # Batch size for training
    "BUFFER_SIZE": 10000,           # Buffer size for shuffling
    "EPOCHS": 20,                   # Number of epochs for training
    "OPTIMIZER": "adam",            # Optimizer for training
    "EARLY_STOPPING_PATIENCE": 5,   # Patience for early stopping
    "MONITOR_METRIC": "val_loss",   # Metric to monitor for early stopping
    "RESTORE_BEST_WEIGHTS": True,   # Whether to restore best weights after training
    
    # Text generation
    "TEMPERATURE": 1.0,             # Temperature for text generation
    "GENERATION_LENGTH": 1000       # Length of generated text
}

In [64]:
# Set seeds for reproducibility
def set_seeds(seed=42):
    """Set seeds for reproducibility."""
    # Set seed for Python's random module
    random.seed(seed)
    
    # Set seed for NumPy
    np.random.seed(seed)
    
    # Set seed for TensorFlow
    tf.random.set_seed(seed)
    
    # Try to make operations deterministic (TF 2.8+)
    try:
        tf.config.experimental.enable_op_determinism()
    except:
        # For older TensorFlow versions
        print("Warning: Op determinism not available in your TF version. Results may still vary.")
        # Set as many deterministic settings as possible
        os.environ['TF_DETERMINISTIC_OPS'] = '1'
        
    print(f"Seeds set to {seed} for reproducibility")

set_seeds(CONFIG["SEED"])

Seeds set to 42 for reproducibility


### Download the Shakespeare dataset

In [65]:
import requests

# URL of the dataset
url = "https://www.gutenberg.org/cache/epub/1513/pg1513.txt"

# Download the file
print("Downloading Romeo and Juliet text...")
response = requests.get(url)
if response.status_code == 200:
  if not os.path.exists('./dataset'):
    os.makedirs('./dataset')

  path_to_file = "./dataset/romeo_and_juliet.txt"
  with open(path_to_file, 'wb') as f:
    f.write(response.content)
  print("Download completed successfully.")
else:
  raise Exception(f"Failed to download file. Status code: {response.status_code}")

Downloading Romeo and Juliet text...
Download completed successfully.


### Read the data

First, look in the text:

In [66]:
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')

Length of text: 167424 characters


In [67]:
def preprocess_romeo_and_juliet(text):
    """
    Extract only the actual play content from the Romeo and Juliet text,
    removing Project Gutenberg header, footer, and metadata.
    """
    # Find the beginning of the actual play
    start_marker = "THE PROLOGUE"
    end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK"
    
    start_index = text.find(start_marker)
    end_index = text.find(end_marker)
    
    if start_index == -1 or end_index == -1:
        print("Warning: Could not find start or end markers in the text")
        return text
    
    # Extract just the play content
    play_text = text[start_index:end_index].strip()
    
    print(f"Original text length: {len(text)} characters")
    print(f"Processed text length: {len(play_text)} characters")
    print(f"Removed {len(text) - len(play_text)} characters of metadata")
    
    return play_text

# Apply preprocessing to remove header and footer
text = preprocess_romeo_and_juliet(text)

Original text length: 167424 characters
Processed text length: 147650 characters
Removed 19774 characters of metadata


In [68]:

# Take a look at the beginning and end of the processed text
print("First 250 characters of the processed text:")
print(text[:250])

print("\nLast 250 characters of the processed text:")
print(text[-250:])

First 250 characters of the processed text:
THE PROLOGUE.

ACT I
Scene I. A public place.
Scene II. A Street.
Scene III. Room in Capulet’s House.
Scene IV. A Street.
Scene V. A Hall in Capulet’s House.

ACT II
CHORUS.
Scene I. An open place adjoining Capulet’s Garden.
Scene II. Cap

Last 250 characters of the processed text:
s morning with it brings;
The sun for sorrow will not show his head.
Go hence, to have more talk of these sad things.
Some shall be pardon’d, and some punished,
For never was a story of more woe
Than this of Juliet and her Romeo.

 [_Exeunt._]


In [69]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

71 unique characters


## Process the text

### Vectorize the text

This converts from tokens to character IDs:

In [70]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)

This recovers the characters from the vectors of IDs, and returns them as a `tf.RaggedTensor` of characters:

In [71]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

Can use `tf.strings.reduce_join` to join the characters back into strings. 

In [72]:
def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

### Create training examples and targets

For each input sequence, the corresponding targets contain the same length of text, except shifted one character to the right. So break the text into chunks of `seq_length+1`

Example with "Tensorflow":
The string "Tensorflow" is split into:

Input: All characters except the last one.
Target: All characters except the first one.
Step-by-Step Breakdown:
Original Sequence: "Tensorflow"

Characters: ['T', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w']
Input Sequence:

Take all characters except the last one: ['T', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o']
Target Sequence:

Take all characters except the first one: ['e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w']

Each character in the Input corresponds to the Target character at the next position:

```
'T' → 'e'
'e' → 'n'
'n' → 's'
's' → 'o'
'o' → 'r'
'r' → 'f'
'f' → 'l'
'l' → 'o'
'o' → 'w'
```

In [73]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(147650,), dtype=int64, numpy=array([31, 19, 16, ...,  8, 39, 38])>

In [74]:
# Create a 70/15/15 train/validation/test split
total_chars = len(all_ids)
train_chars = int(CONFIG["TRAIN_SPLIT"] * total_chars)
val_chars = int(CONFIG["VAL_SPLIT"] * total_chars)

# Split the data
train_ids = all_ids[:train_chars]
val_ids = all_ids[train_chars:train_chars+val_chars]
test_ids = all_ids[train_chars+val_chars:]

# Create separate datasets
train_ids_dataset = tf.data.Dataset.from_tensor_slices(train_ids)
val_ids_dataset = tf.data.Dataset.from_tensor_slices(val_ids)
test_ids_dataset = tf.data.Dataset.from_tensor_slices(test_ids)

The `batch` method lets you easily convert these individual characters to sequences of the desired size.

In [75]:
# Create sequences for each dataset
train_sequences = train_ids_dataset.batch(CONFIG["SEQ_LENGTH"]+1, drop_remainder=True)
val_sequences = val_ids_dataset.batch(CONFIG["SEQ_LENGTH"]+1, drop_remainder=True)
test_sequences = test_ids_dataset.batch(CONFIG["SEQ_LENGTH"]+1, drop_remainder=True)

It's easier to see what this is doing if you join the tokens back into strings:

In [76]:
for seq in train_sequences.take(5):
  print(text_from_ids(seq).numpy())

b'THE PROLOGUE.\r\n\r\nACT I\r\nScene I. A public place.\r\nScene II. A Street.\r\nScene III. Room in Capulet\xe2\x80\x99s H'
b'ouse.\r\nScene IV. A Street.\r\nScene V. A Hall in Capulet\xe2\x80\x99s House.\r\n\r\nACT II\r\nCHORUS.\r\nScene I. An open '
b'place adjoining Capulet\xe2\x80\x99s Garden.\r\nScene II. Capulet\xe2\x80\x99s Garden.\r\nScene III. Friar Lawrence\xe2\x80\x99s Cell.\r\nSc'
b'ene IV. A Street.\r\nScene V. Capulet\xe2\x80\x99s Garden.\r\nScene VI. Friar Lawrence\xe2\x80\x99s Cell.\r\n\r\nACT III\r\nScene I. '
b'A public Place.\r\nScene II. A Room in Capulet\xe2\x80\x99s House.\r\nScene III. Friar Lawrence\xe2\x80\x99s cell.\r\nScene IV. A'


For training you'll need a dataset of `(input, label)` pairs. Where `input` and 
`label` are sequences. At each time step the input is the current character and the label is the next character. 

Here's a function that takes a sequence as input, duplicates, and shifts it to align the input and label for each timestep:

In [77]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [78]:
# Convert sequences to input-target pairs
train_dataset = train_sequences.map(split_input_target)
val_dataset = val_sequences.map(split_input_target)
test_dataset = test_sequences.map(split_input_target)

# Check a sample
for input_example, target_example in train_dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

Input : b'THE PROLOGUE.\r\n\r\nACT I\r\nScene I. A public place.\r\nScene II. A Street.\r\nScene III. Room in Capulet\xe2\x80\x99s '
Target: b'HE PROLOGUE.\r\n\r\nACT I\r\nScene I. A public place.\r\nScene II. A Street.\r\nScene III. Room in Capulet\xe2\x80\x99s H'


### Create training batches

You used `tf.data` to split the text into manageable sequences. But before feeding this data into the model, you need to shuffle the data and pack it into batches.

In [79]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

# Apply batching to all datasets
train_dataset = (
    train_dataset
    .shuffle(CONFIG["BUFFER_SIZE"])
    .batch(CONFIG["BATCH_SIZE"], drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

# For validation and test, we don't need to shuffle
val_dataset = (
    val_dataset
    .batch(CONFIG["BATCH_SIZE"], drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

test_dataset = (
    test_dataset
    .batch(CONFIG["BATCH_SIZE"], drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

print("Training dataset:", train_dataset)
print("Validation dataset:", val_dataset)
print("Test dataset:", test_dataset)

Training dataset: <_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>
Validation dataset: <_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>
Test dataset: <_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>


## Build The Model

In [80]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

In [81]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__()
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.rnn = tf.keras.layers.SimpleRNN(rnn_units,
                                         return_sequences=True,
                                         return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    
    # Get batch size from input tensor
    batch_size = tf.shape(x)[0]
    
    if states is None:
      # Manually create initial state with correct shape [batch_size, rnn_units]
      states = tf.zeros([batch_size, self.rnn.units])
    
    x, states = self.rnn(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [82]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=CONFIG["EMBEDDING_DIM"],
    rnn_units=CONFIG["RNN_UNITS"])

In [83]:
# Create a dummy input with the right shape
# So that the custom class model can by analyzed by .summary()
dummy_input = tf.zeros((1, CONFIG["SEQ_LENGTH"]), dtype=tf.int64)
model(dummy_input) 

model.summary()

## Train the model

In [84]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [85]:
model.compile(optimizer=CONFIG["OPTIMIZER"], loss=loss)

### Execute the training

In [86]:
# Define a function to calculate perplexity
def calculate_perplexity(model, dataset):
    """
    Calculate perplexity on a dataset.
    Perplexity = exp(average cross-entropy loss)
    """
    total_loss = 0
    total_samples = 0
    
    for input_batch, target_batch in dataset:
        predictions = model(input_batch)
        # Get batch_size and sequence_length
        batch_size, sequence_length = target_batch.shape
        
        # Calculate loss for each prediction
        batch_loss = loss(target_batch, predictions)
        total_loss += batch_loss * batch_size
        total_samples += batch_size
        
    # Calculate average loss
    avg_loss = total_loss / total_samples
    
    # Perplexity is exp(average loss)
    perplexity = tf.exp(avg_loss)
    
    return perplexity.numpy()

In [87]:
class PerplexityCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_dataset):
        super().__init__()
        self.val_dataset = val_dataset
        self.perplexity_history = []
        
    def on_epoch_end(self, epoch, logs=None):
        perplexity = calculate_perplexity(self.model, self.val_dataset)
        self.perplexity_history.append(perplexity)
        logs['val_perplexity'] = perplexity
        print(f"\nValidation Perplexity: {perplexity:.4f}")

# Create the callback
perplexity_callback = PerplexityCallback(val_dataset)

In [88]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor=CONFIG["MONITOR_METRIC"],
    patience=CONFIG["EARLY_STOPPING_PATIENCE"],
    restore_best_weights=CONFIG["RESTORE_BEST_WEIGHTS"]
)

In [89]:
history = model.fit(
    train_dataset, 
    epochs=CONFIG["EPOCHS"], 
    validation_data=val_dataset,
    callbacks=[early_stopping, perplexity_callback],
)

Epoch 1/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 326ms/step - loss: 3.9527
Validation Perplexity: 27.3416
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 379ms/step - loss: 3.9381 - val_loss: 3.3084 - val_perplexity: 27.3416
Epoch 2/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 375ms/step - loss: 3.2397
Validation Perplexity: 16.5003
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 418ms/step - loss: 3.2322 - val_loss: 2.8034 - val_perplexity: 16.5003
Epoch 3/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 375ms/step - loss: 2.6384
Validation Perplexity: 12.2303
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 416ms/step - loss: 2.6344 - val_loss: 2.5039 - val_perplexity: 12.2303
Epoch 4/10
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 313ms/step - loss: 2.3858
Validation Perplexity: 10.5487
[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 354

In [90]:
# Evaluate on test set
print("Evaluating on test set...")
test_loss = model.evaluate(test_dataset)
print(f"Test loss: {test_loss}")

Evaluating on test set...
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 164ms/step - loss: 1.9402
Test loss: 1.9394915103912354


In [91]:
# Calculate perplexity on test set
test_perplexity = calculate_perplexity(model, test_dataset)
print(f"Test Perplexity: {test_perplexity:.4f}")

Test Perplexity: 6.9552


## Generate text

In [92]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=None):
    super().__init__()
    self.temperature = temperature if temperature is not None else CONFIG["TEMPERATURE"]
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                        return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [93]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [94]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO.'])
result = [next_char]

for n in range(CONFIG["GENERATION_LENGTH"]):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

ROMEO.
What. Doun, Mondagks.
Butter, toul, ther leveron, theich this ofay.

BENVO.
Labing
Jxo wherf art breach seal] now harlive hears, onjerer tove as wother’’g hove peat for hor live ary shaiseranes yous wiod.
I’w the fover. Whict the colint? shay had, ay’ myel.

JULIET.
[_y, whin shoughe
’ftuties nove you.

BULIET.

MENTOMEO.
Goof it hingo oursy, eny hil?

LAPUOET.
I pherp that wabr,
Shat wallisil. Whom sorewel of my I’frs.

toul perto ghamurse you but fartlems, a redousd do tofut on vore ox of heave-krend fithtuns that a sither
Withe so grow thou.

ScENCE.
Me rave all dog.
Folf in oy; Muspridsans as shear what wath, lovest swiel: peavyer nove on?
The whou bast?
Prows toll piod a kay were at if thy purse.

 noulb.

 Entul that se—t is s’sidstw and timt you hould-and be.
Gorsat, wour youll nod hive and ow thoush sher, goom,
Thy faint?
I parser dest ges hand, I buppinigiteing-walliv, I willing ond, not shemes hichion.

F[RASCE.

P[_AR
Sh shar ase gsin 

_______________________________