In [1]:
# The MIT License (MIT) Copyright (c) 2022 Emilio Morales
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of 
# this software and associated documentation files (the "Software"), to deal in the Software without 
# restriction, including without limitation the rights to use, copy, modify, merge, publish, 
# distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the 
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or 
# substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, 
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES 
# OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# GPT

This notebook is a simple and clean implementation of GPT (Generative Pre-trained Transformer).  The complete code can be found in [this repository](https://github.com/milmor/GPT).

In [2]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Disable tensorflow debugging logs
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
import numpy as np
import os
import re
import string
import random
import time

## 1.- Dataset

In [3]:
# Download the file
import pathlib

path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = pathlib.Path(path_to_zip).parent/'spa-eng/spa.txt'

def load_data(path):
    text = path.read_text(encoding='utf-8')

    lines = text.splitlines()
    pairs = [line.split('\t') for line in lines]

    inp = ['spanish sentence = ' + inp + ' english sentence = ' + targ + ' [END]' for targ, inp in pairs]

    return inp

In [4]:
ds = load_data(path_to_file)
print(ds[-1])

spanish sentence = Si quieres sonar como un hablante nativo, debes estar dispuesto a practicar diciendo la misma frase una y otra vez de la misma manera en que un músico de banjo practica el mismo fraseo una y otra vez hasta que lo puedan tocar correctamente y en el tiempo esperado. english sentence = If you want to sound like a native speaker, you must be willing to practice saying the same sentence over and over in the same way that banjo players practice the same phrase over and over until they can play it correctly and at the desired tempo. [END]


## 2.- Pipeline

In [5]:
BUFFER_SIZE = len(ds)
batch_size = 128

text_ds = tf.data.Dataset.from_tensor_slices(ds).shuffle(BUFFER_SIZE)
text_ds = text_ds.shuffle(BUFFER_SIZE).batch(batch_size)

In [6]:
vocab_size = 30000  
maxlen = 32

In [7]:
vectorize_layer = TextVectorization(
    standardize=None,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary() 


def preprocess(text):
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y


text_ds = text_ds.map(preprocess)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

In [8]:
test_batch_x, test_batch_y = next(iter(text_ds))
test_batch_x

<tf.Tensor: shape=(128, 32), dtype=int64, numpy=
array([[4, 2, 3, ..., 0, 0, 0],
       [4, 2, 3, ..., 0, 0, 0],
       [4, 2, 3, ..., 0, 0, 0],
       ...,
       [4, 2, 3, ..., 0, 0, 0],
       [4, 2, 3, ..., 6, 0, 0],
       [4, 2, 3, ..., 0, 0, 0]])>

## 3.- Model

<img src="../img/dot_product.png" width="500"/>

__Image source: Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., ... & Polosukhin, I. (2017). Attention is all you need. Advances in neural information processing systems, 30.__

\begin{equation}
\mbox{MultiHead}(Q, K, V) = \text{Concat}(\mbox{head}_1,\mbox{head}_2,\ldots,\mbox{head}_h)W^O,
\end{equation}

\begin{equation}
\mbox{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V) = \text{softmax}\left[\frac{QW_i^Q(KW_i^K)^T}{\sqrt{d_k}}\right]VW_i^V,
\label{eq:selfattention}
\end{equation}

### Dot-product attention

In [9]:
class MultiHeadAttention(layers.Layer):
    def __init__(self, model_dim, n_heads, rate=0.1, initializer='glorot_uniform'):
        super(MultiHeadAttention, self).__init__()
        self.n_heads = n_heads
        self.model_dim = model_dim

        assert model_dim % self.n_heads == 0

        self.head_dim = model_dim // self.n_heads

        self.wq = layers.Dense(model_dim, kernel_initializer=initializer)
        self.wk = layers.Dense(model_dim, kernel_initializer=initializer)
        self.wv = layers.Dense(model_dim, kernel_initializer=initializer)
        
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        
        self.wo = layers.Dense(model_dim, kernel_initializer=initializer)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.n_heads, self.head_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, q, k, v, mask=None):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  
        k = self.wk(k)  
        v = self.wv(v)  

        q = self.split_heads(q, batch_size) 
        k = self.split_heads(k, batch_size)  
        v = self.split_heads(v, batch_size) 

        dh = tf.cast(self.head_dim, tf.float32)
        qk = tf.matmul(q, k, transpose_b=True)
        scaled_qk =  qk / tf.math.sqrt(dh)
        
        if mask is not None:
            scaled_qk += (mask * -1e9) 

        attn = self.dropout1(tf.nn.softmax(scaled_qk, axis=-1))
        attn = tf.matmul(attn, v) 

        attn = tf.transpose(attn, perm=[0, 2, 1, 3]) 
        original_size_attention = tf.reshape(attn, (batch_size, -1, self.model_dim)) 

        output = self.dropout2(self.wo(original_size_attention))
        return output

### Transformer block

In [10]:
class TransformerBlock(layers.Layer):
    def __init__(self, emb_dim, n_heads=4, mlp_dim=512, 
                 rate=0.1, initializer='glorot_uniform', eps=1e-6, activation='gelu'):
        super(TransformerBlock, self).__init__()
        self.attn = MultiHeadAttention(emb_dim, n_heads, initializer=initializer)
        self.mlp = tf.keras.Sequential([
            layers.Dense(mlp_dim, activation=activation, kernel_initializer=initializer), 
            layers.Dense(emb_dim, kernel_initializer=initializer),
            layers.Dropout(rate)
        ])
        self.ln1 = layers.LayerNormalization(epsilon=eps)
        self.ln2 = layers.LayerNormalization(epsilon=eps)

    def call(self, inputs, mask=None):
        x = self.ln1(inputs)
        x = inputs + self.attn(x, x, x, mask) 
        x = x + self.mlp(self.ln2(x))
        return x
    
    
emb_dim = 128
test_layer = TransformerBlock(emb_dim)
test_layer(tf.ones([1, maxlen, emb_dim])).shape

TensorShape([1, 32, 128])

### Positional embedding

In [11]:
class TokenEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, emb_dim, 
                 rate=0.1, initializer='glorot_uniform'):
        super(TokenEmbedding, self).__init__()
        self.max_len = maxlen
        self.token_emb = layers.Embedding(
            input_dim=vocab_size, output_dim=emb_dim, 
            embeddings_initializer=initializer)
        self.position_emb = layers.Embedding(
            input_dim=maxlen, output_dim=emb_dim, 
            embeddings_initializer=initializer)
        self.dropout = layers.Dropout(rate)

    def call(self, x):
        token_embeddings = self.token_emb(x)
        positions = tf.range(start=0, limit=self.max_len, delta=1)
        positions = self.position_emb(positions)
        return self.dropout(token_embeddings + positions) 

