# Model

```{note}
本节我们以 Qwen3 模型架构为参考，自己实现一个 LLM 模型。
```

## 总体结构

之前我们实现了一个简单的 `SimpleNextTokenModel`，用于预测下一个 token，但它的结构非常简单甚至退化成了 1-gram model，即 $P(y_{t+1} | y_1, y_2, \dots, y_t) = P(y_{t+1}|y_{t})$。现在我们不来虚的，直接实现 Qwen3 模型的架构。

```{figure} llm.drawio.svg
```

## RMSNorm

### RMSNorm vs LayerNorm

RMSNorm 是 LayerNorm 的一种简化变体。

核心区别：
LayerNorm 会先减去均值，再除以标准差。
RMSNorm 省略了“减去均值”这一步，直接除以均方根（Root Mean Square）。

公式：
$$ \bar{a}_i = \frac{a_i}{\text{RMS}(a)} g_i, \quad \text{其中} \quad \text{RMS}(a) = \sqrt{\frac{1}{n} \sum_{i=1}^{n} a_i^2} $$

为什么用它？
1. 计算更省：少算一个均值，速度更快。
2. 效果相当：研究发现 LayerNorm 起作用的主要原因是缩放，平移其实没那么重要。

### Pre-Norm vs Post-Norm

这是指 Normalization 层放置的位置不同。

1. Post-Norm：$x_{t+1} = \text{Norm}(x_t + F(x_t))$，归一化在残差连接之后。

2. Pre-Norm：$x_{t+1} = x_t + F(\text{Norm}(x_t))$，归一化在进入子层之前，Add 是干净的直接相加。

为什么现在的 LLM 普遍用 Pre-Norm？主要是为了**训练稳定性**。

在 Pre-Norm 中，梯度有一条“高速公路”（即残差连接的 $x_t$ 部分），可以直接从最后一层无损地传到第一层。这使得深层网络非常容易收敛，不需要太多复杂的调参技巧就能训得很稳。

In [6]:
import math
import torch
import torch.nn as nn
from typing import Optional


class RMSNorm(nn.Module):
    """
    RMSNorm：仅通过均方根（Root Mean Square）进行归一化，不减均值。

    归一化方式：
        y = x / sqrt(mean(x^2) + eps) * weight

    形状约定：
        输入 x 形状为 (..., dim)，在最后一维 dim 上归一化；weight 的形状为 [dim]，按最后一维广播。
    """
    def __init__(self, dim, eps=1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))
    
    def _norm(self, x):
        # root mean square
        ms = x.pow(2).mean(-1, keepdim=True)
        return x * torch.rsqrt(ms + self.eps)

    def forward(self, x):
        # x shape: (batch_size, seq_len, dim) 
        output = self._norm(x.float()).type_as(x)
        return output * self.weight

## RoPE

```{tip}
RoPE 是一种相对位置编码方法。它不直接把位置信息加到 Embedding 上，而是通过旋转 Query 和 Key 向量的角度来注入位置信息，让注意力会直接依赖“相对距离”。
```

### 1. 2D Rotation

RoPE 将向量每两个维度分为一组（视为复平面上的一个点），对于第 $i$ 组分量 $(x_{2i}, x_{2i+1})$ 在位置 $m$ 处，应用以下二维旋转矩阵 $\mathbf{R}_{m, i}$：

$$
\mathbf{R}_{m, i} = \begin{pmatrix}
\cos(m \theta_i) & -\sin(m \theta_i) \\
\sin(m \theta_i) & \cos(m \theta_i)
\end{pmatrix}
$$

其中 $\theta_i$ 是频率参数，通常定义为：
$$
\theta_i = 10000^{-2i/d}, \quad i \in \{0, 1, \dots, d/2-1\}
$$

### 2. Full Rotation Matrix

对于维度为 $d$ 的向量 $\mathbf{x}$，在位置 $m$ 处的完整旋转矩阵 $\mathcal{R}_{\Theta, m}$ 是一个稀疏的块对角矩阵 (Block Diagonal Matrix)：

$$
\mathcal{R}_{\Theta, m} = \begin{pmatrix}
\mathbf{R}_{m, 0} & \mathbf{0} & \cdots & \mathbf{0} \\
\mathbf{0} & \mathbf{R}_{m, 1} & \cdots & \mathbf{0} \\
\vdots & \vdots & \ddots & \vdots \\
\mathbf{0} & \mathbf{0} & \cdots & \mathbf{R}_{m, d/2-1}
\end{pmatrix}
$$

展开来看就是：

$$
\mathcal{R}_{\Theta, m} = \begin{pmatrix}
\cos m\theta_0 & -\sin m\theta_0 & 0 & 0 & \cdots \\
\sin m\theta_0 & \cos m\theta_0 & 0 & 0 & \cdots \\
0 & 0 & \cos m\theta_1 & -\sin m\theta_1 & \cdots \\
0 & 0 & \sin m\theta_1 & \cos m\theta_1 & \cdots \\
\vdots & \vdots & \vdots & \vdots & \ddots
\end{pmatrix}
$$

