In [30]:
import torch
import torch.nn as nn
import math
import numpy as np

# 设置随机数种子
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 位置编码模块
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return x

# 多头自注意力模块
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x):
        batch_size, seq_len = x.size(0), x.size(1)

        Q = self.W_q(x)
        K = self.W_k(x)
        V = self.W_v(x)

        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        attn_weights = torch.softmax(scores, dim=-1)
        print("MultiHeadAttention attn_weights",attn_weights)
        attn_output = torch.matmul(attn_weights, V)
        print("MultiHeadAttention attn_output shape",attn_output.shape)
        print("MultiHeadAttention attn_output before concat",attn_output)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        print("MultiHeadAttention attn_output after concat",attn_output)
        output = self.W_o(attn_output)
        return output

# 单头注意力模块
class SingleHeadAttention(nn.Module):
    def __init__(self, d_model):
        super(SingleHeadAttention, self).__init__()
        self.d_model = d_model
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x):
        batch_size, seq_len = x.size(0), x.size(1)
        Q = self.W_q(x)
        K = self.W_k(x)
        V = self.W_v(x)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_model)
        attn_weights = torch.softmax(scores, dim=-1)
        print("SingleHeadAttention attn_weights",attn_weights)
        attn_output = torch.matmul(attn_weights, V)
        print("SingleHeadAttention attn_output shape",attn_output.shape)
        print("SingleHeadAttention attn_output",attn_output)
        output = self.W_o(attn_output)
        return output

# 前馈网络
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.linear1(x))
        x = self.linear2(x)
        return x

