Referred to:- https://deepkondah.medium.com/step-by-step-implementation-of-generative-pre-trained-transformers-gpt-3c8e09622645

In [None]:
!pip install transformers
!pip install datasets
!pip install keras_nlp



In [None]:
!pip install keras_nlp



In [None]:
import keras_nlp
import numpy as np
import tensorflow as tf
from keras.layers import Concatenate, TextVectorization
from tensorflow import keras
from keras import layers
from datasets import load_dataset
from transformers import BertTokenizerFast
import re
import os

The AttentionHead class processes the input by:

    Mapping the input to query, key, and value spaces.
    Computing scaled dot-product attention, including masking to ensure causal attention.
    Applying softmax to get the attention weights.
    Using the attention weights to compute a weighted sum of the value vectors.

In [None]:
class AttentionHead(layers.Layer):

    def __init__(self, embedding_space_dimension):
        super().__init__()
        self.q_mapping = layers.Dense(embedding_space_dimension)
        self.k_mapping = layers.Dense(embedding_space_dimension)
        self.v_mapping = layers.Dense(embedding_space_dimension)

    def call(self, x):
        q = self.q_mapping(x)
        v = self.v_mapping(x)
        k = self.k_mapping(x)
        return self.scaled_dot_product_attention(q, k, v)

    def scaled_dot_product_attention(self, q, k, v):
        w = tf.matmul(q, k, transpose_b=True)
        d_k = tf.cast(tf.shape(k)[-1], tf.float32)
        w = w / tf.sqrt(d_k)
        w = self.mask_attn_weights(w)
        w = tf.nn.softmax(w)
        o = tf.matmul(w, v)
        return o

    def mask_attn_weights(self, w):
        shape = tf.shape(w)
        n = shape[1]
        attention_mask = self.attention_mask(n, w.dtype)
        attention_mask = tf.reshape(attention_mask, [1, n, n])
        m = tf.reshape(attention_mask, [1, n, n])
        w = w * m - tf.cast(1e11, w.dtype) * (1 - m)
        return w

    def attention_mask(self, n, dtype):
        """
        1's positioned in the lower triangular part, starting from the bottom-right corner.
        example:
          M =  [ 1 0 0
                 1 1 0
                 1 1 1]
        """
        i = tf.range(n)[:, None]
        j = tf.range(n)
        m = i >= j
        return tf.cast(m, dtype)

Multiple Attention Heads Process the Input: Each attention head independently computes its own version of attention.

Concatenate Outputs: The outputs from all attention heads are combined into a single tensor.

Linear Projection: This combined tensor is passed through a dense layer to get back to the original embedding dimension.

In [None]:
class MultiAttentionHead(layers.Layer):

    def __init__(self, embedding_space_dimension, numb_heads):
        super().__init__()
        self.attention_heads = [AttentionHead(embedding_space_dimension) for _ in range(numb_heads)]
        self.linear = layers.Dense(embedding_space_dimension)

    def call(self, x):
        heads = Concatenate()([attention_head(x) for attention_head in self.attention_heads])
        return self.linear(heads)

Self-Attention:

    The input is processed by a multi-head self-attention mechanism.
    Dropout is applied to the attention output.
    The result is added to the original input (residual connection).
    Layer normalization is applied to the sum.

Feedforward Network:

    The result from the self-attention block is processed by a feedforward network.
    Dropout is applied to the FFN output.
    The result is added to the input of the FFN block (another residual connection).
    Layer normalization is applied to this sum.

In [None]:
class TransformerDecoderBlock(layers.Layer):

    def __init__(self, embedding_space_dimension, numb_heads, ffn_dimension):
        super().__init__()
        self.self_attention = MultiAttentionHead(embedding_space_dimension, numb_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ffn_dimension, activation="relu"), layers.Dense(embedding_space_dimension), ])
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(0.1)
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout2 = layers.Dropout(0.1)

    def call(self, x):
        x = self.norm1(x + self.dropout1(self.self_attention(x)))
        x = self.norm2(x + self.dropout2(self.ffn(x)))
        return x

Converts the token indices to token embeddings.

Converts the position indices to positional embeddings.

Adds the token embeddings and positional embeddings to produce the final combined embeddings.

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

Embed the Input:

    Convert token indices to dense vectors and add positional information.

Process with Transformer Decoder Blocks: (by default 1)

    Pass the embeddings through several transformer decoder blocks to capture complex patterns in the sequence.

Generate Predictions:

    Project the processed embeddings to the vocabulary size to obtain logits for each token position.

In [None]:
class GPT(layers.Layer):
    def __init__(self, maxlen, embedding_space_dimension, numb_heads, vocab_size, num_layers=1):
        super().__init__()
        self.transformer_decoder_blocks = keras.Sequential([TransformerDecoderBlock(
            embedding_space_dimension=embedding_space_dimension,
            numb_heads=numb_heads,
            ffn_dimension=embedding_space_dimension,
        ) for _ in range(num_layers)])
        self.input_embedding = TokenAndPositionEmbedding(maxlen, vocab_size, embedding_space_dimension)
        self.prediction_output = keras.layers.Dense(vocab_size)

    def call(self, x):
        x = self.input_embedding(x)
        x = self.transformer_decoder_blocks(x)
        o = self.prediction_output(x)
        return o

In [None]:
maxlen = 121
projection_dimension = 256
n_heads = 8
vocab_size = 30522
nb_layers = 2

gpt = GPT(maxlen, projection_dimension, n_heads, vocab_size, nb_layers)
inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
model = keras.Model(name="gpt", inputs=inputs, outputs=gpt(inputs))

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True)
model.compile("adam", loss=loss_fn)

