In [2]:
from dataclasses import dataclass
import torch
import torch.nn as nn
from torch.nn import functional as F  
import math
import os
import time
import inspect


In [None]:

class CasualSelfAttention(nn.Module):   #Relationships between tokens (via self-attention)

    def __init__(self, config):
        super().__init__()
        assert config.n_embd % config.h_head == 0
        # key query, value projections for all heads, but in a batch
        self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd)
        # output projection
        self.c_proj = nn.Linear(config.n_embd, config.n_embd)
        # regularlization
        self.n_head = config.n_head
        self.n_embd = config.n_embd
        # not really a bias, more of a mask, but following the OpenAi/HF naming
        self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))

    def forward(self, x):
        B, T, C = x.size()  # batch, sequence length, embeding size
        qkv = self.c_attn(x)
        q, k, v = qkv.split(self.n_embd, dim=2)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # Every other student compares that question to their Key (what they know).
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # question a student is asking
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)  # Each student gives their Value (knowledge), weighted by how well their Key matched the Query.
        # attention
        att = (q @ k.transpose(-2, -1) * (1.0 / math.sqrt(k.size(-1))))  #(B, n_head, T, head_dim) @ (B, n_head, head_dim, T) ==> (B, n_head, T, T)
        # Determines how relevant each other token is
        # q: (2, 4, 5, 64)
        # kᵀ: (2, 4, 64, 5)
        # att = q @ kᵀ: (2, 4, 5, 5)
        att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float('-inf'))
        # shape will become (1, 1, T, T)
        # [[[1, 0, 0, 0],
        #   [1, 1, 0, 0],
        #   [1, 1, 1, 0],
        #   [1, 1, 1, 1]]]
        att = F.softmax(att, dim=-1)
        # Ensures focus is on the most relevant tokens
        # softmax does not change the dimensionality; 
        # it just normalizes the values along the specified dimension (dim=-1).
        y = att @ v  # (B, n_head, T, head_dim):
        # Gathers contextualized information from those tokens
        y = y.transpose(1, 2).contigious().view(B, T, C)
        # (B, T, C)
        y = self.c_proj(y)
        return y
    
class MLP(nn.Module):  #A two-layer feedforward network : Learns token-level transformations

    def __init__(self, config):
        super().__init__()
        """
        Applies to each token independently
        Self-attention | Multiple tokens (sequence-wise) | Mixes information across positions
        MLP | One token (feature-wise) | Refines how info is represented per token
        Self-attention: “How does this token relate to other tokens in the sentence?”
        MLP: “How do I better express or transform the features of this one token?”
        """
        self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd)  #Projects to higher dimension (richer representation)
        self.gelu = nn.GELU(approximate='tanh')    #Adds non-linearity (helps model complex patterns)
        self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd)   #Projects back to original embedding size
    
    def forward(self, x):  
        x = self.c_fc()
        x = self.gelu(x)
        x = self.c_proj(x)
        return x

class Block(nn.Module):

    def __init__(self, config):
        super().__init__()
        """
        Layernorm: computes stats over C (embedding dim) for each token
        """
        self.ln_1 = nn.LayerNorm(config.n_embd)
        self.attn = CasualSelfAttention(config)
        self.ln_2 = nn.LayerNorm(config.n_embd)
        self.mlp = MLP(config)
    
    def forward(self, x):  # residual connections - Helps training deeper models
        """
        Without residuals: every author rewrites the whole story from scratch.
        With residuals: each author edits or improves what the previous one wrote.
        Eases Gradient Flow:makes training deeper networks more stable and faster.
        Helps with Identity Mapping: If a layer isn’t useful, the model can easily learn to do nothing:  This prevents deeper layers from hurting performance and helps optimization.
        Encourages Incremental Learning: it can just refine or tweak it.
        """
        x = x + self.attn(self.ln_1(x))
        x = x + self.attn(self.ln_2(x))
        return x

@dataclass
class GPTConfig:
    """
    Head size = how deeply each attention head can understand the input.
    Depth (larger head size = deeper understanding per head)

    Number of heads = how broadly the model explores different relationships.
    Have enough heads to capture diverse relationships (semantic, positional, etc.)
    Breadth (more heads = more perspectives)    
    """
    block_size: int = 1024  # max sqeunce length (T)
    vocab_size: int = 50257  # 65 number of tokes: 50000 BPE merges + 256 bytes tokens + <|endoftext|>
    n_layer: int = 12  # number of transformer block or stacked layers
    n_head: int = 12  # number of attention heads
    n_embd: int = 768 # embedding dimension
    
