# 自注意力机制相关知识点

### 一、一些疑问：

1、为什么要做QKV线性变换：  
为了让模型学会不同类型的信息表示，通过不同的变换，我们可以让查询、键、值专注于各自最有用的信息。

2、为什么要除以sqrt(d_k):  
防⽌Softmax函数的梯度消失  
点积的数值大小问题：维度d_k越大，点积的值波动越大，绝对值也越大。  
softmax问题：当输入的绝对值很大时，softmax 的输出会趋近于 0 或 1（即进入“饱和区”），此时梯度非常小，导致反向传播时梯度消失，模型难以有效学习。  
缩放的作用：控制方差，除以sqrt(d_K)的依据是原始QK方差为d_k，而QK/sqrt(d_K)的方差是1，使得softmax的输入保持在一个合理的范围内。  

3、为什么要用softmax：    
将相似度分数转化为概率分布  
归一化：Softmax确保所有注意力权重之和为1，这样输出就是输入的加权平均。   
非负性：Softmax输出都是正数，符合"注意力程度"的直观理解。  
可解释性：输出值可以直接解释为"关注程度"。  
可微性：Softmax是可导的，便于反向传播训。  

4、为什么要使用多头注意力机制：  
与其使用一个头，不如使用多个头并行的关注不同的信息子空间。    
线性投影：将输入分别投影到 个不同的子空间  
并行计算：在每个子空间中独立计算注意力  
拼接输出：将所有头的输出拼接起来  
最终线性变换：将拼接结果映射到期望的输出维度  
优势是：不同的表示子空间，增强模型的表达能力，稳定训练。  

### 二、自注意力机制的时空复杂度：

时间复杂度  
1、计算QKV矩阵：：O(n·d_model²)  
2、多头分割：O(n·d_model)  
3、注意力分数计算：O(n²·d_k) = O(n²·d_model/num_heads)  
4、注意力权重计算：O(n²·num_heads)  
5、注意力加权：O(n²·d_k) = O(n²·d_model/num_heads)  
6、输出投影：O(n·d_model²)  

总时间复杂度：O(n·d_model² + n²·d_model/num_heads)

空间复杂度：    
1、输入存储：O(n·d_model)  
2、QKV变换矩阵存储：O(d_model²)  
3、QKV中间结果存储：O(n·d_model)  
4、多头分割后存储：O(n·d_model)  
5、注意力矩阵存储：O(n²·num_heads)  
6、输出存储：O(n·d_model)

总空间复杂度：O(d_model² + n·d_model + n²·num_heads)

### 三、多头注意力机制（MHA）代码演示：

