## SDPA

In [1]:
import torch
import torch.nn.functional as F

In [2]:
def scaled_dot_product_attention(
    query: torch.Tensor,
    key: torch.Tensor,
    value: torch.Tensor,
    attn_mask: torch.Tensor = None,
    dropout_p: float = 0.0,
    is_causal: bool = False,  # 是否使用因果掩码（例如在Decoder中）
) -> torch.Tensor:
    """
    实现缩放点积注意力 (Scaled Dot-Product Attention)。

    Args:
        query (torch.Tensor): 查询张量，形状通常为 (..., query_seq_len, head_dim)。
                            ... 可以是 batch_size, num_heads 等。
        key (torch.Tensor): 键张量，形状通常为 (..., key_seq_len, head_dim)。
        value (torch.Tensor): 值张量，形状通常为 (..., key_seq_len, value_dim)。
                            key_seq_len 和 value_seq_len 必须相同。
        attn_mask (torch.Tensor, optional): 注意力掩码。
                                        形状可以是 (query_seq_len, key_seq_len) 或
                                        (batch_size, query_seq_len, key_seq_len) 或
                                        (batch_size, num_heads, query_seq_len, key_seq_len)。
                                        True 表示屏蔽（不关注），False 表示可见。
                                        注意：PyTorch 的 `masked_fill` 通常用 True 表示要填充的值。
                                        这里我们假设输入的 mask 是标准的 `True` 屏蔽。
        dropout_p (float, optional): dropout 概率。默认为 0.0。
        is_causal (bool, optional): 如果为 True，则自动生成一个因果（下三角）注意力掩码。
                                    用于自回归任务，确保当前 token 只能关注之前的 token。
                                    如果提供了 attn_mask 且 is_causal 为 True，则两者会合并。

    Returns:
        torch.Tensor: 注意力机制的输出，形状与 query 相同，但最后一个维度是 value_dim。
                    (..., query_seq_len, value_dim)
    """
    L, S = query.size(-2), key.size(-2)  # L: query_seq_len, S: key_seq_len
    head_dim = query.size(-1)  # d_k

    # 1. 计算 QK^T
    # (..., L, D) @ (..., D, S) -> (..., L, S)
    attn_scores = torch.matmul(query, key.transpose(-2, -1))

    # 2. 缩放
    attn_scores = attn_scores / (head_dim**0.5)

    # 3. 应用因果掩码 (如果 is_causal 为 True)
    if is_causal:
        # 创建一个上三角掩码，确保当前位置只能关注之前或自己的位置
        # (L, S) 的形状，对角线及以下为 True (可见)，对角线以上为 False (不可见)
        # masked_fill 需要 True 表示要屏蔽的部分，所以我们用相反的逻辑
        causal_mask = torch.triu(
            torch.ones(L, S, dtype=torch.bool, device=query.device), diagonal=1
        )
        attn_scores = attn_scores.masked_fill(causal_mask, float("-inf"))

    # 4. 应用外部提供的注意力掩码 (如果存在)
    if attn_mask is not None:
        # PyTorch 的 masked_fill 通常用 True 表示要填充的部分（例如 -inf），False 表示不填充
        # 所以如果你的 attn_mask 是 True 表示屏蔽，False 表示可见，则可以直接使用
        # 如果你的 mask 是 0 表示屏蔽，1 表示可见，则需要 mask == 0
        attn_scores = attn_scores.masked_fill(attn_mask, float("-inf"))

    # 5. Softmax 归一化
    attn_weights = F.softmax(attn_scores, dim=-1)

    # 6. Dropout
    attn_weights = F.dropout(attn_weights, p=dropout_p)

    # 7. 与 Value 矩阵相乘
    # (..., L, S) @ (..., S, D_v) -> (..., L, D_v)
    output = torch.matmul(attn_weights, value)

    return output

## 示例 1: 基本用法 (无掩码，无 dropout)

In [3]:
# 示例 1: 基本用法 (无掩码，无 dropout)
print("--- 示例 1: 基本用法 ---")
batch_size = 2
seq_len_q = 3
seq_len_kv = 5
head_dim = 64
value_dim = 128  # value_dim 可以与 head_dim 不同

