#Utilities

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Once your drive is mounted, you can navigate to the directory where your data is stored.

In [2]:
input_path = '/content/drive/MyDrive/nlp/shakespeare.txt'
output_path = '/content/drive/MyDrive/nlp/shakespeare_char'

In [3]:
import pickle
import numpy as np

In [4]:
from abc import ABC, abstractmethod

class Module(ABC):
    def __init__(self):
        self.training = True

    @abstractmethod
    def forward(self, x):
        pass

    @abstractmethod
    def backward(self, grad):
        pass

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)


In [5]:
#@title Activations

class Softmax(Module):
    def __init__(self):
        super().__init__()
        self.cache_output = None

    def forward(self, X):
        X_shifted = X - np.max(X, axis=-1, keepdims=True)  # (*, d) softmax trick for numerical stability
        exp_X = np.exp(X_shifted)  # (*, d)
        softmax_output = exp_X / np.sum(exp_X, axis=-1, keepdims=True)  # (*, d)
        self.cache_output = softmax_output
        return softmax_output

    def backward(self, dZ_or_Y_true, Y_true=None):
        if Y_true is not None:
            Y_hat = self.cache_output
            Y = np.zeros_like(Y_hat)
            Y[np.arange(Y_true.size), Y_true] = 1
            # CrossEntropy + Softmax derivative simplifies beautifully to:
            # Note: PyTorch's cross_entropy averages over batch, so we need to divide by batch size
            batch_size = Y_hat.shape[0]
            return (Y_hat - Y) / batch_size  # ∂(CE○Softmax)/∂x = (ŷ - y) / N  (see: https://www.parasdahal.com/softmax-crossentropy)
        else:
            # Otherwise, compute gradient using the full Softmax Jacobian to backpropagate through softmax outputs
            # (see: https://tombolton.io/2018/08/25/softmax-back-propagation-solved-i-think/)
            dZ = dZ_or_Y_true
            softmax_output = self.cache_output
            # Softmax Jacobian: ∂σ_i/∂x_j = σ_i(δ_ij - σ_j)
            return softmax_output * (dZ - np.sum(dZ * softmax_output, axis=-1, keepdims=True))  # ∂L/∂x_i = σ_i(∂L/∂σ_i - Σ_j ∂L/∂σ_j·σ_j)

    def params(self):
        return {}

    def grads(self):
        return {}


class ReLU(Module):
    def __init__(self):
        super().__init__()
        self.cache_input = None

    def forward(self, X):
        self.cache_input = X
        return np.maximum(0, X)

    def backward(self, dZ):
        X = self.cache_input
        # ReLU derivative: 1 if x > 0, else 0
        grad = (X > 0).astype(np.float32)  # ∂ReLU/∂x = 1 if x > 0, else 0
        return dZ * grad  # ∂L/∂x = ∂L/∂ReLU · ∂ReLU/∂x

    def params(self):
        return {}

    def grads(self):
        return {}


class LeakyReLU(Module):
    def __init__(self, alpha=0.01):
        super().__init__()
        self.alpha = alpha
        self.cache_input = None

    def forward(self, X):
        self.cache_input = X
        return np.where(X > 0, X, self.alpha * X)

    def backward(self, dZ):
        X = self.cache_input
        # LeakyReLU derivative: 1 if x > 0, else α
        grad = np.where(X > 0, 1.0, self.alpha)  # ∂LeakyReLU/∂x = 1 if x > 0, else α
        return dZ * grad  # ∂L/∂x = ∂L/∂LeakyReLU · ∂LeakyReLU/∂x

    def params(self):
        return {}

    def grads(self):
        return {}

In [26]:
#@title cross entropy loss
def cross_entropy_loss(logits, targets, eps=1e-7):
    N = targets.shape[0]

    # logits -> log_softmax via log-sum-exp trick
    max_logits = np.max(logits, axis=-1, keepdims=True)
    shifted = logits - max_logits
    log_softmax = shifted - np.log(np.sum(np.exp(shifted), axis=-1, keepdims=True) + eps)

    # negative log-likelihood: -log P(correct_class)
    nll = -log_softmax[np.arange(N), targets]

    return np.mean(nll)

#Tokenizer

In [6]:
#@title WordTokenizer

import re

