In [1]:
# gpt configuration

GPT_CONFIG_124M = {
    "n_layers": 12,   # number of transformer blocks
    "n_heads": 12,
    "drop_rate": 0.1,
    "context_length": 1024,
    "embed_dim": 768,
    "vocab_size": 50257,
    "qkv_bias": False,
}

In [2]:
import torch
import torch.nn as nn



class GELU(nn.Module):
    def __init__(self):
        super().__init__()


    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2 / torch.pi)) * (x + 0.044715 * torch.pow(x, 3))))


class LayerNorm(nn.Module):
    '''normalizing is adjusting the mean and std of the input'''

    # layer norm is applied on feature dimension
    # vs batch norm depends on the batch dimension
    def normalize(self, x: torch.Tensor) -> torch.Tensor:
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)

        # 1+eps to avoid division by zero
        return (x - mean) / torch.sqrt(var + 1e-5)


    def __init__(self, embed_dim: int):
        super().__init__()
    
        self.scale = nn.Parameter(torch.ones(embed_dim))
        self.shift = nn.Parameter(torch.zeros(embed_dim))


    def forward(self, x):
        normalized = self.normalize(x)
        # just turnable knobs to adjust normalization during training
        return normalized * self.scale + self.shift


class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.layers = nn.Sequential(
            # expansion
            nn.Linear(config["embed_dim"], 4 * config["embed_dim"]),
            # activation
            GELU(),
            # contraction
            nn.Linear(4 * config["embed_dim"], config["embed_dim"]),
        )

    def forward(self, x):
        return self.layers(x)

class MultiheadAttention1(nn.Module):
    def __init__(self, d_in, d_out, num_heads, context_length, dropout, qkv_bias=False):
        super().__init__()

        # check if d_in is divisible by num_heads
        assert d_in % num_heads == 0, "d_in must be divisible by num_heads"

        self.d_in = d_in
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length), diagonal=1))
        self.out_proj = nn.Linear(d_out, d_out)
    
    def forward(self, x):

        # x dims = batch_size, num_tokens, d_in
        b, num_tokens, d_in = x.shape
        q = self.W_query(x)         # batch_size, num_tokens, d_out
        k = self.W_key(x)           # batch_size, num_tokens, d_out
        v = self.W_value(x)         # batch_size, num_tokens, d_out

        # split the query, key and value into multiple heads
        q = q.view(b, num_tokens, self.num_heads, self.head_dim)
        k = k.view(b, num_tokens, self.num_heads, self.head_dim)
        v = v.view(b, num_tokens, self.num_heads, self.head_dim)

        # transpose the query to batch_size, num_heads, num_tokens, head_dim

        #  new dims = batch_size, num_heads, num_tokens, head_dim
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)
     
        # calculate the attention scores = num_tokens x num_tokens
        attention_scores = q @ k.transpose(2, 3)

        # apply the mask to the attention scores
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]
        attention_scores.masked_fill_(mask_bool, -torch.inf)

        # apply the softmax to the attention scores
        attention_weights = torch.softmax(attention_scores / self.head_dim**0.5, dim=-1)    

        # drop out the attention weights
        attention_weights = self.dropout(attention_weights)

        # calculate the context vector
        # attention_weights dims = batch_size, num_heads, num_tokens, num_tokens
        # v dims = batch_size, num_heads, num_tokens, head_dim

        # new dims = batch_size, num_heads, num_tokens, head_dim
        context_vector = attention_weights @ v

        # new dims = batch_size, num_tokens, num_heads, head_dim
        context_vector = context_vector.transpose(1, 2)

        # merge the heads
        # new dims = batch_size, num_tokens, d_out
        context_vector = context_vector.contiguous().view(b, num_tokens, self.d_out)

        # project the context vector to the output dimension
        # new dims = batch_size, num_tokens, d_out
        context_vector = self.out_proj(context_vector)

        return context_vector

