In [1]:
import torch
import torch.nn as nn
import math
from torch.nn import functional as F

# 定义相对位置编码模块
class RelativePositionEmbedding(nn.Module):
    def __init__(self, max_seq_len: int, embedding_dim: int):
        """
        相对位置编码模块
        :param max_seq_len: 序列的最大长度
        :param embedding_dim: 位置编码的维度（与注意力头的维度相同）
        """
        super().__init__()
        self.max_seq_len = max_seq_len
        self.embedding_dim = embedding_dim

        # 定义一个可训练的参数矩阵, 形状为 (2 * max_seq_len - 1, embedding_dim)
        # 用于存储不同相对位置的嵌入表示
        self.relative_positions = nn.Parameter(torch.randn(max_seq_len * 2 - 1, embedding_dim))

    def forward(self, seq_len: int):
        """
        根据输入的序列长度，生成对应的相对位置编码
        :param seq_len: 当前输入序列的长度
        :return: 相对位置编码张量，形状为 (seq_len, seq_len, embedding_dim)
        """
        # 生成相对位置索引矩阵
        relative_positions_matrix = self._generate_relative_positions_matrix(seq_len)

        # 使用 PyTorch 的嵌入层来获取相对位置的编码
        relative_embeddings = F.embedding(relative_positions_matrix, self.relative_positions)
        return relative_embeddings

    def _generate_relative_positions_matrix(self, seq_len: int):
        """
        生成一个相对位置索引矩阵，形状为 (seq_len, seq_len)
        例如，对于长度 5 的序列:
        [[ 0,  1,  2,  3,  4],
         [-1,  0,  1,  2,  3],
         [-2, -1,  0,  1,  2],
         [-3, -2, -1,  0,  1],
         [-4, -3, -2, -1,  0]]
        """
        range_vec = torch.arange(seq_len)  # 创建一个从 0 到 seq_len-1 的张量
        range_matrix = range_vec.unsqueeze(0).expand(seq_len, seq_len)  # 扩展为 seq_len x seq_len
        distance_matrix = range_matrix - range_matrix.t()  # 计算相对位置偏移量
        distance_matrix = distance_matrix + self.max_seq_len - 1  # 平移索引，确保索引是非负数
        return distance_matrix


# 定义多头注意力模块
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim: int, num_heads: int, max_seq_len: int):
        """
        多头注意力机制（带相对位置编码）
        :param embed_dim: 输入嵌入的维度
        :param num_heads: 注意力头的数量
        :param max_seq_len: 序列的最大长度（用于相对位置编码）
        """
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads  # 计算每个头的维度

        # 定义线性变换，用于计算 Q、K、V
        self.q_proj = nn.Linear(embed_dim, embed_dim)
        self.k_proj = nn.Linear(embed_dim, embed_dim)
        self.v_proj = nn.Linear(embed_dim, embed_dim)

        # 相对位置编码
        self.rel_pos_embeddings = RelativePositionEmbedding(max_seq_len, self.head_dim)

        # 最终输出的线性层
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor):
        """
        执行多头注意力计算
        :param query: 查询张量 (batch_size, seq_len, embed_dim)
        :param key: 键张量 (batch_size, seq_len, embed_dim)
        :param value: 值张量 (batch_size, seq_len, embed_dim)
        :return: 注意力输出 (batch_size, seq_len, embed_dim)
        """
        batch_size, seq_len, embed_dim = query.size()

        # 计算 Q、K、V，并 reshape 成多头格式: (batch_size, num_heads, seq_len, head_dim)
        query = self.q_proj(query).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        key = self.k_proj(key).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        value = self.v_proj(value).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        # 获取相对位置编码: (seq_len, seq_len, head_dim)
        rel_pos_embeddings = self.rel_pos_embeddings(seq_len)
        # 扩展维度，使其适应 batch 维度: (batch_size, seq_len, seq_len, head_dim)
        rel_pos_embeddings = rel_pos_embeddings.unsqueeze(0).expand(batch_size, seq_len, seq_len, self.head_dim)

        # 计算 QK^T：注意力得分 (batch_size, num_heads, seq_len, seq_len)
        query = query.unsqueeze(3)  # (batch_size, num_heads, seq_len, 1, head_dim)
        key = key.unsqueeze(2)  # (batch_size, num_heads, 1, seq_len, head_dim)
        attention_scores = torch.matmul(query, key.transpose(-1, -2))  # 矩阵乘法

        # 添加相对位置编码的影响
        attention_scores += torch.matmul(query, rel_pos_embeddings.transpose(-1, -2))

        # 归一化
        attention_scores = attention_scores / math.sqrt(self.head_dim)

        # 计算注意力权重 (batch_size, num_heads, seq_len, seq_len)
        attention_weights = F.softmax(attention_scores, dim=-1)

        # 计算注意力输出
        output = torch.matmul(attention_weights, value)  # (batch_size, num_heads, seq_len, head_dim)

        # 重新调整维度，合并多头 (batch_size, seq_len, embed_dim)
        output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)

        # 通过最终的线性层
        output = self.out_proj(output)
        return output

# 示例使用
max_seq_len = 10
embed_dim = 64
num_heads = 4
batch_size = 2
seq_len = 5

# 创建随机输入数据
query = torch.randn(batch_size, seq_len, embed_dim)
key = torch.randn(batch_size, seq_len, embed_dim)
value = torch.randn(batch_size, seq_len, embed_dim)

# 初始化多头注意力模块
attention = MultiHeadAttention(embed_dim, num_heads, max_seq_len)

# 测试相对位置索引矩阵
print(attention.rel_pos_embeddings._generate_relative_positions_matrix(5))  # 输出相对位置索引矩阵
print(attention.rel_pos_embeddings.relative_positions.shape)  # 输出相对位置嵌入矩阵的形状
print(attention.rel_pos_embeddings(5).shape)  # 获取相对位置编码后的形状

# 计算注意力输出
# output = attention(query, key, value)
# print(output.shape)  # 预期输出形状为 (batch_size, seq_len, embed_dim)

tensor([[ 9, 10, 11, 12, 13],
        [ 8,  9, 10, 11, 12],
        [ 7,  8,  9, 10, 11],
        [ 6,  7,  8,  9, 10],
        [ 5,  6,  7,  8,  9]])
torch.Size([19, 16])
torch.Size([5, 5, 16])
