### 全局配置

In [1]:
from dataclasses import dataclass
from typing import Optional
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

# 全局配置类
@dataclass
class ModelArgs:
    dim: int = 4096  # llama嵌入维度为4096
    n_layers: int = 32
    n_heads: int = 32 # Q的头数
    n_kv_heads: Optional[int] = None # K,V的头数 使用Group Multiple Query
    vocab_size: int = -1
    multiple_of: int = 256
    ffn_dim_multiplier: Optional[float] = None
    norm_eps: float = 1e-5

    # KV cache变量
    max_batch_size: int = 32
    max_seq_len: int = 2048

    device: str = None

## RoPE
### 计算旋转矩阵 $R_{\Theta, m}^d$
实际上需要进行计算简化，因此函数计算得到的是一个复数矩阵

<img src="imgs/complexMat.png" alt="mT@thetha" style="width:30%; height:auto;" />


In [None]:
def precompute_theta_pos_frequencies(head_dim: int,seq_len: int,device: str, theta: float = 10000.0):
    assert head_dim % 2 == 0, "单头嵌入维度必须能被2整除"

    # (Head_dim / 2)  [0,2,4,6,...Head_dim - 2]
    theta_numerator = torch.arange(0, head_dim, 2).float()

    # (Head_dim / 2)  θi公式
    theta = 1.0 / (theta ** (theta_numerator / head_dim)).to(device)

    # (Seq_len)       [0,1,2,3,4,....,Seq_len - 1]
    m = torch.arange(seq_len,device=device)

    # m.T @ theta (注意仅为示例,m与θ为1D向量)
    # (Seq_len,1) @ (1,Head_dim / 2) => (Seq_len, Head_dim / 2)
    freqs = torch.outer(m,theta).float()

    # 转换为极坐标形式 c = R * exp(m * theta), R = 1 
    # torch.polar(abs = 1,angle = freqs)
    freqs_complex = torch.polar(torch.ones_like(freqs), freqs)
    
    return freqs_complex


### 计算旋转过程$R_{\Theta, m}^d x$
简化旋转矩阵过程后的最终形式:

<img src="imgs/R.png" alt="Rx" style="width:30%; height:auto;" />

推导:

<img src="imgs/rotate.png" alt="RxGet" style="width:30%; height:auto;" />


In [None]:
def apply_rotary_embeddings(x: torch.Tensor, freqs_complex: torch.Tensor, device: str):

    # STEP1,2: 每两个合并为1个复数
    # (B, Seq_len , H, Head_dim) -> (B, Seq_len, H, Head_dim/2, 2) 
    # -> 合并复数后: (B, Seq_len , H, Head_dim)
    x_complex = torch.view_as_complex(x.float().reshape(*x.shape[:-1], -1, 2))

    # 确保freqs_complex维度匹配
    # (Seq_len, Head_dim/2)    ->  (1, Seq_len, Head_dim/2)
    # (1, Seq_len, Head_dim/2) ->  (1, Seq_len, 1, Head_dim/2)
    freqs_complex = freqs_complex.unsqueeze(0).unsqueeze(2)

    # STEP3 相乘
    # (B, Seq_len, H, Head_dim/2) * (1, Seq_len, 1, Head_dim/2)
    # => (B, Seq_len, H, Head_dim/2)
    x_rotated = x_complex * freqs_complex

    # STEP4 转换为数对
    # (B, Seq_len, H, Head_dim/2) -> (B, Seq_len, H, Head_dim/2, 2)
    x_out = torch.view_as_real(x_rotated)

    # STEP5 Flatten
    # (B, Seq_len, H, Head_dim/2, 2) -> (B, Seq_len, H, Head_dim/2) 
    x_out = x_out.reshape(*x.shape)

    return x_out.type_as(x).to(device)

### 总体模型结构
<img src="imgs/arch.PNG" alt="model arch" style="width:30%; height:auto;" />

