# Step 1 - Tokenization

In [3]:
import torch 
import torch.nn as nn
import torch.optim as optim
import math

In [4]:
def tokenize(text, vocab):
    return [vocab.get(word, vocab["<UNK>"]) for word in text.split()]

# text.split - splits the sentences into words
# vocab - creates a dictionary where each word is assigned a number for uniqueness
# vocab.get - if there is no number assigned, it returns as unknown

# Step 2 - Embedding Layers

In [5]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, embedding_dim):
        super(Embedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size,embedding_dim)

    def forward(self, x):
        return self.embedding(x)
    
# we need to convert 0 and 1 that we categorized the above words into vectors to actually extract meanings
# nn.Embedding(vocab_size,embedding_dim) - this code creates a table where each word ID ( the unique number we assigned ) maps to a vector
# embedding_dim - defines the length of each word 
# For Example, if we have words like happy and joy which have similar meaning, they would be assigned similar vectors for the model to understand.

# Step 3 - Positional Encoding

In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, max_seq_len = 5000):
        super(PositionalEncoding, self).__init__()
        self.embedding_dim = embedding_dim
        pe = torch.zeros(max_seq_len, embedding_dim)
        position = torch.arange(0,max_seq_len,dtype= torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0,embedding_dim, 2).float() * (-math.log(10000.0) / embedding_dim))
        pe[:,0::2] = torch.sin(position * div_term)
        pe[:,1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0,1)
        self.register_buffer('pe',pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]
    
# the whole point of positional encoding is that the models don't really understand the order hence we need to encode it
# embedding_dim - it matches the vector size from the embedding layer
# we introduce math(cos and sin) - to create patterns i.e., word 1 would get one pattern and word 2 would get another.

# Step 4 - Self Attention

In [28]:
class SelfAttention(nn.Module):
    def __init__(self, embedding_dim):
        super(SelfAttention, self).__init__()
        self.query = nn.Linear(embedding_dim,embedding_dim)
        self.key = nn.Linear(embedding_dim, embedding_dim)
        self.value = nn.Linear(embedding_dim,embedding_dim)

    def forward(self, x):
        queries = self.query(x)
        keys = self.key(x)
        values = self.value(x)
        d_k = queries.size(-1) 
        scores = torch.bmm(queries, keys.transpose(1,2)) / torch.tensor(d_k, dtype=torch.float32, device=queries.device).sqrt()
        attention_weights = torch.softmax(scores, dim = -1)
        attended_values = torch.bmm(attention_weights, values)
        return attended_values
    
# Self attention mainly helps the model focuses on important words
# if there is an exmaple of - 'The cat is sat on the mat'
# The model would be able to understand that sat relates to cat more than mat

# query, key and value - they are mainly 3 transformations of the input vectors. 
# query - what do i care about; key - what is avaliable; value - what do i take
# scores - it meaures how much each word relates to other word based on the tokenization(vector formation) that we performed earlier.
# attention_weights - it then turns the scores into probabilities ( telling us which word has more probability than the other.)

# Step 5 - Transformer Block

In [29]:
class TransformerBlock(nn.Module):
    def __init__(self, embedding_dim, hidden_dim):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(embedding_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(embedding_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embedding_dim)
        )
        self.norm1 = nn.LayerNorm(embedding_dim)
        self.norm2 = nn.LayerNorm(embedding_dim)

    def forward(self, x):
        attended = self.attention(x)
        x = self.norm1(x + attended)
        forwarded = self.feed_forward(x)
        x = self.norm2(x + forwarded)
        return x
    
# Once we created the self attention - just one layer isn't enough
# Hence, we create a neural network for better efficiency.
# attention is what we created earlier using self-attention mechanism
# feed_forwrd is a small neural network where all these words are processed for better understanding or guessing of the next words with better probabilities.
# norm1, norm2 - normalizes the numbers so that they are on the same scale.
# x+attended -  residual connection

# Step 6 - Full Language Model

