<a href="https://colab.research.google.com/github/oscarB1nar10/NLP/blob/main/gpt_from_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Mini gpt (character-level) implemented from scratch in Pythorch

In [None]:
import math
import argparse
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
# --------
# Config dataclass
# --------
@dataclass
class GPTConfig:
  vocab_size: int
  n_embd: int = 256 # Model width
  n_head: int = 8 # Number of attention heads
  n_layer: int = 6 # Number of transformer blocks
  sequence_length: int = 256 # Max context lenght (sequence lenght)
  dropout: float = 0.1


# -------------
# Tokenizer (char-level)
# ---------------
class CharTokenizer:
  def __init__(self, text: str):
    chars = sorted(list(set(text)))
    self.stoi = {ch: i for i, ch in enumerate(chars)}
    self.itos = {i: ch for i, ch in enumerate(chars)}
    self.vocab_size = len(chars)

  def encode(self, s: str):
    return [self.stoi[char] for char in s]

  def decode(self, ids):
    return ''.join(str(self.itos[i]) for i in ids)

In [None]:
# Example of use
ct = CharTokenizer("hello")
encoding = ct.encode("hello")
print(encoding)
print(type(encoding))
decoding = ct.decode(encoding)
print(type(decoding))
print(decoding)

[1, 0, 2, 2, 3]
<class 'list'>
<class 'str'>
hello


In [None]:
# -------------
# Dataset utilities
# -------------
class TextDataset(torch.utils.data.Dataset):
  def __init__(self, data: torch.Tensor, sequence_length: int):
    self.data = data
    self.sequence_length = sequence_length

  # The last usable starting index for a full-length chunck
  # Example:
  # data = [0, 1, 2, 3, 4, 5, 6]
  # sequence_length = 4
  # len(data) - sequence_length = 7 - 4 = 3
  # The dataset yields:
  # idx = 0 → chunk [0, 1, 2, 3, 4]
  # idx = 1 → chunk [1, 2, 3, 4, 5]
  # idx = 2 → chunk [2, 3, 4, 5, 6]
  # If we allowed idx = 3, it would try to slice [3:8] — but there’s no element 7, so it would break.
  def __len__(self):
    return len(self.data) - self.sequence_length

  # Get the input and the target
  # Example:
  # chunk = [a, b, c, d, e]
  # x = [a, b, c, d]
  # y = [b, c, d, e]
  def __getitem__(self, idx):
    chunk = self.data[idx: idx + self.sequence_length + 1]
    x = chunk[:-1]
    y = chunk[1:]
    return x, y

In [None]:
# Example of use
# Integers representing token IDs
data = torch.arange(20)
sequence_length = 4

dataset = TextDataset(data, sequence_length)

# Check length
print("Dataset length: ", len(dataset)) # len(data) - sequence_length

# Inspect some samples
for i in range(4):
  x, y = dataset[i]
  print(f"Sample {i}:")
  print("x:", x)
  print("y:", y)

Dataset length:  16
Sample 0:
x: tensor([0, 1, 2, 3])
y: tensor([1, 2, 3, 4])
Sample 1:
x: tensor([1, 2, 3, 4])
y: tensor([2, 3, 4, 5])
Sample 2:
x: tensor([2, 3, 4, 5])
y: tensor([3, 4, 5, 6])
Sample 3:
x: tensor([3, 4, 5, 6])
y: tensor([4, 5, 6, 7])


In [None]:
# ------------
# Model components
# -------------
class LayerNorm(nn.Module):
  def __init__(self, n_embd, eps=1e-5):
    super().__init__()
    # Learnable parameter, this parameter scales the normalized output
    self.gamma = nn.Parameter(torch.ones(n_embd))
    # Learnable parameter, this parameter shift the distribution back to any mean
    # the model needs.
    self.beta = nn.Parameter(torch.zeros(n_embd))
    self.eps = eps

  def forward(self, x):
    mean = x.mean(dim=-1, keepdim=True)
    var = x.var(dim=-1, unbiased=False, keepdim=True)
    x_hat = (x - mean) / torch.sqrt(var + self.eps)
    return self.gamma * x_hat + self.beta

In [None]:
# Example of use
n_embd = 4
sequence_length = 3

x = torch.tensor([
    [[1.0, 2.0, 3.0, 4.0],
     [2.0, 2.0, 2.0, 2.0],
     [0.0, -1.0, 1.0, 2.0]],
    [[3.0, 3.0, 3.0, 3.0],
     [4.0, 5.0, 6.0, 7.0],
     [1.0, 2.0, 3.0, 4.0]]
])

ln = LayerNorm(n_embd)

