<a href="https://colab.research.google.com/github/mohammadreza-mohammadi94/Deep-Learning-Projects/blob/main/Text-Generation-Edgar-Allan-Poems/text_generation_edgar_allan_word_level.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [2]:
import tensorflow as tf
import numpy as np
import os
import re

# Setup

In [3]:
path = tf.keras.utils.get_file(
    "allan.txt",
    origin="https://www.gutenberg.org/cache/epub/10031/pg10031.txt"
)

text = open(path, "rb").read().decode(encoding="utf-8")

Downloading data from https://www.gutenberg.org/cache/epub/10031/pg10031.txt
[1m408498/408498[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


# Helper Functions

In [22]:
# Normalize the text
def clean_text(raw_text):
    """
    Cleans raw text data by removing Project Gutenberg headers/footers,
    license lines, special characters, and normalizing whitespace.

    Args:
        raw_text (str): The raw text content read from a file.

    Returns:
        str: The cleaned and normalized text.
    """
    # Initial cleaning to remove Project Gutenberg headers and footers
    start_marker = "EDGAR ALLAN POE"
    end_marker = "End of the Project Gutenberg"
    start_idx = raw_text.find(start_marker)
    end_idx = raw_text.find(end_marker)

    if start_idx != -1 and end_idx != -1:
        text = raw_text[start_idx:end_idx]
    else:
        text = raw_text

    # Remove lines containing 'GUTENBERG' (likely license/project information)
    lines = [line for line in text.split('\n') if "GUTENBERG" not in line.upper()]
    text = "\n".join(lines)

    # Normalize the text
    text = text.replace("\r", "") # Remove carriage returns
    text = re.sub(r'\[.*?\]', '', text) # Remove text in square brackets (e.g., [illustration])
    text = re.sub(r'[0-9]+\.[A-Z]\.[0-9]+', '', text) # Remove specific numbering patterns
    text = text.lower() # Convert all text to lowercase

    # Separate punctuation marks with spaces (important for tokenization and learning sentence structure)
    text = re.sub(r'([.,!?();:])', r' \1 ', text)
    text = re.sub(r'\s{2,}', ' ', text) # Replace multiple spaces with a single space

    # Return the cleaned text, stripping leading/trailing whitespace
    return text.strip()

def create_vectorizer(text, max_tokens=10000):
    """
    Creates a TextVectorization layer and adapts it to the provided text
    to build a vocabulary.

    Args:
        text (str): The cleaned text corpus.
        max_tokens (int, optional): The maximum number of tokens to include in the vocabulary.
                                    Defaults to 10000.

    Returns:
        tuple: A tuple containing:
            - vectorizer (tf.keras.layers.TextVectorization): The adapted vectorization layer.
            - vocab (list): The list of words in the vocabulary.
    """
    # Note: output_sequence_length should be None to return the full sequence, not a fixed length
    # We set standardize=None because we already cleaned and standardized the text manually.
    vectorizer = tf.keras.layers.TextVectorization(
        standardize=None,
        max_tokens=max_tokens,
        output_mode="int", # Output integer indices for words
        output_sequence_length=None
    )
    # Adapt the vectorizer to the entire text (treated as a single sample in a list)
    vectorizer.adapt([text])

    # Get the generated vocabulary
    vocab = vectorizer.get_vocabulary()
    return vectorizer, vocab

def prepare_dataset(vectorizer, text, batch_size, seq_length):
    """
    Prepares a TensorFlow dataset for training the language model.

    Args:
        vectorizer (tf.keras.layers.TextVectorization): The adapted text vectorizer.
        text (str): The cleaned text corpus.
        batch_size (int): The number of sequences per batch.
        seq_length (int): The length of each input sequence.

    Returns:
        tf.data.Dataset: A TensorFlow dataset of (input_sequence, target_sequence) pairs.
    """
    # Convert the entire text into numerical IDs using the vectorizer
    # The output shape is (1, total_words), we need (total_words,)
    full_text_ids = vectorizer([text])[0]

    # Create a dataset from the numerical IDs
    word_dataset = tf.data.Dataset.from_tensor_slices(full_text_ids)

    # Window the dataset to create sequences of `seq_length + 1` words.
    # `drop_remainder=True` ensures all batches have the same size.
    sequences = word_dataset.batch(seq_length + 1, drop_remainder=True)

    # Function to split each sequence into input (first `seq_length` words)
    # and target (last `seq_length` words, shifted by one)
    def split_input_target(seq):
        input_text = seq[:-1] # All but the last word
        target_text = seq[1:] # All but the first word
        return input_text, target_text

    # Apply the splitting function to each sequence in the dataset
    dataset = sequences.map(split_input_target)

    # Shuffle, batch, and prefetch the dataset for efficient training.
    # Shuffling is crucial for stateless RNN training.
    dataset = dataset.shuffle(10000).batch(batch_size, drop_remainder=True).prefetch(tf.data.AUTOTUNE)
    return dataset

def build_model(vocab_size, embedding_dim, rnn_units, batch_size, stateful=False):
    """
    Builds a Sequential Keras model for text generation using LSTM layers.

    Args:
        vocab_size (int): The size of the vocabulary (number of unique words).
        embedding_dim (int): The dimension of the word embeddings.
        rnn_units (int): The number of units in the LSTM layers.
        batch_size (int): The batch size for training or inference.
        stateful (bool, optional): If True, the LSTM layers will maintain their internal states
                                   across batch iterations. Defaults to False (stateless).

    Returns:
        tf.keras.Model: The compiled Keras sequential model.
    """
    # If stateful, the input layer needs a defined batch_shape.
    # Otherwise, it can infer the batch size (shape=(None,)).
    if stateful:
        input_layer = tf.keras.Input(batch_shape=(batch_size, None))
    else:
        input_layer = tf.keras.Input(shape=(None,))

    model = tf.keras.Sequential([
        input_layer,
        # Embedding layer converts word indices into dense vectors
        tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim),

        # First LSTM layer with return_sequences=True to pass output to next LSTM
        tf.keras.layers.LSTM(units=rnn_units, return_sequences=True, stateful=stateful),
        tf.keras.layers.Dropout(0.3), # Dropout for regularization

        # Second LSTM layer
        tf.keras.layers.LSTM(units=rnn_units, return_sequences=True, stateful=stateful),
        tf.keras.layers.Dropout(0.3), # Dropout for regularization

        # Third LSTM layer (can be the last with return_sequences=True or False depending on next layer)
        tf.keras.layers.LSTM(units=rnn_units, return_sequences=True, stateful=stateful),

        # Dense output layer to predict the probability distribution over the vocabulary.
        # No softmax needed here if using SparseCategoricalCrossentropy(from_logits=True).
        tf.keras.layers.Dense(units=vocab_size)
    ])
    return model

def generate_text(model, vectorizer, start_string, num_generate=200, temp=0.5):
    """
    Generates text using the trained language model.

    Args:
        model (tf.keras.Model): The trained text generation model (usually stateful).
        vectorizer (tf.keras.layers.TextVectorization): The text vectorization layer.
        start_string (str): The initial string to start text generation from.
        num_generate (int, optional): The number of words to generate. Defaults to 200.
        temp (float, optional): The sampling temperature. Higher values make predictions more random.
                               Defaults to 0.7.

    Returns:
        str: The generated text combined with the start_string.
    """
    # Convert the start_string into numerical IDs
    input_ids = vectorizer([start_string])
    input_eval = input_ids # This will be updated with each generated word

    vocab = vectorizer.get_vocabulary()
    text_generated = []

    # Reset the states of the LSTM layers before starting generation
    # This is crucial for stateful models to start fresh for a new sequence.
    for layer in model.layers:
        if hasattr(layer, 'reset_states'):
            layer.reset_states()

    # Loop to generate `num_generate` words
    for i in range(num_generate):
        # Get model predictions for the current input_eval
        predictions = model(input_eval)

        # We are interested in the prediction for the last word in the sequence.
        # tf.squeeze removes the batch dimension (1,) leaving (seq_len, vocab_size).
        predictions = tf.squeeze(predictions, 0)

        # Apply temperature to the logits to control randomness.
        # Higher temperature leads to more diverse (and sometimes less coherent) text.
        predictions = predictions / temp

        # Sample the next word ID from the probability distribution (categorical).
        # [-1, 0] selects the last predicted word's ID.
        predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

        # Convert the predicted ID back to a word, if it's a valid vocabulary token.
        # Skip unknown tokens (ID 0) and padding tokens (ID 1).
        if predicted_id < len(vocab) and predicted_id > 1:
            predicted_word = vocab[predicted_id]
            text_generated.append(predicted_word)

        # The predicted word becomes the new input for the next step.
        # We expand its dimension to match the expected input shape of the model.
        input_eval = tf.expand_dims([predicted_id], 0)

    # Combine the starting string with the generated words and return.
    return start_string + " " + " ".join(text_generated)


# Execution

In [25]:
def main():
    # Load Data: Download the text corpus for Edgar Allan Poe's works.
    # This ensures the code is self-contained and runnable.
    path = tf.keras.utils.get_file(
        "allan.txt",
        origin="https://www.gutenberg.org/cache/epub/10031/pg10031.txt"
    )
    # Read the downloaded text file and decode it using UTF-8 encoding.
    raw_text = open(path, "rb").read().decode(encoding="utf-8")

    # Cleaning: Process the raw text to remove noise and standardize it.
    # This step involves removing headers, footers, special characters, and normalizing whitespace.
    cleaned_text = clean_text(raw_text)
    print(f"[INFO] Text cleaned. Length: {len(cleaned_text)}")

    # Vectorization: Convert the cleaned text into numerical representations.
    # A TextVectorization layer is used to create a vocabulary and map words to integer IDs.
    # Limit the vocabulary size for faster training and better focus on common words.
    MAX_TOKENS = 5000
    vectorizer, vocab_list = create_vectorizer(cleaned_text, MAX_TOKENS)
    print(f"[INFO] Vocab size: {len(vocab_list)}")

    # Dataset Preparation: Create a TensorFlow dataset suitable for training.
    # This involves splitting the text into sequences of a defined length and batching them.
    BATCH_SIZE = 64 # Number of sequences processed in each training step.
    SEQ_LENGTH = 30 # Length of input sequences for the model.
    dataset = prepare_dataset(vectorizer, cleaned_text, BATCH_SIZE, SEQ_LENGTH)
    print("[INFO] Dataset prepared.")

    # Train Model (Stateless): Build and train the LSTM-based language model.
    # The model learns to predict the next word in a sequence.
    EMBEDDING_DIM = 256 # Dimension of word embeddings.
    RNN_UNITS = 512 # Number of units in the LSTM layers.
    vocab_size = len(vocab_list) # Total number of unique words in the vocabulary.

    # Build the model in a stateless manner for initial training.
    model = build_model(vocab_size, EMBEDDING_DIM, RNN_UNITS, BATCH_SIZE, stateful=False)

    # Compile the model with an optimizer and a loss function.
    # SparseCategoricalCrossentropy is suitable for integer-encoded labels.
    model.compile(
        optimizer="adam",
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    )
    model.summary()

    # Callbacks: Define actions to be performed during training.
    # These include saving model checkpoints, early stopping, and learning rate reduction.
    checkpoint_path = "training_checkpoints/ckpt_{epoch}.weights.h5"
    # Create a directory for checkpoints if it doesn't exist.
    if not os.path.exists("training_checkpoints"):
        os.makedirs("training_checkpoints")

    callbacks = [
        # Stop training if the loss doesn't improve for 7 epochs, and restore the best weights.
        tf.keras.callbacks.EarlyStopping(monitor="loss", patience=7, restore_best_weights=True),
        # Save model weights at each epoch, but only keep the best performing one based on loss.
        tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, save_weights_only=True, monitor="loss", save_best_only=True),
        # Reduce the learning rate if the loss plateaus for 3 epochs.
        tf.keras.callbacks.ReduceLROnPlateau(monitor="loss", factor=0.5, patience=3)
    ]

    print("[INFO] Starting training...")
    # Train the model for a specified number of epochs using the prepared dataset and callbacks.
    model.fit(dataset, epochs=300, callbacks=callbacks)

    # Save the final trained weights of the model.
    model.save_weights("final_weights.weights.h5")
    print("[INFO] Training finished.")

    # Inference Model (Stateful): Rebuild the model for text generation.
    # For generation, the model needs to be stateful and process one word at a time (batch_size=1).
    inference_model = build_model(vocab_size, EMBEDDING_DIM, RNN_UNITS, batch_size=1, stateful=True)
    # Load the weights from the trained stateless model into the new stateful model.
    inference_model.load_weights("final_weights.weights.h5")
    # Build the model with the correct input shape for inference (batch_size=1, any sequence length).
    inference_model.build(tf.TensorShape([1, None]))
    print("[INFO] Inference model ready.")

    # Generate Text: Use the trained stateful model to generate new text.
    print("\n\n--- GENERATED TEXT ---")
    # Start generation with a seed string and generate a specified number of words.
    generated = generate_text(inference_model, vectorizer, start_string="the raven sat", num_generate=100)
    print(generated)

# Execute the main function when the script is run.
main()

[INFO] Text cleaned. Length: 371446
[INFO] Vocab size: 5000
[INFO] Dataset prepared.


[INFO] Starting training...
Epoch 1/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 73ms/step - loss: 7.3247 - learning_rate: 0.0010
Epoch 2/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 168ms/step - loss: 6.0957 - learning_rate: 0.0010
Epoch 3/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 70ms/step - loss: 6.0623 - learning_rate: 0.0010
Epoch 4/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 69ms/step - loss: 6.0401 - learning_rate: 0.0010
Epoch 5/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 63ms/step - loss: 6.0440 - learning_rate: 0.0010
Epoch 6/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 69ms/step - loss: 6.0556 - learning_rate: 0.0010
Epoch 7/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 73ms/step - loss: 6.0384 - learning_rate: 0.0010
Epoch 8/300
[1m36/36[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 71ms/step - loss: 6.