# Building GPT from scratch

In [None]:
import numpy as np

## Task 1:
* train the segmenter with varying k (try normalization strategies)
* compare the performance against a different set
* figure out a measure for accuracy

## Task 2:
* Use cleaned Shakespeare file (will be uploaded) — Train-Test-Validation set will be
uploaded so that everyone has the same split
    * Use Validation set mainly to optimise hyperparameters in interpolation, don‘t
use it to optimise k
* Develop n-gram engine (based on BPE encoding) that can deal with different n
    * Unigram system first, then bigram system, then 3- and 4-gram; intrinsic
evaluation for each:
        * Report Perplexity (on BPE subwords)
            * For bigram: look at how different k‘s affect perplexity
        * „Add-one“ normalisation (Laplace Smoothing)
        * Simple (not conditional) interpolation or Backoff
* Write a program for extrinsic evaluation (generate sentence from n-gram system)
    * Give context first
    * Generation to predict next word (for now: argmax (most likely), or sampling
for more variance)
        * If word not present: assign average probability of all unigrams or
assign most likely word of unigram
        * Use end-of-sequence tokens to determine stop generation

## Task 3:

Implement neural embeddings – either hardcode or softer version, using PyTorch

Watch RAM during training, especially for higher batch sizes (>=32)

### Hardcode
Hardcode version:
* No PyTorch, no ML libraries, only numpy (at least for the neural embeddings,
can use PyTorch, etc. for GPT implementation)
    * Can use Counter and defaultdict from Collections
    * We can, but do not have to hardcode the optimiser (can use Adam,
need to use at least SGD)
* Measure perplexity
* Implement early stopping (when validation error/loss diverges from training
error to avoid overfitting to training set) with patience
    * Do not need to optimise for patience, but can
    * Save top k (the amount that fits reasonably on your disk) of model
checkpoints (can name that file for validation score and iteration)

We want a neural embedding with conditional generation.


### Top-k sampling

In [None]:
def top_k_sample(probs, k):
    """
    Top-k sampling from a probability distribution.
    Args:
        probs: 1D numpy array of probabilities for each word in the vocabulary.
        k: number of top words to consider.
    Returns:
        index of the sampled word.
    """
    if k <= 0:
        raise ValueError("k must be positive")
    # Get indices of top k probabilities
    top_k_indices = probs.argsort()[-k:][::-1]
    # Select top k probabilities and renormalize
    top_k_probs = probs[top_k_indices]
    top_k_probs = top_k_probs / top_k_probs.sum()
    # Sample from the top k
    sampled_idx = np.random.choice(top_k_indices, p=top_k_probs)
    return sampled_idx

### Temperature sampling

### Softer Version

* Using PyTorch
* Implement early stopping (when validation error/loss diverges from training
error to avoid overfitting to training set) with patience
    * Do not have to optimise patience, but can
    * Save top k (the amount that fits reasonably on disk) of model
checkpoints (can name that file for validation score and iteration)
* Tune hyperparameters using a grid search for each separately and
validation set (order is important: number of merges, learning rate, weights of
interpolation) – do not have to do all of this to pass, but for 1.0
    * vocabulary size – gridsearch for max. 10 different amounts of merges
    * learning rate of optimiser
    * interpolation
* Try versions with different optimisers

## Task 4:
Task remarks: GPT – Hand in, until 31.08.

* If something is underspecified, just make decision yourself
* Well-documented code
* Submission format
    * Notebook (incl. pdf) or GitHub readme (submit pdf with link to repo) as technical
report of what we did
        * Nice narrative and way to navigate code, not scientific paper
        * Include plots (loss, perplexity scores, hyperparameters, etc.)
        * Optional include pseudocode
        * Qualitative analysis nice to have, e.g, add and evaluate generated text in
report
        * Can add appendix for additional plots
* Hand in every mile stone, starting from UNIX comments
* Removed in-between milestone of causal-self attention
* Everything together in one file
* Compare the models from each milestone, report perplexity for all
    * Old-school n-gram
    * Best neural n-gram
    * GPT


**GPT itself**
* Hyperparameter tuning: do not need all of them, choose what is most interesting and
explain why
    * Number of merges in BPE (not complete gridsearch, isolate top three number of
merges in perplexity in n-gram, test those for GPT)
    * Regularisation
    * How small can we make neural embedding
    * Do not change optimiser
* General remarks
    * Transformer blocks from scratch would be beyond 1.0, not required
    * Implement causal self-attention yourself, do not use ready-made PyTorch version
    * For computing perplexity: Implementing teacher forcing annealing is necessary
for good generation performance, but we don’t have to do it for our assignment
* Reminders
    * Skip weight initialisation and optimiser configuration
        * Can use standard PyTorch initialisation → just get transformer