# Apply normalization
out = ln(x)

print("Input:\n", x)
print("\nAfter LayerNorm:\n", out)

Input:
 tensor([[[ 1.,  2.,  3.,  4.],
         [ 2.,  2.,  2.,  2.],
         [ 0., -1.,  1.,  2.]],

        [[ 3.,  3.,  3.,  3.],
         [ 4.,  5.,  6.,  7.],
         [ 1.,  2.,  3.,  4.]]])

After LayerNorm:
 tensor([[[-1.3416, -0.4472,  0.4472,  1.3416],
         [ 0.0000,  0.0000,  0.0000,  0.0000],
         [-0.4472, -1.3416,  0.4472,  1.3416]],

        [[ 0.0000,  0.0000,  0.0000,  0.0000],
         [-1.3416, -0.4472,  0.4472,  1.3416],
         [-1.3416, -0.4472,  0.4472,  1.3416]]], grad_fn=<AddBackward0>)


In [None]:
class Head(nn.Module):
  def __init__(self, n_embd, head_size, sequence_length, dropout):
    super().__init__()
    # We are projecting (or linearly mapping) each token's embedding vector
    # from the full embedding dimension (for instance n_embd = 256) down
    # into the smaller subspace (for instance head_size = 8)
    # head_size = n_embd/n_head
    self.key = nn.Linear(n_embd, head_size, bias=False)
    self.query = nn.Linear(n_embd, head_size, bias=False)
    self.value = nn.Linear(n_embd, head_size, bias=False)
    # torch.ones(sequence_length, sequence_length) creates a square matrix full of 1's
    # torch.tril(...)
    # keeps only the lower-triangular part of the matrix (including the diagonal)
    # Example:
    # [[1, 0, 0, 0],
    # [1, 1, 0, 0],
    # [1, 1, 1, 0],
    # [1, 1, 1, 1]]

    # self.register_buffer('tril', ...) stores the matrix as a non-trainable buffer inside the model.
    # it becomes part of the model's state (so it's saved with state_dict())
    # it is not a parameter (so it doesn't get gradients or updates)
    self.register_buffer('tril', torch.tril(torch.ones(sequence_length, sequence_length)))
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # B = (Batch size) Number of sequences (samples) processed in parallel.
    # T = (Sequence length (tokens per sample)) how many tokens we process at once (context window).
    # C = Channel (embedding dimension) how many features represent each token.
    B, T, C = x.shape
    # k=x@WkT , x has shape (B, T, C), Wk​ has shape(head_size, n_embd)
    k = self.key(x) # Projection of x, shape (B, T, head_size)
    q = self.query(x) # Projection of x, shape (B, T, head_size)
    # Compute the attention score
    # k.transpose(-2, -1) swaps the last two dimensions (B, T, C_head) @ (B, C_head, T) = (B, T, T)
    # k.size(-1) gives the dimension of the last axis, which is head_size
    att = q @ k.transpose(-2, -1) * (1.0 / math.sqrt(k.size(-1)))
    # self.tril is a lower triangular matrix like:
    # [[1, 0, 0, 0],
    # [1, 1, 0, 0],
    # [1, 1, 1, 0],
    # [1, 1, 1, 1]]
    # self.tril[:T, :T] == 0 creates a boolean mask marking future positions (upper triangle) as True
    # masked_fill(..., float('-inf')) replaces those future positions in the attention matrix with -inf
    # [[  a, -inf, -inf],
    # [  b,   c, -inf],
    # [  d,   e,   f]]
    att = att.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
    # Turns those raw scores into normalized probabilities across each row.
    # How much attention a token shoul give to each previous token.
    # All those -inf entries become zero probability (since exp(-inf) = 0).
    att = F.softmax(att, dim=-1)
    att = self.dropout(att)
    v = self.value(x) # Projection of x, shape (B, T, head_size)
    # We use these probabilities to take the weighted average of all value vectors
    # This gives each token a context vector, a blend of information from tokens it attends to most.
    out = att @ v # (B, T, T) @ (B, T, head_size) = (B, T, head_size)
    return out

In [None]:
# Example of use
n_embd = 16
head_size = 8
sequence_length = 4
dropout = 0.0

# Create the attention head
head = Head(n_embd, head_size, sequence_length, dropout)

# Dummy input: batch of 2 sequences of 4 tokens, each with 16-dim embeddings
x = torch.randn(2, sequence_length, n_embd)

# Forward pass
out = head(x)

print("Input shape: ", x.shape)
print("Output shape: ", out.shape)
print("Output tensor:\n", out)