In [30]:
class SimpleLLM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers):
        super(SimpleLLM, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.positional_encoding = PositionalEncoding(embedding_dim)
        self.transformer_blocks = nn.Sequential(*[TransformerBlock(embedding_dim, hidden_dim) for _ in range(num_layers)])
        self.output = nn.Linear(embedding_dim, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x = x.transpose(0, 1) # Transpose for positional encoding
        x = self.positional_encoding(x)
        x = x.transpose(0, 1) # Transpose back
        x = self.transformer_blocks(x)
        x = self.output(x)
        return x
    
# we created a full system incorporating everything we did 
# num_layers - provides layers to the model, more layers gives more intensive thinking -> better outputs
# output - turns back the final vectors back into words for final answer to human

# Training the model

In [31]:
vocab = {"hello": 0, "world": 1, "how": 2, "are": 3, "you": 4, "<UNK>": 5}
vocab_size = len(vocab)
embedding_dim = 16
hidden_dim = 32
num_layers = 2

model = SimpleLLM(vocab_size, embedding_dim, hidden_dim, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

data = ["hello world how are you", "how are you hello world"]
tokenized_data = [tokenize(sentence, vocab) for sentence in data]

for epoch in range(100):
    for sentence in tokenized_data:
        for i in range(1, len(sentence)):
            input_seq = torch.tensor(sentence[:i]).unsqueeze(0)
            target = torch.tensor(sentence[i]).unsqueeze(0)
            optimizer.zero_grad()
            output = model(input_seq)
            loss = criterion(output[:, -1, :], target)
            loss.backward()
            optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

Epoch 0, Loss: 1.9223988056182861
Epoch 10, Loss: 0.4437415897846222
Epoch 20, Loss: 0.19538508355617523
Epoch 30, Loss: 0.11235272139310837
Epoch 40, Loss: 0.07333862036466599
Epoch 50, Loss: 0.05164773762226105
Epoch 60, Loss: 0.03821663558483124
Epoch 70, Loss: 0.02933884970843792
Epoch 80, Loss: 0.023173348978161812
Epoch 90, Loss: 0.01872985064983368


In [32]:
input_text = "hello world how"
input_tokens = tokenize(input_text, vocab)
input_tensor = torch.tensor(input_tokens).unsqueeze(0)
output = model(input_tensor)
predicted_token = torch.argmax(output[:, -1, :]).item()
print(f"Input: {input_text}, Predicted: {list(vocab.keys())[list(vocab.values()).index(predicted_token)]}")

Input: hello world how, Predicted: are


# Better Example

In [48]:
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)

context_len = 128
d_model = 256
n_layers = 4
n_heads = 4
d_ff = 4 * d_model
dropout = 0.1
lr = 3e-4
train_steps = 1000
eval_every = 100

In [56]:
# Data Preparation
corpus = """
hello world. how are you? i am building a tiny transformer from scratch.
this is small but it learns patterns. hello world! transformers attend to tokens.
"""

vocab = sorted(set(corpus))
stoi = {ch: i for i, ch in enumerate(vocab)}
itos = {i: ch for ch, i in stoi.items()}
vocab_size = len(vocab)

def encode(s):
    return torch.tensor([stoi[c] for c in s], dtype=torch.long)

def decode(t):
    return "".join(itos[int(i)] for i in t)

data = encode(corpus)
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]

min_len = min(len(train_data), len(val_data))
context_len = min(context_len, max(8, min_len - 2))
vocab_size = len(vocab)

In [57]:
# Batch Splitting
def get_batch(split, batch_size=32):
    src = train_data if split == "train" else val_data
    T = min(context_len, len(src) - 1)
    high = len(src) - T - 1
    if high <= 0:
        x = src[:T].unsqueeze(0).repeat(batch_size, 1)
        y = src[1:T+1].unsqueeze(0).repeat(batch_size, 1)
        return x.to(device), y.to(device)
    ix = torch.randint(0, high, (batch_size,))
    x = torch.stack([src[i:i+T] for i in ix])
    y = torch.stack([src[i+1:i+1+T] for i in ix])
    return x.to(device), y.to(device)

In [58]:
# Building Model
class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.nh = n_heads
        self.dk = d_model // n_heads
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
        self.proj = nn.Linear(d_model, d_model, bias=False)
        self.attn_drop = nn.Dropout(dropout)
        self.resid_drop = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        qkv = self.qkv(x).view(B, T, 3, self.nh, self.dk).transpose(1, 3)
        q, k, v = qkv[:, :, 0], qkv[:, :, 1], qkv[:, :, 2]
        att = (q @ k.transpose(-2, -1)) / math.sqrt(self.dk)
        mask = torch.ones(T, T, device=x.device, dtype=torch.bool).tril_()
        att = att.masked_fill(~mask, float("-inf"))
        att = F.softmax(att, dim=-1)
        att = self.attn_drop(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_drop(self.proj(y))
        return y

In [59]:
# GPT Model
class GPTMini(nn.Module):
    def __init__(self, vocab_size, d_model, n_layers, n_heads, d_ff, context_len, dropout):
        super().__init__()
        self.tok = nn.Embedding(vocab_size, d_model)
        self.pos = nn.Embedding(context_len, d_model)
        self.blocks = nn.ModuleList([
            Block(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)
        ])
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size, bias=False)
        self.context_len = context_len

    def forward(self, idx):
        B, T = idx.shape
        pos = torch.arange(T, device=idx.device).unsqueeze(0)
        x = self.tok(idx) + self.pos(pos)
        for blk in self.blocks:
            x = blk(x)
        x = self.ln_f(x)
        logits = self.head(x)
        return logits

    @torch.no_grad()
    def generate(self, idx, max_new_tokens=100, temperature=1.0, top_k=None, top_p=None):
        self.eval()
        for _ in range(max_new_tokens):
            idx_cond = idx[:, -self.context_len:]
            logits = self(idx_cond)[:, -1, :] / temperature
            if top_k is not None:
                v, _ = torch.topk(logits, top_k)
                logits[logits < v[:, [-1]]] = -float('inf')
            probs = F.softmax(logits, dim=-1)
            next_tok = torch.multinomial(probs, num_samples=1)
            idx = torch.cat([idx, next_tok], dim=1)
        return idx

In [60]:
# Demo - GPT
class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.nh = n_heads
        self.dk = d_model // n_heads
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
        self.proj = nn.Linear(d_model, d_model, bias=False)
        self.attn_drop = nn.Dropout(dropout)
        self.resid_drop = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        qkv = self.qkv(x).view(B, T, 3, self.nh, self.dk).transpose(1, 3)
        q, k, v = qkv[:, :, 0], qkv[:, :, 1], qkv[:, :, 2]
        att = (q @ k.transpose(-2, -1)) / math.sqrt(self.dk)
        mask = torch.ones(T, T, device=x.device, dtype=torch.bool).tril_()
        att = att.masked_fill(~mask, float("-inf"))
        att = F.softmax(att, dim=-1)
        att = self.attn_drop(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_drop(self.proj(y))
        return y

In [61]:
class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        self.nh = n_heads
        self.dk = d_model // n_heads
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
        self.proj = nn.Linear(d_model, d_model, bias=False)
        self.attn_drop = nn.Dropout(dropout)
        self.resid_drop = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        qkv = self.qkv(x).view(B, T, 3, self.nh, self.dk).transpose(1, 3)
        q, k, v = qkv[:, :, 0], qkv[:, :, 1], qkv[:, :, 2]
        att = (q @ k.transpose(-2, -1)) / math.sqrt(self.dk)
        mask = torch.ones(T, T, device=x.device, dtype=torch.bool).tril_()
        att = att.masked_fill(~mask, float("-inf"))
        att = F.softmax(att, dim=-1)
        att = self.attn_drop(att)
        y = att @ v
        y = y.transpose(1, 2).contiguous().view(B, T, C)
        y = self.resid_drop(self.proj(y))
        return y

In [62]:
model = GPTMini(vocab_size, d_model, n_layers, n_heads, d_ff, context_len, dropout).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=lr, betas=(0.9, 0.95), weight_decay=0.1)
sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=train_steps) 


