# Text Generation with a Miniature GPT (using Keras)

*Link to the tutorial given below:* \
[https://keras.io/examples/generative/text_generation_with_miniature_gpt/](https://keras.io/examples/generative/text_generation_with_miniature_gpt/)

## Setup

In [None]:
# Runtime Environment (Colab)
import torch
print(f"PyTorch: {torch.__version__}")
print(f"GPU: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'None'}")

PyTorch: 2.6.0+cu124 \
GPU: None

In [None]:
import os
'''
Forcing Keras to user Tensorflow as its backend (instead of
alternatives like JAX or PyTorch).
This in turn ensures compatibility with TensorFlow-specific features
(eg; tensorflow.data etc)
'''
os.environ["KERAS_BACKEND"] = "tensorflow"

'''
keras: high level neural network API
layers: this contains prebuild layers (eg; Dense, LSTM, Embedding)
ops: provides low level operations (similar to NumPy but backend -
agnostic)
TextVectorisation: converts raw text into numerical tokens (text must
be converted to integers for neural networks)
'''
import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

'''
os: for filesystem operations (eg; loading datasets)
string and random: for text preprocessing
'''
import numpy as np
import os
import string
import random

'''
tensorflow specific imports: core library for differentiable programming
tf_data: tools for efficient data pipelines (eg; batching, shuffling)
tf_strings: string manipulation operations
'''
import tensorflow
import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

## Implement a Transformer Block as a Layer

In [None]:
'''
creates a causal mask for autoregressive attention so that each token
attends to only previous tokens (no 'peeking' into future)
'''
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
  '''
  Mask the upper half of the dot product matrix in self attention,
  This prevents flow of information from future tokens to current
  tokens,
  1's in the lower triangle, counting from the lower right corner
  '''

  '''
  i = row indices (destination tokens)
  j = column indices (source tokens)
  m = boolean mask where i >=j (lower triangular + diagonal)
  '''
  i = ops.arange(n_dest)[:, None]
  j = ops.arange(n_src)
  m = i >= j - n_src + n_dest

  '''
  ops.cast: converts boolean mask to dtype (eg: float32 for softmax)
  ops.tile: repeats the mask for all batches (batch_size copies)
  '''
  mask = ops.cast(m, dtype)
  mask = ops.reshape(mask, [1, n_dest, n_src])
  mult = ops.concatenate(
      [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])], 0
  )
  return ops.tile(mask, mult)

'''
making a single transformer decoder block from scratch (used in GPT)
inherits: keras.layers.Layer (base class for custom layers)
'''
class TransformerBlock(layers.Layer):
  def __init__(self, embed__dim, num_heads, ff_dim, rate=0.1):
    super().__init__()

    '''
    components of Multi-Head Attention:
    1. num_heads - parallel attention heads (eg; 8 heads)
    2. embed_dim - token embedding dimension (eg; 512)
    we use multi head attention because it captures diverse linguistic
    patterns
    '''
    self.att = layers.MultiHeadAttention(num_heads, embed__dim)

    '''
    Feedforrward Network (ffn) -> this adds non linearity after
    attention
    Reference: Original Transformer uses ReLU (Attention is All you
    need)
    ff_dim -> hidden layer size (usually 4 * embed_dim)
    '''
    self.ffn = keras.Sequential([
        layers.Dense(ff_dim, activation="relu"),
        layers.Dense(embed__dim),
    ])

    '''
    LayerNorm - used primarily for normalization (stabilizes training)
    Dropout - used for regularization (prevents overfitting)
    '''
    self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
    self.dropout1 = layers.Dropout(rate)
    self.dropout2 = layers.Dropout(rate)

  def call(self, inputs):

    input_shape = ops.shape(inputs)
    batch_size = input_shape[0]
    seq_len = input_shape[1]
    causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, "bool")

    attention_output = self.att(inputs, inputs, attention_mask = causal_mask)
    attention_output = self.dropout1(attention_output)
    out1 = self.layernorm1(inputs + attention_output)

    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output)
    return self.layernorm2(out1 + ffn_output)

## Implement an Embedding Layer

In [None]:
class TokenandPositionEmbedding(layers.Layer):
  def __init__(self, maxlen, vocab_size, embed_dim):
    '''
    Combines token embeddings and positional embeddings for transformer
    inputs
    - Token embeddings map discrete token IDs to continous vectors
    - Positional embeddings encode sequential order
    - Used in GPT and BERT
    '''
    super().__init__()
    self.token_emb = layers.Embedding(
        input_dim = vocab_size,
        output_dim = embed_dim # Embedding dimension (eg; 512)
    )
    self.pos_emb = layers.Embedding(
        input_dim = maxlen, # Maximum sequence length
        output_dim = embed_dim
    )

  def call(self, x):
    '''
    Forward pass:
    1. Generates positional indices for the input sequence,
    2. Projects tokens and positions to the same embedding space,
    3. Combines them additively
    '''
    maxlen = ops.shape(x)[-1]
    positions = ops.arange(0, maxlen, 1) # [0, 1, ..., maxlen - 1]
    positions = self.pos_emb(positions) # Positional embeddings
    x = self.token_emb(x) # Token embeddings
    return x + positions # Additive combination

