# Create a LLM (GPT-2) Model

In [2]:
# Model configuration
GPT_CONFIG_124M = {
    "vocab_size": 50257,     # Vocabulary size
    "context_length": 1024,  # Context length, the number of tokens
    "emb_dim": 768,          # Embedding dimension, for both tokens and contexts
    "n_heads": 12,           # Number of attention heads
    "n_layers": 12,          # Number of layers
    "drop_rate": 0.1,        # Dropout rate
    "qkv_bias": False        # Query-Key-Value bias, whether to include a bias vector in the Linear layers for query, key, and value computations.
}

## A Dummy GPT Model Class

In [3]:
import torch
import torch.nn as nn

class DummyGPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        
        self.trf_blocks = nn.Sequential(
            *[DummyTransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )
        
        self.final_norm = DummyLayerNorm(cfg["emb_dim"])
        
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
        
    def forward(self, in_idx):
        batch_size, seq_len = in_idx.shape
        
        # create embeddings from input tokens
        tok_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        
        x = tok_embeds + pos_embeds
        x = self.drop_emb(x)
        # Apply final layer normalization after multi-head attention
        x = self.final_norm(x)
        
        # Logit refers to the unscaled output of a model, often used in machine learning before applying a softmax function to convert them into probabilities; logits are crucial for calculating loss functions like cross-entropy, which measures the difference between predicted and true distributions
        logits = self.out_head(x)
        
        return logits

class DummyTransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        
    def forward(self, x):
        return x
    
class DummyLayerNorm(nn.Module):
    def __init__(self, emd_dim, eps=1e-5):
        super().__init__()
        self.eps = eps # a constant to prevent division by zero
        # scale and shift parameters, which are learned during training, they have the same dimensions as the input tensor
        self.scale = nn.Parameter(torch.ones(emd_dim))
        self.shift = nn.Parameter(torch.zeros(emd_dim))
        
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        return self.scale * norm_x + self.shift

In [4]:
# Tokenization

import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")

txt1 = "Every effort moves you"
txt2 = "Every day holds a"

batch = []
batch.append(torch.tensor(tokenizer.encode(txt1)))
batch.append(torch.tensor(tokenizer.encode(txt2)))
batch = torch.stack(batch, dim=0)

print(batch)

tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])


In [5]:
torch.manual_seed(123)
model = DummyGPTModel(GPT_CONFIG_124M)
logits = model(batch)

print("Logits output shape:", logits.shape)
print(logits)

Logits output shape: torch.Size([2, 4, 50257])
tensor([[[-1.2034,  0.3201, -0.7130,  ..., -1.5548, -0.2390, -0.4667],
         [-0.1192,  0.4539, -0.4432,  ...,  0.2392,  1.3469,  1.2430],
         [ 0.5307,  1.6720, -0.4695,  ...,  1.1966,  0.0111,  0.5835],
         [ 0.0139,  1.6754, -0.3388,  ...,  1.1586, -0.0435, -1.0400]],

        [[-1.0908,  0.1798, -0.9484,  ..., -1.6047,  0.2439, -0.4530],
         [-0.7860,  0.5581, -0.0610,  ...,  0.4835, -0.0077,  1.6621],
         [ 0.3567,  1.2698, -0.6398,  ..., -0.0162, -0.1296,  0.3717],
         [-0.2407, -0.7349, -0.5102,  ...,  2.0057, -0.3694,  0.1814]]],
       grad_fn=<UnsafeViewBackward0>)


### Layer Normalization

In [11]:
torch.manual_seed(123)
input = torch.randn(2, 5)
print("Input: ", input)

# Create a layer that takes 5 activations and outputs 6 activations
layer = nn.Sequential(nn.Linear(5, 6), nn.ReLU())
print("Layer: ", layer)

output = layer(input)
print("Output: ", output)