# Transformer层（多头）
class MultiHeadTransformerLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(MultiHeadTransformerLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        attn_output = self.attention(x)
        x = self.norm1(x + attn_output)
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        return x

# Transformer层（单头）
class SingleHeadTransformerLayer(nn.Module):
    def __init__(self, d_model, d_ff):
        super(SingleHeadTransformerLayer, self).__init__()
        self.attention = SingleHeadAttention(d_model)
        self.ffn = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        attn_output = self.attention(x)
        x = self.norm1(x + attn_output)
        ffn_output = self.ffn(x)
        x = self.norm2(x + ffn_output)
        return x

# 简单Transformer模型
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, max_len=5000, use_multi_head=True):
        super(SimpleTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.use_multi_head = use_multi_head
        if use_multi_head:
            self.transformer_layer = MultiHeadTransformerLayer(d_model, num_heads, d_ff)
        else:
            self.transformer_layer = SingleHeadTransformerLayer(d_model, d_ff)

    def forward(self, x):
        embedded = self.embedding(x)
        embedded = self.pos_encoding(embedded)
        output = self.transformer_layer(embedded)
        return output

if __name__ == "__main__":
    # 固定随机数种子
    set_seed(42)

    vocab_size = 100
    d_model = 4
    num_heads = 2
    d_ff = 16
    batch_size = 2
    seq_len = 4

    # 输入数据
    x = torch.tensor([[1, 2, 3, 4], [4, 3, 2, 1]])
    print("输入形状:", x.shape)

    # 多头注意力模型
    set_seed(42)  # 确保每次初始化前种子一致
    multi_head_model = SimpleTransformer(vocab_size, d_model, num_heads, d_ff, use_multi_head=True)
    multi_head_output = multi_head_model(x)
    print("多头注意力输出形状:", multi_head_output.shape)
    print("多头注意力输出:", multi_head_output.detach().numpy())

    # 单头注意力模型
    set_seed(42)  # 再次设置相同种子
    single_head_model = SimpleTransformer(vocab_size, d_model, num_heads, d_ff, use_multi_head=False)
    single_head_output = single_head_model(x)
    print("单头注意力输出形状:", single_head_output.shape)
    print("单头注意力输出:", single_head_output.detach().numpy())

    # 计算输出差异
    diff = torch.abs(multi_head_output - single_head_output)
    print("多头与单头输出差异（绝对值）:", diff.detach().numpy())


    # 验证权重是否相同
    print("W_q 权重是否相同:", torch.equal(multi_head_model.transformer_layer.attention.W_q.weight, single_head_model.transformer_layer.attention.W_q.weight))
    print("W_k 权重是否相同:", torch.equal(multi_head_model.transformer_layer.attention.W_k.weight, single_head_model.transformer_layer.attention.W_k.weight))
    print("W_v 权重是否相同:", torch.equal(multi_head_model.transformer_layer.attention.W_v.weight, single_head_model.transformer_layer.attention.W_v.weight))
    print("W_o 权重是否相同:", torch.equal(multi_head_model.transformer_layer.attention.W_o.weight, single_head_model.transformer_layer.attention.W_o.weight))


输入形状: torch.Size([2, 4])
MultiHeadAttention attn_weights tensor([[[[0.3089, 0.3215, 0.1612, 0.2083],
          [0.1645, 0.1572, 0.4000, 0.2783],
          [0.3302, 0.3489, 0.1314, 0.1895],
          [0.3323, 0.3729, 0.1244, 0.1704]],

         [[0.2310, 0.2231, 0.2686, 0.2772],
          [0.2469, 0.2644, 0.2496, 0.2391],
          [0.3336, 0.1292, 0.1934, 0.3438],
          [0.2311, 0.1544, 0.2663, 0.3482]]],


        [[[0.2404, 0.3100, 0.2313, 0.2182],
          [0.2458, 0.2169, 0.2768, 0.2605],
          [0.2514, 0.2822, 0.2279, 0.2385],
          [0.2003, 0.0730, 0.4141, 0.3125]],

         [[0.2751, 0.2653, 0.2068, 0.2528],
          [0.1778, 0.1424, 0.1709, 0.5089],
          [0.2463, 0.2422, 0.2453, 0.2663],
          [0.2540, 0.2341, 0.2031, 0.3087]]]], grad_fn=<SoftmaxBackward0>)
MultiHeadAttention attn_output shape torch.Size([2, 2, 4, 2])
MultiHeadAttention attn_output before concat tensor([[[[-0.0384,  0.9479],
          [-0.0138,  1.4531],
          [-0.0374,  0.8628],
   

In [4]:
import torch
import torch.nn as nn
import math


# 位置编码模块
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # 创建位置编码表，形状为 [max_len, d_model]
        pe = torch.zeros(max_len, d_model)
        # 生成位置索引 [0, 1, 2, ..., max_len-1]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        # 计算频率项，用于正弦和余弦函数
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # 偶数维度用正弦函数
        pe[:, 0::2] = torch.sin(position * div_term)
        # 奇数维度用余弦函数
        pe[:, 1::2] = torch.cos(position * div_term)
        # 增加 batch 维度，形状变为 [1, max_len, d_model]
        pe = pe.unsqueeze(0)
        # 注册为缓冲区，不参与梯度更新
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        # 截取与输入序列长度匹配的位置编码，并加到输入上
        x = x + self.pe[:, :x.size(1), :]
        return x

# 多头自注意力模块
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0  # 确保 d_model 可以被 num_heads 整除
        self.d_model = d_model  # 模型维度
        self.num_heads = num_heads  # 注意力头数
        self.d_k = d_model // num_heads  # 每个头的维度

        # 定义 Q、K、V 的线性变换层
        self.W_q = nn.Linear(d_model, d_model)  # 查询矩阵
        self.W_k = nn.Linear(d_model, d_model)  # 键矩阵
        self.W_v = nn.Linear(d_model, d_model)  # 值矩阵
        self.W_o = nn.Linear(d_model, d_model)  # 输出投影矩阵

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        batch_size, seq_len = x.size(0), x.size(1)

        # 1. 计算 Q、K、V
        Q = self.W_q(x)  # [batch_size, seq_len, d_model]
        K = self.W_k(x)  # [batch_size, seq_len, d_model]
        V = self.W_v(x)  # [batch_size, seq_len, d_model]

        # torch常用变换形状的函数
        # view 将张量重新调整为指定的形状，要求新形状的元素总数与原张量一致
        # reshape 类似 view，但更灵活，要求元素总数不变
        # transpose 交换张量的两个维度，不改变数据顺序，只能交换两个维度
        # unsqueeze 在指定位置增加一个维度（大小为 1），用于扩充张量维度
        # squeeze 移除张量中大小为 1 的维度，用于缩小维度
        # permute 重新排列张量的所有维度，比 transpose 更灵活。
        # expand 将张量在大小为 1 的维度上扩展到指定大小，不复制数据（类似广播）
        # 2. 将 Q、K、V 分成多头
        # 变换形状为 [batch_size, seq_len, num_heads, d_k]，然后转置为 [batch_size, num_heads, seq_len, d_k]
        Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

        # 3. 计算注意力分数
        # scores = Q * K^T / sqrt(d_k)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)  # [batch_size, num_heads, seq_len, seq_len]
        # 应用 softmax 得到注意力权重
        attn_weights = torch.softmax(scores, dim=-1)  # [batch_size, num_heads, seq_len, seq_len]

        # 4. 用注意力权重加权 V
        attn_output = torch.matmul(attn_weights, V)  # [batch_size, num_heads, seq_len, d_k]

        # 5. 合并多头
        # 转置回 [batch_size, seq_len, num_heads, d_k]，然后重塑为 [batch_size, seq_len, d_model]
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
        # 通过输出投影层
        output = self.W_o(attn_output)  # [batch_size, seq_len, d_model]

        return output

class SingleHeadAttention(nn.Module):
    def __init__(self, d_model):
        super(SingleHeadAttention, self).__init__()
        self.d_model = d_model
        # 单头自注意力，Q、K、V 线性变换
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        batch_size, seq_len = x.size(0), x.size(1)
        # 1. 计算 Q、K、V
        Q = self.W_q(x)  # [batch_size, seq_len, d_model]
        K = self.W_k(x)  # [batch_size, seq_len, d_model]
        V = self.W_v(x)  # [batch_size, seq_len, d_model]
        # 2. 计算注意力分数
        # scores = Q * K^T / sqrt(d_model)
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_model)  # [batch_size, seq_len, seq_len]
        attn_weights = torch.softmax(scores, dim=-1)  # [batch_size, seq_len, seq_len]
        # 3. 用注意力权重加权 V
        attn_output = torch.matmul(attn_weights, V)  # [batch_size, seq_len, d_model]
        # 4. 输出投影
        output = self.W_o(attn_output)  # [batch_size, seq_len, d_model]
        return output
    
# 前馈网络模块
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        # 第一层：扩展到 d_ff 维度
        self.linear1 = nn.Linear(d_model, d_ff)
        # 第二层：压缩回 d_model 维度
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()  # 激活函数

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        # 通过第一层线性变换和激活函数
        x = self.relu(self.linear1(x))  # [batch_size, seq_len, d_ff]
        # 通过第二层线性变换
        x = self.linear2(x)  # [batch_size, seq_len, d_model]
        return x

# 单层 Transformer 模块
class TransformerLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super(TransformerLayer, self).__init__()
        # 多头自注意力
        self.attention = MultiHeadAttention(d_model, num_heads)
        # 前馈网络
        self.ffn = FeedForward(d_model, d_ff)
        # 层归一化
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        # x: [batch_size, seq_len, d_model]
        # 1. 多头自注意力 + 残差连接 + 归一化
        attn_output = self.attention(x)  # [batch_size, seq_len, d_model]
        x = self.norm1(x + attn_output)  # 残差连接后归一化

        # 2. 前馈网络 + 残差连接 + 归一化
        ffn_output = self.ffn(x)  # [batch_size, seq_len, d_model]
        x = self.norm2(x + ffn_output)  # 残差连接后归一化

        return x

# 完整 Transformer 模型
class SimpleTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, max_len=5000):
        super(SimpleTransformer, self).__init__()
        # 嵌入层：将 token 索引转换为 d_model 维向量
        self.embedding = nn.Embedding(vocab_size, d_model)
        # 位置编码
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        # 单层 Transformer
        self.transformer_layer = TransformerLayer(d_model, num_heads, d_ff)
        # 输出层：将 d_model 维向量映射到词汇表大小
        self.output_layer = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # x: [batch_size, seq_len]，输入是 token 索引
        # 1. 嵌入层：将 token 索引转换为嵌入向量
        embedded = self.embedding(x)  # [batch_size, seq_len, d_model]

        # 2. 添加位置编码
        embedded = self.pos_encoding(embedded)  # [batch_size, seq_len, d_model]

        # 3. 通过 Transformer 层
        transformed = self.transformer_layer(embedded)  # [batch_size, seq_len, d_model]

        # 4. 输出层：映射到词汇表大小
        output = self.output_layer(transformed)  # [batch_size, seq_len, vocab_size]

        return output

In [3]:
# 测试代码
if __name__ == "__main__":
    # 参数设置
    vocab_size = 1000  # 词汇表大小
    d_model = 4       # 嵌入维度
    num_heads = 2      # 注意力头数
    d_ff = 16         # 前馈网络中间维度
    batch_size = 2     # 批量大小
    seq_len = 10       # 序列长度

    # 创建模型
    model = SimpleTransformer(vocab_size, d_model, num_heads, d_ff)

    # 随机输入：模拟 token 索引
    x = torch.tensor([[1,2,3,4], [4,3,2,1]])
    print("输入形状:", x.shape)

    # 前向传播
    output = model(x)
    print("输出形状:", output.shape)  # [2, 10, 1000]

输入形状: torch.Size([2, 4])
输出形状: torch.Size([2, 4, 1000])


In [7]:
a = torch.tensor([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=torch.float)
a1 = a
print(a1.softmax(dim=-1))
a2 = a
print(a2.softmax(dim=-2))


tensor([[[0.0900, 0.2447, 0.6652],
         [0.0900, 0.2447, 0.6652],
         [0.0900, 0.2447, 0.6652]]])
tensor([[[0.0024, 0.0024, 0.0024],
         [0.0473, 0.0473, 0.0473],
         [0.9503, 0.9503, 0.9503]]])


In [14]:
mask = torch.tensor([1, 0, 1], dtype=torch.float)
mask_value = torch.finfo(a.dtype).min
print(mask_value)
print(a)
a = torch.masked_fill(a, mask, mask_value)
print(a)

-3.4028234663852886e+38
tensor([[[1., 2., 3.],
         [4., 5., 6.],
         [7., 8., 9.]]])
tensor([[[1., 2., 3.],
         [4., 5., 6.],
         [7., 8., 9.]]])


In [38]:
q = torch.tensor([[1, 2, 3, 17], [4, 5, 6, 13], [7, 8, 9, 23]], dtype=torch.float)
k = torch.tensor([[14, 3, 1, 9], [5, 7, 18, 7], [6, 22, 9, 3]], dtype=torch.float)
v = torch.tensor([[10, 1, 9, 26], [13, 32, 4, 13], [7, 8, 3, 1]], dtype=torch.float)
attention_score = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(4)
attention_weights = attention_score.softmax(dim=-1)
print(attention_score.softmax(dim=-1))
output = torch.matmul(attention_weights, v)
print(output)

tensor([[3.3535e-04, 9.9966e-01, 1.2660e-14],
        [9.3576e-14, 1.0000e+00, 1.3710e-06],
        [3.1391e-17, 1.0000e+00, 1.0262e-10]])
tensor([[12.9990, 31.9896,  4.0017, 13.0044],
        [13.0000, 32.0000,  4.0000, 13.0000],
        [13.0000, 32.0000,  4.0000, 13.0000]])


In [39]:
q = torch.tensor([[1, 2, 3, 17], [4, 5, 6, 13], [7, 8, 9, 23]], dtype=torch.float)
k = torch.tensor([[14, 3, 1, 9], [5, 7, 18, 7], [6, 22, 9, 3]], dtype=torch.float)
v = torch.tensor([[10, 1, 9, 26], [13, 32, 4, 13], [7, 8, 3, 1]], dtype=torch.float)
feature_map = lambda x: torch.nn.functional.elu(x) + 1

q = feature_map(q) / math.sqrt(4)  # 缩放
k = feature_map(k)
kv = torch.matmul(k.transpose(-1, -2), v)
output = torch.matmul(q, kv)
print(output)

tensor([[ 3496.5000,  4991.0000,  1839.5000,  4751.5000],
        [ 4411.0000,  6490.5000,  2233.0000,  5538.0000],
        [ 6949.5000, 10076.0000,  3564.5000,  8900.5000]])