In [5]:
import torch
import torch.nn as nn
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model #Linear（self,in,out)输入维度只传入一个参数是在说明“我对每个向量的 d_model 维特征做线性变换，不管有多少个向量。”
        self.W_k = nn.Linear(d_model, d_model) #Linear是向量处理器，设计哲学是它不处理“整个矩阵的运算”，而是处理“每个向量的变换”。
        self.W_v = nn.Linear(d_model, d_model) #会自动对每一个 d_model 维的向量做相同的线性变换
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):#此处的querykeyvalue都是同一个值“输入向量x”
        batch_size = query.size(0)#.size(0)获取第0维的大小，即batch_size

        Q = self.W_q(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2) #self.W_q(query) 会触发 self.W_q 实例的 __call__ 方法。
        K = self.W_k(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2) #进而调用其 forward 方法。
        V = self.W_v(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2) #对输入 query 的每一个向量进行线性变换，返回一个形状相同的输出张量 Q。

        #调用函数计算注意力分数
        attention_output,attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)

        #拼接
        #.transpose()：只改“怎么读数据”（逻辑索引），不改“数据在哪”（物理顺序）
        #.contiguous()：把数据复制一份，按新的逻辑顺序重新排列在内存中
        #.view()只能对连续存储的数据进行操作
        concat_attention = attention_output.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)

        #最终线性变换
        output = self.W_o(concat_attention)
        return output, attention_weights

    def  scaled_dot_product_attention(self, Q, K, V, mask=None):
        #注意力分数
        scores = torch.matmul(Q, K.transpose(-2,-1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) 
        #把 d_k 变成张量，不是因为“不能算”，而是因为“要融入计算图”。

        #应用填充掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9) #masked_fill(mask,value)把输入张量中，对应 mask 为 True 的位置，全部替换成 value。

        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        return output, attention_weights

# 使用示例
d_model = 512
num_heads = 8
mha = MultiHeadAttention(d_model, num_heads)
# 输入序列
seq_len = 100
batch_size = 32
x = torch.randn(batch_size, seq_len, d_model)
# 前向传播
output, attention_weights = mha(x, x, x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")

输入形状: torch.Size([32, 100, 512])
输出形状: torch.Size([32, 100, 512])
注意力权重形状: torch.Size([32, 8, 100, 100])


### 四、多头注意力机制的复杂度分析

多头注意力通过并行计算，实现了"看起来"有多个注意力机制，但实际上并没有显著增加计算复杂度。   
1、参数总量不变  
2、可以并行计算  

**复杂度对比：**    
n：序列长度  
d_model：模型隐藏维度  
num_heads：注意力头数  
d_k = d_model / num_heads：每个注意力头的维度      

![图片无法显示](../image/MHA复杂度分析.png "MHA复杂度分析表")

**为什么复杂度相同但性能更好？**  
1. 并行计算：num_heads个小矩阵可以并行计算，而单头的大矩阵计算受限于硬件  
2. 内存访问模式：连续的⼩块内存访问比大块稀疏访问更高效  
3. 表达能力：多个头可以学习不同的注意力模式，表达能力更强  
4. 梯度流：多个独立的注意力路径有助于梯度传播，训练更稳定  

**实际效率提升：**  
1. 缓存效率：并行计算更好地利用了GPU的并行能力   
2. 内存访问模式：连续的数据访问模式更高效  
3. 指令级并⾏：现代CPU/GPU的SIMD指令集  

### 五、GQA注意力机制

相当于将多头分为多个小组，然后按照小组来进行查询，每一个小组共用一个多头。    
将查询头分组，每组共享键和值头，在保持性能的同时减少计算量。  
核心思想：   
多个查询头共享相同的键和值  
减少KV  
缓存的内存占用  
保持查询的多样性  

In [10]:
import torch
import torch.nn as nn
class GroupedQueryAttention(nn.Module):
    def __init__(self, d_model, num_heads, num_kv_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.num_kv_heads = num_kv_heads
        self.d_k = d_model // num_heads
        self.head_group_size = num_heads // num_kv_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model // num_heads * num_kv_heads)
        self.W_v = nn.Linear(d_model, d_model // num_heads * num_kv_heads)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        sql_len = query.size(1)

        #计算OKV
        Q = self.W_q(query).view(batch_size, seq_len, self.num_heads, self.d_k)
        K = self.W_k(key).view(batch_size, seq_len, self.num_kv_heads, self.d_k)
        V = self.W_v(value).view(batch_size, seq_len, self.num_kv_heads, self.d_k)

        #扩展kv
        K = K.unsqueeze(2).expand(-1, -1, self.head_group_size, -1, -1)
        V = V.unsqueeze(2).expand(-1, -1, self.head_group_size, -1, -1)

        #重塑张量
        Q = Q.transpose(1, 2)
        K = K.reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = V.reshape(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        #计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))

        #填充掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0,-1e9)

        #计算注意力权重
        attention_weights = torch.softmax(scores, dim=-1)
        #计算注意力输出
        output = torch.matmul(attention_weights, V)

        #重塑输出，合并多头
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        #学会多头信息
        output = self.W_o(output)

        return output, attention_weights

# 使用示例
d_model = 512
num_heads = 32
num_kv_heads = 8  # 4:1的压缩比
gqa = GroupedQueryAttention(d_model, num_heads, num_kv_heads)
# 输入
batch_size = 32
seq_len = 1000
x = torch.randn(batch_size, seq_len, d_model)
# 前向传播
output, attention_weights = gqa(x, x, x)
print(f"GQA输出形状: {output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")

GQA输出形状: torch.Size([32, 1000, 512])
注意力权重形状: torch.Size([32, 32, 1000, 1000])


**时空复杂度分析：**  
见教案

### 六、MQA注意力机制

GQA的极端情况，所有查询头共享相同的键和值，进⼀步减少内存占⽤。  
核⼼思想：  
所有查询头共享⼀个键头和⼀个值头  
最⼤化内存效率  
可能牺牲⼀些模型质量  

In [None]:
class MultiQueryAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, self.d_k)  # 单个键头
        self.W_v = nn.Linear(d_model, self.d_k)  # 单个值头
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        seq_len = query.size(1)
        
        # 计算Q (多个头)
        Q = self.W_q(query).view(batch_size, seq_len, self.num_heads, self.d_k)
        Q = Q.transpose(1, 2)  # (batch_size, num_heads, seq_len, d_k)
        
        # 计算K和V (单个头)
        K = self.W_k(key).unsqueeze(1)  # (batch_size, 1, seq_len, d_k)
        V = self.W_v(value).unsqueeze(1)  # (batch_size, 1, seq_len, d_k)
        
        # 扩展K和V以匹配所有查询头
        K = K.expand(-1, self.num_heads, -1, -1)
        V = V.expand(-1, self.num_heads, -1, -1)
        
        # 计算注意力
        scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=to
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
            
        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        
        # 重塑输出
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        output = self.W_o(output)
        
        return output, attention_weights
 # 使用示例
d_model = 512
num_heads = 32
mqa = MultiQueryAttention(d_model, num_heads)
# 输入
batch_size = 32
seq_len = 1000
x = torch.randn(batch_size, seq_len, d_model)
# 前向传播
output, attention_weights = mqa(x, x, x)
print(f"MQA输出形状: {output.shape}")
print(f"注意力权重形状: {attention_weights.shape}")

### 七、MHA，GQA，MQA的复杂度分析

见教案

### 八、MLA的学习（Multi-Head Latent Attention）

#### MLA的核心思想：（共享低维kv+查询解码）  
**共享低维 Key/Value (K/V)**：所有注意力头共享一组低维的、数量极少的 K/V 向量（称为 kv_dim，远小于 d_k）。  
**Latent KV Cache**	：这组低维 K/V 被缓存，称为 Latent KV Cache，大小仅为传统 KV Cache 的 1/10 甚至更小。  
**Query-to-KV 解码器** ：每个 Query 头通过一个小型解码器网络（通常是 1x1 卷积或线性层），从共享的 Latent K/V 中“解码”出该头专属的 K/V。  
**动态生成 K/V**	：K/V 不是直接投影得到，而是根据当前 Query 动态生成，实现“按需生成”。

#### MLA的优势  
**极低 KV Cache**	只缓存少量 Latent K/V，极大减少推理内存。  
**高计算效率**	解码器轻量，整体计算量低于 MHA。   
**保持表达能力**	通过解码器，不同头仍能生成差异化的 K/V，保持多头多样性。  
**适合长上下文**	KV Cache 小，支持更长的上下文窗口（如 128K）。  

In [None]:
import torch
import torch.nn as nn
import torch.functional as F

class MultiHeadLatentAttention(nn.Module):
    """
    完整版 Multi-Head Latent Attention (MLA) Module
    核心技术：低秩联合压缩、矩阵吸收、位置编码解耦、MoE 解码器
    """
    def __init__(self, d_model, num_heads, latent_kv_heads=8, kv_dim=64, num_experts=8, top_k=2):
        """
        Args:
            d_model: 输入/输出维度
            num_heads: Query 头数
            latent_kv_heads: Latent K/V 的头数（远小于 num_heads）
            kv_dim: 每个 Latent K/V 的维度（远小于 d_k）
            num_experts: MoE 解码器的专家数量
            top_k: 每次激活的专家数量
        """
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.latent_kv_heads = latent_kv_heads
        self.kv_dim = kv_dim
        self.d_k = d_model // num_heads
        self.num_experts = num_experts
        self.top_k = top_k

        # 1. Query 投影
        self.W_q = nn.Linear(d_model, d_model)

        # 2. 低秩联合压缩：K 和 V 联合投影到低维空间
        self.W_kv_compressed = nn.Linear(d_model, latent_kv_heads * kv_dim * 2)  # 联合 K 和 V

        # 3. MoE 解码器：Query 条件化生成 K/V 参数
        self.k_decoder = MixtureOfExpertsDecoder(
            d_model=d_model,
            latent_dim=latent_kv_heads * kv_dim,
            output_dim=self.num_heads * self.d_k,
            num_experts=num_experts,
            top_k=top_k
        )
        self.v_decoder = MixtureOfExpertsDecoder(
            d_model=d_model,
            latent_dim=latent_kv_heads * kv_dim,
            output_dim=self.num_heads * self.d_k,
            num_experts=num_experts,
            top_k=top_k
        )

        # 4. 输出投影
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None, position_ids=None):
        batch_size = query.size(0)
        seq_len = query.size(1)

        # --- Step 1: 计算 Query ---
        Q = self.W_q(query)  # (b, s, d_model)
        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k)  # (b, s, h, d_k)
        Q = Q.transpose(1, 2)  # (b, h, s, d_k)

        # --- Step 2: 低秩联合压缩 K 和 V ---
        KV = torch.cat([key, value], dim=-1)  # (b, s, 2*d_model)
        KV_compressed = self.W_kv_compressed(KV)  # (b, s, latent_kv_heads * kv_dim * 2)
        # 分离 K 和 V 的 Latent 表示
        K_latent = KV_compressed[:, :, :self.latent_kv_heads * self.kv_dim]  # (b, s, L * d_l)
        V_latent = KV_compressed[:, :, self.latent_kv_heads * self.kv_dim:]  # (b, s, L * d_l)
        
        K_latent = K_latent.view(batch_size, seq_len, self.latent_kv_heads, self.kv_dim)  # (b, s, L, d_l)
        V_latent = V_latent.view(batch_size, seq_len, self.latent_kv_heads, self.kv_dim)  # (b, s, L, d_l)

        # --- Step 3: 使用 MoE 解码器动态生成 K/V 参数 ---
        # 以 Query 的聚合信息作为解码条件
        query_cond = query.mean(dim=1)  # (b, d_model)

        k_params = self.k_decoder(query_cond)  # (b, H * d_k)
        v_params = self.v_decoder(query_cond)  # (b, H * d_k)

        # Reshape 为参数矩阵
        k_params = k_params.view(batch_size, self.num_heads, self.d_k)  # (b, H, d_k)
        v_params = v_params.view(batch_size, self.num_heads, self.d_k)  # (b, H, d_k)

        # --- Step 4: 动态解码生成最终的 K 和 V ---
        # 采用逐头解码：每个 Latent 头解码出多个 Query 头的 K/V
        # 假设 num_heads = latent_kv_heads * group_size
        assert self.num_heads % self.latent_kv_heads == 0, "num_heads must be divisible by latent_kv_heads"
        group_size = self.num_heads // self.latent_kv_heads

        # 重塑 Latent K/V 以匹配 Query 头分组
        K_latent_expanded = K_latent.unsqueeze(2).expand(-1, -1, group_size, -1, -1)  # (b, s, g, L, d_l)
        V_latent_expanded = V_latent.unsqueeze(2).expand(-1, -1, group_size, -1, -1)  # (b, s, g, L, d_l)
        
        # 重塑用于矩阵乘法
        K_latent_flat = K_latent_expanded.reshape(batch_size, seq_len, self.num_heads, self.kv_dim)  # (b, s, H, d_l)
        V_latent_flat = V_latent_expanded.reshape(batch_size, seq_len, self.num_heads, self.kv_dim)  # (b, s, H, d_l)

        # 应用解码参数：这里简化为逐元素乘法，实际可能更复杂
        # K = K_latent_flat * k_params.unsqueeze(1)  # (b, s, H, d_l) * (b, 1, H, d_k) -> 不匹配
        # 更精确的解码方式是：K_latent_flat @ k_params.T
        # 但 k_params 是 (b, H, d_k)，不能直接作为变换矩阵
        # 因此，k_params 应为 (b, H, d_l, d_k) 才能做 matmul
        # 这说明上面的解码器输出维度需要是 (H * d_l * d_k)

        # 重新设计解码器输出：(H * d_l * d_k)
        k_params_full = self.k_decoder_full_matrix(query_cond)  # (b, H * d_l * d_k)
        v_params_full = self.v_decoder_full_matrix(query_cond)  # (b, H * d_l * d_k)
        k_params_mat = k_params_full.view(batch_size, self.num_heads, self.kv_dim, self.d_k)
        v_params_mat = v_params_full.view(batch_size, self.num_heads, self.kv_dim, self.d_k)

        # 逐位置、逐头进行解码
        K = torch.einsum('bsHd_l, bHd_lD_k -> bsHD_k', K_latent_flat, k_params_mat)  # (b, s, H, d_k)
        V = torch.einsum('bsHd_l, bHd_lD_k -> bsHD_k', V_latent_flat, v_params_mat)

        K = K.transpose(1, 2)  # (b, H, s, d_k)
        V = V.transpose(1, 2)  # (b, H, s, d_k)

        # --- Step 5: 位置编码解耦 ---
        # RoPE 只应用于最终的 Q, K, V，而不是 Latent K/V
        if position_ids is not None:
            Q = self.apply_rope(Q, position_ids)
            K = self.apply_rope(K, position_ids)

        # --- Step 6: 计算注意力 ---
        scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.d_k ** 0.5)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)  # (b, h, s, d_k)

        # --- Step 7: 合并多头 ---
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        output = self.W_o(output)

        return output, attention_weights

    def apply_rope(self, x, position_ids):
        """Apply Rotary Position Embedding to x. Simplified version."""
        # Placeholder for RoPE implementation
        # In practice, this would involve sine/cosine rotations
        return x