Input shape:  torch.Size([2, 4, 16])
Output shape:  torch.Size([2, 4, 8])
Output tensor:
 tensor([[[ 1.6061, -0.0086, -0.0920,  0.4729, -0.4567, -0.3594,  0.5369,
          -0.3031],
         [-0.4740, -0.1296,  0.2600,  0.1935,  0.5968,  0.4383, -0.3777,
           0.9747],
         [ 0.8660, -0.3656, -0.0132, -0.1348, -0.2070, -0.0839,  0.2118,
          -0.1921],
         [ 0.3591, -0.3880,  0.1927, -0.1519, -0.1711, -0.0327,  0.1570,
          -0.1644]],

        [[ 0.1638,  0.3166, -1.1672, -1.5984,  0.0323,  0.3819, -0.7390,
           0.7006],
         [-0.0830,  0.6119, -0.2642, -0.8969,  0.2198,  0.2931, -0.4022,
           1.1160],
         [ 0.0759,  0.7122, -0.4415, -0.5050,  0.0148,  0.2933, -0.4520,
           0.7690],
         [-0.1249,  0.5125,  0.2879, -0.2315,  0.3353,  0.3216, -0.1415,
           0.6094]]], grad_fn=<UnsafeViewBackward0>)


In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, n_embd, n_head, sequence_length, dropout):
    super().__init__()
    # Splits the embedding vector into n_head equal parts so each attention head
    # works on one slice
    head_size = n_embd // n_head

    # List of n_head self-attention heads so the model can learn multiple types of
    # relationships in parallel
    # nn.ModuleList is like a Python list, but for PyTorch layers. It tells PyTorch
    # that these are submodules and include their parameters in training.
    self.heads = nn.ModuleList([
        Head(n_embd, head_size, sequence_length, dropout) for _ in range(n_head)
    ])
    # Linear projection after concatenating all the heads.
    # It recombines their outputs and lets the model learn
    # how to best fuse the information across heads.
    self.proj = nn.Linear(n_embd, n_embd)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    # h(x) for h in self.heads: Runs the forward pass of each attention head individually.
    # Each head returns an output tensor of shape (B, T, H)
    # where:
    # B = batch size
    # T = sequence length
    # H = head size (i.e., embedding size per head)
    # torch.cat(..., dim=-1): Concatenates the results of all heads along the last dimension
    # (the embedding dimension).
    # If we have n_head heads, each of size H, the concatenated tensor has shape (B, T, n_head x H)
    # this restores the same total embedding size n_embd
    out = torch.cat([h(x) for h in self.heads], dim =-1)
    # It recombines their outputs and lets the model learn
    out = self.proj(out)
    out = self.dropout(out)
    return out

In [None]:
# Example of use
n_embd = 64 # Embedding dimension
n_head = 8 # Number of attention heads
sequence_length = 16 # Context window
dropout = 0.1

# Create a random batch of embeddings (e.g., batch of 2 sequences of 16 tokens each)
x = torch.randn(2, sequence_length, n_embd)

mha = MultiHeadAttention(n_embd, n_head, sequence_length, dropout)
# Run forward pass
out = mha(x)

# Both should match
print("Input shape:", x.shape)
print("Output shape:", out.shape)


Input shape: torch.Size([2, 16, 64])
Output shape: torch.Size([2, 16, 64])


In [None]:
class FeedForward(nn.Module):
  def __init__(self, n_embd, dropout):
    super().__init__()
    self.net = nn.Sequential(
        # Expands the embedding dimension (n_embd) into a larger hidden layer.
        # This gives the model more capacity to transform information non-linearly.
        nn.Linear(n_embd, 4 * n_embd),
        # Activation function
        nn.GELU(),
        # Projects the data back down to the original embedding dimension.
        # This way, the output shape matches what comes from the attention sub-layer, enabling
        # the residual connection.
        nn.Linear(4 * n_embd, n_embd),
        # Randomly zeros out some outputs during training (with probability dropout) to prevent overfitting.
        # During inference, dropout is aoutomatically disabled.
        nn.Dropout(dropout)
    )

  def forward(self, x):
    return self.net(x)

In [None]:
# Example of use
n_embd = 64
sequence_length = 5
dropout = 0.1

ff = FeedForward(n_embd, dropout)

x = torch.randn(2, sequence_length, n_embd)

# Forward pass
out = ff(x)

# Both should match
print("Input shape: ", x.shape)
print("Output shape: ", out.shape)

Input shape:  torch.Size([2, 5, 64])
Output shape:  torch.Size([2, 5, 64])