### GPT

In [12]:
class GPT(tf.keras.models.Model):
    def __init__(self, vocab_size=20000, maxlen=512, 
                 emb_dim=256, heads=4, mlp_dim=128, depth=3, 
                 rate=0.2, initializer='glorot_uniform', 
                 embedding_initializer='glorot_uniform', eps=1e-6,
                 mlp_activation='gelu'):
        super(GPT, self).__init__()
        self.depth = depth
        self.tok_emb = TokenEmbedding(maxlen, vocab_size, 
                        emb_dim, rate=rate, initializer=embedding_initializer)
        self.drop = layers.Dropout(rate)
            
        self.transformer = [TransformerBlock(emb_dim, 
                                heads, mlp_dim, rate=rate,
                                initializer=initializer, eps=eps, 
                                activation=mlp_activation)
                            for _ in range(depth)]

        self.layernorm = layers.LayerNormalization(epsilon=eps)
        self.out = layers.Dense(vocab_size, kernel_initializer=initializer)
        
    def get_padding_mask(self, seq):
        seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
        # add extra dimensions to add the padding
        # to the attention logits.
        return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

    def get_attention_mask(self, size):
        mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
        return mask  # (seq_len, seq_len)
    
    def create_mask(self, x):
        attn_mask = self.get_attention_mask(tf.shape(x)[1])
        padding_mask = self.get_padding_mask(x)
        attn_mask = tf.maximum(padding_mask, attn_mask)
        return attn_mask
                       
    def call(self, x):
        mask = self.create_mask(x)
 
        x = self.tok_emb(x)
        x = self.drop(x)

        for i in range(self.depth):
            x = self.transformer[i](x, mask)

        x = self.layernorm(x)
        x = self.out(x)
        return x
       
        
emb_dim = 128
depth = 3
mlp_dim = 256

gpt = GPT(maxlen=maxlen, vocab_size=vocab_size, emb_dim=emb_dim,
            mlp_dim=mlp_dim, depth=depth)
out = gpt(test_batch_x)
out.shape

TensorShape([128, 32, 30000])

In [13]:
gpt.summary()

Model: "gpt"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 token_embedding (TokenEmbed  multiple                 3844096   
 ding)                                                           
                                                                 
 dropout_4 (Dropout)         multiple                  0         
                                                                 
 transformer_block_1 (Transf  multiple                 132480    
 ormerBlock)                                                     
                                                                 
 transformer_block_2 (Transf  multiple                 132480    
 ormerBlock)                                                     
                                                                 
 transformer_block_3 (Transf  multiple                 132480    
 ormerBlock)                                                   

## 4.- Training

In [14]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [15]:
text_to_ids = tf.keras.layers.StringLookup(
                vocabulary=vectorize_layer.get_vocabulary(),
                mask_token='')