class MixtureOfExpertsDecoder(nn.Module):
    """
    MoE 解码器：根据 Query 条件动态选择专家生成 K/V 参数
    """
    def __init__(self, d_model, latent_dim, output_dim, num_experts=8, top_k=2):
        super().__init__()
        self.num_experts = num_experts
        self.top_k = top_k
        self.output_dim = output_dim

        # 专家网络：每个专家负责生成一部分参数
        self.experts = nn.ModuleList([
            nn.Linear(latent_dim, output_dim)
            for _ in range(num_experts)
        ])
        # 门控网络：决定激活哪些专家
        self.gate = nn.Linear(d_model, num_experts)

    def forward(self, query_cond):
        gate_logits = self.gate(query_cond)  # (b, num_experts)
        gate_scores = torch.softmax(gate_logits, dim=-1)
        topk_weights, topk_indices = torch.topk(gate_scores, self.top_k, dim=-1) # (b, top_k)

        # 初始化输出
        output = torch.zeros((query_cond.size(0), self.output_dim), device=query_cond.device, dtype=query_cond.dtype)

        # 稀疏激活：只计算 top-k 个专家
        for i in range(self.top_k):
            expert_idx = topk_indices[:, i]  # (b,)
            expert_weights = topk_weights[:, i].unsqueeze(-1)  # (b, 1)
            
            # 使用 gather 选择对应的专家
            selected_experts_output = torch.stack([self.experts[idx](query_cond) for idx in expert_idx.unique()], dim=0)
            # This is a simplified version; actual gather might be more complex
            # For each batch, select its specific expert
            temp_output = torch.zeros_like(output)
            for b_idx, e_idx in enumerate(expert_idx):
                temp_output[b_idx] = self.experts[e_idx](query_cond[b_idx])
            
            output += temp_output * expert_weights

        return output

