# Mini Transformer Lab

在 Google Colab 中运行最小 Transformer 模型

**目标**:
1. 理解 Transformer 各模块的代码实现
2. 在 toy 数据集上完成训练
3. 使用训练好的模型进行文本生成

In [None]:
# 检查 GPU
!nvidia-smi

In [None]:
# 安装依赖
!pip install torch --index-url https://download.pytorch.org/whl/cu118

## 1. 实现各模块

In [None]:
import torch
import torch.nn as nn
import math

# Token Embedding
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.embedding(x) * (self.d_model ** 0.5)

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

print("✓ 模块定义完成")

In [None]:
# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, seq_len, _ = x.shape
        Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        weights = torch.softmax(scores, dim=-1)
        heads = torch.matmul(weights, V)
        concat = heads.transpose(1, 2).contiguous().view(batch_size, seq_len, -1)
        return self.W_o(concat), weights

print("✓ Multi-Head Attention 完成")

In [None]:
# Transformer Block & Full Model
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.GELU(),
            nn.Linear(d_ff, d_model),
            nn.Dropout(dropout)
        )
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        x = x + self.attention(self.norm1(x), mask)[0]
        x = x + self.ffn(self.norm2(x))
        return x

class MiniTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=128, num_heads=4, num_layers=2, d_ff=512, max_len=256, dropout=0.1):
        super().__init__()
        self.token_embedding = TokenEmbedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.blocks = nn.ModuleList([TransformerBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.output = nn.Linear(d_model, vocab_size)

    def forward(self, x, mask=None):
        x = self.token_embedding(x)
        x = self.pos_encoding(x)
        for block in self.blocks:
            x = block(x, mask)
        return self.output(self.norm(x))

print("✓ 完整模型定义完成")

## 2. 训练

In [None]:
# 准备数据
text = """To be, or not to be, that is the question:
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles
"""

chars = sorted(list(set(text)))
vocab_size = len(chars)
char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}

def encode(text): return [char_to_idx[c] for c in text]
def decode(ids): return ''.join([idx_to_char[i] for i in ids])

data = encode(text)
print(f"词表大小: {vocab_size}, Token 数量: {len(data)}")

In [None]:
# 训练
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = MiniTransformer(vocab_size, d_model=64, num_heads=4, num_layers=2, d_ff=256).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss()

seq_len = 32
X = torch.tensor([data[:seq_len]], dtype=torch.long).to(device)
Y = torch.tensor([data[1:seq_len+1]], dtype=torch.long).to(device)

model.train()
initial_loss = None
for epoch in range(200):
    logits = model(X)
    loss = criterion(logits.view(-1, vocab_size), Y.view(-1))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if initial_loss is None: initial_loss = loss.item()
    if (epoch + 1) % 50 == 0:
        print(f"Epoch {epoch+1}: Loss = {loss.item():.4f}")

reduction = (initial_loss - loss.item()) / initial_loss * 100
print(f"\nLoss 下降: {reduction:.1f}%")

## 3. 生成

In [None]:
# 生成文本
def generate(model, prompt, max_new_tokens=50, temperature=0.8):
    model.eval()
    ids = torch.tensor([encode(prompt)], dtype=torch.long).to(device)
    with torch.no_grad():
        for _ in range(max_new_tokens):
            logits = model(ids[:, -64:])[:, -1, :] / temperature
            probs = torch.softmax(logits, dim=-1)
            next_id = torch.multinomial(probs, 1)
            ids = torch.cat([ids, next_id], dim=1)
    return decode(ids[0].tolist())

generated = generate(model, "To be", max_new_tokens=30)
print(f"生成结果:\n{generated}")
print(f"\n生成 token 数: {len(generated) - len('To be')}")