# Training
for step in range(1, train_steps + 1):
    x, y = get_batch("train")
    logits = model(x)
    loss = F.cross_entropy(logits.view(-1, vocab_size), y.view(-1))
    opt.zero_grad(set_to_none=True)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    opt.step()
    sched.step()

    if step % eval_every == 0 or step == 1:
        with torch.no_grad():
            vx, vy = get_batch("val")
            v_logits = model(vx)
            v_loss = F.cross_entropy(v_logits.view(-1, vocab_size), vy.view(-1))
        print(f"Step {step:4d} | Train {loss.item():.3f} | Val {v_loss.item():.3f}")

Step    1 | Train 3.463 | Val 3.429
Step  100 | Train 0.195 | Val 5.425
Step  200 | Train 0.164 | Val 6.319
Step  300 | Train 0.175 | Val 6.484
Step  400 | Train 0.143 | Val 6.591
Step  500 | Train 0.119 | Val 6.625
Step  600 | Train 0.145 | Val 6.766
Step  700 | Train 0.150 | Val 6.980
Step  800 | Train 0.154 | Val 6.977
Step  900 | Train 0.148 | Val 7.081
Step 1000 | Train 0.135 | Val 7.024


In [67]:
start = "hello"
start_ids = encode(start).unsqueeze(0).to(device)
out = model.generate(start_ids, max_new_tokens=120, temperature=0.9, top_k=20)
print(decode(out[0].tolist()))

hello world! transformers arns patterns. hello world! transformers arns patterns. hello world! transformers ame building a ti