class WordTokenizer:
    def __init__(self, min_freq=1, max_vocab_size=None):
        self.min_freq = min_freq
        self.max_vocab_size = max_vocab_size
        self.words = []
        self.stoi = {}
        self.itos = {}

    def build_vocab(self, text):
        # Replace special whitespace characters with tokens
        text = text.replace('\n', ' <|newline|> ')
        text = text.replace('\t', ' <|tab|> ')
        text = text.replace('\r', ' <|carriage_return|> ')

        # Tokenize text into:
        # 1. Special tokens like <|newline|>
        # 2. Words (alphanumeric/underscores)
        # 3. Punctuation or non-word chars
        tokens = re.findall(r"<\|[^|]+\|>|\b\w+\b|[^\s\w]", text)
        tokens = [
            token.lower() if not (token.startswith('<|') and token.endswith('|>')) else token
            for token in tokens
        ]

        # Count word frequencies
        word_counts = {}
        for token in tokens:
            word_counts[token] = word_counts.get(token, 0) + 1

        # Filter by min_freq
        filtered_words = [word for word, count in word_counts.items() if count >= self.min_freq]

        # Sort by frequency (desc) then alphabetically
        sorted_words = sorted(filtered_words, key=lambda x: (-word_counts[x], x))

        # Limit vocab size if needed (reserve 4 for special tokens)
        if self.max_vocab_size:
            sorted_words = sorted_words[:self.max_vocab_size - 4]

        # Final vocab list
        self.words = ['<pad>', '<unk>', '<bos>', '<eos>'] + sorted_words
        self.stoi = {word: i for i, word in enumerate(self.words)}
        self.itos = {i: word for i, word in enumerate(self.words)}

    def encode(self, text, add_bos=True, add_eos=True):
        # Apply same replacements
        text = text.replace('\n', ' <|newline|> ')
        text = text.replace('\t', ' <|tab|> ')
        text = text.replace('\r', ' <|carriage_return|> ')

        tokens = re.findall(r"<\|[^|]+\|>|\b\w+\b|[^\s\w]", text)
        tokens = [
            token.lower() if not (token.startswith('<|') and token.endswith('|>')) else token
            for token in tokens
        ]

        indices = [self.stoi.get(token, 1) for token in tokens]  # 1 = <unk>

        if add_bos:
            indices = [2] + indices  # <bos>
        if add_eos:
            indices = indices + [3]  # <eos>
        return indices

    def decode(self, tokens):
        words = [self.itos[t] for t in tokens if t in self.itos]

        # Remove special tokens
        for special in ['<bos>', '<eos>', '<pad>', '<unk>']:
            while special in words:
                words.remove(special)

        result = []
        for i, word in enumerate(words):
            if word == '<|newline|>':
                result.append('\n')
            elif word == '<|tab|>':
                result.append('\t')
            elif word == '<|carriage_return|>':
                result.append('\r')
            else:
                result.append(word)
                if i < len(words) - 1:
                    next_word = words[i + 1]
                    if not next_word.startswith('<|') and next_word not in '\'.,;!?)"':
                        result.append(' ')

        text = ''.join(result)
        text = re.sub(r' +', ' ', text)
        text = re.sub(r" ([\'.,;:!?)])", r'\1', text)
        text = re.sub(r"\b([A-Za-z]+) ?' ?(ll|re|ve|d|s|t|m)\b", r"\1'\2", text)
        text = re.sub(r"([a-z]) :", r'\1:', text)
        text = re.sub(r' *\n *', '\n', text)
        text = re.sub(r' *\t *', '\t', text)
        text = re.sub(r' *\r *', '\r', text)
        return text.strip()

    @property
    def vocab_size(self):
        return len(self.words)

    @property
    def eos_token_id(self):
        return 3

    @property
    def bos_token_id(self):
        return 2


In [49]:
#@title BPE Tokenizer
import re
from collections import Counter, defaultdict

