# Implementing a GPT model from Scratch

In [24]:
from transformers import GPT2Tokenizer

# Initialize the BPE tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

text = "Every day is your"
encoded_input = tokenizer.encode(text, return_tensors='pt')  # Returns a tensor
print(f"Encoded input: {encoded_input}")


Encoded input: tensor([[6109, 1110,  318,  534]])


In [25]:
import torch
import torch.nn as nn
import math


# Configuration for GPT-2 124M
GPT_CONFIG_124M = {
    "vocab_size": 50257,    # Vocabulary size
    "context_length": 1024, # Context length
    "emb_dim": 768,         # Embedding dimension
    "n_heads": 12,          # Number of attention heads
    "n_layers": 12,         # Number of layers
    "drop_rate": 0.1,       # Dropout rate
    "qkv_bias": False       # Query-Key-Value bias
}

We use short variable names to keep the code compact and maintain readability. Here's a breakdown of key variables used in the model:

- **`vocab_size`**: Defines the vocabulary size, which in our case is 50,257 tokens, as determined by the BPE tokenizer.
- **`context_length`**: Represents the model's maximum input token length, constrained by the positional embeddings.
- **`emb_dim`**: Refers to the embedding size for token inputs, where each input token is converted into a 768-dimensional vector.
- **`n_heads`**: Specifies the number of attention heads in the multi-head attention mechanism.
- **`n_layers`**: Indicates the number of transformer blocks in the model, which will be implemented in the following sections.
- **`drop_rate`**: Determines the intensity of the dropout mechanism, with a value of 0.1 implying that 10% of hidden units are dropped during training to reduce overfitting.
- **`qkv_bias`**: Controls whether a bias vector is added in the Linear layers of the multi-head attention mechanism when computing the query (Q), key (K), and value (V) tensors. We disable this feature by default, following common practice in modern large language models. We'll revisit this decision when loading pretrained GPT-2 weights from OpenAI.


### Let's break down the GPT-2 model into its main components and then implement each block step by step.

#### **GPT-2 Model Breakdown:**

1. **Token Embedding**
2. **Positional Encoding**
3. **Transformer Blocks**</br>
   -a. Multi-Head Attention  
   -b. Layer Normalization  
   -c. Feed-Forward Neural Network
4. **Output Layer**

---

### Let's start by implementing each component:

#### **1. Token Embedding**


In [26]:
# 1. Embedding Layer
class Embedding(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super().__init__()
        # Initialize the embedding layer with the specified vocabulary size and embedding dimension
        self.embed = nn.Embedding(vocab_size, embed_size)
    
    def forward(self, x):
        # Forward pass: convert input token IDs to their corresponding embeddings
        return self.embed(x)

# Test Embedding
# Create an instance of the Embedding layer using the configuration values
embedding = Embedding(GPT_CONFIG_124M["vocab_size"], GPT_CONFIG_124M["emb_dim"])

# Generate random input token IDs with shape (batch_size, seq_length)
input_ids = torch.randint(0, GPT_CONFIG_124M["vocab_size"], (2, 10))

# Apply the embedding layer to the input token IDs
embed_output = embedding(input_ids)

# Print the shape of the output embeddings
print(f"Embedding output shape: {embed_output.shape}")

# Assert that the output shape matches the expected shape
# Expected shape: (batch_size, seq_length, embed_size)
assert embed_output.shape == (2, 10, GPT_CONFIG_124M["emb_dim"]), "Embedding shape mismatch"

Embedding output shape: torch.Size([2, 10, 768])


#### **2. Positional Encoding**

In [27]:
# 2. Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_seq_length=512):
        super().__init__()
        # Initialize a tensor to hold the positional encodings
        pe = torch.zeros(max_seq_length, embed_size)
        
        # Create a tensor for positions (0 to max_seq_length)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        
        # Calculate the division term for the sine and cosine functions
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * -(math.log(10000.0) / embed_size))
        
        # Apply sine to even indices and cosine to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)  # Sine for even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Cosine for odd indices
        
        # Register the positional encodings as a buffer (not a model parameter)
        self.register_buffer('pe', pe.unsqueeze(0))  # Shape: (1, max_seq_length, embed_size)

    def forward(self, x):
        # Add the positional encodings to the input embeddings
        return x + self.pe[:, :x.size(1)]

# Test Positional Encoding
# Create an instance of the PositionalEncoding layer using the configuration values
pos_encoding = PositionalEncoding(GPT_CONFIG_124M["emb_dim"], GPT_CONFIG_124M["context_length"])