In [None]:
class Block(nn.Module):
  def __init__(self, n_embd, n_head, sequence_length, dropout):
    super().__init__()
    # Defines  a normalization layer that ensures each token's vector has stable
    # mean and variance before it's processed by the self-attention mechanism.
    self.ln1 = LayerNorm(n_embd)
    self.sa = MultiHeadAttention(n_embd, n_head, sequence_length, dropout)
    self.ln2 = LayerNorm(n_embd)
    self.ffwd = FeedForward(n_embd, dropout)


  def forward(self, x):
    # Take the input x, normalize it, pass it through the self-attention layer to get
    # a refined representation, and then add that refinement back to the original x
    x = x + self.sa(self.ln1(x))
    # Normalize the current layer's ouput, feed it through a small neural network, and then
    # add it back to the original preserving the core information while allowing refinement.
    x = x + self.ffwd(self.ln2(x))
    return x

In [None]:
# Example of use
n_embd = 64
n_head = 4
sequence_length = 16
dropout = 0.1

block = Block(n_embd, n_head, sequence_length, dropout)
x = torch.randn(2, sequence_length, n_embd)

# Forward pass
out = block(x)

# Both should match
print("Input shape: ", x.shape)
print("Output shape: ", out.shape)

Input shape:  torch.Size([2, 16, 64])
Output shape:  torch.Size([2, 16, 64])


In [None]:
from typing import Optional
class GPTLanguageModel(nn.Module):
  def __init__(self, config: GPTConfig):
    super().__init__()
    self.config = config
    self.token_embedding_table = nn.Embedding(config.vocab_size, config.n_embd)
    self.position_embedding_table = nn.Embedding(config.sequence_length, config.n_embd)
    # Creates n_layer Transformer blocks (each with Multi-Head Self-Attention, Feed-Forward Network
    # Residual connection and Layer Normalization)
    self.blocks = nn.Sequential(*[
        Block(config.n_embd, config.n_head, config.sequence_length, config.dropout)
        for _ in range(config.n_layer)
    ])
    # self.blocks(x) runs the input through all Transformer blocks sequentially.
    # Normalizes the final hidden representation from the last transformer block.
    # Normalizes across the last dimension (the features of each token vector).
    self.ln_f = LayerNorm(config.n_embd)
    # lm = Language Modeling, head = Output head (final layer)
    # Final projection layer that turns the model's internal hidden states into
    # predictions over the vocabulary.
    self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
    # Inside the GPT model we have two related embedding layers
    # self.token_embedding_table = nn.Embedding(vocab_size, n_embd) , self.lm_head = nn.Linear(n_embd, vocab_size, bias=False)
    # By default these two layers are separate, meaning:
    # One set of weights (token_embedding_table.weight) is used to look up token embeddings.
    # Another (lm_head.weight) is used to project back to the vocabulary space.
    # Use the same matrix for both econding tokens into vectors and decoding vectors back into token logits
    self.lm_head.weight = self.token_embedding_table.weight

  # idx is a tensor of integers, where each integer corresponds to a character (or token)
  # The model can be used in two modes:
  # Training mode: We pass both idx(inputs) and targets(expected next tokens).
  # Then it computes a loss for optimization.
  # Inference mode: We only pass idx. The model return logits (predictions) but no loss
  def forward(self, idx, targets: Optional[torch.Tensor] = None):
    # B = batch size, T = Sequence length of the current input sequence
    B, T = idx.shape
    assert T <= self.config.sequence_length, "Sequence length exceeds model limit"
    tok_emb = self.token_embedding_table(idx)
    # Creates a 1D tensor of integers from 0 to T-1
    # Example if T = 5
    # tensor([0, 1, 2, 3, 4])
    # .unsqueeze(0) adds a new dimension at index 0, turning the shape from
    # (T,) into (1, T)
    pos = torch.arange(0, T, device=idx.device).unsqueeze(0)
    pos_emb = self.position_embedding_table(pos)
    x = tok_emb + pos_emb
    x = self.blocks(x)
    x = self.ln_f(x)
    logits = self.lm_head(x)

    loss = None
    if targets is not None:
      # V = vocabulary size
      B,T,V = logits.shape
      # Compare the model's predicted probabilities (logits) for each token position in every batch
      # against the correct next-token(targets), and compute how wrong the model is - averaged over
      # all tokens
      loss = F.cross_entropy(logits.view(B*T, V), targets.view(B*T))
    return logits, loss


  # idx: Input context, the seed phrase
  # max_new_tokens: How many new tokens to generate after the initial context.
  # temperature: Controls randomness of generation by scaling the logits.
  # top_k: Optional[int]: This limits sampling to the top-k most probable tokens at each step.
  @torch.no_grad()
  def generate(self, idx: torch.Tensor, max_new_tokens: int, temperature: float = 1.0, top_k: Optional[int] = None):
    # Switches the model into evaluation mode.
    # It tells all submodules (like Dropout, BatchNorm, etc) to behave differently than during training.
    self.eval()
    for _ in range(max_new_tokens):
      # This slices the tensor so that, if the generated sequence gets longer than the model's sequence length (context windows)
      # we only keep the most recent sequence_length tokens.
      idx_cond = idx[:, -self.config.sequence_length:]
      logits, _ = self(idx_cond)
      # Take the model's prediction for the last time step (the last token in the current sequence),
      # after this the shape becomes (batch_size, vocab_size). Finally apply temperature scaling.
      logits = logits[:, -1, :] / max(temperature, 1e-6)
      if top_k is not None:
        # Top k highest values in each row of logits
        v, _ = torch.topk(logits, top_k)
        # Selects the last column of the top-k logits what implies take the smallest value amoing
        # the top-k logits in each row.
        thresh = v[:, [-1]]
        # It zeros out the probability of all tokens not in the top-k, ensuring only the top-k
        # tokens can be sampled next.
        logits = torch.where(logits < thresh, torch.full_like(logits, -1e10), logits)
        probs = F.softmax(logits, dim=-1)
        # Randomly samples one index from the probability distribution in probs.
        next_id = torch.multinomial(probs, num_samples=1)
        # The two tensors are concatenated along dimension 1, which corresponds to the sequence length axis.
        idx = torch.cat((idx, next_id), dim=1)
    return idx