class BPETokenizer:
    def __init__(self, vocab_size=1000):
        self.vocab_size = vocab_size
        self.stoi = {}
        self.itos = {}
        self.merges = []
        self.merge_ranks = {}

    def _get_word_freqs(self, texts):
        if isinstance(texts, str):
            texts = [texts]

        word_freqs = Counter()
        for text in texts:
            text = text.replace('\n', ' <|newline|> ')
            text = text.replace('\t', ' <|tab|> ')
            text = text.replace('\r', ' <|carriage_return|> ')

            # Tokenize text into:
            # 1. Special tokens with format <|...|>   -> <\|[^|]+\|>
            # 2. Words (alphanumerics/underscores)    -> \b\w+\b
            # 3. Punctuation/non-word characters      -> [^\s\w]
            tokens = re.findall(r"<\|[^|]+\|>|\b\w+\b|[^\s\w]", text)
            tokens = [token if not (token.startswith('<|') and token.endswith('|>')) else token for token in tokens]

            for token in tokens:
                word_freqs[token] += 1
        return word_freqs

    def _get_pairs(self, word):
        pairs = set()
        prev_char = word[0]
        for char in word[1:]:
            pairs.add((prev_char, char))
            prev_char = char
        return pairs

    def _merge_word(self, word_tokens, pair):
        new_tokens = []
        i = 0
        while i < len(word_tokens):
            # If we find the pair, merge it
            if i < len(word_tokens) - 1 and word_tokens[i] == pair[0] and word_tokens[i + 1] == pair[1]:
                new_tokens.append(pair[0] + pair[1])
                i += 2  # Skip both tokens
            else:
                new_tokens.append(word_tokens[i])
                i += 1
        return new_tokens

    def build_vocab(self, texts):
        word_freqs = self._get_word_freqs(texts)

        # Step 1: Initialize with character-level tokens
        vocab = {}
        base_chars = set()

        for word, freq in word_freqs.items():
            if word.startswith('<|') and word.endswith('|>'):
                # Special tokens stay as single units
                vocab[word + ' </w>'] = freq
                base_chars.add(word)
            else:
                # Split into characters + end-of-word marker
                char_list = list(word) + ['</w>']
                vocab[' '.join(char_list)] = freq
                base_chars.update(word)

        base_chars.add('</w>')

        # Step 2: Calculate how many merges we can do
        special_tokens = ['<pad>', '<unk>', '<bos>', '<eos>']
        base_vocab_size = len(special_tokens) + len(base_chars)
        available_for_merges = max(0, self.vocab_size - base_vocab_size)

        # Step 3: Iteratively find and apply merges
        for i in range(available_for_merges):
            pairs = defaultdict(int)
            # Count all adjacent pairs across all words
            for word, freq in vocab.items():
                word_pairs = self._get_pairs(word.split())
                for pair in word_pairs:
                    if not any(p.startswith('<|') and '|>' in p for p in pair):
                        pairs[pair] += freq

            if not pairs:
                break

            # Find most frequent pair, that will be merged in this iteration
            best_pair = max(pairs, key=pairs.get)

            # Update vocab with the merged pair
            new_vocab = {}
            for word, freq in vocab.items():
                word_tokens = word.split()
                merged_tokens = self._merge_word(word_tokens, best_pair)
                new_vocab[' '.join(merged_tokens)] = freq

            vocab = new_vocab

            self.merges.append(best_pair)
            self.merge_ranks[best_pair] = i  # Remember when this was learned for encoding and decoding

         # Step 4: Add special tokens, base chars to the vocab and pairs to the vocab
        all_tokens = special_tokens + sorted(list(base_chars))
        for pair in self.merges:
            all_tokens.append(''.join(pair))

        all_tokens = all_tokens[:self.vocab_size]

        self.stoi = {token: idx for idx, token in enumerate(all_tokens)}
        self.itos = {idx: token for idx, token in enumerate(all_tokens)}
        self.vocab_size = len(self.stoi)

    def _tokenize_word(self, word):
        if word.startswith('<|') and word.endswith('|>'):
            return [word, '</w>']

        word_tokens = list(word) + ['</w>']

        while len(word_tokens) > 1:
            pairs = self._get_pairs(word_tokens)
            if not pairs:
                break

            best_pair = min(pairs, key=lambda pair: self.merge_ranks.get(pair, float('inf')))
            if best_pair not in self.merge_ranks:
                break

            word_tokens = self._merge_word(word_tokens, best_pair)

        return word_tokens

    def encode(self, text, add_bos=False, add_eos=False):
        text = text.replace('\n', ' <|newline|> ')
        text = text.replace('\t', ' <|tab|> ')
        text = text.replace('\r', ' <|carriage_return|> ')

        tokens = re.findall(r"<\|[^|]+\|>|\b\w+\b|[^\s\w]", text)
        tokens = [token if not (token.startswith('<|') and token.endswith('|>')) else token for token in tokens]

        indices = []
        if add_bos:
            indices.append(2)  # <bos> index

        for token in tokens:
            word_tokens = self._tokenize_word(token)
            for word_token in word_tokens:
                indices.append(self.stoi.get(word_token, 1))  # 1 is <unk>

        if add_eos:
            indices.append(3)  # <eos> index

        return indices

    def decode(self, indices):
        tokens = [self.itos.get(idx, '<unk>') for idx in indices]

        # Remove special tokens
        for special in ['<bos>', '<eos>', '<pad>', '<unk>']:
            while special in tokens:
                tokens.remove(special)

        result = []

        for i, token in enumerate(tokens):
            if token == '<|newline|>':
                result.append('\n')
            elif token == '<|tab|>':
                result.append('\t')
            elif token == '<|carriage_return|>':
                result.append('\r')
            else:
                # Check if token ends with </w> (end of word marker)
                if token.endswith('</w>'):
                    word = token[:-len('</w>')]   # Remove '</w>'
                    result.append(word)

                    if i < len(tokens) - 1:
                        result.append(' ')
                else:
                    # it is a subword token
                    result.append(token)

        text = ''.join(result)
        text = re.sub(r' +', ' ', text)
        text = re.sub(r" ([\'.,;:!?)])", r'\1', text)
        text = re.sub(r"\b([A-Za-z]+) ?' ?(ll|re|ve|d|s|t|m)\b", r"\1'\2", text)
        text = re.sub(r"([a-z]) :", r'\1:', text)
        text = re.sub(r' *\n *', '\n', text)
        text = re.sub(r' *\t *', '\t', text)
        text = re.sub(r' *\r *', '\r', text)

        return text.strip()

    @property
    def eos_token_id(self):
        return 3

    @property
    def bos_token_id(self):
        return 2

#Embedding

In [7]:
class Embedding(Module):
    def __init__(self, vocab_size, embed_dim):
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

        self.W = np.random.randn(vocab_size, embed_dim) * 0.02
        self.dW = None
        self.cache_input = None

    def forward(self, X):
        self.cache_input = X  # (B, T)
        out = self.W[X]  # (B, T, embed_dim)
        return out

    def backward(self, dZ):
        self.dW = np.zeros_like(self.W)
        X = self.cache_input
        np.add.at(self.dW, X.flatten(), dZ.reshape(-1, self.embed_dim))  # in place accumulation for efficiency

    def params(self):
        return {"W": self.W}

    def grads(self):
        return {"W": self.dW}

#Positional Encoding

learned positional encoding

In [8]:
class PositionalEncoding(Module):
    def __init__(self, max_len, d_model):
        super().__init__()
        self.max_len = max_len
        self.d_model = d_model

        self.W = np.random.randn(max_len, d_model) * 0.02
        self.dW = None
        self.cache_input = None

    def forward(self, X):
        self.cache_input = X
        B, T, C = X.shape

        pos_emb = self.W[:T, :]  # (T, C)
        out = X + pos_emb  # (B, T, C)
        return out

    def backward(self, dZ):
        B, T, C = dZ.shape
        self.dW = np.zeros_like(self.W)

        # out = X + W[:T, :], so ∂out/∂W[t] = 1 for every batch element at position t
        # ∂L/∂W[t] = sum over all batch gradients at position t
        self.dW[:T, :] = np.sum(dZ, axis=0)
        return dZ

    def params(self):
        return {"W": self.W}

    def grads(self):
        return {"W": self.dW}

#Layer Normalization

