<a href="https://colab.research.google.com/github/sharathkramadas/aws-codepipeline/blob/master/tiny_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# mini_transformer.py
import torch
import torch.nn as nn
import torch.optim as optim
import math

# =========================
# Positional Encoding
# =========================
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# =========================
# Multi-Head Attention
# =========================
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads

        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, x):
        batch_size = x.size(0)

        Q = self.q_linear(x)
        K = self.k_linear(x)
        V = self.v_linear(x)

        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(weights, V)

        output = output.transpose(1,2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        return self.out(output)

# =========================
# Feed-Forward Layer
# =========================
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(torch.relu(self.fc1(x)))

# =========================
# Encoder Layer
# =========================
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        x = self.norm1(x + self.mha(x))
        x = self.norm2(x + self.ffn(x))
        return x

# =========================
# Mini Transformer Encoder
# =========================
class MiniTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=64, num_heads=4, d_ff=128, max_len=50, num_layers=2):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x)
        return self.fc_out(x)

# =========================
# Tiny Training Example
# =========================
def train_tiny_model():
    # Toy dataset: sequences of numbers 0-9
    vocab_size = 10
    seq_length = 5
    batch_size = 32
    num_epochs = 200
    lr = 0.01

    # Random toy dataset: sequences where next number = (prev + 1) % 10
    def generate_batch(batch_size, seq_length):
        x = torch.randint(0, vocab_size, (batch_size, seq_length))
        y = (x + 1) % vocab_size
        return x, y

    model = MiniTransformer(vocab_size)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        x, y = generate_batch(batch_size, seq_length)
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs.view(-1, vocab_size), y.view(-1))
        loss.backward()
        optimizer.step()

        if (epoch+1) % 20 == 0:
            print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

    # Test generation
    test_seq = torch.tensor([[5,0,7,8,9]])
    with torch.no_grad():
        pred = model(test_seq)
        pred_seq = torch.argmax(pred, dim=-1)
    print("Input sequence: ", test_seq)
    print("Predicted next: ", pred_seq)

# =========================
# Run training if file executed
# =========================
if __name__ == "__main__":
    train_tiny_model()



Epoch 20, Loss: 0.0034
Epoch 40, Loss: 0.0007
Epoch 60, Loss: 0.0004
Epoch 80, Loss: 0.0003
Epoch 100, Loss: 0.0003
Epoch 120, Loss: 0.0002
Epoch 140, Loss: 0.0002
Epoch 160, Loss: 0.0002
Epoch 180, Loss: 0.0001
Epoch 200, Loss: 0.0001
Input sequence:  tensor([[5, 0, 7, 8, 9]])
Predicted next:  tensor([[6, 1, 8, 9, 0]])
