<br>

<h1 style="text-align:center;">Text generation with a miniature GPT</h1>

<br>

### Introduction

---

Implementation of an autoregressive language model using the GPT model.
 
We use the IMDB sentiment classification dataset for training generate new movie reviews for a given prompt.

[GPT](https://www.semanticscholar.org/paper/Improving-Language-Understanding-by-Generative-Radford/cd18800a0fe0b668a1cc19f2ec95b5003d0a5035),
[GPT-2](https://www.semanticscholar.org/paper/Language-Models-are-Unsupervised-Multitask-Learners-Radford-Wu/9405cc0d6169988371b2755e573cc28650d14dfe),
[GPT-3](https://arxiv.org/abs/2005.14165)

<br>

### INITIAL SETUP

---

In [1]:
# Import the libraries
import re, os, string, random
import numpy as np
import tensorflow as tf

<br>

### TRANSFORMER BLOCK

---

In [2]:
# Causal attention mask function
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    This function creates a causal attention mask for the transformer model.
    More specifically, it will mask the upper half of the dot product matrix in 
    self attention (to prevent flow of information from future tokens to current).
    ARGUMENTS
    =================
        - batch_size: batch size of the input
        - n_dest: number of tokens in the destination sequence
        - n_src: number of tokens in the source sequence
        - dtype: data type of the mask
        
    RETURNS
    =================
        - out: causal attention mask
    """
    
    # Initialize the indices
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    
    # Create the mask 
    m = i >= j - n_src + n_dest
    
    # Cast the mask
    mask = tf.cast(m, dtype)
    
    # Expand the mask
    mask = tf.reshape(mask, [1, n_dest, n_src])
    
    # This is the mask that we will use to mask the upper half of the dot product matrix
    mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0)
    
    # Tile the mask 
    out = tf.tile(mask, mult)
    
    return out

In [3]:
# Transformer block class
class TransformerBlock(tf.keras.layers.Layer):
    
    # Initialize the constructor
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        
        # Inherit the parent's constructor
        super(TransformerBlock, self).__init__()
        
        # Multi-head attention layer
        self.att = tf.keras.layers.MultiHeadAttention(num_heads, embed_dim)
        
        # Feed forward network
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(ff_dim, activation="relu"), tf.keras.layers.Dense(embed_dim),]
        )
        
        # Layer normalization
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        
        # Dropout regularization
        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    # Call function
    def call(self, inputs):
        
        # Input shape
        input_shape = tf.shape(inputs)
        
        # Batch size
        batch_size = input_shape[0]
        
        # Sequence length
        seq_len = input_shape[1]
        
        # Causal mask
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        
        # Multi-head attention with causal mask
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        
        # Dropout regularization
        attention_output = self.dropout1(attention_output)
        
        # Add and normalize layers
        out1 = self.layernorm1(inputs + attention_output)
        
        # Feed forward network
        ffn_output = self.ffn(out1)
        
        # Dropout regularization
        ffn_output = self.dropout2(ffn_output)
        
        # Add and normalize layers
        out = self.layernorm2(out1 + ffn_output)
        
        return out


<br>

### EMBEDDING LAYER

---

Create two seperate embedding layers: one for tokens and one for token index
(positions).

In [4]:
# Token and position embedding class
class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    
    # Initialize the constructor
    def __init__(self, maxlen, vocab_size, embed_dim):
        
        # Inherit the parent's constructor
        super(TokenAndPositionEmbedding, self).__init__()
        
        # Token embedding layer
        self.token_emb = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        
        # Position embedding layer
        self.pos_emb = tf.keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    # Call function
    def call(self, x):
        
        # Maximum sequence length
        maxlen = tf.shape(x)[-1]
        
        # Initialize the positions
        positions = tf.range(start=0, limit=maxlen, delta=1)
        
        # Feed the positions to the position embedding layer
        positions = self.pos_emb(positions)
        
        # Feed the tokens to the token embedding layer
        x = self.token_emb(x)
        
        # Add the token and position embeddings
        out = x + positions
        
        return out

<br>

### GPT model

---

In [5]:
# Initialization
vocab_size = 20000       # Vocabulary (only consider the top 20k words)
maxlen = 80              # Max sequence size
embed_dim = 256          # Embedding size for each token
num_heads = 2            # Number of attention heads
feed_forward_dim = 256   # Hidden layer size in feed forward network inside transformer

In [6]:
# GPT model
def create_model():
    
    # Input layer
    inputs = tf.keras.layers.Input(shape=(maxlen,), dtype=tf.int32)
    
    # Token and position embedding layer
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    
    # Transformer block
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    
    # Output layer
    outputs = tf.keras.layers.Dense(vocab_size)(x)
    
    # Construct the model
    model = tf.keras.Model(inputs=inputs, outputs=[outputs, x])
    
    # Loss function
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    
    # Compile the model
    model.compile("adam", loss=[loss_fn, None],)  # No loss and optimization based on word embeddings from transformer block
    
    return model


<br>

### DOWNLOAD AND PREPARE DATASET

---

Download the IMDB dataset and combine training and validation sets for a text
generation task.

In [7]:
# Download the dataset
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0 80.2M    0 49152    0     0  32751      0  0:42:48  0:00:01  0:42:47 32768
  0 80.2M    0  560k    0     0   222k      0  0:06:09  0:00:02  0:06:07  222k
  4 80.2M    4 3824k    0     0  1083k      0  0:01:15  0:00:03  0:01:12 1083k
  6 80.2M    6 5056k    0     0  1093k      0  0:01:15  0:00:04  0:01:11 1094k
  8 80.2M    8 7184k    0     0  1307k      0  0:01:02  0:00:05  0:00:57 1431k
  9 80.2M    9 7952k    0     0  1233k      0  0:01:06  0:00:06  0:01:00 1597k
 15 80.2M   15 12.5M    0     0  1720k      0  0:00:47  0:00:07  0:00:40 2483k
 19 80.2M   19 15.6M    0     0  1812k      0  0:00:45  0:00:08  0:00:37 2300k
 19 80.2M   19 15.6M    0     0  1626k      0  0:00

In [7]:
# Batch size
batch_size = 128

In [8]:
# Initialize a list for filesnames
filenames = []

# Loop over the directories
for dir in ["aclImdb/train/pos", "aclImdb/train/neg", "aclImdb/test/pos", "aclImdb/test/neg"]:
    
    # Loop over the files inside each directory
    for f in os.listdir(dir):
        
        # Append the filename to the list
        filenames.append(os.path.join(dir, f))

# Report
print(f"{len(filenames)} files")

50000 files


In [9]:
# Shuffle the filenames
random.shuffle(filenames)

# Load the dataset through tf.data
text_ds = tf.data.TextLineDataset(filenames)

# Shuffle the dataset
text_ds = text_ds.shuffle(buffer_size=256)

# Set the batch size
text_ds = text_ds.batch(batch_size)

In [10]:
# Function for custom standardization
def custom_standardization(input_string):
    
    # Lowercase the text
    lowercased = tf.strings.lower(input_string)
    
    # Remove html line-break tags
    stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")
    
    # Handle punctuation
    out = tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")
    
    return out


# Text vectorization layer
vectorize_layer = tf.keras.layers.TextVectorization(standardize=custom_standardization,
                                                    max_tokens=vocab_size - 1,
                                                    output_mode="int",
                                                    output_sequence_length=maxlen + 1,
)

# Adapt the vectorization layer to the text
vectorize_layer.adapt(text_ds)

# Vocabulary list
vocab = vectorize_layer.get_vocabulary()  


In [11]:
# Function for preparing the inputs and labels
def prepare_lm_inputs_labels(text):
    
    # Add the extra dimension to the text
    text = tf.expand_dims(text, -1)
    
    # Vectorize the text
    tokenized_sentences = vectorize_layer(text)
    
    # Inputs (all words except the last)
    x = tokenized_sentences[:, :-1]
    
    # Labels (shifted one position)
    y = tokenized_sentences[:, 1:]
    
    return x, y

# Map the function to the dataset
text_ds = text_ds.map(prepare_lm_inputs_labels)

# Prefetch the dataset
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

<br>

### CALLBACK FUNCTION

---

In [12]:
# Class for generating text
class TextGenerator(tf.keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    # Constructor function
    def __init__(self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        
        # Initialization
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    # Function for sampling from the model
    def sample_from(self, logits):
        
        # Finds values and indices of the k largest entries for the last dimension.
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        
        # Covert indices to numpy array
        indices = np.asarray(indices).astype("int32")
        
        # Softmax to convert logits to probabilities
        preds = tf.keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        
        # Convert to numpy array
        preds = np.asarray(preds).astype("float32")
        
        # Generates a random sample from a given 1-D array
        out = np.random.choice(indices, p=preds)
        
        return out 

    # Function for converting indices to tokens
    def detokenize(self, number):
        
        # Convert index to word
        return self.index_to_word[number]

    # Function for generating text
    def on_epoch_end(self, epoch, logs=None):
        
        # Initialize the start tokens 
        start_tokens = [_ for _ in self.start_tokens]
        
        # Every `print_every` epochs
        if (epoch + 1) % self.print_every != 0:
            
            # Return
            return
        
        # Initialize the number of tokens generated
        num_tokens_generated = 0
        
        # Initialize the tokens generated
        tokens_generated = []
        
        # Loop until the number of tokens generated is less than the maximum number of tokens
        while num_tokens_generated <= self.max_tokens:
            
            # Pad length 
            pad_len = maxlen - len(start_tokens)
            
            # The index of the last token in the start_tokens
            sample_index = len(start_tokens) - 1
            
            # If the pad length is less than 0
            if pad_len < 0:
                
                # Inputs: Start tokens from 0 to maxlen
                x = start_tokens[:maxlen]
                
                # Set the sample index to maxlen - 1
                sample_index = maxlen - 1
                
            # If the pad length is greater than 0
            elif pad_len > 0:
                
                # Inputs: Start tokens and pad with 0s
                x = start_tokens + [0] * pad_len
                
            # If the pad length is 0
            else:
                
                # Inputs: Start tokens
                x = start_tokens
                
            # Convert to numpy array
            x = np.array([x])
            
            # Predict
            y, _ = self.model.predict(x)
            
            # Sample from the model
            sample_token = self.sample_from(y[0][sample_index])
            
            # Append the sample token to the generated tokens
            tokens_generated.append(sample_token)
            
            # Append the sample token to the start tokens
            start_tokens.append(sample_token)
            
            # Increment the number of tokens generated
            num_tokens_generated = len(tokens_generated)
            
        # Join the predicted tokens 
        txt = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated])
        
        # Report
        print(f"GENERATED TEXT:\n{txt}\n")

In [19]:
# Initialize a word2index dictionary
word_to_index = {}

# Loop over the vocabulary
for index, word in enumerate(vocab):
    
    # Add word and index to the dictionary
    word_to_index[word] = index

# Start prompt
start_prompt = "the wizard "

# Convert the start prompt to token indices
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]

# Number of tokens to be generated
num_tokens_generated = 50

# Initialize the text generator
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

<br>

### TRAINING

---

Note: This code should preferably be run on GPU.

In [20]:
# Initialize the model
model = create_model()

# Model summary
model.summary()

Model: "model_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         [(None, 80)]              0         
_________________________________________________________________
token_and_position_embedding (None, 80, 256)           5140480   
_________________________________________________________________
transformer_block_2 (Transfo (None, 80, 256)           658688    
_________________________________________________________________
dense_8 (Dense)              (None, 80, 20000)         5140000   
Total params: 10,939,168
Trainable params: 10,939,168
Non-trainable params: 0
_________________________________________________________________


In [21]:
# Train the model
model.fit(text_ds, verbose=1, epochs=30, callbacks=[text_gen_callback])

Epoch 1/30
GENERATED TEXT:
the wizard of the world is not . the worst of it was a good . the movie is just plain awful , the acting , i was really liked this was so bad and i am very funny at least one of the best , i have watched it . . i

Epoch 2/30
GENERATED TEXT:
the wizard is a bit of the film 's , but it 's so hard to describe what i think i 'd expect it , if it wasn 't too bad and it 's really bad . i have a lot of it to say it was very funny and i was the

Epoch 3/30
GENERATED TEXT:
the wizard of oz was a great film , but in it was an awesome way to see . i had never heard of it , i was very happy with the same old [UNK] . the film also shows that we can get their own rules for . the film is about

Epoch 4/30
GENERATED TEXT:
the wizard of oz is one of the best films i have seen in the past . i was very impressed with an [UNK] in the middle of [UNK] , this movie is so awful that it was so bad that i was . i would give it a 10 ! i 'm

Epoch 5/30
GENERATED TEXT:
the wizard is a pr

<br>

### PREDICTION

---

In [None]:
# # Class for generating text
# class TextGenerator():

#     # Constructor function
#     def __init__(self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        
#         # Initialization
#         self.max_tokens = max_tokens
#         self.start_tokens = start_tokens
#         self.index_to_word = index_to_word
#         self.print_every = print_every
#         self.k = top_k

#     # Function for sampling from the model
#     def sample_from(self, logits):
        
#         # Finds values and indices of the k largest entries for the last dimension.
#         logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        
#         # Covert indices to numpy array
#         indices = np.asarray(indices).astype("int32")
        
#         # Softmax to convert logits to probabilities
#         preds = tf.keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        
#         # Convert to numpy array
#         preds = np.asarray(preds).astype("float32")
        
#         # Generates a random sample from a given 1-D array
#         out = np.random.choice(indices, p=preds)
        
#         return out 

#     # Function for converting indices to tokens
#     def detokenize(self, number):
        
#         # Convert index to word
#         return self.index_to_word[number]

#     # Function for generating text
#     def on_epoch_end(self, epoch, logs=None):
        
#         # Initialize the start tokens 
#         start_tokens = [_ for _ in self.start_tokens]
        
#         # Every `print_every` epochs
#         if (epoch + 1) % self.print_every != 0:
            
#             # Return
#             return
        
#         # Initialize the number of tokens generated
#         num_tokens_generated = 0
        
#         # Initialize the tokens generated
#         tokens_generated = []
        
#         # Loop until the number of tokens generated is less than the maximum number of tokens
#         while num_tokens_generated <= self.max_tokens:
            
#             # Pad length 
#             pad_len = maxlen - len(start_tokens)
            
#             # The index of the last token in the start_tokens
#             sample_index = len(start_tokens) - 1
            
#             # If the pad length is less than 0
#             if pad_len < 0:
                
#                 # Inputs: Start tokens from 0 to maxlen
#                 x = start_tokens[:maxlen]
                
#                 # Set the sample index to maxlen - 1
#                 sample_index = maxlen - 1
                
#             # If the pad length is greater than 0
#             elif pad_len > 0:
                
#                 # Inputs: Start tokens and pad with 0s
#                 x = start_tokens + [0] * pad_len
                
#             # If the pad length is 0
#             else:
                
#                 # Inputs: Start tokens
#                 x = start_tokens
                
#             # Convert to numpy array
#             x = np.array([x])
            
#             # Predict
#             y, _ = self.model.predict(x)
            
#             # Sample from the model
#             sample_token = self.sample_from(y[0][sample_index])
            
#             # Append the sample token to the generated tokens
#             tokens_generated.append(sample_token)
            
#             # Append the sample token to the start tokens
#             start_tokens.append(sample_token)
            
#             # Increment the number of tokens generated
#             num_tokens_generated = len(tokens_generated)
            
#         # Join the predicted tokens 
#         txt = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated])
        
#         # Report
#         print(f"GENERATED TEXT:\n{txt}\n")

# # Initialize a word2index dictionary
# word_to_index = {}

# # Loop over the vocabulary
# for index, word in enumerate(vocab):
    
#     # Add word and index to the dictionary
#     word_to_index[word] = index

# # Start prompt
# start_prompt = "the wizard "

# # Convert the start prompt to token indices
# start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]

# # Number of tokens to be generated
# num_tokens_generated = 50

# # Initialize the text generator
# text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

<br>

### EVALUATION

---

In [None]:
# TODO