In [9]:
class LayerNorm(Module):
    def __init__(self, d_model, eps=1e-5):
        super().__init__()
        self.d_model = d_model
        self.eps = eps

        self.gamma = np.ones(d_model)
        self.beta = np.zeros(d_model)

        self.dgamma = None
        self.dbeta = None
        self.cache = {}

    def forward(self, X):
        mean = np.mean(X, axis=-1, keepdims=True)  # (B, T, 1)
        var = np.var(X, axis=-1, keepdims=True)  # (B, T, 1)

        X_norm = (X - mean) / np.sqrt(var + self.eps)  # z-score normalization

        out = self.gamma * X_norm + self.beta  # (B, T, C)

        self.cache = {
            'X': X,
            'mean': mean,
            'var': var,
            'X_norm': X_norm
        }

        return out

    def backward(self, dZ):
        X = self.cache['X']
        mean = self.cache['mean']
        var = self.cache['var']
        X_norm = self.cache['X_norm']

        N = X.shape[-1]

        # sum over batch and time dimensions
        self.dgamma = np.sum(dZ * X_norm, axis=(0, 1))  # mul -> ∂L/∂γ = Σ ∂L/∂y · x̂
        self.dbeta = np.sum(dZ, axis=(0, 1))  # sum -> ∂L/∂β = Σ ∂L/∂y

        dX_norm = dZ * self.gamma  # mul -> ∂L/∂x̂ = ∂L/∂y · γ

        # x̂ = (x-μ)/√(σ²+ε), so ∂x̂/∂σ² = (x-μ)·(-1/2)(σ²+ε)^(-3/2)
        dvar = np.sum(dX_norm * (X - mean), axis=-1, keepdims=True) * -1/2 * (var + self.eps)**(-3/2)

        # μ = (1/N)Σx, so ∂μ/∂x = 1/N
        # μ affects x̂ directly: x̂ = (x-μ)/√(σ²+ε), so ∂x̂/∂μ = -1/√(σ²+ε)
        # μ affects σ² indirectly: σ² = (1/N)Σ(x-μ)², so ∂σ²/∂μ = (1/N)Σ(-2(x-μ)) = -2(x-μ)
        dmean = np.sum(dX_norm, axis=-1, keepdims=True) * -1 / np.sqrt(var + self.eps) + \
            dvar * np.sum(-2 * (X - mean), axis=-1, keepdims=True) / N

        # x̂ = (x - μ) / √(σ² + ε), so ∂x̂/∂x = 1 / √(σ² + ε)
        # μ = (1/N) Σx, so ∂μ/∂x = 1/N
        # σ² = (1/N) Σ(x - μ)², so ∂σ²/∂x = 2(x - μ)/N
        # Total: ∂L/∂x = ∂L/∂x̂ · ∂x̂/∂x + ∂L/∂μ · ∂μ/∂x + ∂L/∂σ² · ∂σ²/∂x
        dX = dX_norm / np.sqrt(var + self.eps) + dmean / N + dvar * 2 * (X - mean) / N

        return dX

    def params(self):
        return {"gamma": self.gamma, "beta": self.beta}

    def grads(self):
        return {"gamma": self.dgamma, "beta": self.dbeta}

#Multi Head Attention

