### Cofig

In [1]:
cfg = {
"block_size" : 256,
"batch_size": 64,
"dropout": 0.2,
"n_embd" : 384,
"n_heads" : 6,
"vocab_size": 65,
"n_layers": 6,
"head_dim": int(384/6),
"learning_rate" : 3e-4,
"max_iters": 5000,
"eval_interval": 500,
"eval_iters": 200
}

### Importing Libraries

In [4]:
import numpy as np
import tensorflow as tf
import os

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Importing Text File

In [5]:
with open('/content/drive/MyDrive/Colab Notebooks/nano GPT/tinyshakespeare.txt', 'r', encoding='utf-8') as f:
    text = f.read()

In [6]:
print(text[:100])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [7]:
len(text)

1115394

In [8]:
chars = sorted(list(set(text)))

In [9]:
len(chars)

65

In [10]:
"".join(chars)

"\n !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

In [11]:
vocab_size = len(chars)
vocab_size

65

### TokenID

In [12]:
stoi = {ch:i for i, ch in enumerate(chars)} # chars to tokenID mapping
itos = {i:ch for i, ch in enumerate(chars)} # tokenID to chars mapping

# given a string it converts every char to tokenID and returns a list
encode = lambda s: [stoi[c] for c in s]
# given a list of tokenID's it converts it back to string and returns it
decode = lambda l: ''.join([itos[i] for i in l])

In [13]:
encode('hello world')

[46, 43, 50, 50, 53, 1, 61, 53, 56, 50, 42]

In [14]:
decode([46, 43, 50, 50, 53, 1, 61, 53, 56, 50, 42])

'hello world'

### Train Val Split

In [15]:
data = tf.convert_to_tensor(encode(text))

In [16]:
data[:100].numpy()

array([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43,
       44, 53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39,
       52, 63,  1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1,
       51, 43,  1, 57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31,
       54, 43, 39, 49,  6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56,
       57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 37, 53, 59],
      dtype=int32)

In [17]:
n = int(0.9* len(data))
train = data[:n]
val = data[n:]

In [18]:
print(len(train))
print(len(val))

1003854
111540


### Data Loading

In [19]:
def get_batch(split):
    data = train if split == 'train' else val
    # this generates the start index for each batch from [0, len-blocksize]
    ix = np.random.randint(low = 0, high=len(data)-cfg['block_size'], size = cfg['batch_size'])
    # this creates the x and y from the start index
    x = tf.stack([data[i: i+cfg['block_size']] for i in ix], axis = 0)
    y = tf.stack([data[i+1: i+cfg['block_size']+1] for i in ix], axis = 0)

    return x, y

In [20]:
x, y = get_batch('train')

In [21]:
x

<tf.Tensor: shape=(64, 256), dtype=int32, numpy=
array([[52, 58, 10, ..., 53, 59, 45],
       [ 1, 39,  1, ..., 47, 53, 52],
       [47, 57,  1, ..., 52, 53, 61],
       ...,
       [59, 41, 41, ...,  1, 51, 63],
       [ 1, 51, 39, ...,  1, 59, 57],
       [ 1, 46, 47, ..., 17, 26, 17]], dtype=int32)>

In [22]:
y

<tf.Tensor: shape=(64, 256), dtype=int32, numpy=
array([[58, 10,  0, ..., 59, 45, 46],
       [39,  1, 41, ..., 53, 52,  6],
       [57,  1, 52, ..., 53, 61,  1],
       ...,
       [41, 41, 43, ..., 51, 63, 57],
       [51, 39, 49, ..., 59, 57,  2],
       [46, 47, 57, ..., 26, 17, 26]], dtype=int32)>

### Self Attention Block

In [23]:
class Head(tf.Module):
    # one head of self attention
    def __init__(self, head_size, name = None):
        super().__init__(name = name)
        self.head_size = head_size
        self.key = tf.keras.layers.Dense(head_size, use_bias=False)
        self.query = tf.keras.layers.Dense(head_size, use_bias=False)
        self.value = tf.keras.layers.Dense(head_size, use_bias=False)
        # this creates a lower triangle matrix with the numbers above the diagonal set to 0 and trianable = False
        self.tril = tf.linalg.band_part(tf.Variable(tf.ones((cfg['block_size'], cfg['block_size'])),
                                                    trainable=False),
                                        num_lower=-1,
                                        num_upper=0)
        self.dropout = tf.keras.layers.Dropout(cfg['dropout'])

    def __call__(self, x):
        # Input shape (batch, time_step, channels)
        # Output shape (batch, time_step, num_heads)

        B,T,C = x.shape

        # key, query, value
        k = self.key(x) # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        v = self.value(x) # (B,T,hs)

        # attention scores
        # attenion scores using dot product of query and keys and scaling it
        attention_scores = q @ tf.transpose(k, perm=[0, 2, 1]) * k.shape[-1]**-0.5 # (B,T,hs) @ (B,hs,T) --> (B,T,T)
        # masking the attention scores of future tokens
        mask = self.tril[:T, :T]
        attention_scores = tf.where(mask == 0, tf.fill(attention_scores.shape, float('-inf')), attention_scores) # (B,T,T)
        attention_weights = tf.nn.softmax(attention_scores, axis = -1)
        attention_weights = self.dropout(attention_weights)
        # weighted aggregation of values
        out = attention_weights @ v # (B,T,T) @ (B,T,hs) --> (B,T,hs)

        return out