parameters and add them when initialising the optimiser
    * Remember to change device selection, currently “cuda”, you might want “mps” or
“cpu”
    * Configs: make n_embd smaller, don’t change betas and weight decay (unless
you want to), can change batch size, chunk size, n_head, n_layer
    * Specify temperature and top-k parameters for generate function
    * Activation function used in MLP: not ReLU as in slides but GELU (might not be in
PyTorch yet)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import math

In [None]:
class FFN(nn.Module):
    """
    Position-wise Feed-Forward Networks
    This consists of two linear transformations with a ReLU activation in between.
    
    FFN(x) = max(0, xW1 + b1 )W2 + b2
    d_model: embedding dimension (e.g., 512)
    d_ff: feed-forward dimension (e.g., 2048)
    
    """
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.d_model=d_model
        self.d_ff= d_ff
        
        # Linear transformation y = xW+b
        self.fc1 = nn.Linear(self.d_model, self.d_ff, bias = True)
        self.fc2 = nn.Linear(self.d_ff, self.d_model, bias = True)
        
        # for potential speed up
        # Pre-normalize the weights (can help with training stability)
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)


    def forward(self, input):
        # check input and first FF layer dimension matching
        batch_size, seq_length, d_input = input.size()
        assert self.d_model == d_input, "d_model must be the same dimension as the input"

        # First linear transformation followed by ReLU
        # There's no need for explicit torch.max() as F.relu() already implements max(0,x)
        f1 = F.relu(self.fc1(input))

        # max(0, xW_1 + b_1)W_2 + b_2 
        f2 =  self.fc2(f1)

        return f2

In [None]:
class TransformerAttention(nn.Module):
    """
    Multi-Head Causal Self-Attention (no cross attention, GPT style)
    Args:
        d_model: total hidden dimension of the model
        num_head: number of attention heads
        dropout: dropout rate for attention scores
        bias: whether to include bias in linear projections
    """
    def __init__(self, d_model, num_head, dropout=0.1, bias=True): # infer d_k, d_v, d_q from d_model
        super().__init__()  # Missing in the original implementation
        assert d_model % num_head == 0, "d_model must be divisible by num_head"
        self.d_model = d_model
        self.num_head = num_head
        self.d_head=d_model//num_head
        self.dropout_rate = dropout  # Store dropout rate separately

        # linear transformations
        self.q_proj = nn.Linear(d_model, d_model, bias=bias)
        self.k_proj = nn.Linear(d_model, d_model, bias=bias)
        self.v_proj = nn.Linear(d_model, d_model, bias=bias)
        self.output_proj = nn.Linear(d_model, d_model, bias=bias)

        # Dropout layer
        self.dropout = nn.Dropout(p=dropout)

        # Initiialize scaler
        self.scaler = float(1.0 / math.sqrt(self.d_head)) # Store as float in initialization
        

    def forward(self, sequence, att_mask=None):
        """Input shape: [batch_size, seq_len, d_model=num_head * d_head]"""
        batch_size, seq_len, model_dim = sequence.size()

        # Check only critical input dimensions
        assert model_dim == self.d_model, f"Input dimension {model_dim} doesn't match model dimension {self.d_model}"
    
        
        # Linear projections and reshape for multi-head
        Q_state = self.q_proj(sequence)
        
        kv_seq_len = seq_len
        K_state = self.k_proj(sequence)
        V_state = self.v_proj(sequence)

        #[batch_size, self.num_head, seq_len, self.d_head]
        Q_state = Q_state.view(batch_size, seq_len, self.num_head, self.d_head).transpose(1,2) 
            
        # in cross-attention, key/value sequence length might be different from query sequence length
        K_state = K_state.view(batch_size, kv_seq_len, self.num_head, self.d_head).transpose(1,2)
        V_state = V_state.view(batch_size, kv_seq_len, self.num_head, self.d_head).transpose(1,2)

        # Scale Q by 1/sqrt(d_k)
        Q_state = Q_state * self.scaler
    
    
        # Compute attention matrix: QK^T
        self.att_matrix = torch.matmul(Q_state, K_state.transpose(-1,-2)) 

    
        # apply attention mask to attention matrix
        if att_mask is not None and not isinstance(att_mask, torch.Tensor):
            raise TypeError("att_mask must be a torch.Tensor")

        if att_mask is not None:
            self.att_matrix = self.att_matrix + att_mask
        
        # apply softmax to the last dimension to get the attention score: softmax(QK^T)
        att_score = F.softmax(self.att_matrix, dim = -1)
    
        # apply drop out to attention score
        att_score = self.dropout(att_score)
    
        # get final output: softmax(QK^T)V
        att_output = torch.matmul(att_score, V_state)
    
        # concatinate all attention heads
        att_output = att_output.transpose(1, 2)
        att_output = att_output.contiguous().view(batch_size, seq_len, self.num_head*self.d_head) 
    
        # final linear transformation to the concatenated output
        att_output = self.output_proj(att_output)

        assert att_output.size() == (batch_size, seq_len, self.d_model), \
        f"Final output shape {att_output.size()} incorrect"

        return att_output

