## 类定义

In [164]:
from io import open
import random

# 深度学习库pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader
import math
import torch.nn.functional as F

# 用于绘制损失函数下降曲线
from matplotlib import pyplot as plt
%pdb off

Automatic pdb calling has been turned OFF


## 比较Attention

In [165]:
class Attention(nn.Module):
    def __init__(self, h, a, dropout=0, type = 'self'):
        '''
        h: 嵌入层维度
        a: 注意力头数
        d_k: 每个注意力头的第二个维度

        X: (s,h) ---Wq, Wk, Wv: (h, h//a) ---> Q,K,V: (s, h//a) 
            ---> softmax(Q * K.t / sqrt(d_k)) * V: (s, h//a)
            ---> output: (s, h)
        '''
        super().__init__()  # 注意这里的修正，使用super()而不是super.__init__()
        self.h = h
        self.a = a
        self.d_k = h // a
        self.types = type
        self.dropout = nn.Dropout(p=dropout)
        
        # 初始化Q, K, V的权重矩阵
        # 每个权重矩阵的维数是(s, h//a) 这里是(h, h)，是将每个头的相应矩阵拼接到一起了
        self.Wq = nn.Linear(h, h)
        self.Wk = nn.Linear(h, h)
        self.Wv = nn.Linear(h, h)
        
        # 缩放因子，用于缩放点积结果
        self.scale = 1 / math.sqrt(self.d_k)

        self.out_proj = nn.Linear(h, h)

    def forward(self, x, y = None, padding_mask=None, tgt_sequence_mask = None):
        """
        x: (batch_size, s = tgt_s, h)
        y: (batch_size, s = src_s, h)
        """
        batch_size = x.size(0)
        """
        Step #1 通过线性变换得到Q, K, V
        q,k,v: (batch_size, s, h) ---> (batch_size, s, a, d_k) ---> (batch_size, a, s, d_k)
        crros attention时q的s=tgt_s, kv的s=src_s
        """
        if self.types == 'self':            # 自注意力机制，均来自输入x            
            assert y is None, ("Self Attention but different input for Q K V")
            q = k = v = x
        elif self.types == 'cross':         # 交叉注意力机制，q来自x，k v来自y
            assert y is not None, ("Cross Attention but the same input for Q K V")
            q = x
            k = v = y
        else: raise ValueError("Undefined Attention Type")

        q = self.Wq(q).view(batch_size, -1, self.a, self.d_k).transpose(1, 2)
        k = self.Wk(k).view(batch_size, -1, self.a, self.d_k).transpose(1, 2)
        v = self.Wv(v).view(batch_size, -1, self.a, self.d_k).transpose(1, 2)

        """
        Step#2 计算注意力分数
        x: (batch_size, s, h)
        k: (batch_size, a, src_s, d_k) ---> (batch_size, a, d_k, src_s)
        tgt_sequence_mask: (tgt_s, tgt_s) ---> (batch_size, a, tgt_s, tgt_s)
        padding_mask : (batch_size, src_s) ---> (batch_size, a, tgt_s, src_s)
        """
        k_len  = k.size()[2]
        scores = torch.matmul(q, k.transpose(-1, -2)) * self.scale
        # if padding_mask is not None:
        #     # print(padding_mask)
        #     mask = padding_mask.view(batch_size, 1, 1, k_len).expand(batch_size, self.a, q.size()[2], k_len)
        #     if tgt_sequence_mask is not None: 
        #         assert self.types == 'self' , \
        #                 (f"Only Self Attention in Decoder Needs Sequence Mask, but now {self.types} attetion!")
        #         s_mask = tgt_sequence_mask.view(1, 1, k_len, k_len).   \
        #         expand(batch_size, self.a, -1, -1)
        #         mask = s_mask.logical_or(mask)
        #     # print(mask.size(), scores.size())
        #     # print(mask)
        #     scores = scores.masked_fill(mask, -1e9)
        
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        output = torch.matmul(attention_weights, v)
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.h)
        output = self.out_proj(output)
        return output