### Train the model

In [None]:
from google.colab import drive
import os

In [None]:
# Mount Google drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [None]:

# Data path
file_path = "/content/drive/MyDrive/ML/fine-tuning-gpt2/dostoevsky_clean.txt"
# Model path
model_path = "/content/drive/MyDrive/ML/transformer_architecture/mini_gpt_model.pth"

In [None]:
with open(file_path, 'r', encoding='utf-8') as f:
  text = f.read()

print(f"Dataset loaded. Total characters: {len(text)}")

Dataset loaded. Total characters: 5651943


In [None]:
text = text[:500_000]  # 500k characters, ~2.6 hours training time, Much faster experimentation
tokenizer = CharTokenizer(text)
# Encode the entire corpus
data = torch.tensor(tokenizer.encode(text), dtype=torch.long)
# Split: 80% train, 10% validation, 10% test
n = len(data)
train_ratio = 0.8
val_ratio = 0.1

train_end = int(train_ratio * n)
val_end = int((train_ratio + val_ratio) * n)

train_data = data[:train_end]
val_data = data[train_end:val_end]
test_data = data[val_end:]

sequence_length = 256
batch_size = 128

In [None]:
train_dataset = TextDataset(train_data, sequence_length)
val_dataset = TextDataset(val_data, sequence_length)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Consider not shuffle to val dataset
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

# Initialize model
config = GPTConfig(vocab_size=tokenizer.vocab_size)
model = GPTLanguageModel(config).to(device)

# Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

Using device: cuda


In [None]:
# Choose mode: "train" or "generate"
mode = "generate"

if mode == "train":
    print("Starting training...")

    step = 0
    num_epochs = 3  # how many times we go through the entire dataset

    for epoch in range(num_epochs):
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)

            # Forward pass
            _, loss = model(xb, yb)

            # Backward pass
            optimizer.zero_grad(set_to_none=True)
            loss.backward()
            optimizer.step()

            # Increment step count
            step += 1

            # Print every 100 batches
            if step % 100 == 0:
                print(f"Epoch {epoch+1}/{num_epochs} | Step {step:5d} | Loss: {loss.item():.4f}")

    print("Training complete!")

    # Save model
    torch.save(model.state_dict(), model_path)
    print(f"Model saved at {model_path}")

else:
    print("Loading trained model...")
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    print("Model loaded successfully!")

Loading trained model...
Model loaded successfully!


In [None]:
# Text generation
prompt = "The meaning of life is"
# Converts the list of token IDs into PyTorch tensor
context = torch.tensor([tokenizer.encode(prompt)], dtype=torch.long, device=device)

generated = model.generate(
    context,
    max_new_tokens=200,
    temperature = 0.9,
    top_k = 50
)

# It converts model predictions (numbers) to text (characters)
output_text = tokenizer.decode(generated[0].tolist())
print("\n=== Generated Text Sample ===\n")
print(output_text)


=== Generated Text Sample ===

The meaning of life is in the street and disappeared.

“You have been besome to stupid into every one?”

“I am? Well, that then you answer you do not to care the landlord!”

“And what mistress? Madame liftes shall out lean