In [None]:
class GPTBlock(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout=0.1):
        super().__init__()
        self.att = TransformerAttention(d_model, n_head, dropout=dropout)
        self.ln1 = nn.LayerNorm(d_model)
        self.ffn = FFN(d_model, d_ff)
        self.ln2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.d_model = d_model
    
    @staticmethod
    def create_causal_mask(seq_len):
        mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1)
        mask = mask.masked_fill(mask == 1, float('-inf'))
        return mask

    def forward(self, embed_input, padding_mask=None):
        """
        Args:
        embed_input: Decoder input sequence [batch_size, seq_len, d_model]
        casual_attention_mask: Causal mask for self-attention [batch_size, seq_len, seq_len]
        padding_mask: Padding mask for cross-attention [batch_size, seq_len, encoder_seq_len]
        Returns:
        Tensor: Decoded output [batch_size, seq_len, d_model]
        """
        batch_size, seq_len, _ = embed_input.size()
        
        assert embed_input.size(-1) == self.d_model, f"Input dimension {embed_input.size(-1)} doesn't match model dimension {self.d_model}"

        # Generate and expand causal mask for self-attention
        causal_mask = self.create_causal_mask(seq_len).to(embed_input.device)  # [seq_len, seq_len]
        causal_mask = causal_mask.unsqueeze(0).unsqueeze(1)  # [1, 1, seq_len, seq_len]


        # Self-attention + residual + norm
        att_out = self.att(x, att_mask=causal_mask)
        x = self.ln1(x + self.dropout(att_out))

        # FFN + residual + norm
        ffn_out = self.ffn(x)
        x = self.ln2(x + self.dropout(ffn_out))
        return x


In [None]:
class MiniGPT(nn.Module):
    def __init__(self, vocab_size, n_layer, n_embd, n_head, d_ff, max_seq_len, dropout=0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, n_embd)
        self.pos_emb = nn.Embedding(max_seq_len, n_embd)
        self.blocks = nn.ModuleList([
            GPTBlock(n_embd, n_head, d_ff, dropout) for _ in range(n_layer)
        ])
        self.ln_f = nn.LayerNorm(n_embd)
        self.head = nn.Linear(n_embd, vocab_size, bias=False)

    def forward(self, idx):
        batch_size, seq_len = idx.size()
        
        pos = torch.arange(0, seq_len, device=idx.device).unsqueeze(0)
        x = self.token_emb(idx) + self.pos_emb(pos)
        # Causal mask for GPT
        mask = GPTBlock.create_causal_mask(seq_len).to(idx.device)
        mask = mask.unsqueeze(0).unsqueeze(1)  # [1, 1, seq_len, seq_len]
        for block in self.blocks:
            x = block(x, att_mask=mask)
        x = self.ln_f(x)
        logits = self.head(x)
        return logits


In [None]:
vocab_size = 5000      # depends on tokenizer
n_layer = 4            # small for testing
n_embd = 128           # embedding dimension
n_head = 4             # must divide n_embd
d_ff = 512             # feed-forward dimension
max_seq_len = 128      # context window

model = MiniGPT(vocab_size, n_layer, n_embd, n_head, d_ff, max_seq_len)


In [None]:
batch_size = 2
seq_len = 10
x = torch.randint(0, vocab_size, (batch_size, seq_len))  # random token ids

logits = model(x)  # [batch_size, seq_len, vocab_size]
print(logits.shape)
# torch.Size([2, 10, 5000])

In [None]:
#training

optimizer = optim.AdamW(model.parameters(), lr=3e-4)

for step in range(100):
    x = torch.randint(0, vocab_size, (batch_size, seq_len))
    y = x.clone()  # next-token prediction (shifted later)

    logits = model(x)
    loss = F.cross_entropy(logits.view(-1, vocab_size), y.view(-1))

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % 10 == 0:
        print(f"Step {step} | Loss {loss.item():.4f}")


In [None]:
@torch.no_grad()
def generate(model, idx, max_new_tokens):
    for _ in range(max_new_tokens):
        logits = model(idx)                       # [batch, seq, vocab_size]
        logits = logits[:, -1, :]                 # last token logits
        probs = F.softmax(logits, dim=-1)         # convert to probs
        next_token = torch.multinomial(probs, 1)  # sample
        idx = torch.cat([idx, next_token], dim=1) # append
    return idx


In [None]:
#generation
start = torch.tensor([[1]])  # BOS token or just any token id
out = generate(model, start, max_new_tokens=20)
print("Generated sequence:", out.tolist())
