In [62]:
# 导入必备的工具包
import torch

# 预定义的网络层torch.nn, 工具开发者已经帮助我们开发好的一些常用层,
# 比如，卷积层, lstm层, embedding层等, 不需要我们再重新造轮子.
import torch.nn as nn

# 数学计算工具包
import math

# torch中变量封装函数Variable.
from torch.autograd import Variable

In [63]:
class Embadding(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super(Embadding, self).__init__()

        self.lut = nn.Embedding(vocab_size, embed_size)
        self.embed_size = embed_size
    def forward(self, input):
        return self.lut(input) * math.sqrt(self.embed_size)

In [64]:
embadding = Embadding(1000,128)
# 这个不能超过vocab_size
input = torch.LongTensor([[1,2,4,5],[4,3,2,9],[1,4,999,9]])
x = embadding(input)

In [1]:
class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        """
        embed_dim: 嵌入维度
        max_len: 序列的最大长度
        """
        super(PositionalEncoding, self).__init__()
        self.embed_dim = embed_dim
        self.dropout = nn.Dropout(0.2)

        # 创建位置编码矩阵
        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2) *
                             -(math.log(10000.0) / embed_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, input):
        return self.dropout(input + Variable(self.pe[:, :input.size(1)],requires_grad=False))
ps = PositionalEncoding(128)
ps(x),x

NameError: name 'nn' is not defined

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiheadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        """
        初始化MultiheadAttention模块
        参数:
            d_model (int): 嵌入向量的维度
            num_heads (int): 注意力头的数量
            dropout (float): dropout比例，默认值为0.1
        """
        super(MultiheadAttention, self).__init__()
        # 确保d_model能被num_heads整除
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads  # 每个头的维度

        # 定义线性层，用于生成Q、K、V
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        # 定义输出线性层
        self.w_o = nn.Linear(d_model, d_model)
        # 定义dropout层
        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        """
        前向传播
        参数:
            query (Tensor): 查询张量，形状为(batch_size, seq_len_q, d_model)
            key (Tensor): 键张量，形状为(batch_size, seq_len_k, d_model)
            value (Tensor): 值张量，形状为(batch_size, seq_len_v, d_model)
            mask (Tensor, optional): 掩码张量，形状为(batch_size, seq_len_q, seq_len_k)
        返回:
            Tensor: 注意力输出，形状为(batch_size, seq_len_q, d_model)
        """
        batch_size = query.size(0)

        # 1. 通过线性层生成Q、K、V
        Q = self.w_q(query)  # (batch_size, seq_len_q, d_model)
        K = self.w_k(key)    # (batch_size, seq_len_k, d_model)
        V = self.w_v(value)  # (batch_size, seq_len_v, d_model)

        # 2. 将Q、K、V分割成多个头
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len_q, d_k)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len_k, d_k)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)  # (batch_size, num_heads, seq_len_v, d_k)

        # 3. 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)  # (batch_size, num_heads, seq_len_q, seq_len_k)

        # 4. 如果提供了掩码，则应用掩码
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # 5. 计算注意力权重
        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # 6. 使用注意力权重对V进行加权求和
        attn_output = torch.matmul(attn_weights, V)  # (batch_size, num_heads, seq_len_q, d_k)

        # 7. 合并多头输出
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)  # (batch_size, seq_len_q, d_model)

        # 8. 通过输出线性层得到最终结果
        output = self.w_o(attn_output)
        return output

# 示例使用
if __name__ == "__main__":
    # 定义参数
    d_model = 512
    num_heads = 8
    batch_size = 32
    seq_len_q = 10
    seq_len_k = 10

    # 创建MultiheadAttention实例
    mha = MultiheadAttention(d_model, num_heads)

    # 生成随机输入张量
    query = torch.rand(batch_size, seq_len_q, d_model)
    key = torch.rand(batch_size, seq_len_k, d_model)
    value = torch.rand(batch_size, seq_len_k, d_model)

    # 执行前向传播
    output = mha(query, key, value)
    print("MultiheadAttention输出形状:", output.shape)

MultiheadAttention输出形状: torch.Size([32, 10, 512])
