## 1. Transformer Block

In [7]:
import torch
from torch import nn
import numpy as np

In [8]:
# Defining the GPT configuration settings

GPT_Config_124M = {
    "vocab_size": 50257,
    "context_length": 1024, 
    "emb_dim": 768,
    "n_heads": 12,
    "n_layers": 12,
    "dropout_rate": 0.0,
    "qkv_bias": True
}

In [9]:
# Reading data 

with open(r"C:\Users\nyasa\Downloads\BUILDING LLM FROM SCRATCH\Stage1\1.Data Preparation and Sampling\1.Tokenization\1.Word-Based-Tokenization\the-verdict.txt", "r") as f:
    text_data = f.read()
text_data[:1000]

'I HAD always thought Jack Gisburn rather a cheap genius--though a good fellow enough--so it was no great surprise to me to hear that, in the height of his glory, he had dropped his painting, married a rich widow, and established himself in a villa on the Riviera. (Though I rather thought it would have been Rome or Florence.)\n\n"The height of his glory"--that was what the women called it. I can hear Mrs. Gideon Thwing--his last Chicago sitter--deploring his unaccountable abdication. "Of course it\'s going to send the value of my picture \'way up; but I don\'t think of that, Mr. Rickham--the loss to Arrt is all I think of." The word, on Mrs. Thwing\'s lips, multiplied its _rs_ as though they were reflected in an endless vista of mirrors. And it was not only the Mrs. Thwings who mourned. Had not the exquisite Hermia Croft, at the last Grafton Gallery show, stopped me before Gisburn\'s "Moon-dancers" to say, with tears in her eyes: "We shall not look upon its like again"?\n\nWell!--even 

In [10]:
# NB: Input = [b, num_tokens, 768]
# 1. Activation function
class GELUActivation(nn.Module):
    def __init__(self):
        super().__init__()
    def forward(self, x):
        # Pass the input through the GELU activation - Approximate formula
        return 0.5 * x * (1 + torch.tanh(torch.sqrt(torch.tensor(2.0) / torch.pi) * (x + 0.44715 * torch.pow(x, 3))))
        
# 2. Layer normalization
class LayerNorm(nn.Module):
    def __init__(self, emb_dim):
        super().__init__()
        # Defining the epsilon -  small constant added to the variance to prevent zero division - undefined - limits
        self.eps = 1e-5
        # Defining the scaling and shifting parameters - trainable - better results
        self.shift = nn.Parameter(torch.zeros(emb_dim))
        self.scale = nn.Parameter(torch.ones(emb_dim))
    # Forward pass
    def forward(self, x):
        # Getting the mean and variance of each row
        mean = x.mean(dim=-1, keepdim=True)
        variance = x.var(dim=-1, keepdim=True, unbiased=False)
        # Getting the normalization values
        norm_x = (x - mean) / torch.sqrt(variance + self.eps)
        # Returning the normalized values of x shifted and scaled - finetuning parameters
        return self.scale * norm_x + self.shift
        
# 3. Feed forward        
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], cfg["emb_dim"]*4),
            GELUActivation(),
            nn.Linear(cfg["emb_dim"]*4, cfg["emb_dim"])
        )
    def forward(self, x):
        return self.layers(x)
# 4. Attention Mechanism
# Creating the multi-head attention compact class
import torch
from torch import nn

class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, num_heads, context_length, dropout_rate, bias_units=False):
        super().__init__()
        assert d_out % num_heads == 0, "dimensions out must be divisible by number of heads"
        # Getting the head dimensions
        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        # Initializing the key query value weights - (d_out, d_out)
        self.w_key = nn.Linear(d_in, d_out, bias=bias_units)
        self.w_query = nn.Linear(d_in, d_out, bias=bias_units)
        self.w_value = nn.Linear(d_in, d_out, bias=bias_units)
        # Initializing the final projection layer - optional - (d_out, d_out)
        self.out_proj = nn.Linear(d_out, d_out)
        # Creating the masking layer
        self.register_buffer("mask", torch.triu(
            torch.ones(context_length, context_length),
            diagonal = 1
        ))
        # Creating the dropout layer
        self.dropout = nn.Dropout(dropout_rate)
    # Forward pass    
    def forward(self, x):
        # Exploding the input shape
        b, num_tokens, d_out = x.shape
        # Getting the key query value matrices (b, num_tokens, d_out)
        keys = self.w_key(x)
        queries = self.w_query(x)
        values =  self.w_value(x)
        # Reshaping the key query value matrices - (b, num_tokens, num_head, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        # Grouping by number of heads - (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)
        # Getting the attention scores - (b, num_heads, num_tokens, num_tokens)
        attention_scores = queries @ keys.transpose(2, 3)
        # Masking the attention scores
        attention_scores.masked_fill_(
            self.mask.bool()[:num_tokens, :num_tokens],
            -torch.inf
        )
        # Scaling the attention scores
        attention_scores = attention_scores / keys.shape[-1]**0.5
        # Getting the attention weights
        attention_weights = torch.softmax(attention_scores, dim=-1)
        # Implementing the dropout layer
        attention_weights = self.dropout(attention_weights)
        # Getting the context vector - (b, num_heads, num_tokens, head_dim)
        context_vector = attention_weights @ values
        # Reshaping the context vectors - (b, num_tokens, num_heads, head_dim)
        context_vector = context_vector.transpose(1, 2)
        # Combining the result of mutiple heads - d_out = num_heads * head_dim
        context_vector = context_vector.contiguous().view(b, num_tokens, d_out)
        # Passing the final context vector into the projection layer - optional
        context_vector = self.out_proj(context_vector)
        return context_vector