class TransformerBlock(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.att = MultiheadAttention1(
            d_in=config["embed_dim"],
            d_out=config["embed_dim"],
            num_heads=config["n_heads"],
            context_length=config["context_length"],
            dropout=config["drop_rate"],
            qkv_bias=config["qkv_bias"],
        )
        
        self.feed_forward = FeedForward(config)

        self.norm1 = LayerNorm(config["embed_dim"])
        self.norm2 = LayerNorm(config["embed_dim"])

        self.dropout = nn.Dropout(config["drop_rate"])

    def forward(self, x):
        shortcut = x

        x = self.norm1(x)
        x = self.att(x)
        x = self.dropout(x)
        x = x + shortcut

        shortcut = x

        x = self.norm2(x)
        x = self.feed_forward(x)
        x = self.dropout(x)
        x = x + shortcut

        return x


class GPTModel(nn.Module):
    def __init__(self, config):
        super().__init__()
        
        # token embedding matrix
        self.token_embedding = nn.Embedding(config["vocab_size"], config["embed_dim"])

        # positional embedding
        self.positional_embedding = nn.Embedding(config["context_length"], config["embed_dim"])

        self.dropout = nn.Dropout(config["drop_rate"])


        # transformer blocks
        self.transformer_blocks = nn.Sequential(*[
            TransformerBlock(config) for _ in range(config["n_layers"])
        ])

        # final layer norm
        self.final_layer_norm = LayerNorm(config["embed_dim"])

        self.output_head = nn.Linear(config["embed_dim"], config["vocab_size"], bias=False)
        

    def forward(self, input_idx):
        input_idx_batch_sz, input_idx_len = input_idx.shape

        # create the embedding matrix
        token_embeddings = self.token_embedding(input_idx)
        positional_embeddings = self.positional_embedding(torch.arange(input_idx_len))

        vector_embeddings = token_embeddings + positional_embeddings


        # drop out
        vector_embeddings = self.dropout(vector_embeddings)

        # transformer blocks
        transformer_output = self.transformer_blocks(vector_embeddings)

        # final layer norm
        normalized_output = self.final_layer_norm(transformer_output)

        # output head
        logits = self.output_head(normalized_output)

        return logits

In [3]:
torch.manual_seed(123)

# test trasnformer with dummy data batch size 2

tb = torch.randn(2, 4, 768)

transformer_block = TransformerBlock(GPT_CONFIG_124M)

output = transformer_block(tb)
print("shape of output: ", output.shape)

shape of output:  torch.Size([2, 4, 768])


In [4]:
!pip install tiktoken



In [5]:
import tiktoken
import torch

sentence1 = "Every effort moves you"
sentence2 = "Every day holds a"

enc = tiktoken.get_encoding("gpt2")
tokens1 = enc.encode(sentence1)
print(tokens1)

tokens2 = enc.encode(sentence2)
print(tokens2)


batch = [torch.tensor(tokens1), torch.tensor(tokens2)]
tb = torch.stack(batch, dim=0)
print(tb)
print("shape of tb: ", tb.shape)



[6109, 3626, 6100, 345]
[6109, 1110, 6622, 257]
tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])
shape of tb:  torch.Size([2, 4])


In [6]:
dummy_gpt = DummyGPTModel(GPT_CONFIG_124M)

output = dummy_gpt(tb)
print("shape of output: ", output.shape)

print(output)




NameError: name 'DummyGPTModel' is not defined

In [None]:
# shortcut connection tutorial 

# it will make gradient smoother, and help with vanishing gradient problem

class ShortcutConnection(nn.Module):
    def __init__(self, layer_sizes: list[int], use_shortcut: bool):
        super().__init__()

        self.layers = nn.ModuleList(
            [
                nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]), GELU()),
                nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]), GELU()),
                nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]), GELU()),
                nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]), GELU()),
                nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]), GELU()),
            ]
        )

        self.use_shortcut = use_shortcut

    def forward(self, x):
        for layer in self.layers:
            layer_output = layer(x)

            if self.use_shortcut and x.shape == layer_output.shape:
                x = x + layer_output
            else:
                x = layer_output

        return x

layer_size = [3, 3, 3, 3, 3, 1]
use_shortcut = False