class GPT(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.config = config

        """ 
        input x -> (B, T)
        wte -> (B, T, C(n_embd)) (x)
        wpe -> (B, T, C(n_embd)) (position)
        """

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(config.vocab_size, config.n_embd),  #word token embeddings
            wpe = nn.Embedding(config.block_size, config.n_embd),  # position embeddings
            # “The word ‘dog’ at position 1 is not the same as ‘dog’ at position 100.”
            h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]),  # transformer layers
            ln_f = nn.LayerNorm(config.n_embd),  # final layer
        ))
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)  # output layer

    def forward(self, idx):
        B, T = idx.size()
        assert T <= self.config.block_size, f"Cannot forward sequence of the length {T}, block size of {B}"
        pos = torch.arange(0, T, dtype=torch.long, device=idx.device)  # shape (T)
        pos_emb = self.transformer.wpe(pos) # pos embeddings of shape (T, n_embd)
        tok_emb = self.transformer.wte(idx) # token embeddings (B, T, n_embd)
        x = tok_emb + pos_emb  # adding these
        for block in self.transformer.h:
            x = block(x)
        x = self.transformer.ln_f(x)
        logits = self.lm_head(x)  #(B, T, vocab_size)
        return logits
    
    # load parameters
    @classmethod
    def from_pretrained(cls, model_type):
        """Loads pretrained GPT-2 model weights from huggingface"""
        assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}
        from transformers import GPT2LMHeadModel
        print("loading weights from pretrained gpt: %s" % model_type)

        config_args = {
            'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),
            'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024),
            'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280),
            'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600),
        }[model_type]
        config_args['vocab_size'] = 50257  # always 50257 for GPT model checkpoints
        config_args['block_size'] = 1024  # always 1024 for GPT model checkpoints

        config = GPTConfig(**config_args)
        model = GPT(config)
        sd = model.state_dict()  #.state_dict() that returns a dictionary of all learnable parameters and buffers in the model.
        sd_keys = sd.keys()  # key is .weight / .bias etc..
        sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]

        model_hf = GPT2LMHeadModel.from_pretrained(model_type)
        sd_hf = model_hf.state_dict()

        sd_keys_hf = sd_hf.keys()
        sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.masked_bias')]  #This removes any keys that end with .attn.masked_bias  These are buffer entries (not trainable parameters) often used for attention masks 
        sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.bias')]
        transposed = ['attn.c_attn.weight','attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']
        assert len(sd_keys_hf) == len(sd_keys), f"mismatched keys: {len(sd_keys_hf)} != {len(sd_keys)}"
        for k in sd_keys_hf:
            if any(k.endswith(w) for w in transposed):
                assert sd_hf[k].shape[::-1] == sd[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k].t())
            else:
                assert sd_hf[k].shape == sd[k].shape
                with torch.no_grad():
                    sd[k].copy_(sd_hf[k])
        return model

In [8]:
a = torch.tril(torch.ones(8, 8)).view(1, 1, 8, 8)
a

tensor([[[[1., 0., 0., 0., 0., 0., 0., 0.],
          [1., 1., 0., 0., 0., 0., 0., 0.],
          [1., 1., 1., 0., 0., 0., 0., 0.],
          [1., 1., 1., 1., 0., 0., 0., 0.],
          [1., 1., 1., 1., 1., 0., 0., 0.],
          [1., 1., 1., 1., 1., 1., 0., 0.],
          [1., 1., 1., 1., 1., 1., 1., 0.],
          [1., 1., 1., 1., 1., 1., 1., 1.]]]])

In [10]:
# Config
batch_size = 2
seq_length = 4
vocab_size = 100
embedding_dim = 8
block_size = 16  # max sequence length

# Sample input
token_ids = torch.tensor([
    [5, 8, 2, 3],   # example 1
    [7, 6, 1, 0]    # example 2
])  # Shape: (2, 4)

# Position indices (0 to T-1)
position_ids = torch.arange(seq_length)  # Shape: (4,)

# Embedding layers
wte = nn.Embedding(vocab_size, embedding_dim)  # token embedding
wpe = nn.Embedding(block_size, embedding_dim)  # position embedding

# Apply embeddings
tok_emb = wte(token_ids)           # (2, 4, 8)
pos_emb = wpe(position_ids)        # (4, 8)
x = tok_emb + pos_emb              # (2, 4, 8) — broadcasted addition

In [18]:
tok_emb[0]

tensor([[ 1.2644,  0.1056, -0.2670, -0.8304, -1.0564, -0.3676, -0.5851,  0.0837],
        [-0.0536,  0.9074, -0.0990,  1.3859, -0.7152, -0.7872, -0.1566,  0.4855],
        [-0.2358, -1.7843,  0.5786, -2.4957, -1.9267,  0.5426,  1.5097,  0.4468],
        [ 0.0316,  1.0584, -1.0380,  0.7808,  0.8842, -0.0497, -0.8083, -0.6393]],
       grad_fn=<SelectBackward0>)

In [21]:
pos_emb[1]

tensor([ 1.9580, -0.5378, -1.1977, -0.9342,  0.7744, -0.9729, -0.4574,  0.2160],
       grad_fn=<SelectBackward0>)

In [20]:
x[0]

tensor([[ 2.0106, -0.0364,  0.6077,  1.7487, -2.0082,  0.3499,  0.6859,  1.0258],
        [ 1.9044,  0.3696, -1.2967,  0.4518,  0.0592, -1.7601, -0.6139,  0.7016],
        [ 0.1103, -0.7721, -0.0748, -3.2344, -2.5200, -0.2137,  1.5868, -0.1149],
        [ 0.6435,  0.4013, -1.1126,  1.4585,  2.4205,  0.9778, -1.8962, -1.0458]],
       grad_fn=<SelectBackward0>)