# Calculate mean and variance
mean = output.mean(dim=-1, keepdim=True)
variance = output.var(dim=-1, keepdim=True)
print("Mean before normalization: \n", mean)
print("Variance before normalization: \n", variance)

# Apply layer normalization
layer_norm = (output - mean) / torch.sqrt(variance)
print("Layer normalized: \n", layer_norm)
mean = layer_norm.mean(dim=-1, keepdim=True)
variance = layer_norm.var(dim=-1, keepdim=True)
print("Mean after normalization: \n", mean)
print("Variance after normalization: \n", variance)

Input:  tensor([[-0.1115,  0.1204, -0.3696, -0.2404, -1.1969],
        [ 0.2093, -0.9724, -0.7550,  0.3239, -0.1085]])
Layer:  Sequential(
  (0): Linear(in_features=5, out_features=6, bias=True)
  (1): ReLU()
)
Output:  tensor([[0.2260, 0.3470, 0.0000, 0.2216, 0.0000, 0.0000],
        [0.2133, 0.2394, 0.0000, 0.5198, 0.3297, 0.0000]],
       grad_fn=<ReluBackward0>)
Mean before normalization: 
 tensor([[0.1324],
        [0.2170]], grad_fn=<MeanBackward1>)
Variance before normalization: 
 tensor([[0.0231],
        [0.0398]], grad_fn=<VarBackward0>)
Layer normalized: 
 tensor([[ 0.6159,  1.4126, -0.8719,  0.5872, -0.8719, -0.8719],
        [-0.0189,  0.1121, -1.0876,  1.5173,  0.5647, -1.0876]],
       grad_fn=<DivBackward0>)
Mean after normalization: 
 tensor([[-5.9605e-08],
        [ 1.9868e-08]], grad_fn=<MeanBackward1>)
Variance after normalization: 
 tensor([[1.0000],
        [1.0000]], grad_fn=<VarBackward0>)


### GELU Activation Function

In [12]:
class GELU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return 0.5 * x * (1 + torch.tanh(
            torch.sqrt(torch.tensor(2.0 / torch.pi)) *
            (x + 0.044715 * torch.pow(x, 3))
        ))

### Feed Forward Network

In [13]:
class FeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
            GELU(),
            nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
        )

    def forward(self, x):
        return self.layers(x)

In [14]:
ffn = FeedForward(GPT_CONFIG_124M)
x = torch.rand(2, 3, 768)
out = ffn(x)
print(out.shape)

torch.Size([2, 3, 768])


### Shortcut Connection

In [15]:
class ExampleDeepNeuralNetwork(nn.Module):
    def __init__(self, layer_sizes, use_shortcut):
        super().__init__()
        self.use_shortcut = use_shortcut
        self.layers = nn.ModuleList([
            nn.Sequential(nn.Linear(layer_sizes[0], layer_sizes[1]),
                          GELU()),
            nn.Sequential(nn.Linear(layer_sizes[1], layer_sizes[2]),
                          GELU()),
            nn.Sequential(nn.Linear(layer_sizes[2], layer_sizes[3]),
                          GELU()),
            nn.Sequential(nn.Linear(layer_sizes[3], layer_sizes[4]),
                          GELU()),
            nn.Sequential(nn.Linear(layer_sizes[4], layer_sizes[5]),
                          GELU())
        ])

    def forward(self, x):
        for layer in self.layers:
            layer_output = layer(x)
            if self.use_shortcut and x.shape == layer_output.shape:
                x = x + layer_output
            else:
                x = layer_output
        return x

In [24]:
layer_sizes = [3, 3, 3, 3, 3, 1]
sample_input = torch.tensor([[1., 0., -1.]])
torch.manual_seed(123)
model_without_shortcut = ExampleDeepNeuralNetwork(layer_sizes, use_shortcut=False)
torch.manual_seed(123)
model_with_shortcut = ExampleDeepNeuralNetwork(layer_sizes, use_shortcut=True)