query = torch.randn(batch_size, seq_len_q, head_dim)
key = torch.randn(batch_size, seq_len_kv, head_dim)
value = torch.randn(batch_size, seq_len_kv, value_dim)

output = scaled_dot_product_attention(query, key, value)
print(f"Query Shape: {query.shape}")
print(f"Key Shape: {key.shape}")
print(f"Value Shape: {value.shape}")
print(f"Output Shape: {output.shape}")
# 期望：
# Query Shape: torch.Size([2, 3, 64])
# Key Shape: torch.Size([2, 5, 64])
# Value Shape: torch.Size([2, 5, 128])
# Output Shape: torch.Size([2, 3, 128])

--- 示例 1: 基本用法 ---
Query Shape: torch.Size([2, 3, 64])
Key Shape: torch.Size([2, 5, 64])
Value Shape: torch.Size([2, 5, 128])
Output Shape: torch.Size([2, 3, 128])


## 示例 2: 使用注意力掩码 (例如，填充掩码)

In [None]:

print("\n--- 示例 2: 使用注意力掩码 ---")
# 模拟一个填充掩码
# 假设 batch_size=2，seq_len_kv=5
# 第一个样本的有效长度是 3，后面两个是填充
# 第二个样本的有效长度是 4，最后一个是填充
key_padding_mask_bool = torch.tensor(
    [[False, False, False, True, True], [False, False, False, False, True]],
    dtype=torch.bool,
)  # True 表示需要屏蔽的部分

# 广播到注意力分数维度: (batch_size, 1, key_seq_len) -> (batch_size, query_seq_len, key_seq_len)
# 如果是多头，可能需要 (batch_size, num_heads, 1, key_seq_len)
# 这里假设 attn_mask 是 (batch_size, query_seq_len, key_seq_len)
# 或者可以直接让 mask 能够广播到 attn_scores (..., L, S)
attn_mask_expanded = key_padding_mask_bool.unsqueeze(1).expand(-1, seq_len_q, -1)

output_with_mask = scaled_dot_product_attention(
    query, key, value, attn_mask=attn_mask_expanded
)
print(f"Output with Mask Shape: {output_with_mask.shape}")
# 形状不变，但内部值会因为掩码而改变

## 示例 3: 使用因果掩码 (在自注意力中常见)

In [None]:

print("\n--- 示例 3: 使用因果掩码 ---")
# 自注意力通常 Q, K, V 来自同一个源
query_self = torch.randn(batch_size, seq_len_kv, head_dim)
key_self = query_self.clone()
value_self = query_self.clone()

# is_causal=True 会自动生成一个下三角掩码
output_causal = scaled_dot_product_attention(
    query_self, key_self, value_self, is_causal=True
)
print(f"Output with Causal Mask Shape: {output_causal.shape}")
# 期望：torch.Size([2, 5, 64])

## 示例 4: 结合多头情况下的 SDPA (MHA 的一部分)

In [None]:
# 示例 4: 结合多头情况下的 SDPA (MHA 的一部分)
print("\n--- 示例 4: 结合多头情况下的 SDPA ---")
num_heads = 8
total_embed_dim = 512
head_dim = total_embed_dim // num_heads  # 64

# 假设 Q, K, V 已经通过线性层并被分割成多头
# 形状通常变为 (batch_size, num_heads, seq_len, head_dim)
query_mha = torch.randn(batch_size, num_heads, seq_len_q, head_dim)
key_mha = torch.randn(batch_size, num_heads, seq_len_kv, head_dim)
value_mha = torch.randn(batch_size, num_heads, seq_len_kv, head_dim)

# 直接传入到 SDPA 函数，因为 SDPA 可以处理 ... (任意批次维度)
output_mha_part = scaled_dot_product_attention(query_mha, key_mha, value_mha)
print(f"Output for a single head calculation in MHA: {output_mha_part.shape}")
# 期望：torch.Size([2, 8, 3, 64])
# 之后这些头的输出会被拼接并再次线性变换。