In [16]:
ids_to_text = tf.keras.layers.StringLookup(
                vocabulary=vectorize_layer.get_vocabulary(),
                mask_token='',
                invert=True)

In [17]:
context = ['spanish sentence = Me gustan los perros rojos. english sentence = ',
           'spanish sentence = Me encanta escribir. english sentence = ',
           'spanish sentence = Los elefantes comen manzanas. english sentence = ']

def sample(model, context, maxlen):  
    words = [context.split()] # add batch dim
    x = tf.cast(text_to_ids(words), tf.int32)
    # Generate new text by sampling from the model
    for i in range(x.shape[1], maxlen):
        # Pad the input sequence to seq_len
        x_pad = tf.keras.preprocessing.sequence.pad_sequences(x, maxlen=maxlen, padding="post")
        # Generate logits from the model
        logits = model(x_pad, training=False)

        pred_index = tf.argmax(logits[:, i-1, :], axis=-1, 
                               output_type=tf.dtypes.int32)
        pred_index = pred_index[tf.newaxis]
        if ids_to_text(pred_index) == '[END]':
            break
        # Concatenate the new token to the sequence
        x = tf.concat([x, pred_index], axis=-1)

    str_list = ids_to_text(x)[0].numpy()
    text = ' '.join([s.decode('utf-8') for s in str_list])
    return text

for c in context:
    trans = sample(gpt, c, maxlen)
    print(f"{trans}")

spanish sentence = Me gustan los perros rojos. english sentence = Hizo suponer suicidado. cama! rehusaron suicidado. president? Hizo suicidado. ayude. suicidado. ¡Hay suicidado. suicidado. suicidado. rehusaron suicidado. compasión termina? pesadilla. suicidado.
spanish sentence = Me encanta escribir. english sentence = quedito. Hizo suicidado. suponer suicidado. cama! rehusaron suicidado. president? Hizo suicidado. ayude. suicidado. ¡Hay suicidado. suicidado. suicidado. rehusaron suicidado. compasión termina? pesadilla. suicidado.
spanish sentence = Los elefantes comen manzanas. english sentence = Hizo Hizo suponer Hizo cama! Hizo coleccionaba suicidado. Hizo suicidado. ayude. suicidado. ¡Hay suicidado. suicidado. suicidado. rehusaron rehusaron compasión termina? pesadilla. suicidado.


In [18]:
optimizer = tf.keras.optimizers.Adam(0.001, beta_1=0.9, beta_2=0.999)
train_loss = tf.keras.metrics.Mean(name='train_loss')

In [19]:
def loss_function(label, pred):
    mask = label != 0
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
    loss = loss_object(label, pred)

    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask

    loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
    return loss

In [20]:
@tf.function
def train_step(inp, tar):
    with tf.GradientTape() as tape:
        pred = gpt(inp, training=True)
        loss = loss_function(tar, pred)
    gradients = tape.gradient(loss, gpt.trainable_variables)
    optimizer.apply_gradients(zip(gradients, gpt.trainable_variables))
    train_loss(loss)

In [21]:
epochs = 10

for epoch in range(1, epochs):
    start = time.time()
    train_loss.reset_states()
    for (batch, (inp, tar)) in enumerate(text_ds):
        train_step(inp, tar)
    
    print(f'Time taken for epoch {epoch} is: {time.time() - start:.2f} secs', end=' ')
    print(f'Loss: {train_loss.result():.4f}')
    
    if epoch % 2 == 0:
        print('Output: ')
        for c in context:
            trans = sample(gpt, c, maxlen)
            print(f"{trans}")

Time taken for epoch 1 is: 36.89 secs Loss: 4.1995
Time taken for epoch 2 is: 18.92 secs Loss: 3.0519
Output: 
spanish sentence = Me gustan los perros rojos. english sentence = I like dogs.
spanish sentence = Me encanta escribir. english sentence = I love to sleep.
spanish sentence = Los elefantes comen manzanas. english sentence = The children are [UNK]
Time taken for epoch 3 is: 18.64 secs Loss: 2.6254
Time taken for epoch 4 is: 18.29 secs Loss: 2.3875
Output: 
spanish sentence = Me gustan los perros rojos. english sentence = I like dogs.
spanish sentence = Me encanta escribir. english sentence = I love writing.
spanish sentence = Los elefantes comen manzanas. english sentence = The birds eat apples.
Time taken for epoch 5 is: 17.84 secs Loss: 2.2406
Time taken for epoch 6 is: 18.62 secs Loss: 2.1420
Output: 
spanish sentence = Me gustan los perros rojos. english sentence = I like red dogs.
spanish sentence = Me encanta escribir. english sentence = I love writing.
spanish sentence = 