### 3. 应用于 Query 和 Key

对于位置 $m$ 的 Query 向量 $\mathbf{q}$ 和位置 $n$ 的 Key 向量 $\mathbf{k}$，应用旋转：

$$
\mathbf{q}'_m = \mathcal{R}_{\Theta, m} \mathbf{q}, \quad \mathbf{k}'_n = \mathcal{R}_{\Theta, n} \mathbf{k}
$$

### 4. 相对位置性质

RoPE 的核心优势在于点积（Attention Score）只依赖于相对位置 $(n-m)$：

$$
\begin{aligned}
(\mathbf{q}'_m)^T \mathbf{k}'_n &= (\mathcal{R}_{\Theta, m} \mathbf{q})^T (\mathcal{R}_{\Theta, n} \mathbf{k}) \\
&= \mathbf{q}^T \mathcal{R}_{\Theta, m}^T \mathcal{R}_{\Theta, n} \mathbf{k} \\
&= \mathbf{q}^T \mathcal{R}_{\Theta, n-m} \mathbf{k}
\end{aligned}
$$

这是因为旋转矩阵是正交矩阵，且 $\mathcal{R}_{\Theta, m}^T = \mathcal{R}_{\Theta, -m}$，所以 $\mathcal{R}_{\Theta, -m} \mathcal{R}_{\Theta, n} = \mathcal{R}_{\Theta, n-m}$。

### 5. 高效计算

原版 RoPE 是将相邻的两个数 $(x_{2i}, x_{2i+1})$ 作为一个复数对进行旋转。而现在许多模型的实现是将向量劈成两半，将 $(x_i, x_{i + d/2})$ 作为一个复数对进行旋转。它们在数学上是完全等效的，区别仅仅是元素的排列顺序不同：
- 原版：[实1, 虚1, 实2, 虚2, ...]
- 现版：[实1, 实2, ..., 虚1, 虚2, ...]

现版把输入向量 $x$ 切分成前后两半：$x_{left}$ 和 $x_{right}$。
- `rotate_half(x)` 变成了 $[-x_{right}, x_{left}]$
- `cos` 是 $[C, C]$，其中 $C = [\cos(m\Theta_0), \dots, \cos(m\Theta_{d/2-1})]$
- `sin` 是 $[S, S]$, 其中 $S = [\sin(m\Theta_0), \dots, \sin(m\Theta_{d/2-1})]$

代入公式 `(x * cos) + (rotate_half(x) * sin)`，我们可以分别看前半部分和后半部分的结果：

前半部分：
$$ \text{Out}_{left} = x_{left} \cdot C + (-x_{right}) \cdot S $$

后半部分：
$$ \text{Out}_{right} = x_{right} \cdot C + (x_{left}) \cdot S $$

把它们合起来看

$$ \text{Out} = [x_{left} \cdot C + (-x_{right}) \cdot S, x_{right} \cdot C + (x_{left}) \cdot S] $$

```{tip}
`x[..., :half]` 和 `x[..., half:]` 都是巨大的、连续的内存块，这种连续的内存读写能最大化 GPU 的显存带宽利用率。
```

In [None]:
def apply_rope_emb(
    x: torch.Tensor,
    cos: torch.Tensor,
    sin: torch.Tensor,
) -> torch.Tensor:
    # 将输入向量 x 在最后一个维度切分成两半 (前半部分 x1, 后半部分 x2)
    x1 = x[..., : x.shape[-1] // 2]
    x2 = x[..., x.shape[-1] // 2 :]
    
    # 应用旋转公式，(x_{i}, x_{i+d/2}) 是一个复数：
    # new_x1 = x1 * cos - x2 * sin
    # new_x2 = x2 * cos + x1 * sin
    y1 = x1 * cos - x2 * sin
    y2 = x2 * cos + x1 * sin
    
    return torch.cat((y1, y2), dim=-1).to(x.dtype)


class RotaryEmbedding(nn.Module):

    def __init__(
        self,
        head_size: int,
        max_position_embeddings: int,
        base: float,
    ) -> None:
        super().__init__()
        # theta_j
        inv_freq = 1.0 / (base**(torch.arange(0, head_size, 2, dtype=torch.float) / head_size))

        t = torch.arange(max_position_embeddings, dtype=torch.float)
        
        # m * theta_j
        freqs = torch.outer(t, inv_freq)
        
        # 计算 cos 和 sin
        cos = freqs.cos()
        sin = freqs.sin()
        
        # 预先拼接 cos 和 sin，存入缓存
        # unsqueeze(1) 是为了增加一个维度 [max_pos, 1, dim]，方便后续对齐 head 维度进行广播
        cache = torch.cat((cos, sin), dim=-1).unsqueeze_(1)
        
        # 注册为 buffer：它是模型状态的一部分，但不参与梯度更新
        # persistent=False：通常表示保存 checkpoint 时不一定要存它（因为可以算出来），减小模型体积
        self.register_buffer("cos_sin_cache", cache, persistent=False)

    # @torch.compile 是 PyTorch 2.0 的核心特性，用于算子融合图编译加速
    @torch.compile
    def forward(
        self,
        positions: torch.Tensor
    ) -> tuple[torch.Tensor, torch.Tensor]:
        # 根据输入的位置 ID，从缓存中“查表”取出对应的 cos/sin
        # positions: [batch, seq_len]
        # 取出后 cos_sin: [batch, seq_len, 1, dim]
        cos_sin = self.cos_sin_cache[positions]
        
        # 将取出的缓存再次切分回 cos 和 sin 部分
        cos, sin = cos_sin.chunk(2, dim=-1)
        # 调整形状以支持广播: [batch, seq_len, 1, dim/2] -> [batch, 1, seq_len, dim/2]
        cos = cos.transpose(1, 2)
        sin = sin.transpose(1, 2)
        
        return cos, sin

## Multi-Head Attention
```{tip}
普通的 Attention 就像一个人读文章，一次只能关注一个重点。
Multi-Head Attention 就像是把文章分给好几个人（Head）同时读，每个人关注不同的侧重点。比如有的关注语法结构，有的关注词义关联，有的关注上下文指代。最后把大家看到的信息汇总（Concat）起来，理解就更全面。
```

1. 单个 Head 的计算：假设输入是 $Q, K, V$。每个 Head 都有自己独立的投影矩阵 $W^Q_i, W^K_i, W^V_i$。

    $$ \text{Attention}(Q, K, V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V $$

    这里除以 $\sqrt{d_k}$ 是为了防止点积结果过大导致梯度消失。

2. 多头整合：把每个 Head 的结果拼接起来，再经过一个线性变换 $W^O$ 输出。

    $$ \text{MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h)W^O $$

    其中每个 $\text{head}_i$ 是：

    $$ \text{head}_i = \text{Attention}(QW_i^Q, KW_i^K, VW_i^V) $$

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head, dropout=0.0):
        super().__init__()
        assert d_model % n_head == 0, "d_model 必须能整除 n_head"
        self.d_model = d_model
        self.n_head = n_head
        self.head_dim = d_model // n_head
        assert self.head_dim % 2 == 0, "head_dim 必须为偶数以支持 RoPE"
        # nn.Linear 类似于 nn.Parameter(torch.empty(d_model, n_head * head_dim))
        # 然后 nn.init.kaiming_uniform_ 初始化
        self.wq = nn.Linear(d_model, n_head * head_dim, bias=False)
        self.wk = nn.Linear(d_model, n_head * head_dim, bias=False)
        self.wv = nn.Linear(d_model, n_head * head_dim, bias=False)
        self.wo = nn.Linear(n_head * head_dim, d_model, bias=False)
        # 在 Q 和 K 上额外加 RMSNorm
        self.q_norm = RMSNorm(head_dim)
        self.k_norm = RMSNorm(head_dim)
        self.attn_dropout = dropout

    def forward(self, x: torch.Tensor, position_embeddings: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        """
        支持掩码的多头注意力层
        """
        # x shape: [b, t, d_model]
        q = self.wq(x)  # [b, t, n_head * head_dim]
        k = self.wk(x)
        v = self.wv(x)
        # 分头，需先 reshape 再 transpose -> [b, n_head, t, head_dim]
        q = self.q_norm(q.reshape(q.shape[0], q.shape[1], self.n_head, self.head_dim)).transpose(1, 2)
        k = self.k_norm(k.reshape(k.shape[0], k.shape[1], self.n_head, self.head_dim)).transpose(1, 2)
        v = v.reshape(v.shape[0], v.shape[1], self.n_head, self.head_dim).transpose(1, 2)
        # RoPE 嵌入
        cos, sin = position_embeddings
        q = apply_rope_emb(q, cos, sin)
        k = apply_rope_emb(k, cos, sin)

        # 多头注意力 [b, n_head, t, t]
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        if attention_mask is not None:
            scores = scores + attention_mask
        # 转化为 float32 以提高计算精度
        attn_weights = nn.functional.softmax(scores, dim=-1, dtype=torch.float32).to(q.dtype)
        # Dropout: 在 softmax 之后，乘 v 之前, 训练时生效
        attn_weights = nn.functional.dropout(attn_weights, p=self.attn_dropout, training=self.training)
        
        # 合并头 [b, n_head, t, head_dim] -> [b, t, n_head * head_dim]
        o = torch.matmul(attn_weights, v).transpose(1, 2).reshape(o.shape[0], o.shape[1], -1)
        return self.wo(o)

## FFN

现代 LLM（如 LLaMA, Qwen, Mistral）普遍采用的是 **SwiGLU** 结构的 FFN，而不是传统的两层 MLP。

**1. 结构解析 (Gate, Up, Down)**

传统的 MLP 只有两层线性变换：先放大 (Up)，激活，再缩小 (Down)。
公式：output = Down(Act(Up(x)))

现在的 Gated MLP (SwiGLU) 有三层线性变换：
- gate_proj：门控投影，负责计算“通过率”。
- up_proj：信号投影，负责提取特征。
- down_proj：输出投影，负责映射回原来的维度。

核心公式：
$$ \text{FFN}(x) = \text{down\_proj}(\text{SiLU}(\text{gate\_proj}(x)) \odot \text{up\_proj}(x)) $$

这里 $\odot$ 代表逐元素相乘。
直观理解是：`gate_proj` 经过激活后，像一个阀门一样，控制 `up_proj` 提取的信息有多少能流向下一层。这种“门控”机制让模型能更灵活地选择性保留信息。

**2. 为什么用 SiLU (Swish) 激活函数？**

SiLU 的公式是 $f(x) = x \cdot \sigma(x)$，其中 $\sigma$ 是 Sigmoid 函数。

相比于 ReLU，它的优势在于：
- 平滑性：SiLU 是一条光滑的曲线，处处可导，这有利于梯度的稳定传播。
- 非单调性：在 x 为负值时，SiLU 允许少量的负输出（有一个小的波谷），而不是像 ReLU 一样直接砍成 0。这种特性被证明能帮助深层网络更好地学习复杂的特征。

总结来说，SwiGLU 结构配合 SiLU 激活函数，虽然比传统 MLP 多了一个矩阵运算，但在同等参数量下能带来更好的模型性能（Perplexity 更低，收敛更快）。

In [15]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_hidden):
        super().__init__()
        self.gate_proj = nn.Linear(d_model, d_hidden, bias=False)
        self.up_proj = nn.Linear(d_model, d_hidden, bias=False)
        self.down_proj = nn.Linear(d_hidden, d_model, bias=False)

    def forward(self, x: torch.Tensor):
        # [b, t, d_model] -> [b, t, d_hidden] -> [b, t, d_model]
        return self.down_proj(nn.functional.silu(self.gate_proj(x)) * self.up_proj(x))

## 整个模型

```{note}
现在我们已经拥有所有必要的的组件了，我们可以把它们组合起来，构建一个完整的模型。
```

In [19]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_head, d_hidden, dropout=0.0):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, n_head, dropout)
        self.ffn = FeedForward(d_model, d_hidden)
        self.input_norm = RMSNorm(d_model)
        self.post_attention_norm = RMSNorm(d_model)

    def forward(self, x: torch.Tensor, position_embeddings: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        """
        支持掩码的 Transformer 层
        """
        residual = x
        x = self.input_norm(x)
        # self attention
        x = self.attn(
            x=x,
            position_embeddings=position_embeddings,
            attention_mask=attention_mask,
        )
        x = x + residual

        # fully connected
        residual = x
        x = self.post_attention_norm(x)
        x = self.ffn(x)
        x = x + residual
        return x


class TransformerModel(nn.Module):
    def __init__(self, d_model, n_head, d_hidden, n_layer, vocab_size, max_seq_len=8192, rope_theta=10000.0, dropout=0.0):
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, d_model)
        self.layers = nn.ModuleList([TransformerBlock(d_model, n_head, d_hidden, dropout) for _ in range(n_layer)])
        self.norm = RMSNorm(d_model)
        self.output = nn.Linear(d_model, vocab_size, bias=False)
        # RoPE 频率按 head_dim 计算，而不是 d_model
        self.rotary_emb = RotaryEmbedding(d_model // n_head, max_seq_len, rope_theta)

    def forward(self, input_ids: torch.Tensor, position_ids: torch.Tensor) -> torch.Tensor:
        """
        Causal Transformer 模型
        """
        # 词嵌入
        x = self.embeddings(input_ids)

        # 因果三角掩码（加性）：未来位置设为 -inf，其余为 0
        # torch.triu 生成上三角矩阵，diagonal=1 表示主对角线以下为 0
        t = input_ids.shape[1]
        attention_mask = torch.triu(torch.full((t, t), float('-inf'), device=input_ids.device), diagonal=1).view(1, 1, t, t)

        # 层循环
        position_embeddings = self.rotary_emb(position_ids)
        for layer in self.layers:
            x = layer(x=x, position_embeddings=position_embeddings, attention_mask=attention_mask)
        # 输出层
        return self.output(self.norm(x))