In [10]:
class MultiHeadAttention(Module):
    """
    Multi-Head Attention (Attention is All You Need <3): https://arxiv.org/abs/1706.03762
    """

    def __init__(self, d_model, n_heads):
        super().__init__()
        assert d_model % n_heads == 0
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads

        self.W_q = Linear(d_model, d_model)
        self.W_k = Linear(d_model, d_model)
        self.W_v = Linear(d_model, d_model)
        self.W_o = Linear(d_model, d_model)
        self.softmax = Softmax()

        self.cache = {}

    def forward(self, X, mask=None):
        B, T, C = X.shape

        Q = self.W_q(X)  # (B, T, C)
        K = self.W_k(X)  # (B, T, C)
        V = self.W_v(X)  # (B, T, C)

        Q = Q.reshape(B, T, self.n_heads, self.d_k).swapaxes(1, 2)  # (B, nh, T, d_k)
        K = K.reshape(B, T, self.n_heads, self.d_k).swapaxes(1, 2)  # (B, nh, T, d_k)
        V = V.reshape(B, T, self.n_heads, self.d_k).swapaxes(1, 2)  # (B, nh, T, d_k)

        # scaled dot product
        scores = Q @ K.swapaxes(-2, -1) / np.sqrt(self.d_k)  # (B, nh, T, T)

        if mask is not None:
            scores = scores + mask  # (B, nh, T, T)

        original_shape = scores.shape
        scores_reshaped = scores.reshape(-1, scores.shape[-1])  # (B*nh*T, T)
        # scaled dot product with softmax
        attn_weights_reshaped = self.softmax(scores_reshaped)  # (B*nh*T, T)
        attn_weights = attn_weights_reshaped.reshape(original_shape)  # (B, nh, T, T)

        attn_output = attn_weights @ V  # (B, nh, T, d_k)

        attn_output = attn_output.swapaxes(1, 2).reshape(B, T, C)  # (B, T, C)
        output = self.W_o(attn_output)  # (B, T, C)

        self.cache = {
            'X': X, 'Q': Q, 'K': K, 'V': V,
            'attn_weights': attn_weights,
            'original_shape': original_shape
        }

        return output

    def backward(self, dZ):
        X, Q, K, V = self.cache['X'], self.cache['Q'], self.cache['K'], self.cache['V']
        attn_weights = self.cache['attn_weights']
        original_shape = self.cache['original_shape']
        B, T, C = X.shape

        dattn_output = self.W_o.backward(dZ)  # ∂L/∂attn_output
        dattn_output = dattn_output.reshape(B, T, self.n_heads, self.d_k).swapaxes(1, 2)  # (B, nh, T, d_k)

        # Attention output = attn_weights @ V
        # ∂L/∂attn_weights = ∂L/∂attn_output @ V^T
        # ∂L/∂V = attn_weights^T @ ∂L/∂attn_output
        dattn_weights = dattn_output @ V.swapaxes(-2, -1)  # ∂L/∂attn_weights
        dV = attn_weights.swapaxes(-2, -1) @ dattn_output  # ∂L/∂V

        dattn_weights_reshaped = dattn_weights.reshape(-1, dattn_weights.shape[-1])
        dscores_reshaped = self.softmax.backward(dattn_weights_reshaped)
        dscores = dscores_reshaped.reshape(original_shape)

        # scores = QK^T / √d_k
        # ∂L/∂Q = ∂L/∂scores @ K / √d_k
        # ∂L/∂K = Q^T @ ∂L/∂scores / √d_k
        dQ = dscores @ K / np.sqrt(self.d_k)  # ∂L/∂Q
        dK = Q.swapaxes(-2, -1) @ dscores / np.sqrt(self.d_k)  # ∂L/∂K

        dQ = dQ.swapaxes(1, 2).reshape(B, T, C)
        dK = dK.swapaxes(1, 2).reshape(B, T, C)
        dV = dV.swapaxes(1, 2).reshape(B, T, C)

        # X branches into Q, K, V - sum gradients from all paths
        dX_q = self.W_q.backward(dQ)  # ∂L/∂X via Q
        dX_k = self.W_k.backward(dK)  # ∂L/∂X via K
        dX_v = self.W_v.backward(dV)  # ∂L/∂X via V

        return dX_q + dX_k + dX_v  # ∂L/∂X = sum of all paths

    def params(self):
        params = {}
        params.update({f"W_q.{k}": v for k, v in self.W_q.params().items()})
        params.update({f"W_k.{k}": v for k, v in self.W_k.params().items()})
        params.update({f"W_v.{k}": v for k, v in self.W_v.params().items()})
        params.update({f"W_o.{k}": v for k, v in self.W_o.params().items()})
        return params

    def grads(self):
        grads = {}
        grads.update({f"W_q.{k}": v for k, v in self.W_q.grads().items()})
        grads.update({f"W_k.{k}": v for k, v in self.W_k.grads().items()})
        grads.update({f"W_v.{k}": v for k, v in self.W_v.grads().items()})
        grads.update({f"W_o.{k}": v for k, v in self.W_o.grads().items()})
        return grads



#Feed Forward

