In [1]:
import math
import torch
import torch.nn.functional as F
from torch import nn, Tensor

In [2]:
BATCH_SIZE = 128
SEQUENCE_LEN = 64
EMBEDDING_DIM = 512
HEAD_COUNT = 8

In [3]:
# 模拟测试数据
X = torch.randn(BATCH_SIZE, SEQUENCE_LEN, EMBEDDING_DIM)

print(X.shape)

torch.Size([128, 64, 512])


$$
\text{Attention}(Q, K, V)=\text{softmax}(\frac{QK^T}{\sqrt{d_k}})V
$$

- 单纯attention函数不含有可学习的参数。
- torch的`matmul`/`@`是支持带batch的高维tensor的，它只会把最后两个维度相乘。

Dropout的使用：
- 构造：`dropout = nn.Dropout(p)`，
- 输入：`dropout(x)`，
- 输出：对于x中的每个元素，都有p概率被置为0。

mask应该是上三角还是下三角？
- 只要记住一点：我们的目的在于，Q只想要关注一部分的K。
- 这里的QK相乘后，attention scores的维度为(seq_len_q, seq_len_k)，所以mask要设置为**下三角矩阵**。


In [4]:
def _attention(
    query: Tensor,
    key: Tensor,
    value: Tensor,
    mask: Tensor | None = None,
    dropout: nn.Dropout = None,
) -> tuple[Tensor, Tensor]:
    d_k = query.shape(-1)
    # torch的矩阵乘法支持带batch的乘法，因此二维以上的矩阵也可以相乘
    score_probs = query @ key.transpose(-2, -1) / math.sqrt(d_k)
    if mask is not None:
        # mask == 0的位置都设置为负无穷
        score_probs = score_probs.masked_fill(mask == 0, float("-inf"))
    score_probs = F.softmax(score_probs, dim=-1)
    if dropout is not None:
        score_probs = dropout(score_probs)
    return score_probs @ value, score_probs

### 拆分多头

把QKV的最后一维embedding_dim拆分成多个head_dim, 即投射到一个较小的维度上：
- 原QKV形状为：`(batch_size, seq_len, embedding_dim)`
- 拆分后形状为：`(batch_size, head_count, seq_len, head_dim)`

每个头都是单独的权重矩阵。在代码的实现中，多个头是拼接在一起的，和一个大权重矩阵相乘（这个大矩阵其实就看做多个权重矩阵的拼接）。
- 这都得益于pytorch方便的矩阵乘法，使得我们可以做到**并行计算**。

### 合并多头

最终，多个头的attention score拼接在一起后，还要应用一个输出权重矩阵 $W^O$ ，得到最终的输出。
$$
  \begin{align*}
  \text{MultiHead}(Q,K,V) &= \text{Concat}(\text{head}_1,\cdots,\text{head}_h)W^O \\
  \textbf{where}\ \text{head}_i &= \text{Attention}(QW^Q_i, KW^K_i, VW^V_i)
  \end{align*}
  $$ 



In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dim, head_count):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.head_count = head_count
        self.q_weight = nn.Linear(embedding_dim, embedding_dim)
        self.k_weight = nn.Linear(embedding_dim, embedding_dim)
        self.v_weight = nn.Linear(embedding_dim, embedding_dim)
        # 输出权重矩阵W_O
        self.output_weight = nn.Linear(embedding_dim, embedding_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q_seq, k_seq, v_seq):
        queries: Tensor = self.q_weight(q_seq)
        keys: Tensor = self.k_weight(k_seq)
        values: Tensor = self.v_weight(v_seq)
        # 拆分多头
        batch_size, seq_len, embedding_dim = q_seq.shape
        head_dim = self.embedding_dim // self.head_count
        # 即最后一维拆分 -> embedding_dim = head_count * head_dim，并交换head_count和seq_dim维度
        queries = (
            queries.contiguous()
            .view(batch_size, seq_len, self.head_count, head_dim)
            .permute(0, 2, 1, 3)
        )
        keys = (
            keys.contiguous()
            .view(batch_size, seq_len, self.head_count, head_dim)
            .permute(0, 2, 1, 3)
        )
        values = (
            values.contiguous()
            .view(batch_size, seq_len, self.head_count, head_dim)
            .permute(0, 2, 1, 3)
        )
        # 计算注意力
        # 先获取一个mask，它是一个下三角矩阵
        mask = torch.tril(torch.ones(seq_len, seq_len, dtype=bool))
        attention_scores, _ = _attention(queries, keys, values, mask)
        # 合并多头
        attention_scores = (
            attention_scores.permute(0, 2, 1, 3)
            .contiguous()
            .view(batch_size, seq_len, embedding_dim)
        )
        output = self.output_weight(attention_scores)
        return output

In [6]:
mha = MultiHeadAttention(EMBEDDING_DIM, HEAD_COUNT)
res = mha(X, X, X)
print(res.shape)

TypeError: 'torch.Size' object is not callable