In [None]:
# In[1]:
# =============================================================================
# Setup and Imports
# =============================================================================
import torch
import torch.nn as nn
from torch.nn import functional as F
import tiktoken
import numpy as np
import os
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2LMHeadModel # For loading pretrained weights

print(f"PyTorch version: {torch.__version__}")
# Set device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

PyTorch version: 2.6.0+cu124
Using device: cuda


In [None]:
# In[2]:
# =============================================================================
# Part 1: Core Architecture Implementation
# =============================================================================

# -----------------------------------------------------------------------------
# a) er("mask", torch.tril(torch.ones(config["context_length"], config["context_length"]))
                                     .view(1, 1, config["context_length"], config["context_length"]))

    def forward(self, x):
        B, T, C = x.size() # Batch size, sequence length, embedding dimensionality (C)

        # 1. Calculate query, key, values for all heads in batch and move head forward
        q, k, v  = self.c_attn(x).split(self.emb_dim, dim=2)
        k = k.view(B, T, selImplement the MultiHeadAttention class
# -----------------------------------------------------------------------------
class MultiHeadAttention(nn.Module):
    """
    Implements Multi-Head Causal Self-Attention as described in the GPT-2 paper.
    """
    def __init__(self, config):
        super().__init__()
        assert config["emb_dim"] % config["n_heads"] == 0
        self.emb_dim = config["emb_dim"]
        self.n_heads = config["n_heads"]
        self.head_dim = self.emb_dim // self.n_heads

        # Combined key, query, value projections
        self.c_attn = nn.Linear(self.emb_dim, 3 * self.emb_dim, bias=config["qkv_bias"])
        # Output projection
        self.c_proj = nn.Linear(self.emb_dim, self.emb_dim, bias=config["qkv_bias"])
        # Regularization
        self.attn_dropout = nn.Dropout(config["drop_rate"])
        self.resid_dropout = nn.Dropout(config["drop_rate"])

        # Causal mask to prevent attending to future tokens
        # Using register_buffer so it's part of the model's state but not a parameter
        self.register_bufff.n_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_heads, self.head_dim).transpose(1, 2) # (B, nh, T, hs)

        # 2. Causal self-attention
        # (B, nh, T, hs) @ (B, nh, hs, T) -> (B, nh, T, T)
        att = (q @ k.transpose(-2, -1)) * (1.0 / np.sqrt(k.size(-1)))
        # Apply causal mask
        att = att.masked_fill(self.mask[:,:,:T,:T] == 0, float('-inf'))
        # Apply softmax
        att = F.softmax(att, dim=-1)
        att = self.attn_dropout(att)

        # 3. Get weighted values
        y = att @ v # (B, nh, T, T) @ (B, nh, T, hs) -> (B, nh, T, hs)

        # 4. Re-assemble heads
        y = y.transpose(1, 2).contiguous().view(B, T, C) # (B, T, C)

        # 5. Output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

# -----------------------------------------------------------------------------
# b) Implement the TransformerBlock class
# -----------------------------------------------------------------------------
class TransformerBlock(nn.Module):
    """
    A single block of the transformer, including multi-head attention and a feed-forward network.
    """
    def __init__(self, config):
        super().__init__()
        self.ln_1 = nn.LayerNorm(config["emb_dim"])
        self.attn = MultiHeadAttention(config)
        self.ln_2 = nn.LayerNorm(config["emb_dim"])
        self.mlp = nn.Sequential(
            nn.Linear(config["emb_dim"], 4 * config["emb_dim"]),
            nn.GELU(),
            nn.Linear(4 * config["emb_dim"], config["emb_dim"]),
            nn.Dropout(config["drop_rate"])
        )

    def forward(self, x):
        # Residual connection around the attention layer
        x = x + self.attn(self.ln_1(x))
        # Residual connection around the MLP
        x = x + self.mlp(self.ln_2(x))
        return x