torch.manual_seed(123)
shortcut_connection = ShortcutConnection(layer_size, use_shortcut)

x = torch.tensor([1.0, 0.0, -1.0])

print(shortcut_connection(x))



tensor([0.0610], grad_fn=<MulBackward0>)


In [None]:
def print_gradient(model, x):
    output = model(x)
    target = torch.tensor([0.0])
    loss = nn.MSELoss()(output, target)
    loss.backward()

    for name, param in model.named_parameters():
        if 'weight' in name:
            # print mean absolute value of gradient
            print(name, param.grad.abs().mean().item())


print_gradient(shortcut_connection, x)

layers.0.0.weight 0.0006052076350897551
layers.1.0.weight 0.000360334845026955
layers.2.0.weight 0.0021456119138747454
layers.3.0.weight 0.004196620546281338
layers.4.0.weight 0.01514893677085638


In [None]:
# use_shortcut = True
shortcut_connection.use_shortcut = True
print_gradient(shortcut_connection, x)






layers.0.0.weight 0.2223031222820282
layers.1.0.weight 0.20684535801410675
layers.2.0.weight 0.3305492103099823
layers.3.0.weight 0.2698953449726105
layers.4.0.weight 1.3107051849365234


In [7]:
# run gpt 

torch.manual_seed(123)

gpt_model = GPTModel(GPT_CONFIG_124M)

output = gpt_model(tb)

print("input shape: ", tb.shape)

print("output: ", output)

print("output.shape: ", output.shape)

input shape:  torch.Size([2, 4])
output:  tensor([[[ 0.3613,  0.4223, -0.0711,  ...,  0.3483,  0.4661, -0.2838],
         [-0.1792, -0.5660, -0.9485,  ...,  0.0477,  0.5181, -0.3168],
         [ 0.7120,  0.0332,  0.1085,  ...,  0.1018, -0.4327, -0.2553],
         [-1.0076,  0.3418, -0.1190,  ...,  0.7195,  0.4023,  0.0532]],

        [[-0.2564,  0.0900,  0.0335,  ...,  0.2659,  0.4454, -0.6806],
         [ 0.1230,  0.3653, -0.2074,  ...,  0.7705,  0.2710,  0.2246],
         [ 1.0558,  1.0318, -0.2800,  ...,  0.6936,  0.3205, -0.3178],
         [-0.1565,  0.3926,  0.3288,  ...,  1.2630, -0.1858,  0.0388]]],
       grad_fn=<UnsafeViewBackward0>)
output.shape:  torch.Size([2, 4, 50257])


In [8]:
total_params = sum(p.numel() for p in gpt_model.parameters())
print("total parameters: ", total_params)







total parameters:  163009536


In [9]:
# # generate sample tokens

def generate_tokens(model, idx, max_input_tokens, context_size):

#     # idx shape is batch_size, num_tokens
#     # tensor([[6109, 3626, 6100,  345],
#         [6109, 1110, 6622,  257]])
    for _ in range(max_input_tokens):

        # look at the last context_size tokens
        idx_cond = idx[:, -context_size:]

        # batch, number of tokens, vocab_size
        with torch.no_grad():
            logits = model(idx_cond) 


        # get last rows from each batch because that is our prediction
        last_rows = logits[:, -1, :]

        # get the probability which is most likely to be the next token
        probs = torch.softmax(last_rows, dim=-1)

        idx_next = torch.argmax(probs, dim=-1, keepdim=True)

        # append the next token to the context
        idx = torch.cat((idx, idx_next), dim=1)

    return idx


In [None]:
sample_text = "Hello, I am"
tokens = enc.encode(sample_text)
encoded_tokens = torch.tensor(tokens).unsqueeze(0)

gpt_model.eval()

token_output = generate_tokens(model=gpt_model, idx=encoded_tokens, max_input_tokens=6, context_size=4)

print(token_output)

# decode the tokens
decoded_tokens = enc.decode(token_output[0].tolist())
print(decoded_tokens)







tensor([[15496,    11,   314,   716, 27018, 34305, 14838,  2694, 20485, 18702]])