# Transformer class        
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Defining the normalization layers
        self.layerNorm1 = LayerNorm(cfg["emb_dim"])
        self.layerNorm2 = LayerNorm(cfg["emb_dim"])
        # Defining the dropout layers
        self.drop_shortcut = nn.Dropout(cfg["dropout_rate"])
        # Defining the Multi-Head Attention layer
        self.attention = MultiHeadAttention(
            d_in = cfg["emb_dim"],
            d_out = cfg["emb_dim"],
            num_heads = cfg["n_heads"],
            context_length = cfg["context_length"],
            dropout_rate = cfg["dropout_rate"],
            bias_units = cfg["qkv_bias"]
        )
        # Deefining the feed forward layer
        self.feed_forward = FeedForward(cfg)
    def forward(self, x):
        # Shortcut connection for attention block
        shortcut1 = x
        x = self.layerNorm1(x)
        x = self.attention(x)
        x = self.drop_shortcut(x)
        # Add the original input the output 
        x = x +  shortcut1
        # Shortcut connection for the 
        shortcut2 = x
        x = self.layerNorm2(x)
        x = self.feed_forward(x)
        x = self.drop_shortcut(x)
        # Add the original output
        x = x + shortcut2
        return x
# Gpt model        
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Defining the token embedding layer
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        # Defining the positional embedding layer
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        # Defining the dropout layer 
        self.drop_emb = nn.Dropout(cfg["dropout_rate"])
        # Defining the transformer blocks
        self.transformer_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        # Defining the final normalization layer
        self.final_norm = LayerNorm(cfg["emb_dim"])
        # Defining the final linear layer
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
    # Forward pass
    def forward(self, in_idx):
        # Explosion of shape
        batch_size, seq_len = in_idx.shape
        # Getting the token embeddings
        token_embeddings = self.tok_emb(in_idx)
        # Getting the positional embeddings
        positional_embeddings = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        # Getting the input embeddings
        input_embeddings = token_embeddings + positional_embeddings
        # Passing the input embeddings through the dropout layer
        input_embeddings = self.drop_emb(input_embeddings)
        # Passing the input embeddings through the transformer blocks
        input_embeddings = self.transformer_blocks(input_embeddings)
        # Passing the input embeddings through the final normalization layer
        input_embeddings = self.final_norm(input_embeddings)
        # Passint the input embeddings through the final linear layer to get logits
        logits = self.out_head(input_embeddings)
        return logits        

In [11]:
# Creating the decoder and encoder functions
import tiktoken
tokenizer = tiktoken.get_encoding("gpt2")
# Encoder function
def encoder(sample_text, tokenizer=tokenizer):
    encoded_tensor = torch.tensor(tokenizer.encode(sample_text, allowed_special={"<|endoftext|>"})).unsqueeze(0)
    return encoded_tensor
# Decoder function
def decoder(encoded_tensor, tokenizer=tokenizer):
    decoded_text = tokenizer.decode(encoded_tensor.squeeze(0).numpy())
    return decoded_text    

## 4. Comparing generation with decoding strategies and without decoding strategies

In [12]:
import weight_download

In [13]:
settings, params = weight_download.download_and_load_gpt2(r"C:\Users\nyasa\Downloads\gpt2", "124M")



File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\checkpoint




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\encoder.json




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\hparams.json




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\model.ckpt.data-00000-of-00001




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\model.ckpt.index




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\model.ckpt.meta




File already exists and is upto dateC:\Users\nyasa\Downloads\gpt2\124M\vocab.bpe


In [14]:
settings

{'n_vocab': 50257, 'n_ctx': 1024, 'n_embd': 768, 'n_head': 12, 'n_layer': 12}

In [15]:
params.keys()

dict_keys(['blocks', 'b', 'g', 'wpe', 'wte'])