In [25]:
def print_gradients(model, x):
    output = model(x)
    target = torch.tensor([[0.]])

    loss = nn.MSELoss()
    loss = loss(output, target)

    loss.backward()

    for name, param in model.named_parameters():
        if 'weight' in name:
            print(f"{name} has gradient mean of {param.grad.abs().mean().item()}")

print("Model without shortcut:")
print_gradients(model_without_shortcut, sample_input)
print("\nModel with shortcut:")
print_gradients(model_with_shortcut, sample_input)

Model without shortcut:
layers.0.0.weight has gradient mean of 0.00020173587836325169
layers.1.0.weight has gradient mean of 0.00012011159560643137
layers.2.0.weight has gradient mean of 0.0007152039906941354
layers.3.0.weight has gradient mean of 0.0013988736318424344
layers.4.0.weight has gradient mean of 0.005049645435065031

Model with shortcut:
layers.0.0.weight has gradient mean of 0.22169792652130127
layers.1.0.weight has gradient mean of 0.20694106817245483
layers.2.0.weight has gradient mean of 0.32896995544433594
layers.3.0.weight has gradient mean of 0.2665732204914093
layers.4.0.weight has gradient mean of 1.3258540630340576


## A Transformer Block

In [26]:
# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x) # Shape: (b, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # Compute scaled dot-product attention (aka self-attention) with a causal mask
        attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head

        # Original mask truncated to the number of tokens and converted to boolean
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]

        # Use the mask to fill attention scores
        attn_scores.masked_fill_(mask_bool, -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Shape: (b, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(1, 2)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return context_vec

In [30]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Attention module
        self.att = MultiHeadAttention(d_in=cfg["emb_dim"], 
                                      d_out=cfg["emb_dim"],
                                      context_length=cfg["context_length"],
                                      num_heads=cfg["n_heads"],
                                      dropout=cfg["drop_rate"],
                                      qkv_bias=cfg["qkv_bias"])
        # Feed Forward module, which expand the network to 4 times bigger.
        self.ff = FeedForward(cfg)
        # Layer Norms modules
        self.norm1 = DummyLayerNorm(cfg["emb_dim"])
        self.norm2 = DummyLayerNorm(cfg["emb_dim"])
        # Dropout modules
        self.dropout_shortcut = nn.Dropout(cfg["drop_rate"])
    
    def forward(self, x):
        shortcut = x
        # Apply layer normalization before multi-head attention
        x = self.norm1(x)
        # Apply multi-head attention
        x = self.att(x)
        # Apply dropout after attention to prevent overfitting
        x = self.dropout_shortcut(x)
        # Add the shortcut connection after attention
        x = x + shortcut
    
        shortcut = x
        # Apply layer normalization after multi-head attention
        x = self.norm2(x)
        # Apply feed forward network after attention
        x = self.ff(x)
        x = self.dropout_shortcut(x)
        x = x + shortcut
        
        return x

In [31]:
torch.manual_seed(123)
x = torch.rand(2, 4, 768)
block = TransformerBlock(GPT_CONFIG_124M)
output = block(x)

print("Input shape:", x.shape)
print("Output shape:", output.shape)

Input shape: torch.Size([2, 4, 768])
Output shape: torch.Size([2, 4, 768])


## Assembling Transformer Blocks into a GPT Model

In [37]:
class GPTModel(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # Create input token embeddings
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"]) 
        # Create positional embeddings
        self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
        # Dropout layer
        self.drop_emb = nn.Dropout(cfg["drop_rate"])
        
        # Added transformer block
        self.trf_blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg["n_layers"])]
        )

        # Final layer normalization
        self.final_norm = DummyLayerNorm(cfg["emb_dim"])
        # Added final output layer
        self.out_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False)
        
    def forward(self, in_idx):
        """
        :param in_idx: Tokenized input tensor
        :return: output embeddings
        """
        batch_size, seq_len = in_idx.shape
        
        token_embeds = self.tok_emb(in_idx)
        pos_embeds = self.pos_emb(torch.arange(seq_len, device=in_idx.device))
        
        x = token_embeds + pos_embeds
        x = self.drop_emb(x)
        x = self.trf_blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        
        return logits