# -----------------------------------------------------------------------------
# c) Implement the GPTModel class
# -----------------------------------------------------------------------------
class GPTModel(nn.Module):
    """
    The full GPT-2 model architecture.
    """
    def __init__(self, config):
        super().__init__()
        self.config = config

        self.transformer = nn.ModuleDict(dict(
            # Token and positional embeddings
            wte = nn.Embedding(config["vocab_size"], config["emb_dim"]),
            wpe = nn.Embedding(config["context_length"], config["emb_dim"]),
            drop = nn.Dropout(config["drop_rate"]),
            # Stack of transformer blocks
            h = nn.ModuleList([TransformerBlock(config) for _ in range(config["n_layers"])]),
            # Final layer norm
            ln_f = nn.LayerNorm(config["emb_dim"]),
        ))
        # Language model head
        self.lm_head = nn.Linear(config["emb_dim"], config["vocab_size"], bias=False)
        # Weight tying: lm_head weights are tied with token embedding weights
        self.transformer.wte.weight = self.lm_head.weight

    def forward(self, idx, targets=None):
        B, T = idx.size()
        assert T <= self.config["context_length"], f"Cannot forward sequence of length {T}, max is {self.config['context_length']}"

        # 1. Get token and position embeddings
        pos = torch.arange(0, T, dtype=torch.long, device=idx.device).unsqueeze(0) # shape (1, T)
        tok_emb = self.transformer.wte(idx) # (B, T, C)
        pos_emb = self.transformer.wpe(pos) # (1, T, C)
        x = self.transformer.drop(tok_emb + pos_emb)

        # 2. Pass through transformer blocks
        for block in self.transformer.h:
            x = block(x)

        # 3. Final layer norm
        x = self.transformer.ln_f(x)

        # 4. Language model head
        logits = self.lm_head(x) # (B, T, vocab_size)

        # 5. Calculate loss if targets are provided
        loss = None
        if targets is not None:
            # Reshape for cross-entropy loss
            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)

        return logits, loss

print("Part 1: Core architecture implemented.")

Part 1: Core architecture implemented.


In [None]:
# In[3]:
# =============================================================================
# Part 2: Prepare for Training
# =============================================================================

# -----------------------------------------------------------------------------
# a) Instantiate your model using the GPT_CONFIG_124M
# -----------------------------------------------------------------------------
GPT_CONFIG_124M = {
    "vocab_size": 50257,      # Vocabulary size for GPT-2
    "context_length": 256,   # Context window size (reduced from 1024 for manageability)
    "emb_dim": 768,          # Embedding dimension
    "n_heads": 12,           # Number of attention heads
    "n_layers": 12,          # Number of transformer layers
    "drop_rate": 0.1,        # Dropout rate
    "qkv_bias": True         # Use bias in QKV projections (standard for GPT-2)
}

model_from_scratch = GPTModel(GPT_CONFIG_124M)
model_from_scratch.to(device)
print(f"Model instantiated with {sum(p.numel() for p in model_from_scratch.parameters())/1e6:.2f}M parameters.")

# -----------------------------------------------------------------------------
# b) Choose a small corpus and tokenize it
# -----------------------------------------------------------------------------
# For this demo, we'll use a small snippet of Shakespeare's "The Tempest"
# to keep training fast and demonstrate the model learns the style.
text_corpus = """
PROSPERO:
If thou more murmur'st, I will rend an oak
And peg thee in his knotty entrails till
Thou hast howl'd away twelve winters.

ARIEL:
Pardon, master;
I will be correspondent to command
And do my spriting gently.

PROSPERO:
Do so, and after two days
I will discharge thee.

ARIEL:
That's my noble master!
What shall I do? say what; what shall I do?

PROSPERO:
Go make thyself like a nymph o' the sea: be subject
To no sight but thine and mine, invisible
To every eyeball else. Go take this shape
And hither come in't: go, hence with diligence!
"""

# Initialize tokenizer
tokenizer = tiktoken.get_encoding("gpt2")
tokenized_text = tokenizer.encode(text_corpus)
print(f"Corpus length: {len(text_corpus)} characters, {len(tokenized_text)} tokens.")

# Create a simple dataset
class TextDataset(Dataset):
    def __init__(self, tokens, context_length):
        self.tokens = tokens
        self.context_length = context_length

    def __len__(self):
        return len(self.tokens) - self.context_length

    def __getitem__(self, idx):
        # Grab a chunk of tokens
        chunk = self.tokens[idx:idx + self.context_length + 1]
        x = torch.tensor(chunk[:-1], dtype=torch.long)
        y = torch.tensor(chunk[1:], dtype=torch.long)
        return x, y

# Prepare DataLoader
context_length = 64 # Use a smaller context length for training to speed things up
batch_size = 4
train_dataset = TextDataset(tokenized_text, context_length)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

print(f"DataLoader created with {len(train_loader)} batches.")


# -----------------------------------------------------------------------------
# c) Train your GPT model on the dataset
# -----------------------------------------------------------------------------
print("\nStarting training...")
optimizer = torch.optim.AdamW(model_from_scratch.parameters(), lr=1e-4)
epochs = 100 # Train for more epochs on this tiny dataset
log_interval = 10

model_from_scratch.train()
for epoch in range(epochs):
    total_loss = 0
    for i, (x, y) in enumerate(train_loader):
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        logits, loss = model_from_scratch(x, y)

        if loss is not None:
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

    if (epoch + 1) % log_interval == 0:
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs} | Average Loss: {avg_loss:.4f}")

