<a href="https://colab.research.google.com/github/m4vic/Transformer-101/blob/main/Encoder/Decoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

MiniGPT Decoder.
dataset - WikiText-103

Data Loading → Tokenization → Encode IDs → Padding
`→ Token Embedding → Positional Encoding`
`→ Masked Multi-Head Self Attention`
`→ Add & Norm → Feed Forward → Add & Norm`
`→ Repeat N layers`
`→ Linear Output Layer → Softmax`
`→ Loss (CrossEntropy) → Training Loop`
`→ Inference (Text Generation)`
`


# **DataLoading**

In [None]:
from datasets import load_dataset

dataset = load_dataset("wikitext", "wikitext-103-v1")





The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
print(dataset)

DatasetDict({
    test: Dataset({
        features: ['text'],
        num_rows: 4358
    })
    train: Dataset({
        features: ['text'],
        num_rows: 1801350
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 3760
    })
})


In [None]:
from collections import Counter

import re


def build_vocab(texts, vocab_size=50000):
  words = []
  for text in texts:
    text = text.lower()
    words.extend(re.findall(r"\b\w+\b", text))

  #count freq
  word_counts = Counter(words)

  #keep top vocab size

  most_common = word_counts.most_common(vocab_size-2) # reserving 2 for <unk and <pad>

  # create word-> id mapping

  vocab = {"<pad>":0, "<unk>":1}
  for i, (word,_) in enumerate(most_common,start=2): # it gives us index, word, count
    vocab[word] = i  # start from 2 and assign the id to word .
  return vocab

# build vocab from training split

train_texts = [d["text"] for d in dataset["train"]]
vocab = build_vocab(train_texts, vocab_size=50000)
inv_vocab = {idx: word for word, idx in vocab.items()}
print("Vocab size:", len(vocab))


Vocab size: 50000


In [None]:
def tokenize_text(text, vocab):
  text = text.lower()

  words = re.findall(r"\b\w+\b", text)
  token_ids = [vocab.get(word, vocab["<unk>"]) for word in words] # look for word in words if it exist give the id if not rwturn 0 for unk

  return token_ids
sample_text = dataset["train"][0]["text"][:100]
print("Sample text:", sample_text)
print("Token IDs:", tokenize_text(sample_text, vocab))

Sample text: 
Token IDs: []


In [None]:
import torch

SEQ_LEN = 128
BATCH_SIZE = 64

def encode_dataset(dataset_split, vocab):
  all_ids =[]
  for d in dataset_split:
    token_ids = tokenize_text(d["text"], vocab)
    #split into chunks of seq_len
    for i in range(0, len(token_ids), SEQ_LEN): # 0 to len of token ids to step seq len
      chunk = token_ids[i:i+SEQ_LEN] # EACH TIME it takes token id of len i+128
      if len(chunk) < SEQ_LEN: #
        chunk += [vocab["<pad>"]] * (SEQ_LEN - len(chunk))
      all_ids.append(chunk)
  return all_ids

train_encoded = encode_dataset(dataset["train"], vocab)
all_sequences = torch.tensor(train_encoded, dtype=torch.long)

from torch.utils.data import random_split

train_size = int(0.9 * len(all_sequences))
val_size   = len(all_sequences) - train_size

train_dataset, val_dataset = random_split(all_sequences, [train_size, val_size])
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)


# **TOKEN EMBEDDING + POSITIONAL ENCODING**

In [None]:
import torch
import torch.nn as nn

# hyperparamters

VOCAB_SIZE =len(vocab)    # from your custom tokenizer
D_MODEL = 256             # embedding dimension
SEQ_LEN = 128             #sequence length

# token embedding layer

token_embedding = nn.Embedding(VOCAB_SIZE, D_MODEL)

# example

sample_batch = torch.tensor(train_encoded[:2])
embeddings = token_embedding(sample_batch)
print("embedding ",embeddings.shape)

embedding  torch.Size([2, 128, 256])


*Positional Encoding*

In [None]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=SEQ_LEN):
        super().__init__()
        pe = torch.zeros(max_len, d_model) # tensor of zeros len of max len and dimension
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # adding one more dimension
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # calculating the 100002i/dmodel

        pe[:, 0::2] = torch.sin(position * div_term)   # even indices
        pe[:, 1::2] = torch.cos(position * div_term)   # odd indices

        pe = pe.unsqueeze(0)  # shape [1, max_len, d_model]
        self.register_buffer('pe', pe)  # not learnable, just stored

    def forward(self, x):
        # x shape: [batch, seq_len, d_model]
        return x + self.pe[:, :x.size(1)]


In [None]:
pos_encoder = PositionalEncoding(D_MODEL, SEQ_LEN)

#pass through token embedding + PE
x = token_embedding(sample_batch)
x = pos_encoder(x)

print("with PE shape", x.shape)

with PE shape torch.Size([2, 128, 256])


#**Model Architecture**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MaskedMultiHeadSelfAttention(nn.Module):
  def __init__(self, embed_dim, num_heads, dropout=0.0):
    super().__init__()
    assert embed_dim % num_heads == 0, "embed_dim must be divisible by num_heads"

    self.embed_dim = embed_dim
    self.num_heads = num_heads
    self.head_dim = embed_dim // num_heads # integer division or reverse
    self.scale = 1.0 / math.sqrt(self.head_dim) # sqrtroot of head_dim for scaled dot product

      #in projetcion for Q K V (combined to gain better speed )
    self.qkv_proj = nn.Linear(embed_dim, 3*embed_dim, bias=True)
    # out projection
    self.out_proj = nn.Linear(embed_dim, embed_dim)
    self.dropout = nn.Dropout(dropout)

  def _causal_mask(self, seq_len, device):
    # lower triangular matrix with 1s on and below diagonal
    mask = torch.tril(torch.ones((seq_len, seq_len), device=device)).unsqueeze(0).unsqueeze(0)
    # torch.onces metrix full of ones
    # torch tril lower triangular will become 1s rest 0s
    # unsquees 2 times will add 2 new dimension  1,1,seq_len,seq_len
    # convert to additive mask 0 for allowed -1e9 for disallowed
    return (1.0-mask) * -1e9   # if mask = 1 = 1.0-1 = 0 * -1.e9 = 0 and 1.0-0 = 1 * -1e9 = -1e9

    #tensor([[[[0., -1e9, -1e9, -1e9],
          #[0.,    0., -1e9, -1e9],
          #[0.,    0.,    0., -1e9],
          #[0.,    0.,    0.,    0.]]]])

  def forward(self, x,key_padding_mask=None, attn_mask=None):

    """
        x: (batch, seq_len, embed_dim)
        key_padding_mask: optional (batch, seq_len) with 1 for tokens to KEEP and 0 for PAD tokens (or vice versa - doc below)
            We'll assume mask==1 means keep; if you have mask with True for padding you can invert it.
        attn_mask: optional additive mask of shape (seq_len, seq_len) or (batch, seq_len, seq_len)
            If provided, it will be added to attention scores (like causal mask), before softmax.
        Returns:
            out: (batch, seq_len, embed_dim)
            attn_weights: (batch, num_heads, seq_len, seq_len)  -- optional, helpful for debugging
        """
    B,T,C = x.shape # b = batch size , t is token seq len , and c is embedding dimension channel

    device = x.device

    # project to qkv and split
    qkv = self.qkv_proj(x) # x as input for qkv_proj embeddingdim
    qkv = qkv.reshape(B,T,3,self.num_heads, self.head_dim) # spliting qkv into B,T AND 3*embed into 3 * num_heads * head_dim
    qkv = qkv.permute(2,0,3,1,4) # (3,B,num_heads,t,head)
    q,k,v = qkv[0], qkv[1], qkv[2] # each shape (b, num_heads, T, head_dim) saparate tensor q,k,v  for all heads


    # computing the scaled dot product QKV = softmax(QKt/rootDk) V

    scores = (q @ k.transpose(-2,-1)) * self.scale # q @ k multiply * scale (ie 1/rootDK) =  scores which has (B, num_head, T, T) ie t-query and t-key
    #causal mask
    causal = self._causal_mask(T,device)
    #scores has shape (B, num_heads, T, T) — the dot products of Q and K.
    # T is the query length and T is key len
    scores = scores + causal


    # optional user provided attn_mask
    if attn_mask is not None:
      scores = scores + attn_mask


    if key_padding_mask is not None: # IF IT IS NOT NONE
      if key_padding_mask.dtype == torch.bool: # check bool? 1 or 0 . ~ this opreator inverts the true to false
        keep_mask = (~key_padding_mask).to(dtype=torch.float32) # invert the boolean mask 1 to 0 , 0 to 1 keep. and also convert it to float
      else: # if it is not bool
        keep_mask = key_padding_mask.to(dtype=torch.float32)  # directly converting it 1 to keep and 0 to ignore

      keep_mask = keep_mask.unsqueeze(1).unsqueeze(1) # Adds two singleton dimensions so it broadcasts across batch and heads: (B, 1, 1, T)
      scores = scores + (1.0 - keep_mask) * -1e9  # 1.0 - 1 = 0 * -1e9 = 0 + scores
      # 1.0-0 = 1 *-1e9 + scores

     # attention probabilities
    attn = F.softmax(scores, dim=-1)  # (B, num_heads, T, T) applying softmax
    attn = self.dropout(attn) # dropouts

        # attention output
    out = torch.matmul(attn, v)  # matrix multiplication attn * v
        # combine heads
    out = out.transpose(1, 2).contiguous().view(B, T, C)  # out shape before: (B, num_heads, T, head_dim) = transpose(1, 2) → (B, T, num_heads, head_dim)
    # .view(B, T, C) → flatten num_heads * head_dim = embed_dim
    out = self.out_proj(out)  # (B, T, embed_dim)

    return out, attn  # return attn if you want to inspect i
























*Tester*

In [None]:
# hyperparams
batch = 2
seq_len = 16
embed_dim = 128
num_heads = 8

mha = MaskedMultiHeadSelfAttention(embed_dim=embed_dim, num_heads=num_heads, dropout=0.1)
x = torch.randn(batch, seq_len, embed_dim)   # token embeddings + pos encodings

out, attn = mha(x)
print(out.shape)   # -> (2, 16, 128)
print(attn.shape)  # -> (2, 8, 16, 16)


torch.Size([2, 16, 128])
torch.Size([2, 8, 16, 16])



          ┌───────────────┐
          │ Input x       │
          └───────┬───────┘
                  │
                  ▼
     ┌─────────────────────────┐
     │ Masked Multi-Head Attn  │
     └─────────┬───────────────┘
               │
               ▼
        Dropout + Residual
               │
               ▼
           LayerNorm
               │
               ▼
          Feed-Forward
               │
               ▼
        Dropout + Residual
               │
               ▼
           LayerNorm
               │
               ▼
           Output x


*Decoder Block*

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FeedForward(nn.Module):
  def __init__(self, d_model, d_ff, dropout=0.1):
    super().__init__()
    self.linear1 = nn.Linear(d_model, d_ff)
    self.linear2 = nn.Linear(d_ff, d_model)
    self.dropout = nn.Dropout(dropout)
    self.activation = nn.GELU() #GELU (Gaussian Error Linear Unit)

  def forward(self, x):
    x = self.linear1(x)
    x = self.activation(x)
    x = self.dropout(x)
    x = self.linear2(x)
    return x

class DecoderBlock(nn.Module):
  def __init__(self , d_model, num_heads, d_ff, dropout=0.1):
    super().__init__()
    self.mha = MaskedMultiHeadSelfAttention(d_model, num_heads, dropout)
    self.norm1 = nn.LayerNorm(d_model)
    self.ffn = FeedForward(d_model, d_ff, dropout)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):

    # masked multihead self attention + residual
    attn_out, _ = self.mha(x)
    x = self.norm1(x+self.dropout(attn_out))

    ffn_out = self.ffn(x)
    x = self.norm2(x+self.dropout(ffn_out))
    return x


In [None]:
b , s , d = 2,16,128
n_heads , d_ff = 8 , 512

decoder_block = DecoderBlock(d,n_heads,d_ff)
x = torch.randn(b,s,d)
out = decoder_block(x)
print(out.shape)

torch.Size([2, 16, 128])


#**miniGPT**


In [None]:
class MiniGPT(nn.Module):
  def __init__(self, vocab_size, seq_len, d_model=256, num_heads=8, d_ff=512, num_layers=6, dropout=0.1):
    super().__init__()
    self.seq_len= seq_len
    self.d_model = d_model


    # token embedding

    self.token_embedding = nn.Embedding(vocab_size, d_model)
    # positional encoding
    self.pos_encoder = PositionalEncoding(d_model, max_len=seq_len)

    #stack of decoder
    self.layers = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

    # output projectiobn
    self.lm_head = nn.Linear(d_model, vocab_size, bias=False)

  def forward(self, input_ids):

  # embedding + positional encoding

    x = self.token_embedding(input_ids)
    x = self.pos_encoder(x)

# pass through all decoder blovks
    for layer in self.layers:
      x = layer(x)

  # output logits
    logits = self.lm_head(x)

    return logits





# **Training Loop**

*Define loss optimizer*

*training loop*

In [None]:
import os
vocab_size = len(vocab)

SEQ_LEN = 128
BATCH_SIZE = 64
NUM_EPOCHS = 5
CHECKPOINT_DIR = "/content/drive/MyDrive/mini_gpt_checkpoints"
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
scaler = torch.cuda.amp.GradScaler()  # for mixed precision
model = MiniGPT(vocab_size=vocab_size, seq_len=SEQ_LEN  )
model.to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
criterion = torch.nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler()


  scaler = torch.cuda.amp.GradScaler()  # for mixed precision
  scaler = torch.cuda.amp.GradScaler()


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import math, os
from tqdm import tqdm




def train_epoch(dataloader):
    model.train()
    total_loss, total_tokens = 0, 0
    for batch in tqdm(dataloader):
        input_ids = batch[0].to(DEVICE)
        labels = input_ids.clone()

        optimizer.zero_grad()
        with torch.amp.autocast(device_type='cuda', dtype=torch.float16):
            logits = model(input_ids)
            loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item() * input_ids.numel()
        total_tokens += input_ids.numel()

    avg_loss = total_loss / total_tokens
    ppl = math.exp(avg_loss)
    return avg_loss, ppl

def eval_epoch(dataloader):
    model.eval()
    total_loss, total_tokens = 0, 0
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch[0].to(DEVICE)
            labels = input_ids.clone()
            logits = model(input_ids)
            loss = criterion(logits.view(-1, logits.size(-1)), labels.view(-1))

            total_loss += loss.item() * input_ids.numel()
            total_tokens += input_ids.numel()

    avg_loss = total_loss / total_tokens
    ppl = math.exp(avg_loss)
    return avg_loss, ppl


In [None]:
start_epoch = 0
best_val_ppl = float("inf")
latest_checkpoint = os.path.join(CHECKPOINT_DIR, "mini_gpt_best.pt")

if os.path.exists(latest_checkpoint):
    print("✅ Loading checkpoint to resume training...")
    checkpoint = torch.load(latest_checkpoint)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    start_epoch = checkpoint["epoch"]
    best_val_ppl = checkpoint["val_perplexity"]

# --- 6️⃣ Training loop ---
for epoch in range(start_epoch, NUM_EPOCHS):
    train_loss, train_ppl = train_epoch(train_loader)
    val_loss, val_ppl = eval_epoch(val_loader)

    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train PPL: {train_ppl:.2f} | Val PPL: {val_ppl:.2f}")

    # Save checkpoint if validation improves
    if val_ppl < best_val_ppl:
        best_val_ppl = val_ppl
        torch.save({
            "epoch": epoch+1,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "val_perplexity": val_ppl
        }, os.path.join(CHECKPOINT_DIR, "mini_gpt_best.pt"))
        print(f"✅ Saved new best model at epoch {epoch+1} with Val PPL: {val_ppl:.2f}")

100%|██████████| 20359/20359 [13:00<00:00, 26.10it/s]


Epoch 1/5 | Train PPL: 1.60 | Val PPL: 1.19
✅ Saved new best model at epoch 1 with Val PPL: 1.19


100%|██████████| 20359/20359 [12:50<00:00, 26.44it/s]


Epoch 2/5 | Train PPL: 1.19 | Val PPL: 1.17
✅ Saved new best model at epoch 2 with Val PPL: 1.17


100%|██████████| 20359/20359 [12:47<00:00, 26.52it/s]


Epoch 3/5 | Train PPL: 1.18 | Val PPL: 1.16
✅ Saved new best model at epoch 3 with Val PPL: 1.16


100%|██████████| 20359/20359 [12:56<00:00, 26.21it/s]


Epoch 4/5 | Train PPL: 1.17 | Val PPL: 1.17


100%|██████████| 20359/20359 [12:38<00:00, 26.83it/s]


Epoch 5/5 | Train PPL: 1.18 | Val PPL: 1.17


**MiniGPT**

In [None]:
import torch

def generate_text(model, start_tokens, inv_vocab, max_len=100, temperature=1.0):
    """
    model: your trained MiniGPT decoder
    start_tokens: list of token IDs as prompt
    inv_vocab: {id: token} mapping
    max_len: number of tokens to generate
    temperature: randomness, 1.0 = default, <1 = more confident, >1 = more random
    """
    model.eval()
    generated = start_tokens.copy()

    for _ in range(max_len):
        # take last SEQ_LEN tokens as input
        input_ids = torch.tensor([generated[-SEQ_LEN:]], device=DEVICE)
        with torch.no_grad():
            logits = model(input_ids)  # [1, seq_len, vocab_size]
            next_token_logits = logits[0, -1] / temperature
            probs = torch.softmax(next_token_logits, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1).item()
        generated.append(next_token)

    # convert token IDs back to text
    text = " ".join([inv_vocab.get(tok, "<unk>") for tok in generated])
    return text


In [None]:
# Suppose you have a start prompt like "The future of AI"
prompt_text = "The future of Tech"
start_tokens = tokenize_text(prompt_text, vocab)  # your existing tokenization function

# Generate text
generated_text = generate_text(model, start_tokens, inv_vocab, max_len=50, temperature=0.8)
print(generated_text)


the future of tech edges 588 onondaga eponymous homework android 128 flee masques harrington rocco perceptions unitis devastated bohr alcoholics winning surrounded birbhum yip mcdiarmid detached takahashi kids strains nullified sensors goa scelidosaurus virginia cicada iximche platte barracudas 1364 prelate valdés mcdermott persevered bgsu domestication magnificat inclusive laid vulnerabilities vishal prefix damacy dicaprio maeda