In [16]:
import torch
import numpy as np

# --- Assign and Check function
def assign_and_check(left, right, name=""):
    if left.shape != right.shape:
        raise ValueError(f"\033[41mShape mismatch: {left.shape} != {right.shape}\033[0m")
    
    assigned_param = torch.nn.Parameter(torch.tensor(right))
    is_equal = torch.allclose(assigned_param.data, torch.tensor(right), atol=1e-5)

    print(f"✅ Assignment successful for {name}: {is_equal}")

    return assigned_param

# --- Weight Loading Function
def load_gpt_weights_into_custom(gpt, params):
    # Assign token and positional embeddings
    gpt.tok_emb.weight = assign_and_check(gpt.tok_emb.weight, params["wte"], name="tok_emb")
    gpt.pos_emb.weight = assign_and_check(gpt.pos_emb.weight, params["wpe"], name="pos_emb")

    # Loop over blocks
    for b in range(len(params["blocks"])):
        # Split weights into q, k, v
        q_w, k_w, v_w = np.split(params["blocks"][b]["attn"]["c_attn"]["w"], 3, axis=-1)

        # Attention weights
        gpt.transformer_blocks[b].attention.w_query.weight = assign_and_check(
            gpt.transformer_blocks[b].attention.w_query.weight, q_w.T, name=f"block{b}_w_query"
        )
        gpt.transformer_blocks[b].attention.w_key.weight = assign_and_check(
            gpt.transformer_blocks[b].attention.w_key.weight, k_w.T, name=f"block{b}_w_key"
        )
        gpt.transformer_blocks[b].attention.w_value.weight = assign_and_check(
            gpt.transformer_blocks[b].attention.w_value.weight, v_w.T, name=f"block{b}_w_value"
        )

        # Split biases into q, k, v
        q_b, k_b, v_b = np.split(params["blocks"][b]["attn"]["c_attn"]["b"], 3, axis=-1)

        # Attention biases
        gpt.transformer_blocks[b].attention.w_query.bias = assign_and_check(
            gpt.transformer_blocks[b].attention.w_query.bias, q_b, name=f"block{b}_b_query"
        )
        gpt.transformer_blocks[b].attention.w_key.bias = assign_and_check(
            gpt.transformer_blocks[b].attention.w_key.bias, k_b, name=f"block{b}_b_key"
        )
        gpt.transformer_blocks[b].attention.w_value.bias = assign_and_check(
            gpt.transformer_blocks[b].attention.w_value.bias, v_b, name=f"block{b}_b_value"
        )

        # Attention output projection
        gpt.transformer_blocks[b].attention.out_proj.weight = assign_and_check(
            gpt.transformer_blocks[b].attention.out_proj.weight,
            params["blocks"][b]["attn"]["c_proj"]["w"].T,
            name=f"block{b}_attn_out_proj_w"
        )
        gpt.transformer_blocks[b].attention.out_proj.bias = assign_and_check(
            gpt.transformer_blocks[b].attention.out_proj.bias,
            params["blocks"][b]["attn"]["c_proj"]["b"],
            name=f"block{b}_attn_out_proj_b"
        )

        # Feed-forward first layer
        gpt.transformer_blocks[b].feed_forward.layers[0].weight = assign_and_check(
            gpt.transformer_blocks[b].feed_forward.layers[0].weight,
            params["blocks"][b]["mlp"]["c_fc"]["w"].T,
            name=f"block{b}_ffn_fc_w"
        )
        gpt.transformer_blocks[b].feed_forward.layers[0].bias = assign_and_check(
            gpt.transformer_blocks[b].feed_forward.layers[0].bias,
            params["blocks"][b]["mlp"]["c_fc"]["b"],
            name=f"block{b}_ffn_fc_b"
        )

        # Feed-forward second layer
        gpt.transformer_blocks[b].feed_forward.layers[2].weight = assign_and_check(
            gpt.transformer_blocks[b].feed_forward.layers[2].weight,
            params["blocks"][b]["mlp"]["c_proj"]["w"].T,
            name=f"block{b}_ffn_proj_w"
        )
        gpt.transformer_blocks[b].feed_forward.layers[2].bias = assign_and_check(
            gpt.transformer_blocks[b].feed_forward.layers[2].bias,
            params["blocks"][b]["mlp"]["c_proj"]["b"],
            name=f"block{b}_ffn_proj_b"
        )

        # LayerNorm 1
        gpt.transformer_blocks[b].layerNorm1.scale = assign_and_check(
            gpt.transformer_blocks[b].layerNorm1.scale,
            params["blocks"][b]["ln_1"]["g"],
            name=f"block{b}_ln1_g"
        )
        gpt.transformer_blocks[b].layerNorm1.shift = assign_and_check(
            gpt.transformer_blocks[b].layerNorm1.shift,
            params["blocks"][b]["ln_1"]["b"],
            name=f"block{b}_ln1_b"
        )

        # LayerNorm 2
        gpt.transformer_blocks[b].layerNorm2.scale = assign_and_check(
            gpt.transformer_blocks[b].layerNorm2.scale,
            params["blocks"][b]["ln_2"]["g"],
            name=f"block{b}_ln2_g"
        )
        gpt.transformer_blocks[b].layerNorm2.shift = assign_and_check(
            gpt.transformer_blocks[b].layerNorm2.shift,
            params["blocks"][b]["ln_2"]["b"],
            name=f"block{b}_ln2_b"
        )

    # Final normalization layer
    gpt.final_norm.scale = assign_and_check(
        gpt.final_norm.scale,
        params["g"],
        name="final_norm_g"
    )
    gpt.final_norm.shift = assign_and_check(
        gpt.final_norm.shift,
        params["b"],
        name="final_norm_b"
    )

    # Final output head (weight tying with embeddings)
    gpt.out_head.weight = assign_and_check(
        gpt.out_head.weight,
        params["wte"],
        name="out_head_weight"
    )

    return gpt