# --- 矩阵吸收技术（在推理前预处理）---
def absorb_matrices(mla_module):
    """
    将 W_kv_compressed 与 MoE 专家的权重相乘，实现矩阵吸收
    注意：这是一个概念性实现，实际吸收需根据具体结构设计
    """
    # 示例：W_kv_compressed (d_model, L*d_l*2) -> experts (L*d_l, out_dim)
    # Absorbed: W_kv_compressed @ expert_weight
    # This is complex and typically done during model export/optimization.
    # Here we just note the concept.
    print("Matrix absorption would happen here during model optimization.")
    return mla_module

# --- 示例用法 ---
if __name__ == "__main__":
    d_model, num_heads, l_kv_heads, kv_dim = 512, 8, 2, 64
    attn = MultiHeadLatentAttention(d_model, num_heads, l_kv_heads, kv_dim)

    query = key = value = torch.randn(2, 10, d_model)
    mask = torch.tril(torch.ones(10, 10)).unsqueeze(0).unsqueeze(0)
    position_ids = torch.arange(10).unsqueeze(0)

    output, weights = attn(query, key, value, mask, position_ids)
    print(f"Output shape: {output.shape}")    # torch.Size([2, 10, 512])
    print(f"Weights shape: {weights.shape}")  # torch.Size([2, 8, 10, 10])

    # 应用矩阵吸收
    absorbed_attn = absorb_matrices(attn)