In [45]:
torch.manual_seed(123)
model = GPTModel(GPT_CONFIG_124M)

out = model(batch)
print("Input token batch:\n", batch)
print("\nOutput shape:", out.shape)
print(out)

total_params = sum(p.numel() for p in model.parameters())
print(f"\nTotal number of parameters: {total_params:,}")

print("\nToken embedding layer shape:", model.tok_emb.weight.shape)
print("Output layer shape:", model.out_head.weight.shape)

total_params_gpt2 = (
        total_params - sum(p.numel()
                           for p in model.out_head.parameters())
)
print(f"\nNumber of trainable parameters "
      f"considering weight tying: {total_params_gpt2:,}"
      )

total_size_bytes = total_params * 4
total_size_mb = total_size_bytes / (1024 * 1024)
print(f"\nTotal size of the model: {total_size_mb:.2f} MB")


Input token batch:
 tensor([[6109, 3626, 6100,  345],
        [6109, 1110, 6622,  257]])

Output shape: torch.Size([2, 4, 50257])
tensor([[[-1.1050, -0.0054, -0.9680,  ...,  3.0550,  2.3504, -2.9953],
         [-3.4404, -2.7845, -2.6117,  ...,  0.8947,  4.6135, -2.8290],
         [-0.1715, -0.8735,  2.6575,  ...,  0.9795,  1.5972, -3.4830],
         [-2.6278, -0.3963,  0.2422,  ...,  0.9697,  2.0820,  0.3534]],

        [[-2.8959,  2.0748, -0.4365,  ..., -0.4643,  2.6097, -1.5061],
         [-0.1044,  0.9152, -1.6846,  ...,  0.7598,  3.3212, -0.5796],
         [-0.3383,  2.1516, -1.7438,  ...,  2.1214,  2.1429, -0.7223],
         [-3.3636,  2.2360,  0.6698,  ...,  1.9665,  1.5848, -0.4326]]],
       grad_fn=<UnsafeViewBackward0>)

Total number of parameters: 162,971,136

Token embedding layer shape: torch.Size([50257, 768])
Output layer shape: torch.Size([50257, 768])

Number of trainable parameters considering weight tying: 124,373,760

Total size of the model: 621.69 MB


## Generating Texts

In [64]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    """  
    :param model: GPT model    :param idx: input    :param max_new_tokens: how many tokens to generate    :param context_size: size of tokens to consider for the context    :return:   
    """
    for _ in range(max_new_tokens):
        # Crops the current context to fit the model’s maximum context size  
        idx_cond = idx[:, -context_size:]
        # Generate the output logits  
        with torch.no_grad():
            logits = model(idx_cond)

        logits = logits[:, -1, :]
        probas = torch.softmax(logits, dim=-1)
        # Sample the token with the highest probability  
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)
        # Append the new token to the input  
        idx = torch.cat((idx, idx_next), dim=1)

    return idx

In [61]:
start_context = "Hello, I am"

# Tokenize the input (1 dimensional tensor)
encoded = tokenizer.encode(start_context)
print("encoded:", encoded)

# Convert the encoded tensor to a 2D tensor (a matrix)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encoded_tensor.shape)

encoded: [15496, 11, 314, 716]
encoded_tensor.shape: torch.Size([1, 4])


In [65]:
model.eval()
out = generate_text_simple(
    model=model,
    idx=encoded_tensor,
    max_new_tokens=6,
    context_size=GPT_CONFIG_124M["context_length"]
)
print("Output:", out)
print("Output length:", len(out[0]))

Output: tensor([[15496,    11,   314,   716, 34359,  2312, 48810, 36468,  8267, 36678]])
Output length: 10


In [66]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print(decoded_text)

Hello, I am RG Theseknife CalderILL 268