In [166]:
import torch
import torch.nn as nn

# 假设您的自定义 Attention 类名为 CustomAttention
# from your_module import CustomAttention

# 定义模型参数
d_model = 512
nhead = 8

# 初始化自定义 Attention 模型
custom_attention = Attention(d_model, nhead, type = 'cross')

# 初始化标准库 MultiheadAttention 模型
standard_attention = nn.MultiheadAttention(embed_dim=d_model, num_heads=nhead, batch_first=True)

with torch.no_grad():
    # 设置自定义 Attention 的权重和偏置
    custom_attention.Wq.weight.copy_(standard_attention.in_proj_weight[:d_model])
    custom_attention.Wq.bias.copy_(standard_attention.in_proj_bias[:d_model])
    custom_attention.Wk.weight.copy_(standard_attention.in_proj_weight[d_model:2*d_model])
    custom_attention.Wk.bias.copy_(standard_attention.in_proj_bias[d_model:2*d_model])
    custom_attention.Wv.weight.copy_(standard_attention.in_proj_weight[2*d_model:])
    custom_attention.Wv.bias.copy_(standard_attention.in_proj_bias[2*d_model:])
    custom_attention.out_proj.weight.copy_(standard_attention.out_proj.weight)
    custom_attention.out_proj.bias.copy_(standard_attention.out_proj.bias)

# 创建相同的输入数据
batch_size = 2
seq_len = 10

query = torch.rand(batch_size, seq_len, d_model)
key = value = torch.rand(batch_size, seq_len, d_model)

# 使用自定义 Attention 进行前向传播
custom_output = custom_attention(query, key)

# 使用标准库 MultiheadAttention 进行前向传播
standard_output, _ = standard_attention(query, key, value)

print(custom_output.size(), standard_output.size())

# 比较输出结果
print("Custom Attention Output:", custom_output)
print("Standard Attention Output:", standard_output)

# 检查输出是否一致
if torch.allclose(custom_output, standard_output, atol=1e-5):
    print("Attention outputs are consistent.")
else:
    print("Attention outputs are not consistent.")