In [11]:
class FeedForward(Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = Linear(d_model, d_ff)
        self.relu = ReLU()
        self.linear2 = Linear(d_ff, d_model)

    def forward(self, X):
        X = self.linear1(X)  # (B, T, d_ff)
        X = self.relu(X)  # (B, T, d_ff)
        X = self.linear2(X)  # (B, T, d_model)
        return X

    def backward(self, dZ):
        dZ = self.linear2.backward(dZ)
        dZ = self.relu.backward(dZ)
        dZ = self.linear1.backward(dZ)
        return dZ

    def params(self):
        params = {}
        params.update({f"linear1.{k}": v for k, v in self.linear1.params().items()})
        params.update({f"linear2.{k}": v for k, v in self.linear2.params().items()})
        return params

    def grads(self):
        grads = {}
        grads.update({f"linear1.{k}": v for k, v in self.linear1.grads().items()})
        grads.update({f"linear2.{k}": v for k, v in self.linear2.grads().items()})
        return grads

#Linear

In [24]:
class Linear(Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        scale = np.sqrt(2.0 / in_features)

        self.W = np.random.randn(in_features, out_features) * scale
        self.b = np.zeros(out_features)

        self.dW = None
        self.db = None
        self.cache_input = None

    def forward(self, X):
        self.cache_input = X

        if X.ndim == 3:
            B, T, C = X.shape
            X_reshaped = X.reshape(-1, C)  # (B*T, C)
            out = X_reshaped @ self.W + self.b  # (B*T, out_features)
            out = out.reshape(B, T, self.out_features)  # (B, T, out_features)
            return out
        else:
            out = X @ self.W + self.b  # (B, out_features)
            return out

    def backward(self, dZ):
        X = self.cache_input

        if X.ndim == 3:
            B, T, C = X.shape
            X_reshaped = X.reshape(-1, C)
            dZ_reshaped = dZ.reshape(-1, self.out_features)

            # Y = XW + b, so ∂Y/∂W = X^T, ∂Y/∂b = I, ∂Y/∂X = W^T
            self.dW = X_reshaped.T @ dZ_reshaped  # ∂L/∂W = X^T @ ∂L/∂Y
            self.db = np.sum(dZ_reshaped, axis=0)  # ∂L/∂b = Σ ∂L/∂Y

            dX_reshaped = dZ_reshaped @ self.W.T  # ∂L/∂X = ∂L/∂Y @ W^T
            dX = dX_reshaped.reshape(B, T, C)
            return dX
        else:
            self.dW = X.T @ dZ  # ∂L/∂W = X^T @ ∂L/∂Y
            self.db = np.sum(dZ, axis=0)  # ∂L/∂b = Σ ∂L/∂Y
            return dZ @ self.W.T  # ∂L/∂X = ∂L/∂Y @ W^T

    def params(self):
        return {"W": self.W, "b": self.b}

    def grads(self):
        return {"W": self.dW, "b": self.db}

#Transformer Block

In [23]:
class TransformerBlock(Module):
    def __init__(self, d_model, n_heads, d_ff):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, n_heads)
        self.ln1 = LayerNorm(d_model)
        self.ffn = FeedForward(d_model, d_ff)
        self.ln2 = LayerNorm(d_model)

    def forward(self, X, mask=None):
        ln1_out = self.ln1(X)  #Pre-Normalization
        attn_out = self.attn(ln1_out, mask)
        X = X + attn_out  #Residual Connection

        ln2_out = self.ln2(X)  #Pre-Normalization
        ffn_out = self.ffn(ln2_out)
        X = X + ffn_out  #Residual Connection

        return X

    def backward(self, dZ):
        # Residual: y = x + f(x), so ∂L/∂x = ∂L/∂y * I + ∂L/∂f(x) · ∂f/∂x
        # gradient flows through both paths
        dffn_out = dZ.copy()
        dX1 = dZ.copy()

        dffn_in = self.ffn.backward(dffn_out)
        dln2_out = dffn_in
        dX2 = self.ln2.backward(dln2_out)

        dX1 += dX2  # Sum both paths

        dattn_out = dX1.copy()
        dX3 = dX1.copy()

        dattn_in = self.attn.backward(dattn_out)
        dln1_out = dattn_in
        dX4 = self.ln1.backward(dln1_out)

        dX3 += dX4  # Sum both paths

        return dX3

    def params(self):
        params = {}
        params.update({f"attn.{k}": v for k, v in self.attn.params().items()})
        params.update({f"ln1.{k}": v for k, v in self.ln1.params().items()})
        params.update({f"ffn.{k}": v for k, v in self.ffn.params().items()})
        params.update({f"ln2.{k}": v for k, v in self.ln2.params().items()})
        return params

    def grads(self):
        grads = {}
        grads.update({f"attn.{k}": v for k, v in self.attn.grads().items()})
        grads.update({f"ln1.{k}": v for k, v in self.ln1.grads().items()})
        grads.update({f"ffn.{k}": v for k, v in self.ffn.grads().items()})
        grads.update({f"ln2.{k}": v for k, v in self.ln2.grads().items()})
        return grads

#MiniGPT

In [25]:
class miniGPT(Module):

    def __init__(self, vocab_size, max_len, d_model, n_heads, n_layers, d_ff):
        super().__init__()
        self.vocab_size = vocab_size
        self.max_len = max_len
        self.d_model = d_model

        self.tok_emb = Embedding(vocab_size, d_model)
        self.pos_emb = PositionalEncoding(max_len, d_model)
        self.blocks = [TransformerBlock(d_model, n_heads, d_ff) for _ in range(n_layers)]
        self.ln_f = LayerNorm(d_model)
        self.lm_head = Linear(d_model, vocab_size) #linear layer that as the language modeling head

        self.cache = {}

    def forward(self, X, targets=None):
        B, T = X.shape

        tok_emb = self.tok_emb(X)  # (B, T, C)
        X = self.pos_emb(tok_emb)  # (B, T, C)

        mask = self._create_causal_mask(T)  # (1, 1, T, T)

        for block in self.blocks:
            X = block(X, mask)  # (B, T, C)

        X = self.ln_f(X)  # (B, T, C)
        logits = self.lm_head(X)  # logits over vocab

        if targets is not None:
            loss = cross_entropy_loss(logits.reshape(-1, self.vocab_size), targets.reshape(-1))
            self.cache = {'logits': logits, 'targets': targets}
            return logits, loss

        return logits

    def backward(self):
        logits, targets = self.cache['logits'], self.cache['targets']
        B, T, C = logits.shape

        logits_reshaped = logits.reshape(-1, C)  # (B*T, C)
        targets_reshaped = targets.reshape(-1)  # (B*T)

        logits_max = np.max(logits_reshaped, axis=-1, keepdims=True)  # (B*T, 1)
        exp_logits = np.exp(logits_reshaped - logits_max)  # for numerically stable softmax
        probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)  # (B*T, C)

        # CE + softmax trick (https://stats.stackexchange.com/questions/235528/backpropagation-with-softmax-cross-entropy)
        dlogits = probs
        dlogits[np.arange(len(targets_reshaped)), targets_reshaped] -= 1  # d(CE)/d(logits) = softmax - one_hot
        dlogits /= len(targets_reshaped)  # average over batch and sequence length

        dlogits = dlogits.reshape(B, T, C)

        dX = self.lm_head.backward(dlogits)
        dX = self.ln_f.backward(dX)

        for block in reversed(self.blocks):
            dX = block.backward(dX)

        dX = self.pos_emb.backward(dX)
        self.tok_emb.backward(dX)

    def _create_causal_mask(self, T):
        mask = np.triu(np.ones((T, T)), k=1) * -1e9  # will add a large negative value to the scores of future tokens
        return mask[None, None, :, :]

    def generate(self, idx, max_new_tokens, temperature=1.0, eos_token_id=None):
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.max_len:]
            logits = self(idx_cond)
            logits = logits[:, -1, :] / temperature

            logits_shifted = logits - np.max(logits, axis=-1, keepdims=True)  # for numerically stable softmax
            probs = np.exp(logits_shifted) / np.sum(np.exp(logits_shifted), axis=-1, keepdims=True)

            idx_next = np.array([[np.random.choice(self.vocab_size, p=probs[0])]])
            idx = np.concatenate([idx, idx_next], axis=1)

            if eos_token_id is not None and idx_next[0, 0] == eos_token_id:
                break

        return idx

    def params(self):
        params = {}
        params.update({f"tok_emb.{k}": v for k, v in self.tok_emb.params().items()})
        params.update({f"pos_emb.{k}": v for k, v in self.pos_emb.params().items()})

        for i, block in enumerate(self.blocks):
            params.update({f"blocks.{i}.{k}": v for k, v in block.params().items()})

        params.update({f"ln_f.{k}": v for k, v in self.ln_f.params().items()})
        params.update({f"lm_head.{k}": v for k, v in self.lm_head.params().items()})
        return params

    def grads(self):
        grads = {}
        grads.update({f"tok_emb.{k}": v for k, v in self.tok_emb.grads().items()})
        grads.update({f"pos_emb.{k}": v for k, v in self.pos_emb.grads().items()})

        for i, block in enumerate(self.blocks):
            grads.update({f"blocks.{i}.{k}": v for k, v in block.grads().items()})

        grads.update({f"ln_f.{k}": v for k, v in self.ln_f.grads().items()})
        grads.update({f"lm_head.{k}": v for k, v in self.lm_head.grads().items()})
        return grads