print("GPT model compiled successfully")
print(model.summary())

GPT model compiled successfully


None


In [None]:
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 80.2M  100 80.2M    0     0  12.5M      0  0:00:06  0:00:06 --:--:-- 11.3M


In [None]:
def preprocess(text):
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub('\s+', ' ', text)
    text = text.strip()
    text = '[start] ' + text + ' [end]'
    return text

dirs = [
    "aclImdb/train/pos",
    "aclImdb/train/neg",
    "aclImdb/test/pos",
    "aclImdb/test/neg",
]

def read_files(dirs):
    texts = []
    for dir in dirs:
        for file_name in os.listdir(dir):
            file_path = os.path.join(dir, file_name)
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
                preprocessed_text = preprocess(text)
                texts.append(preprocessed_text)
    return texts

texts = read_files(dirs)


tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
maxlen = 121

encoded_texts = tokenizer(texts, truncation=True, padding='max_length', max_length=maxlen + 1, return_tensors='np')

# Extract input_ids from the encoded texts
input_ids = encoded_texts['input_ids']

# Prepare input and output sequences
final_dataset = input_ids
inputs = final_dataset[:, :-1]
outputs = final_dataset[:, 1:]

print("Inputs:", inputs[:2])
print("Outputs:", outputs[:2])


Inputs: [[  101  1031  2707  1033  2017  3726  2196  2464  2505  2066  2009  2320
   1996  8648  4269  2049  1996  2087 28190  3341 11253 29337 22573  4017
  10874  2017  3363  2412  2156  2130  2295  2017  2113  1996  9560  1998
   2049  2035  2613  2138  2049  1037  4516  6429  7987  7987  2011  1996
   2051  2009  2001  2058  2009  2001  2006  2026  2327  2184  2862  1997
   2035  2051  2307  5691 19892  7987 27770  1996 22889 16429  5677  2075
   2157  9328  5470 17592  3071  1045  2113  2040  2038  2464  2023  2143
   3957  2009  1996  1018 14117  5790  2130  2065  2017  2123  2102  2729
   2055  4331  2030  2055 15332  4331  2017  2097  2424  4426  9113 22648
   8126  1998  2903  2009  2006  1996  3341  1997  2115  2835 19892  7987
   2049]
 [  101  1031  2707  1033 12807  1998  3811  6383 12127  3235  8040  5668
   6810 20798  1037  4989  1997  2010  5440  2137  3152 15131  2429  2000
   2093  2367  4127  1997  5501  1996  2472  2004  2019 12492  2923  1040
   2860 14135  2030  

In [None]:
callback = keras.callbacks.EarlyStopping(monitor='loss',
                                         patience=3)

model.fit(x=inputs, y=outputs, epochs=15, callbacks=[callback])

Epoch 1/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m390s[0m 179ms/step - loss: 5.8257
Epoch 2/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m214s[0m 131ms/step - loss: 4.6363
Epoch 3/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m203s[0m 130ms/step - loss: 4.3659
Epoch 4/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m263s[0m 131ms/step - loss: 4.1921
Epoch 5/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m262s[0m 131ms/step - loss: 4.0628
Epoch 6/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m204s[0m 130ms/step - loss: 3.9641
Epoch 7/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m263s[0m 131ms/step - loss: 3.8690
Epoch 8/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m262s[0m 131ms/step - loss: 3.7932
Epoch 9/15
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m257s[0m 128ms/step - loss: 3.7333
Epoch 10/15
[1m1563/1563[0m [32m━━

<keras.src.callbacks.history.History at 0x7f1b1610dde0>

In [None]:
#function to generate text one by one
def generate_text(model, tokenizer, input_text, max_length, sampler):
    input_padded_tokens = tokenizer(text, truncation=True, padding='max_length', max_length=max_length)
    input_token_ids = np.array([input_padded_tokens["input_ids"]])

    def next(prompt, cache, index):
      logits = model(prompt)[:, index - 1, :]
      hidden_states = None
      return logits, hidden_states, cache

    output_tokens = sampler(
                    next=next,
                    prompt=input_token_ids,
                    index=len(np.nonzero(input_padded_tokens["input_ids"])[0]) - 1)

    txt = tokenizer.decode(output_tokens[0])

    return txt

In [None]:
input_text = "Once"
greedy_sampler = keras_nlp.samplers.GreedySampler()


generated_text = generate_text(model, tokenizer, input_text, max_length=maxlen, sampler=greedy_sampler)
print(f"Generated Text: \n{generated_text}\n")

Generated Text: 
[CLS] in general comedy movies the only bit the british comedies are the british comedies of the british comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies are of comedy comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies comedies 

In [None]:
TopK_sampler = keras_nlp.samplers.TopKSampler(k=10)

generated_text = generate_text(model, tokenizer, input_text, max_length=maxlen, sampler=TopK_sampler)
print(f"Generated Text: \n{generated_text}\n")

Generated Text: 
[CLS] in general comedy movies he is breezy romantic comedy satire the young man comedy a vice griffith the einstein is the einstein is breezy supporting role comedy director walter burns a bree bra sullivan maririshly naiveutesally einstein walter sullivan scarte sullivan is the sullivan sullivan ira hoffman flynn sullivan sullivan sullivan sullivan in romantic comedies with walter sullivan davies flynn sullivan sullivan davies flynn succeeds once a late in unexpectedly davies unexpectedly corbett while unexpectedly flynn flynn gets unexpectedly unexpectedly unexpectedly unexpectedly flynns flynn succeeds as flynn sullivan flynn is a breezy corbettly bree corbett off flynn gets mistaken flynn corbett flynn is corbett off flynn davies is the