# Apply the positional encoding to the output of the embedding layer
pos_output = pos_encoding(embed_output)

# Print the shape of the output after adding positional encodings
print(f"Positional Encoding output shape: {pos_output.shape}")

# Assert that the output shape matches the expected shape
assert pos_output.shape == embed_output.shape, "Positional Encoding shape mismatch"

Positional Encoding output shape: torch.Size([2, 10, 768])


#### **3. Transformer Block**

##### **A. Multi-Head Attention**

In [28]:
# A. Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads, qkv_bias=False):
        super().__init__()
        self.embed_size = embed_size
        self.num_heads = num_heads
        self.head_dim = embed_size // num_heads
        
        self.query = nn.Linear(embed_size, embed_size, bias=qkv_bias)
        self.key = nn.Linear(embed_size, embed_size, bias=qkv_bias)
        self.value = nn.Linear(embed_size, embed_size, bias=qkv_bias)
        self.out = nn.Linear(embed_size, embed_size)
        
    def forward(self, x, mask=None):
        batch_size = x.shape[0]
        
        q = self.query(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        k = self.key(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.value(x).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        
        attention = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(self.head_dim)
        if mask is not None:
            attention = attention.masked_fill(mask == 0, float('-inf'))
        attention = torch.softmax(attention, dim=-1)
        
        out = torch.matmul(attention, v)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.embed_size)
        return self.out(out)

# Test Multi-Head Attention
mha = MultiHeadAttention(GPT_CONFIG_124M["emb_dim"], GPT_CONFIG_124M["n_heads"])
mha_output = mha(pos_output)
print(f"Multi-Head Attention output shape: {mha_output.shape}")
assert mha_output.shape == pos_output.shape, "Multi-Head Attention shape mismatch"


Multi-Head Attention output shape: torch.Size([2, 10, 768])


##### **B. LayerNorm**

In [29]:
# 4. Layer Normalization (Just for explanation, we used nn.LayerNorm later)
class LayerNorm(nn.Module):
    def __init__(self, emb_dim, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.scale = nn.Parameter(torch.ones(emb_dim))
        self.shift = nn.Parameter(torch.zeros(emb_dim))

    def forward(self, x):
        # Calculate mean and variance
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        
        # Normalize the input
        norm_x = (x - mean) / torch.sqrt(var + self.eps)
        
        # Scale and shift
        return self.scale * norm_x + self.shift

In [23]:
# Test LayerNorm
emb_dim = GPT_CONFIG_124M["emb_dim"]
layer_norm = LayerNorm(emb_dim)

# Create a random input tensor with shape (batch_size, seq_length, emb_dim)
batch_size = 2
seq_length = 10
input_tensor = torch.randn(batch_size, seq_length, emb_dim)

# Apply LayerNorm
ln_output = layer_norm(input_tensor)

# Check output shape
print(f"LayerNorm output shape: {ln_output.shape}")
assert ln_output.shape == input_tensor.shape, "LayerNorm output shape mismatch"

# Check if the mean and variance of the output are approximately 0 and 1
output_mean = ln_output.mean(dim=-1)
output_var = ln_output.var(dim=-1, unbiased=False)

# Check mean
assert torch.allclose(output_mean, torch.zeros(batch_size, seq_length), atol=1e-6), "Mean is not close to 0"

# Check variance
assert torch.allclose(output_var, torch.ones(batch_size, seq_length), atol=1e-6), "Variance is not close to 1"

print("LayerNorm test passed!")

LayerNorm output shape: torch.Size([2, 10, 768])
LayerNorm test passed!


##### **C. Feed-Forward Network**

In [30]:
# 5. Feed-Forward Network
class FeedForward(nn.Module):
    def __init__(self, embed_size, ff_hidden_size):
        super().__init__()
        # First linear layer that transforms input from embedding size to hidden size
        self.fc1 = nn.Linear(embed_size, ff_hidden_size)
        # Second linear layer that transforms from hidden size back to embedding size
        self.fc2 = nn.Linear(ff_hidden_size, embed_size)
        # GELU activation function
        self.gelu = nn.GELU()

    def forward(self, x):
        # Forward pass: apply the first linear layer, then GELU activation, and finally the second linear layer
        return self.fc2(self.gelu(self.fc1(x)))

# Test Feed-Forward Network
# Define the hidden size for the feed-forward network (4 times the embedding size)
ff_hidden_size = GPT_CONFIG_124M["emb_dim"] * 4
# Create an instance of the FeedForward network
ff = FeedForward(GPT_CONFIG_124M["emb_dim"], ff_hidden_size)

# Apply the FeedForward network to the output of the multi-head attention layer
ff_output = ff(mha_output)

# Print the shape of the output after applying the FeedForward network
print(f"Feed-Forward output shape: {ff_output.shape}")

# Assert that the output shape matches the expected shape
assert ff_output.shape == mha_output.shape, "Feed-Forward shape mismatch"

Feed-Forward output shape: torch.Size([2, 10, 768])


##### **A,B & C combined into Transformer Block**

In [31]:
# 6. Transformer Block
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_size, dropout=0.1, qkv_bias=False):
        super().__init__()
        # Initialize the multi-head attention layer
        self.mha = MultiHeadAttention(embed_size, num_heads, qkv_bias)
        # Initialize the feed-forward network
        self.ff = FeedForward(embed_size, ff_hidden_size)
        # Initialize layer normalization for the attention output
        self.ln1 = nn.LayerNorm(embed_size)
        # Initialize layer normalization for the feed-forward output
        self.ln2 = nn.LayerNorm(embed_size)
        # Initialize dropout layer
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Apply multi-head attention and add the residual connection, followed by layer normalization
        attention_output = self.ln1(x + self.dropout(self.mha(x, mask)))
        # Apply feed-forward network and add the residual connection, followed by layer normalization
        ff_output = self.ln2(attention_output + self.dropout(self.ff(attention_output)))
        return ff_output

# Test Transformer Block
# Create an instance of the TransformerBlock using the configuration values
transformer = TransformerBlock(GPT_CONFIG_124M["emb_dim"], GPT_CONFIG_124M["n_heads"], ff_hidden_size)

# Apply the transformer block to the output of the positional encoding layer
transformer_output = transformer(pos_output)

# Print the shape of the output after applying the transformer block
print(f"Transformer Block output shape: {transformer_output.shape}")

# Assert that the output shape matches the expected shape
assert transformer_output.shape == pos_output.shape, "Transformer Block shape mismatch"

Transformer Block output shape: torch.Size([2, 10, 768])


##### **4. GPT-2 Model**

In [32]:
# 7. GPT-2 Model
class GPT2(nn.Module):
    def __init__(self, config):
        super().__init__()
        # Initialize the embedding layer to convert token IDs to embeddings
        self.embedding = Embedding(config["vocab_size"], config["emb_dim"])
        
        # Initialize positional encoding to add positional information to embeddings
        self.positional_encoding = PositionalEncoding(config["emb_dim"], config["context_length"])
        
        # Create a list of transformer blocks
        self.transformer_blocks = nn.ModuleList([
            # Each transformer block consists of multi-head attention and feed-forward layers
            TransformerBlock(config["emb_dim"], config["n_heads"], config["emb_dim"] * 4, config["drop_rate"], config["qkv_bias"])
            for _ in range(config["n_layers"])  # Repeat for the number of layers specified in the config
        ])
        
        # Final linear layer to project the output back to the vocabulary size for logits
        self.fc_out = nn.Linear(config["emb_dim"], config["vocab_size"])
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(config["drop_rate"])

    def forward(self, x, mask=None):
        # Step 1: Convert input token IDs to embeddings and add positional encodings
        x = self.dropout(self.positional_encoding(self.embedding(x)))
        
        # Step 2: Pass the embeddings through each transformer block
        for block in self.transformer_blocks:
            x = block(x, mask)  # Apply the transformer block with optional masking
        
        # Step 3: Project the final output to the vocabulary size
        return self.fc_out(x)  # Shape: (batch_size, seq_length, vocab_size)

# Test GPT-2 Model
# Create an instance of the GPT-2 model using the configuration values
model = GPT2(GPT_CONFIG_124M)

# Generate random input token IDs with shape (batch_size, seq_length)
input_ids = torch.randint(0, GPT_CONFIG_124M["vocab_size"], (2, 64))

# Apply the model to the input token IDs
output = model(input_ids)

# Print the shape of the output from the model
print(f"GPT-2 Model output shape: {output.shape}")

# Assert that the output shape matches the expected shape
assert output.shape == (2, 64, GPT_CONFIG_124M["vocab_size"]), "GPT-2 Model shape mismatch"

GPT-2 Model output shape: torch.Size([2, 64, 50257])


## Generate output using GPT-2 Model

The modle is not trained yet, but lets try to generate 5 new tokens for our text using GPT-2 untrained model

In [45]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    # idx is (batch, n_tokens) array of indices in the current context
    for _ in range(max_new_tokens):
        
        # Crop current context if it exceeds the supported context size
        # E.g., if LLM supports only 5 tokens, and the context size is 10
        # then only the last 5 tokens are used as context
        idx_cond = idx[:, -context_size:]
        
        # Get the predictions
        with torch.no_grad():
            logits = model(idx_cond)
        
        # Focus only on the last time step
        # (batch, n_tokens, vocab_size) becomes (batch, vocab_size)
        logits = logits[:, -1, :]  

        # Apply softmax to get probabilities
        probas = torch.softmax(logits, dim=-1)  # (batch, vocab_size)

        # Get the idx of the vocab entry with the highest probability value
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)  # (batch, 1)

        # Append sampled index to the running sequence
        idx = torch.cat((idx, idx_next), dim=1)  # (batch, n_tokens+1)

    return idx