print("Training finished.")

# Save the trained model weights
output_dir = "./trained_gpt"
os.makedirs(output_dir, exist_ok=True)
torch.save(model_from_scratch.state_dict(), os.path.join(output_dir, "custom_gpt_model.pth"))
print(f"Model weights saved to {output_dir}/custom_gpt_model.pth")

Model instantiated with 123.85M parameters.
Corpus length: 550 characters, 176 tokens.
DataLoader created with 28 batches.

Starting training...
Epoch 10/100 | Average Loss: 4.0904
Epoch 20/100 | Average Loss: 4.0252
Epoch 30/100 | Average Loss: 4.0154
Epoch 40/100 | Average Loss: 4.0202
Epoch 50/100 | Average Loss: 3.9873
Epoch 60/100 | Average Loss: 3.8683
Epoch 70/100 | Average Loss: 2.9654
Epoch 80/100 | Average Loss: 1.1839
Epoch 90/100 | Average Loss: 0.6015
Epoch 100/100 | Average Loss: 0.4516
Training finished.
Model weights saved to ./trained_gpt/custom_gpt_model.pth


In [None]:
# In[4]:
# =============================================================================
# Part 3: Inference and Demonstration
# =============================================================================

# -----------------------------------------------------------------------------
# a) Implement the generation function
# -----------------------------------------------------------------------------
def generate(model, input_ids, max_new_tokens=20, temperature=1.0, top_k=None):
    """
    Generates text by iteratively predicting the next token.
    """
    model.eval()
    context_length = model.config["context_length"]

    with torch.no_grad():
        for _ in range(max_new_tokens):
            # Crop context if it exceeds the model's supported length
            idx_cond = input_ids if input_ids.size(1) <= context_length else input_ids[:, -context_length:]

            # Get logits for the next token
            logits, _ = model(idx_cond)
            # Focus only on the last time step
            logits = logits[:, -1, :] / temperature

            # Optional: Top-k sampling
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')

            # Apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1)
            # Sample from the distribution
            next_token_id = torch.multinomial(probs, num_samples=1)

            # Append the new token to the sequence
            input_ids = torch.cat((input_ids, next_token_id), dim=1)

    return input_ids

# -----------------------------------------------------------------------------
# b) Demonstrate your model by generating completions
# -----------------------------------------------------------------------------
print("\n--- Generating with Custom-Trained Model ---")
prompts = ["PROSPERO:", "ARIEL:\nPardon, master;"]

for prompt_text in prompts:
    input_ids = tokenizer.encode(prompt_text, allowed_special={'<|endoftext|>'})
    input_tensor = torch.tensor(input_ids, dtype=torch.long, device=device).unsqueeze(0)

    print(f"\nPrompt: '{prompt_text}'")

    # Generate completion
    output_tensor = generate(model_from_scratch, input_tensor, max_new_tokens=30, temperature=0.8, top_k=10)

    # Decode and print
    generated_text = tokenizer.decode(output_tensor[0].tolist())
    print("Generated Completion:")
    print(generated_text)
    print("-" * 30)

# -----------------------------------------------------------------------------
# c) Save the trained model and configuration
# -----------------------------------------------------------------------------
def save_model(model, config, file_path):
    """Saves model state_dict and config together."""
    state = {
        "config": config,
        "state_dict": model.state_dict()
    }
    torch.save(state, file_path)
    print(f"Model and config saved to {file_path}")

def load_model(file_path):
    """Loads model and config, then instantiates the model."""
    state = torch.load(file_path, map_location=device)
    config = state["config"]
    model = GPTModel(config)
    model.load_state_dict(state["state_dict"])
    model.to(device)
    print(f"Model loaded from {file_path}")
    return model, config

# Demonstrate save/load
save_path = os.path.join(output_dir, "full_model_package.pth")
save_model(model_from_scratch, GPT_CONFIG_124M, save_path)
loaded_model, loaded_config = load_model(save_path)


--- Generating with Custom-Trained Model ---

Prompt: 'PROSPERO:'
Generated Completion:
PROSPERO:
Do shall I do? say what; what shall I do?

PROSPERO:
Go make thyself like a nymph
------------------------------

Prompt: 'ARIEL:
Pardon, master;'
Generated Completion:
ARIEL:
Pardon, master;

ARIEL:
That's my noble master!
What shall I do? say what; what shall I do?

PROS
------------------------------
Model and config saved to ./trained_gpt/full_model_package.pth
Model loaded from ./trained_gpt/full_model_package.pth