### MultiHeadAttention

In [24]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, n_heads, n_embd, name=None):
        super().__init__(name=name)
        self.n_embd = n_embd # 384
        self.num_heads = n_heads  # 6
        self.head_dim = int(n_embd/n_heads) # 384/6 = 64
        self.W_key = tf.keras.layers.Dense(n_embd, use_bias=False)
        self.W_query = tf.keras.layers.Dense(n_embd, use_bias=False)
        self.W_value = tf.keras.layers.Dense(n_embd, use_bias=False)
        self.dropout = tf.keras.layers.Dropout(cfg['dropout'])
        self.tril = tf.linalg.band_part(tf.Variable(tf.ones((cfg['block_size'], cfg['block_size'])),
                                                    trainable=False),
                                        num_lower=-1,
                                        num_upper=0)
        self.out_proj = tf.keras.layers.Dense(n_embd)

    def call(self, x, training = True):
        b, num_tokens, embd_dim = x.shape
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        keys = tf.reshape(keys, (b, num_tokens, self.num_heads, self.head_dim))
        queries = tf.reshape(queries, (b, num_tokens, self.num_heads, self.head_dim))
        values = tf.reshape(values, (b, num_tokens, self.num_heads, self.head_dim))

        keys = tf.transpose(keys, perm=[0,2,1,3])
        queries = tf.transpose(queries, perm=[0,2,1,3])
        values = tf.transpose(values, perm=[0,2,1,3])

        attention_scores = queries @ tf.transpose(keys, perm=[0, 1, 3, 2]) * keys.shape[-1]**-0.5
        mask = self.tril[:num_tokens, :num_tokens]

        attention_scores = tf.where(mask == 0, tf.fill(attention_scores.shape, float('-inf')), attention_scores)

        attention_weights = tf.nn.softmax(attention_scores, axis = -1)
        attention_weights = self.dropout(attention_weights, training = training)

        context_vec = tf.transpose(attention_weights @ values, perm = [0,2,1,3])

        context_vec = tf.reshape(context_vec, (b, num_tokens, self.n_embd))
        context_vec = self.out_proj(context_vec)
        return context_vec

### GPT Model

In [25]:
class FeedForward(tf.keras.layers.Layer):
    def __init__(self, cfg):
        super().__init__()
        self.net = tf.keras.Sequential(
            [
                tf.keras.layers.Dense(cfg['n_embd']*4),
                tf.keras.layers.ReLU(),
                tf.keras.layers.Dense(cfg['n_embd']),
            ]
        )
    def call(self, x, training = True):
        return self.net(x)

In [26]:
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, cfg):
        super().__init__()
        self.mha = MultiHeadAttention(n_heads=cfg['n_heads'], n_embd=cfg['n_embd'])
        self.feed = FeedForward(cfg)
        self.norm1 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
        self.norm2 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
        self.drop_shortcut = tf.keras.layers.Dropout(cfg['dropout'])
    def call(self, x, training = True):
        shortcut = x
        x = self.norm1(x)
        x = self.mha(x)
        x = self.drop_shortcut(x, training = training)
        x = x + shortcut

        shortcut = x
        x = self.norm2(x)
        x = self.feed(x)
        x = self.drop_shortcut(x)
        x = x + shortcut

        return x

In [27]:
def estimate_loss(model):
    out = []
    training = False
    for split in ['train', 'val']:
        losses = []
        for k in range(cfg['eval_iters']):
            x, y = get_batch(split)
            logits, loss = model(x,y, training = training)
            losses.append(loss.numpy())
        out.append(tf.reduce_mean(losses))
    return tf.convert_to_tensor(out)

In [28]:
def loss_func(targets, logits):
    loss_values = tf.keras.losses.sparse_categorical_crossentropy(targets, logits, from_logits=True)
    return tf.reduce_mean(loss_values)

In [29]:
class GPTModel(tf.keras.models.Model):
    def __init__(self, cfg):
        super().__init__()
        self.tok_emb = tf.keras.layers.Embedding(input_dim=cfg['vocab_size'], output_dim=cfg['n_embd'])
        self.pos_emb = tf.keras.layers.Embedding(input_dim=cfg['block_size'], output_dim=cfg['n_embd'])
        self.drop = tf.keras.layers.Dropout(cfg['dropout'])
        self.trf_blocks = tf.keras.Sequential(
            [TransformerBlock(cfg) for _ in range(cfg['n_layers'])]
        )
        self.final_norm = tf.keras.layers.LayerNormalization(epsilon=1e-5)
        self.out_head = tf.keras.layers.Dense(cfg['vocab_size'], use_bias=False)
    def call(self, in_idx, targets=None, training = True):
        batch, seq_len = in_idx.shape
        tok_emb = self.tok_emb(in_idx)
        pos_emb = self.pos_emb(tf.range(seq_len))
        x = tok_emb + pos_emb
        x = self.drop(x, training = training)
        x = self.trf_blocks(x, training = training)
        # self.final_norm.gamma.trainable = training
        # self.final_norm.beta.trainable = training
        x = self.final_norm(x)
        logits = self.out_head(x) # (b, num_tokens, vocab_size)

        if targets == None:
            loss = None
        else:
            loss = loss_func(targets, logits)
        return logits, loss