In [37]:
start_context = "Every day is your"

encoded = tokenizer.encode(start_context)
print("encoded:", encoded)

encoded_tensor = torch.tensor(encoded).unsqueeze(0)
print("encoded_tensor.shape:", encoded_tensor.shape)

encoded: [6109, 1110, 318, 534]
encoded_tensor.shape: torch.Size([1, 4])


In [42]:
model.eval() # disable dropout

out = generate_text_simple(
    model=model,
    idx=encoded_tensor, 
    max_new_tokens=5, 
    context_size=GPT_CONFIG_124M["context_length"]
)

print("Output:", out)
print("Output length:", len(out[0]))

Output: tensor([[ 6109,  1110,   318,   534, 43407, 42861, 32666, 17714, 43560]])
Output length: 9


In [43]:
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print(decoded_text)

Every day is yourriterlatest Basinarin harb


In [51]:
def generate_text_simple(model, idx, max_new_tokens, context_size):
    """
    Generate text using the provided model by predicting the next tokens based on the current context.

    Parameters:
    - model: The language model used for generating text.
    - idx: A tensor of shape (batch, n_tokens) containing the current context token indices.
    - max_new_tokens: The maximum number of new tokens to generate.
    - context_size: The maximum number of tokens to consider from the context.

    Returns:
    - idx: The updated tensor containing the original context and the newly generated tokens.
    """
    # Loop to generate the specified number of new tokens
    for _ in range(max_new_tokens):
        # Step 1: Prepare the context
        # Crop the current context to the last 'context_size' tokens
        idx_cond = idx[:, -context_size:]  # Shape: (batch, context_size)

        # Step 2: Get model predictions
        with torch.no_grad():  # Disable gradient calculation for inference
            logits = model(idx_cond)  # Shape: (batch, n_tokens, vocab_size)

        # Step 3: Focus on the last time step's predictions
        logits = logits[:, -1, :]  # Shape: (batch, vocab_size)

        # Step 4: Convert logits to probabilities using softmax
        probas = torch.softmax(logits, dim=-1)  # Shape: (batch, vocab_size)

        # Step 5: Get the index of the token with the highest probability
        idx_next = torch.argmax(probas, dim=-1, keepdim=True)  # Shape: (batch, 1)

        # Step 6: Append the predicted token index to the sequence
        idx = torch.cat((idx, idx_next), dim=1)  # Shape: (batch, n_tokens + 1)

    return idx  # Return the updated sequence of token indices

# Initial context for text generation
start_context = "Every day is your"

# Step 1: Encode the initial context into token indices
encoded = tokenizer.encode(start_context)
print("Encoded:", encoded)

# Step 2: Convert the encoded list into a tensor and add a batch dimension
encoded_tensor = torch.tensor(encoded).unsqueeze(0)  # Shape: (1, n_tokens)
print("Encoded tensor shape:", encoded_tensor.shape)

# Set the model to evaluation mode to disable dropout
model.eval()

# Step 3: Generate new tokens based on the initial context
out = generate_text_simple(
    model=model,
    idx=encoded_tensor, 
    max_new_tokens=5, 
    context_size=GPT_CONFIG_124M["context_length"]
)

# Step 4: Print the output tensor and its length
print("Output:", out)
print("Output length:", len(out[0]))

# Step 5: Decode the generated token indices back into text
decoded_text = tokenizer.decode(out.squeeze(0).tolist())
print("Decoded text:", decoded_text)

Encoded: [6109, 1110, 318, 534]
Encoded tensor shape: torch.Size([1, 4])
Output: tensor([[ 6109,  1110,   318,   534, 43407, 42861, 32666, 17714, 43560]])
Output length: 9
Decoded text: Every day is yourriterlatest Basinarin harb