In [2]:
class Transformer(nn.Module):

    def __init__(self,args: ModelArgs):
        super().__init__()

        assert args.vocab_size != -1, "未设置词表大小"

        self.args = args
        self.vocab_size = args.vocab_size
        self.n_layers = args.n_layers
        # 嵌入层
        self.tok_embeddings = nn.Embedding(self.vocab_size,args.dim)

        # N层堆叠的encoder块
        self.layers = nn.ModuleList()
        for layer_id in range(args.n_layers):
            self.layers.append(EncoderBlock(args))

        self.norm = RMSNorm(args.dim,eps = args.norm_eps)
        
        self.output = nn.Linear(args.dim, self.vocab_size, bias=False)

        # RoPE位置编码
        self.freqs_complex = precompute_theta_pos_frequencies(
            self.args.dim // self.args.n_heads,
            self.args.max_seq_len * 2,
            device = self.args.device
        )

    def forward(self, tokens: torch.Tensor, start_pos: int):
        # KV-cache仅限推理!

        # (B, Seq_len)
        batch_size, seq_len = tokens.shape
        assert seq_len == 1, "KV缓存,Q仅为每次更新的一个token 一次处理一个token!"

        # (B, Seq_len) -> (B, Seq_len , Dim)
        h = self.tok_embeddings(tokens)

        # RoPE编码
        freq_complex = self.freqs_complex[start_pos:start_pos + seq_len]

        for layer in self.layers:
            h = layer(h,start_pos,freq_complex)
        
        # RMSNorm
        h = self.norm(h)

        # Linear
        output = self.output(h).float()

        # Softmax 在 loss 中
        return output



你给的这个公式正是 **RoPE（Rotary Position Embedding）中定义的旋转变换矩阵 $R_{\Theta, m}^d$** 的向量化形式。

---

## 1. 它是什么

* $R_{\Theta, m}^d x$ 表示将一个 d 维向量 $x$ **按照 RoPE 的规则进行位置相关的旋转**。
* 旋转的方式是：

  * 把向量分成 $(x_1, x_2)$, $(x_3, x_4)$, … 这样的二维对
  * 每个二维对按照不同频率 $\theta_i$ 做二维旋转
* 公式里：

  * $\cos m\theta_k$ 和 $\sin m\theta_k$ 是旋转角度的 cos、sin 分量
  * $\otimes$ 是逐元素相乘（Hadamard product）
  * 第一部分是向量与 cos 部分逐元素相乘
  * 第二部分是向量与 sin 部分逐元素相乘，但注意它的排列是做二维旋转需要的交换（即 $x, y \to -y, x$）

换句话说，这个 $R_{\Theta, m}^d$ 就是把 Q 或 K **在每个二维子空间上绕原点旋转 m 倍频率的角度**。

---

## 2. 为什么这么写

普通二维旋转矩阵：

$$
\begin{bmatrix}
\cos\theta & -\sin\theta \\
\sin\theta & \cos\theta
\end{bmatrix}
\begin{bmatrix}
x \\
y
\end{bmatrix}
$$

作用到每个 $(x_{2i-1}, x_{2i})$ 对上，就是 RoPE 的核心。

这个公式把所有二维对合并成一个向量化写法：

* **第一项**：$x$ 与 cos 部分逐元素相乘
* **第二项**：交换奇偶位置（并在偶数位取负），再与 sin 部分逐元素相乘，然后相加

这样能一次性计算整个向量的旋转效果，方便在矩阵/张量实现中批处理。

---

## 3. 在 RoPE 中的意义

在 RoPE 里：

$$
Q' = R_{\Theta, pos} Q,\quad K' = R_{\Theta, pos} K
$$

其中：

* $\theta_k = 10000^{-2(k-1)/d}$（频率随维度变化）
* $m$ 是位置（pos），也就是旋转角度与 token 位置绑定
* 这样一来，Q 与 K 的点积：

$$
Q'_{pos} \cdot K'_{pos'} \quad \Rightarrow \quad \text{只与 } (pos - pos') \text{相关}
$$

→ 这就是 RoPE 的相对位置编码特性。

---

如果你愿意，我可以帮你画一个**二维平面上向量旋转的直观动画**，让你看出这个 $R$ 是怎么把向量“转”起来的。
这样你一眼就能看懂 RoPE 的公式含义。你要我画吗？