torch.Size([2, 10, 512]) torch.Size([2, 10, 512])
Custom Attention Output: tensor([[[ 7.6254e-02, -2.3000e-01, -2.4884e-01,  ..., -1.2974e-01,
          -2.4446e-01, -5.3609e-02],
         [ 7.6644e-02, -2.2916e-01, -2.4818e-01,  ..., -1.2940e-01,
          -2.4590e-01, -5.2144e-02],
         [ 7.7572e-02, -2.2836e-01, -2.4693e-01,  ..., -1.3126e-01,
          -2.4442e-01, -5.2186e-02],
         ...,
         [ 7.7370e-02, -2.2816e-01, -2.4882e-01,  ..., -1.2853e-01,
          -2.4481e-01, -5.3064e-02],
         [ 7.5168e-02, -2.2928e-01, -2.4806e-01,  ..., -1.2686e-01,
          -2.4602e-01, -5.6681e-02],
         [ 7.5953e-02, -2.2779e-01, -2.4856e-01,  ..., -1.2830e-01,
          -2.4703e-01, -5.0346e-02]],

        [[ 5.1680e-02, -2.7051e-01, -2.0929e-01,  ..., -1.6202e-01,
          -1.7383e-01, -9.3966e-04],
         [ 5.0170e-02, -2.7117e-01, -2.0826e-01,  ..., -1.6102e-01,
          -1.7110e-01,  6.9406e-04],
         [ 5.3044e-02, -2.7202e-01, -2.0860e-01,  ..., -1.6277e-01,
 

## 比较Layernorm

In [167]:
class LayerNorm(nn.Module):
    def __init__(self, normalized_shape, eps=1e-5, elementwise_affine=True):
        super().__init__()
        self.normalized_shape = normalized_shape
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
    def forward(self, input):
        # 计算均值和方差
        assert self.normalized_shape[0] == input.size()[-1], ("Unmatched Shape.")
        mean = input.mean(dim=-1, keepdim=True)
        var = input.var(dim=-1, unbiased=False, keepdim=True)
        std = torch.sqrt(var + self.eps)
        
        # 应用层归一化公式
        normalized_input = (input - mean) / std
        normalized_input = normalized_input * self.weight + self.bias
        
        return normalized_input

In [168]:
# 定义模型参数
normalized_shape = (d_model,)

# 初始化自定义 LayerNorm 模型
custom_layernorm = LayerNorm(normalized_shape)

# 初始化标准库 LayerNorm 模型
standard_layernorm = nn.LayerNorm(normalized_shape)

# 创建相同的输入数据
input_data = torch.rand(batch_size, seq_len, d_model)

# 使用自定义 LayerNorm 进行前向传播
custom_output = custom_layernorm(input_data)

# 使用标准库 LayerNorm 进行前向传播
standard_output = standard_layernorm(input_data)

# 比较输出结果
print("Custom LayerNorm Output:", custom_output)
print("Standard LayerNorm Output:", standard_output)

# 检查输出是否一致
if torch.allclose(custom_output, standard_output, atol=1e-6):
    print("LayerNorm outputs are consistent.")
else:
    print("LayerNorm outputs are not consistent.")

Custom LayerNorm Output: tensor([[[-1.4499,  1.0367,  0.8635,  ...,  1.0624,  1.6254,  1.5586],
         [ 1.1919,  0.9680,  0.1012,  ...,  0.4476,  1.0002, -0.9387],
         [ 1.2584,  0.4808,  1.4803,  ...,  0.1199,  1.1277, -1.0652],
         ...,
         [-1.1420,  0.1190,  0.1091,  ...,  0.6674, -0.5147,  0.4453],
         [ 1.5784,  0.6634, -0.2846,  ..., -0.4316, -0.8661, -0.3981],
         [-0.2079, -1.5761,  0.7877,  ..., -0.3204,  1.3260,  1.7002]],

        [[-0.3636, -0.9277,  1.3401,  ...,  1.4149, -1.2291, -0.7934],
         [-1.0764,  0.4581,  0.6112,  ..., -0.0618, -1.6071,  1.1913],
         [-0.5549,  0.9851, -1.1974,  ...,  1.1155, -0.6968, -1.2720],
         ...,
         [-0.2990,  0.1023,  1.3759,  ..., -0.7829,  0.4815, -0.5369],
         [-1.4962, -0.8674,  1.6019,  ..., -1.1599, -0.6209,  0.6707],
         [ 0.1447, -0.1330,  0.5088,  ...,  1.8131, -0.8557, -0.8239]]],
       grad_fn=<AddBackward0>)
Standard LayerNorm Output: tensor([[[-1.4499,  1.0367,  0.86

## 比较FeedForward

In [169]:
class FeedForward(nn.Module):
    def __init__(self, h, hiddenDim = None, outDim = None, dropout = 0.1, type = 'relu'):
        """
        x: (h, h) ---> x * W_1: (h, hiddenDim) ---> relu/gelu: (h, hiddenDim) ---> A' * W2: (h, outDim)
        W1: (h, hiddenDim)
        W2: (hiddenDim, outDim)
        默认hiddenDim = 4 * h, outDim = h
        """
        super().__init__()
        self.h = h
        if hiddenDim is None: hiddenDim = 4 * h
        if outDim is None: outDim = h
        self.W1 = nn.Linear(h, hiddenDim)
        # self.dropout = nn.Dropout(dropout)
        self.W2 = nn.Linear(hiddenDim, outDim)
        self.types = type
    
    def forward(self, x):
        x = self.W1(x)
        if self.types == 'relu': x = F.relu(x)
        elif self.types == 'gelu': x = F.gelu(x)
        else: raise ValueError("Unsupported activation type")
        # x = self.dropout(x)
        x = self.W2(x)
        return x

In [170]:
# 定义模型参数
dim_ff = 2048

# 初始化自定义 FeedForward 模型
custom_feedforward = FeedForward(d_model, dim_ff, dropout=0)

# 初始化标准库 FeedForward 模型
standard_feedforward = nn.Sequential(
    nn.Linear(d_model, dim_ff),
    nn.ReLU(),
    nn.Linear(dim_ff, d_model)
)

with torch.no_grad():
    # 设置第一层的权重和偏置
    custom_feedforward.W1.weight.copy_(standard_feedforward[0].weight)
    custom_feedforward.W1.bias.copy_(standard_feedforward[0].bias)
    # 设置第二层的权重和偏置
    custom_feedforward.W2.weight.copy_(standard_feedforward[2].weight)
    custom_feedforward.W2.bias.copy_(standard_feedforward[2].bias)


# 创建相同的输入数据
input_data = torch.rand(batch_size, seq_len, d_model)

# 使用自定义 FeedForward 进行前向传播
custom_output = custom_feedforward(input_data)

# 使用标准库 FeedForward 进行前向传播
standard_output = standard_feedforward(input_data)

# 比较输出结果
print("Custom FeedForward Output:", custom_output)
print("Standard FeedForward Output:", standard_output)

# 检查输出是否一致
if torch.allclose(custom_output, standard_output, atol=1e-6):
    print("FeedForward outputs are consistent.")
else:
    print("FeedForward outputs are not consistent.")

Custom FeedForward Output: tensor([[[-0.0033,  0.0491, -0.1526,  ...,  0.4070, -0.1007, -0.1621],
         [ 0.0931,  0.0313, -0.1054,  ...,  0.4616, -0.0819, -0.0800],
         [ 0.1692, -0.0269, -0.1421,  ...,  0.4423, -0.0836, -0.1786],
         ...,
         [ 0.0956,  0.0691, -0.1539,  ...,  0.3764, -0.0960, -0.1147],
         [ 0.0786,  0.1818, -0.1807,  ...,  0.4164, -0.0534, -0.2579],
         [-0.0387,  0.0827, -0.1627,  ...,  0.3216,  0.0080, -0.0844]],

        [[ 0.0102,  0.1768, -0.1897,  ...,  0.4118, -0.1269, -0.0931],
         [ 0.1048,  0.1075, -0.1676,  ...,  0.4553, -0.0972, -0.1229],
         [ 0.1263,  0.0266, -0.1519,  ...,  0.5270, -0.0882, -0.1278],
         ...,
         [ 0.1030, -0.0176, -0.1740,  ...,  0.3288, -0.1698, -0.1251],
         [ 0.0236, -0.0760, -0.2338,  ...,  0.4686, -0.1037, -0.0839],
         [ 0.0725, -0.0319, -0.1449,  ...,  0.3669, -0.1149, -0.0957]]],
       grad_fn=<ViewBackward0>)
Standard FeedForward Output: tensor([[[-0.0033,  0.0491, 

## 比较Transformer架构

In [171]:
class TransformerEncoderDecoder(nn.Module):
    def __init__(self, h, a, num_encoder_layers, num_decoder_layers, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.encoders = nn.ModuleList([
            nn.ModuleList([
                Attention(h, a, dropout),
                LayerNorm((h,)),
                FeedForward(h, dropout = dropout),
                LayerNorm((h,))
            ]) for _ in range(num_encoder_layers)
        ])
        
        self.decoders = nn.ModuleList([
            nn.ModuleList([
                Attention(h, a, dropout),
                LayerNorm((h,)),
                Attention(h, a, dropout, type='cross'),
                LayerNorm((h,)),
                FeedForward(h, dropout = dropout),
                LayerNorm((h,))
            ]) for _ in range(num_decoder_layers)
        ])

    def forward(self, encoder_input, decoder_input, src_padding_mask=None, tgt_padding_mask = None, tgt_sequence_mask=None):

        for enc in self.encoders:
            attention, norm1, ff, norm2 = enc
            encoder_input = norm1(attention(encoder_input, padding_mask=src_padding_mask) + encoder_input)
            encoder_input = norm2(ff(encoder_input) + encoder_input)

        for dec in self.decoders:
            self_attention, norm1, cross_attention, norm2, ff, norm3 = dec
            decoder_input = norm1(self_attention(decoder_input, padding_mask=tgt_padding_mask, \
                                                 tgt_sequence_mask = tgt_sequence_mask) + decoder_input)
            decoder_input = norm2(cross_attention(decoder_input, encoder_input, \
                                                  padding_mask=src_padding_mask) + decoder_input)
            decoder_input = norm3(ff(decoder_input) + decoder_input)
        return decoder_input

In [None]:
import numpy as np
import random
# 设置全局随机种子
# def set_seed(seed):
#     torch.manual_seed(seed)
#     torch.cuda.manual_seed_all(seed)
#     np.random.seed(seed)
#     random.seed(seed)
#     torch.backends.cudnn.deterministic = True
#     torch.backends.cudnn.benchmark = False

# # 设置随机种子
# set_seed(42)

# 定义模型参数
h = 512
a = 8
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = 2048
dropout = 0.1

# 初始化自定义 Transformer 模型
custom_transformer = TransformerEncoderDecoder(h, a, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)

# 初始化标准库 Transformer 模型
standard_transformer = nn.Transformer(d_model=h, nhead=a, num_encoder_layers=num_encoder_layers, 
                                      num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward, 
                                      dropout=dropout, batch_first=True)

# 设置相同的权重和偏置
with torch.no_grad():
    for i in range(num_encoder_layers):
        custom_attention, custom_norm1, custom_ff, custom_norm2 = custom_transformer.encoders[i]
        standard_encoder_layer = standard_transformer.encoder.layers[i]
        
        # Attention weights
        custom_attention.q_proj.weight.copy_(standard_encoder_layer.self_attn.in_proj_weight[:h])
        custom_attention.q_proj.bias.copy_(standard_encoder_layer.self_attn.in_proj_bias[:h])
        custom_attention.k_proj.weight.copy_(standard_encoder_layer.self_attn.in_proj_weight[h:2*h])
        custom_attention.k_proj.bias.copy_(standard_encoder_layer.self_attn.in_proj_bias[h:2*h])
        custom_attention.v_proj.weight.copy_(standard_encoder_layer.self_attn.in_proj_weight[2*h:])
        custom_attention.v_proj.bias.copy_(standard_encoder_layer.self_attn.in_proj_bias[2*h:])
        custom_attention.out_proj.weight.copy_(standard_encoder_layer.self_attn.out_proj.weight)
        custom_attention.out_proj.bias.copy_(standard_encoder_layer.self_attn.out_proj.bias)
        
        # LayerNorm weights
        custom_norm1.layer_norm.weight.copy_(standard_encoder_layer.norm1.weight)
        custom_norm1.layer_norm.bias.copy_(standard_encoder_layer.norm1.bias)
        custom_norm2.layer_norm.weight.copy_(standard_encoder_layer.norm2.weight)
        custom_norm2.layer_norm.bias.copy_(standard_encoder_layer.norm2.bias)
        
        # FeedForward weights
        custom_ff.linear1.weight.copy_(standard_encoder_layer.linear1.weight)
        custom_ff.linear1.bias.copy_(standard_encoder_layer.linear1.bias)
        custom_ff.linear2.weight.copy_(standard_encoder_layer.linear2.weight)
        custom_ff.linear2.bias.copy_(standard_encoder_layer.linear2.bias)

    for i in range(num_decoder_layers):
        custom_self_attention, custom_norm1, custom_cross_attention, custom_norm2, custom_ff, custom_norm3 = custom_transformer.decoders[i]
        standard_decoder_layer = standard_transformer.decoder.layers[i]
        
        # Self-Attention weights
        custom_self_attention.q_proj.weight.copy_(standard_decoder_layer.self_attn.in_proj_weight[:h])
        custom_self_attention.q_proj.bias.copy_(standard_decoder_layer.self_attn.in_proj_bias[:h])
        custom_self_attention.k_proj.weight.copy_(standard_decoder_layer.self_attn.in_proj_weight[h:2*h])
        custom_self_attention.k_proj.bias.copy_(standard_decoder_layer.self_attn.in_proj_bias[h:2*h])
        custom_self_attention.v_proj.weight.copy_(standard_decoder_layer.self_attn.in_proj_weight[2*h:])
        custom_self_attention.v_proj.bias.copy_(standard_decoder_layer.self_attn.in_proj_bias[2*h:])
        custom_self_attention.out_proj.weight.copy_(standard_decoder_layer.self_attn.out_proj.weight)
        custom_self_attention.out_proj.bias.copy_(standard_decoder_layer.self_attn.out_proj.bias)
        
        # Cross-Attention weights
        custom_cross_attention.q_proj.weight.copy_(standard_decoder_layer.multihead_attn.in_proj_weight[:h])
        custom_cross_attention.q_proj.bias.copy_(standard_decoder_layer.multihead_attn.in_proj_bias[:h])
        custom_cross_attention.k_proj.weight.copy_(standard_decoder_layer.multihead_attn.in_proj_weight[h:2*h])
        custom_cross_attention.k_proj.bias.copy_(standard_decoder_layer.multihead_attn.in_proj_bias[h:2*h])
        custom_cross_attention.v_proj.weight.copy_(standard_decoder_layer.multihead_attn.in_proj_weight[2*h:])
        custom_cross_attention.v_proj.bias.copy_(standard_decoder_layer.multihead_attn.in_proj_bias[2*h:])
        custom_cross_attention.out_proj.weight.copy_(standard_decoder_layer.multihead_attn.out_proj.weight)
        custom_cross_attention.out_proj.bias.copy_(standard_decoder_layer.multihead_attn.out_proj.bias)
        
        # LayerNorm weights
        custom_norm1.layer_norm.weight.copy_(standard_decoder_layer.norm1.weight)
        custom_norm1.layer_norm.bias.copy_(standard_decoder_layer.norm1.bias)
        custom_norm2.layer_norm.weight.copy_(standard_decoder_layer.norm2.weight)
        custom_norm2.layer_norm.bias.copy_(standard_decoder_layer.norm2.bias)
        custom_norm3.layer_norm.weight.copy_(standard_decoder_layer.norm3.weight)
        custom_norm3.layer_norm.bias.copy_(standard_decoder_layer.norm3.bias)
        
        # FeedForward weights
        custom_ff.linear1.weight.copy_(standard_decoder_layer.linear1.weight)
        custom_ff.linear1.bias.copy_(standard_decoder_layer.linear1.bias)
        custom_ff.linear2.weight.copy_(standard_decoder_layer.linear2.weight)
        custom_ff.linear2.bias.copy_(standard_decoder_layer.linear2.bias)

# 创建相同的输入数据
batch_size = 2
src_len = 10
tgt_len = 10

src = torch.rand(batch_size, src_len, h)
tgt = torch.rand(batch_size, tgt_len, h)
src_padding_mask = torch.zeros(batch_size, src_len).bool()
tgt_padding_mask = torch.zeros(batch_size, tgt_len).bool()
tgt_sequence_mask = nn.Transformer.generate_square_subsequent_mask(tgt_len)

# 使用自定义 Transformer 进行前向传播
custom_output = custom_transformer(src, tgt, src_padding_mask, tgt_padding_mask, tgt_sequence_mask)

# 使用标准库 Transformer 进行前向传播
standard_output = standard_transformer(src, tgt, tgt_mask=tgt_sequence_mask, 
                                       src_key_padding_mask=src_padding_mask, 
                                       tgt_key_padding_mask=tgt_padding_mask)

# 比较输出结果
print("Custom Transformer Output:", custom_output)
print("Standard Transformer Output:", standard_output)

# 检查输出是否一致
if torch.allclose(custom_output, standard_output, atol=1e-6):
    print("Outputs are consistent.")
else:
    print("Outputs are not consistent.")