#Implementation

##Word Tokenizer

In [14]:
with open(input_path) as f:
    text = f.read()

tokenizer = WordTokenizer(min_freq=1, max_vocab_size=1000)
tokenizer.build_vocab(text)

encoded = tokenizer.encode(text)

split_idx = int(0.9 * len(encoded))
train_data = np.array(encoded[:split_idx], dtype=np.int32)
val_data = np.array(encoded[split_idx:], dtype=np.int32)

np.save(output_path+'_train.npy', train_data)
np.save(output_path+'_val.npy', val_data)

with open(output_path+'_tokenizer.pkl', 'wb') as f:
    pickle.dump(tokenizer, f)


In [15]:
train_data = np.load(output_path + '_train.npy')
val_data = np.load(output_path + '_val.npy')
with open(output_path + '_tokenizer.pkl', 'rb') as f:
    tokenizer = pickle.load(f)

def get_batch(data, block_size, batch_size):
    ix = np.random.randint(0, len(data) - block_size - 1, (batch_size,))
    x = np.stack([data[i:i+block_size] for i in ix])
    y = np.stack([data[i+1:i+block_size+1] for i in ix])
    return x, y

In [27]:
vocab_size = tokenizer.vocab_size
block_size = 64        # sequence length for each sample
batch_size = 32        # mini-batch size
d_model = 128
n_heads = 4
n_layers = 2
d_ff = 512
max_len = block_size   # maximum sequence length

In [61]:
model_word = miniGPT(
    vocab_size=vocab_size,
    max_len=max_len,
    d_model=d_model,
    n_heads=n_heads,
    n_layers=n_layers,
    d_ff=d_ff)

In [29]:
learning_rate = 1e-3
num_iters = 500  # how many mini-batches to run

In [62]:
for it in range(num_iters):
    # get a batch
    x, y = get_batch(train_data, block_size, batch_size)

    # forward pass
    logits, loss = model_word.forward(x, targets=y)

    # backward pass
    model_word.backward()

    # SGD update
    for name, param in model_word.params().items():
        grad = model_word.grads()[name]
        param -= learning_rate * grad  # simple SGD

    if it % 50 == 0:
        print(f"iter {it}, loss {loss:.4f}")

iter 0, loss 8.0009
iter 50, loss 7.2585
iter 100, loss 6.8730
iter 150, loss 6.6689
iter 200, loss 6.6148
iter 250, loss 6.6308
iter 300, loss 6.3698
iter 350, loss 6.5196
iter 400, loss 6.2249
iter 450, loss 6.4731


In [63]:
context = np.array([[tokenizer.encode("my dear")[0]]])  # initial token(s)
generated_ids = model_word.generate(context, max_new_tokens=20)
print(tokenizer.decode(generated_ids[0]))

banished make half keeper; liege patience with pray brief know kill 3 been speed, become consul thyself late


##BPE Tokenizer

In [50]:
with open(input_path) as f:
    text = f.read()

bpe_tokenizer = BPETokenizer(vocab_size=1000)  # adjust vocab size if needed
bpe_tokenizer.build_vocab(text)

bpe_encoded = bpe_tokenizer.encode(text)

split_idx = int(0.9 * len(bpe_encoded))
train_data = np.array(bpe_encoded[:split_idx], dtype=np.int32)
val_data = np.array(bpe_encoded[split_idx:], dtype=np.int32)

np.save(output_path + '_train_bpe.npy', train_data)
np.save(output_path + '_val_bpe.npy', val_data)

with open(output_path + '_tokenizer_bpe.pkl', 'wb') as f:
    pickle.dump(bpe_tokenizer, f)


In [51]:
train_data = np.load(output_path + '_train_bpe.npy')
val_data = np.load(output_path + '_val_bpe.npy')
with open(output_path + '_tokenizer_bpe.pkl', 'rb') as f:
    bpe_tokenizer = pickle.load(f)

In [52]:
vocab_size = bpe_tokenizer.vocab_size
block_size = 64
batch_size = 32
d_model = 128
n_heads = 4
n_layers = 2
d_ff = 512
max_len = block_size

In [65]:
model_bpe = miniGPT(
    vocab_size=vocab_size,
    max_len=max_len,
    d_model=d_model,
    n_heads=n_heads,
    n_layers=n_layers,
    d_ff=d_ff
)