In [17]:
gpt_custom = GPTModel(GPT_Config_124M)

In [18]:
gpt_openAI = load_gpt_weights_into_custom(gpt_custom, params)

✅ Assignment successful for tok_emb: True
✅ Assignment successful for pos_emb: True
✅ Assignment successful for block0_w_query: True
✅ Assignment successful for block0_w_key: True
✅ Assignment successful for block0_w_value: True
✅ Assignment successful for block0_b_query: True
✅ Assignment successful for block0_b_key: True
✅ Assignment successful for block0_b_value: True
✅ Assignment successful for block0_attn_out_proj_w: True
✅ Assignment successful for block0_attn_out_proj_b: True
✅ Assignment successful for block0_ffn_fc_w: True
✅ Assignment successful for block0_ffn_fc_b: True
✅ Assignment successful for block0_ffn_proj_w: True
✅ Assignment successful for block0_ffn_proj_b: True
✅ Assignment successful for block0_ln1_g: True
✅ Assignment successful for block0_ln1_b: True
✅ Assignment successful for block0_ln2_g: True
✅ Assignment successful for block0_ln2_b: True
✅ Assignment successful for block1_w_query: True
✅ Assignment successful for block1_w_key: True
✅ Assignment successful 

In [19]:
def generate(model, idx, max_new_tokens, context_size, temp=0.0, top_k=None, eos_id=None):
    for _ in range(max_new_tokens):
        # Slicing to context
        idx_cond = idx[:, -context_size:]
        # Getting the logits
        with torch.no_grad():
            logits = model(idx_cond)
        # Getting the logits for the last prediction task
        logits = logits[:, -1, :]
        # Applying top-k sampling
        if top_k is not None:
            top_logits, _ = torch.topk(input=logits, k=top_k, dim=-1)
            # Getting the smallest top logit value - [b, logits]
            min_val = top_logits[-1].min()
            # -Infinity masking
            logits = torch.where(
                condition = logits < min_val,
                input = torch.tensor(-torch.inf).to(logits.device),
                other = logits
            )
        # Applying temprature scaling
        if temp > 0.0:
           logits = logits / temp
           # Applying softmax
           probabilities = torch.softmax(logits, dim=-1)
           # Getting the next token
           idx_next = torch.multinomial(probabilities, num_samples=1)
        # If temprature is None - use argmax    
        else:
            probabilities = torch.softmax(logits, dim=-1)
            idx_next = torch.argmax(probabilities, dim=-1, keepdim=True)
        # If end of sequence token is encountered end sequence early    
        if idx_next == eos_id:
            break
        # Updating the idx - input
        idx = torch.cat((idx, idx_next), dim=-1)
    # Returning the initial plus predicted tokens    
    return idx    

In [20]:
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
sample_text = "what is business"
token_ids = torch.tensor(tokenizer.encode(sample_text, allowed_special={"<|endoftext|>"})).unsqueeze(dim=0)
ids = generate(
    model = gpt_openAI,
    idx=token_ids,
    max_new_tokens=50, 
    context_size = 1024,
    temp = 1.4, 
    top_k=25
).squeeze(0).numpy()
tokenizer.decode(ids)

'what is business." This was a common complaint:\n\nWhat did the White House, Office of N. Strategy and the Office of Nidescoping Services, do. At one time or the risk is this, the EPA is investigating to what degree of pollution'

In [21]:
# Saving the model

torch.save(gpt_openAI.state_dict(), r"C:\Users\nyasa\Downloads\gpt2\gpt_openAI.pth")