In [74]:
def generate_text(model, txt, max_new_tokens):
  idx = tf.convert_to_tensor([encode(txt)])
  for i in range(max_new_tokens):
      block_idx = idx[:, -cfg['block_size']:]
      logits, loss = model(block_idx, training=False)
      logits = logits[:, -1, :]
      probas = tf.nn.softmax(logits, axis = -1)
      idx_next = np.argmax(probas, axis = -1, keepdims=True)
      idx = tf.concat([idx, idx_next], axis = -1)

  decoded_text = decode(np.squeeze(idx.numpy()).tolist())
  print(decoded_text)

In [36]:
nano_gpt = GPTModel(cfg)

### Training

In [37]:
opt = tf.keras.optimizers.AdamW(learning_rate=cfg['learning_rate'])

In [38]:
nano_gpt(x, y)

(<tf.Tensor: shape=(64, 256, 65), dtype=float32, numpy=
 array([[[-0.20846345, -1.9429071 ,  1.3011606 , ...,  0.78615093,
          -1.0147915 , -0.6277273 ],
         [ 0.3492674 , -1.5829298 ,  2.2471912 , ...,  0.9334457 ,
           0.3026073 ,  1.1786331 ],
         [ 1.329802  , -2.1322942 ,  3.784091  , ..., -0.1127113 ,
          -0.03692227, -1.1613263 ],
         ...,
         [-0.88985485,  0.23510765,  0.21676378, ...,  1.2272862 ,
          -0.06108657,  0.7893808 ],
         [-0.02013638,  0.09895454,  0.69079965, ...,  1.1361108 ,
           0.14594793,  0.6558433 ],
         [-0.32299265, -0.51938426,  0.2724907 , ...,  0.55434775,
          -0.40204123, -0.1493415 ]],
 
        [[-2.26906   , -1.0781363 , -0.07381792, ...,  2.152628  ,
          -0.1096975 ,  0.801946  ],
         [-0.1483921 , -1.6223848 , -1.2839822 , ...,  2.412289  ,
           0.70111674, -0.25935566],
         [-0.3733255 , -2.5930138 , -0.23633076, ...,  1.7961799 ,
           0.2627119 , -0.28

In [39]:
folder_path = '/content/drive/MyDrive/Colab Notebooks/nano GPT'

In [40]:
for i in range(cfg['max_iters']):
    xb, yb = get_batch('train')
    with tf.GradientTape() as tape:
        logits, loss = nano_gpt(xb, yb)
    gradients = tape.gradient(loss, nano_gpt.trainable_variables)
    opt.apply_gradients(zip(gradients, nano_gpt.trainable_variables))
    if i%cfg['eval_interval'] == 0:
        losses = tf.stop_gradient(estimate_loss(nano_gpt))
        print(f'train_loss: {losses[0]} and val_loss: {losses[1]}')
        nano_gpt.save(os.path.join(folder_path, f'model{i}.keras'))

train_loss: 5.858647346496582 and val_loss: 5.950787544250488
train_loss: 2.3933098316192627 and val_loss: 2.4309804439544678
train_loss: 1.9363648891448975 and val_loss: 2.0601372718811035
train_loss: 1.6390371322631836 and val_loss: 1.8232166767120361
train_loss: 1.5008022785186768 and val_loss: 1.7000079154968262
train_loss: 1.4142546653747559 and val_loss: 1.630603313446045
train_loss: 1.3522528409957886 and val_loss: 1.5835031270980835
train_loss: 1.2997814416885376 and val_loss: 1.5369867086410522
train_loss: 1.2568883895874023 and val_loss: 1.515067458152771
train_loss: 1.2194623947143555 and val_loss: 1.4995356798171997


In [41]:
nano_gpt.save(os.path.join(folder_path, 'nano_gpt.keras'))

In [43]:
estimate_loss(nano_gpt)

<tf.Tensor: shape=(2,), dtype=float32, numpy=array([1.1928322, 1.493715 ], dtype=float32)>

In [75]:
generate_text(nano_gpt, 'helen', 700)

helen the story of the world,
Which we shall be so be so much before the storms.

KING RICHARD II:
What shall I stay thee, and thou shalt be so?

QUEEN ELIZABETH:
The strange of thy son of thy soul that I should stay.

KING RICHARD II:
So that I have been thee to thy soul have thy love.

KING RICHARD III:
So will I see thee thy soul I have thy love.

KING RICHARD III:
So that I have been thy son and my heart.

KING RICHARD III:
So that I have been thee to thy soul of York.

KING RICHARD III:
So that I have been thy son and my heart.

KING RICHARD III:
So that I have been thee to thy soul of York.

KING RICHARD III:
So that I have been thy son and my heart.

KING RICHARD III:
So that I have been t