In [66]:
learning_rate = 1e-3
num_iters = 500  # mini-batches

In [67]:
for it in range(num_iters):
    # get a batch
    x, y = get_batch(train_data, block_size, batch_size)

    # forward pass
    logits, loss = model_bpe.forward(x, targets=y)

    # backward pass
    model_bpe.backward()

    # SGD update
    for name, param in model_bpe.params().items():
        grad = model_bpe.grads()[name]
        param -= learning_rate * grad  # simple SGD

    if it % 50 == 0:
        print(f"iter {it}, loss {loss:.4f}")

iter 0, loss 7.8874
iter 50, loss 7.2187
iter 100, loss 6.9284
iter 150, loss 6.7447
iter 200, loss 6.4249
iter 250, loss 6.3946
iter 300, loss 6.3722
iter 350, loss 6.3591
iter 400, loss 6.2878
iter 450, loss 6.1805


In [71]:
context = np.array([[bpe_tokenizer.encode("the")[0]]])  # initial token(s)
generated_ids = model_bpe.generate(context, max_new_tokens=20, eos_token_id=bpe_tokenizer.eos_token_id)
print(bpe_tokenizer.decode(generated_ids[0]))

the: Then itizISABELL, LIan' IANrionce mber two dead GREDUKE chard you


#Testing

In [74]:
print("Running simple verification tests...")

#dummy
test_vocab_size = 50
test_block_size = 8
test_batch_size = 2
test_d_model = 16
test_n_heads = 2
test_d_ff = 32
test_n_layers = 1

#uji dimensi
print("\n--- Testing Model Output Dimensions ---")
try:
    # dummy model
    dummy_model = miniGPT(
        vocab_size=test_vocab_size,
        max_len=test_block_size,
        d_model=test_d_model,
        n_heads=test_n_heads,
        n_layers=test_n_layers,
        d_ff=test_d_ff
    )
    dummy_input = np.random.randint(0, test_vocab_size, (test_batch_size, test_block_size))

    # Lakukan forward pass
    logits = dummy_model(dummy_input)

    # Check dimension output
    expected_shape = (test_batch_size, test_block_size, test_vocab_size)
    assert logits.shape == expected_shape, f"Shape mismatch! Expected {expected_shape}, got {logits.shape}"
    print(f"Test Passed: Output shape is correct {logits.shape}.")

except Exception as e:
    print(f"Test Failed: {e}")


# test softmax properties
print("\n--- Testing Softmax Properties ---")
try:
    logits_last_step = logits[:, -1, :] # logits from last token

    # re-implement softmax for verification
    exp_logits = np.exp(logits_last_step - np.max(logits_last_step, axis=-1, keepdims=True))
    probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)

    # check every row
    assert np.allclose(np.sum(probs, axis=-1), 1.0), "Softmax sums are not close to 1."
    print("Test Passed: Probabilities sum to 1.")

    # check probability
    assert np.all(probs >= 0) and np.all(probs <= 1), "Probabilities are not between 0 and 1."
    print("Test Passed: All probability values are within the [0, 1] range.")

except Exception as e:
    print(f"Test Failed: {e}")


# causal mask
print("\n--- Testing Causal Masking ---")
try:
    # initiate mha
    mha = MultiHeadAttention(d_model=test_d_model, n_heads=test_n_heads)

    # dummy
    dummy_mha_input = np.random.randn(test_batch_size, test_block_size, test_d_model)

    # get Q, K, dan V
    Q = mha.W_q(dummy_mha_input).reshape(test_batch_size, test_block_size, test_n_heads, mha.d_k).swapaxes(1, 2)
    K = mha.W_k(dummy_mha_input).reshape(test_batch_size, test_block_size, test_n_heads, mha.d_k).swapaxes(1, 2)
    V = mha.W_v(dummy_mha_input).reshape(test_batch_size, test_block_size, test_n_heads, mha.d_k).swapaxes(1, 2)

    # calculate score
    scores = Q @ K.swapaxes(-2, -1) / np.sqrt(mha.d_k)

    # create and implement causal mask
    causal_mask = np.triu(np.ones((test_block_size, test_block_size)), k=1) * -1e9
    masked_scores = scores + causal_mask

    # softmax
    softmax_layer = Softmax()
    attn_weights = softmax_layer(masked_scores.reshape(-1, test_block_size)).reshape(scores.shape)

    # check future attention weight
    first_attn_matrix = attn_weights[0, 0, :, :]

    # get upper triangle
    future_attention_weights = np.triu(first_attn_matrix, k=1)

    assert np.allclose(future_attention_weights, 0), "Causal mask failed! Model is attending to future tokens."
    print("Test Passed: Causal mask correctly prevents attention to future tokens.")

except Exception as e:
    print(f"Test Failed: {e}")

print("\nAll simple tests completed.")

Running simple verification tests...

--- Testing Model Output Dimensions ---
Test Passed: Output shape is correct (2, 8, 50).

--- Testing Softmax Properties ---
Test Passed: Probabilities sum to 1.
Test Passed: All probability values are within the [0, 1] range.

--- Testing Causal Masking ---
Test Passed: Causal mask correctly prevents attention to future tokens.

All simple tests completed.


#Credits
1. [Attention is All You Need](https://arxiv.org/abs/1706.03762)
2. [numpyGPT](https://github.com/codiceSpaghetti/numpyGPT)
3. ChatGPT as my personal assistant who help me understand the attention mechanism :D and this assignment