## Implement the Miniature GPT Model

In [None]:
vocab_size = 20000 # Vocabulary size (top 20k words)
maxlen = 80 # Maximum sequence length (tokens)
embed_dim = 256 # Embedding dimension (d_model in "Attention is All you need")
num_heads = 2 # Number of parallel attention heads
feed_forward_dim = 256 # Hidden layer size in FFN (typically 4 * embed_dim in original Transformer)

def create_model():
  '''
  Constructs a miniature GPT-like autoregressive language model
  Architecture:
  1. Token + Position embeddings
  2. Transformer Decoder Block
  3. Output Dense Layer (vocab_size logits)
  '''

  # Input layer (integer-encoded token sequences)
  inputs = layers.Input(shape=(maxlen,), dtype="int32")

  # 1. Embedding Layer (Token + Position)
  embedding_layer = TokenandPositionEmbedding(maxlen, vocab_size, embed_dim)
  x = embedding_layer(inputs)

  # 2. Transformer Block (Autoregressive Decoder)
  transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
  x = transformer_block(x)

  # 3. Output Layer (Vocabulary Projection)
  outputs = layers.Dense(vocab_size)(x)

  # Model Definition
  model = keras.Model(inputs=inputs, outputs=[outputs, x]) # Logits + embeddings

  # Loss function (Sparse Categorical Crossentropy)
  loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

  # Compilation (Using Adam Optimizer)
  model.compile(
      optimizer="adam",
      loss=[loss_fn, None],
  )

  return model

## Preparing the data for word-level language modelling

In [None]:
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz

In [None]:
filenames = []
'''
Aggregates IMDb review files (positive/negative, tain/test) and shuffles
them to avoid order bias
'''
directories = [
    "aclImdb/train/pos",
    "aclImdb/train/neg",
    "aclImdb/test/pos",
    "aclImdb/test/neg",
]
for dir in directories:
  for f in os.listdir(dir):
    filenames.append(os.path.join(dir, f))

'''
Ensures batches are independent and identically distributed (IID),
critical for SGD convergence
'''
random.shuffle(filenames)

# Text Dataset Pipeline
'''
Reads text files line-by-line (one review per line)
'''
text_ds = tf_data.TextLineDataset(filenames)
'''
maintains a buffer of 256 samples to randomize order
'''
text_ds = text_ds.shuffle(buffer_size=256)
'''
groups samples into batches of 128 for parallel processing
'''
text_ds = text_ds.batch(batch_size=256)

# Text Standardization
def custom_standardization(input_string):
  '''
  lowercase all text
  reduces vocabulary size
  '''
  lowercased = tf_strings.lower(input_string)
  '''
  Remove HTML tags
  '''
  stripped_html = tf_strings.regex_replace(lowercased, "<br />", " ")
  return tf_strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")

# Text Vectorization
'''
Purpose - converts taw text -> integer token sequences
'''
vectorize_layer = TextVectorization(
    standardize = custom_standardization,
    
    # Top 19,999 words (reserve 0 for padding)
    max_tokens = vocab_size - 1,
    
    # Output integer token IDs
    output_mode = "int",
    
    # Pad / trim to 81 tokens
    output_sequence_length = maxlen + 1,
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()

# Language Model Input-Target Preparation
def prepare_lm_inputs_labels(text):
  text = tensorflow.expand_dims(text, -1)  # Shape: [batch_size, 1]
  tokenized_sentences = vectorize_layer(text)  # Shape: [batch_size, maxlen+1]
  x = tokenized_sentences[:, :-1]  # Input tokens (positions 0...maxlen-1)
  y = tokenized_sentences[:, 1:]   # Target tokens (positions 1...maxlen)
  return x, y

text_ds = text_ds.map(prepare_lm_inputs_labels, num_parallel_calls=tf_data.AUTOTUNE)
text_ds = text_ds.prefetch(tf_data.AUTOTUNE)

## Implement a Keras Callback for generating text

In [None]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = ops.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(ops.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x, verbose=0)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")


# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "this movie is"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

### Training Note
*This model was trained on Google Colab using a GPU (T4) runtime. Local execution may vary in speed.*

In [None]:
model = create_model()

model.fit(text_ds, verbose=2, epochs=25, callbacks=[text_gen_callback])