总结：首先是Input_embedding和positional_encoding 是Transformer中的一环
EncoderLayer包含MultiHeadAttention
https://blog.csdn.net/weixin_44613415/article/details/139848359

## 下面是Transformer的class
已知这里会有Input_embedding和Positional_encoding

In [14]:
import torch
import torch.nn as nn
import math
import torch.optim as optim

In [15]:
class PositionalEncoding(nn.Module):#维度是奇数也不会报错
    def __init__(self, d_model, max_seq_length):
        super().__init__()#继承父类nn.Module
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        # 只考虑偶数位置，确保 div_term 的长度匹配
        div_term = torch.exp(2 * torch.arange(0, (d_model + 1) // 2).float()  * -(math.log(10000.0) / d_model))
        #其中2 * torch.arange(0, (d_model + 1) // 2).float()是2i
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term[:d_model // 2]) 
        self.register_buffer('pe', pe.unsqueeze(0))#pe是[batch_size, seq_length, d_model]，注册后会成为self.pe

    def forward(self, x):#这里的x是[batch_size, seq_length, d_model]
        return x + self.pe[:, :x.size(1)]#也可以尝试下除了相加的方式，但是感觉乘法的话就会有权重为0的可能性

In [16]:
class MultiHeadAttention(nn.Module):#x是[batch_size, seq_length, d_model]
    def __init__(self, d_model, num_heads):
        super().__init__()#继承父类nn.Module
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = self.d_model//self.num_heads #因为要整除
        self.Qw = nn.Linear(d_model, d_model)#加载Q权重
        self.Kw = nn.Linear(d_model, d_model)#加载K权重
        self.Vw = nn.Linear(d_model, d_model)#加载V权重
        self.Ow = nn.Linear(d_model, d_model)#加载V权重
        
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
    #把第一列和第二列交换，也就是seq_length和self.num_heads
    #变成了(batch_size, self.num_heads, seq_length, self.d_k)

    def concat_heads(self, x):
        batch_size, num_heads, seq_length, d_k = x.size()
        return x.transpose(2,1).reshape(batch_size, seq_length, self.d_model)
    
    def dot_attention(self, Q, K, V):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)#首先是K转置，所以要交换最后两列，矩阵相乘
        #得到格式为[batch_size,num_heads,seq_length,seq_length]

        attn_probs = torch.softmax(attn_scores, dim=-1)#对最后一维做softmax
        output = torch.matmul(attn_probs, V)
        return output#得到了(batch_size, self.num_heads, seq_length, self.d_k)

    
    def forward(self,Q,K,V):#首先第一步是分头，把d_model分解成num_heads
        Qa=self.split_heads(self.Qw(Q))
        Ka=self.split_heads(self.Kw(K))
        Va=self.split_heads(self.Vw(V))
        attn_output = self.dot_attention(Qa, Ka, Va)
        output= self.Ow(self.concat_heads(attn_output))
        return output

In [17]:
class FeedForward(nn.Module):# 感觉前馈层能做很多，加上dropout吧
    def __init__(self, d_model,d_hidden,is_drop = True, drop = 0.1):
        super().__init__()#继承父类nn.Module
        self.W1 = nn.Linear(d_model, d_hidden)
        self.W2 = nn.Linear(d_hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(drop)
        self.is_drop=is_drop
    
    def forward(self, x,):
        x = self.relu(self.W1(x))
        if self.is_drop:
            x=self.dropout(x)
        return self.W2(x)

In [18]:
class EncoderLayer(nn.Module):
    def __init__(self,d_model, num_heads, d_hidden,is_drop = True, drop = 0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.feed_forward = FeedForward(d_model,d_hidden,is_drop, drop)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x):
        attn_output = self.attn(x, x, x)
        x = self.norm1(x + attn_output)
        f_output = self.feed_forward(x)
        x = self.norm2(x + f_output)
        return x

In [19]:
class Transformer(nn.Module):
    def __init__(self):
        #Embedding层输入（vocab表，embedding的维度），输出（句子的长度，embedding的维度）
        self.encoder_input_embedding = nn.Embedding(num_embeddings, embedding_dim)
        #PositionalEncoding层输入（最大长度，嵌入维度）先把位置编码固定了
        self.positional_encoding = PositionalEncoding(max_seq_length, embedding_dim)#位置编码的维度和嵌入维度通常要一样，因为要相加在一起


# 测试部分

In [20]:
## tips：下面是关于embedding层的测试
# 定义词汇表（Vocabulary）
vocab = {'I': 0, 'like': 1, 'to': 2, 'learn': 3, 'deep': 4, 'learning': 5, 'with': 6, 'PyTorch': 7}
vocab_size = len(vocab)
# 定义句子
sentence = ['I', 'like', 'to', 'learn', 'deep', 'learning', 'with', 'PyTorch']

# 将句子中的单词映射为索引序列
sentence_indices = [vocab[word] for word in sentence]
# 将索引序列转换为张量，同事
sentence_tensor = torch.tensor(sentence_indices).unsqueeze(0) 
# 定义 nn.Embedding 层
embedding_dim = 6
embedding_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
# 使用 nn.Embedding 将索引序列转换为嵌入向量
embedded_sentence = embedding_layer(sentence_tensor)
# 打印结果
print("原始句子:", sentence)
print("句子索引:", sentence_tensor )
print("嵌入向量形状:", embedded_sentence.shape)  # (sequence_length, embedding_dim)
print("嵌入向量:\n", embedded_sentence)

原始句子: ['I', 'like', 'to', 'learn', 'deep', 'learning', 'with', 'PyTorch']
句子索引: tensor([[0, 1, 2, 3, 4, 5, 6, 7]])
嵌入向量形状: torch.Size([1, 8, 6])
嵌入向量:
 tensor([[[ 0.3950, -1.2595,  0.5595, -1.2674,  2.7538, -0.8248],
         [ 1.7810,  0.4843, -0.5217,  0.6411,  2.0571,  0.8603],
         [-1.0124,  1.2623, -0.0644,  1.2093,  0.0174,  0.3585],
         [-1.2773, -0.6560, -0.3979, -0.9609,  1.4672,  1.4918],
         [-0.3171,  0.5181,  1.0291,  1.4574, -0.9940,  1.2083],
         [-2.9706, -0.8443, -0.9328,  0.6566,  0.2987, -0.1731],
         [-0.3194,  1.1842, -1.3125,  0.6940,  0.2855,  0.4416],
         [ 0.5895,  0.1618, -0.8739, -0.3380,  0.0239,  0.2934]]],
       grad_fn=<EmbeddingBackward0>)


In [21]:
## tips：下面是关于positional embedding层的测试
max_seq_length = 10
d_model = 6
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
# 只考虑偶数位置，确保 div_term 的长度匹配
div_term = torch.exp(2*torch.arange(0, (d_model + 1) // 2).float()  * -(math.log(10000.0) / d_model))
# print(position)
# print(torch.arange(0, (d_model + 1) // 2).float(), div_term)
# 分别对偶数和奇数位置赋值
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term[:d_model // 2]) 
pe=pe.unsqueeze(0)#维度变成[batch_size, seq_length, d_model]
embedded_sentence + pe[:, :embedded_sentence.size(1)]

tensor([[[ 0.3950, -0.2595,  0.5595, -0.2674,  2.7538,  0.1752],
         [ 2.6224,  1.0246, -0.4753,  1.6400,  2.0592,  1.8603],
         [-0.1031,  0.8462,  0.0283,  2.2050,  0.0217,  1.3585],
         [-1.1362, -1.6459, -0.2591,  0.0294,  1.4737,  2.4918],
         [-1.0740, -0.1356,  1.2137,  2.4402, -0.9854,  2.2082],
         [-3.9295, -0.5606, -0.7028,  1.6298,  0.3094,  0.8268],
         [-0.5989,  2.1443, -1.0376,  1.6554,  0.2984,  1.4415],
         [ 1.2465,  0.9157, -0.5546,  0.6097,  0.0390,  1.2933]]],
       grad_fn=<AddBackward0>)

In [22]:
## tips：下面是关于维度转换的测试
batch_size = 2
seq_length = 4
d_model = 8
num_heads = 2
d_k = d_model // num_heads

# 定义输入张量
x = torch.tensor([
    [
        [1, 2, 3, 4, 5, 6, 7, 8],
        [9, 10, 11, 12, 13, 14, 15, 16],
        [17, 18, 19, 20, 21, 22, 23, 24],
        [25, 26, 27, 28, 29, 30, 31, 32]
    ],
    [
        [33, 34, 35, 36, 37, 38, 39, 40],
        [41, 42, 43, 44, 45, 46, 47, 48],
        [49, 50, 51, 52, 53, 54, 55, 56],
        [57, 58, 59, 60, 61, 62, 63, 64]
    ]
])  # 形状 [2, 4, 8]

# 重塑张量
x = x.view(batch_size, seq_length, num_heads, d_k)  # [2, 4, 2, 4]
print("原张量格式如下：",x)
# 调整维度顺序
y = x.permute(0, 2, 1, 3)  # [2, 2, 4, 4]
# 打印新张量的形状
print("使用permute后的格式如下：",y)
z = x.transpose(1,2)
zz = x.transpose(2,1)
print("使用transpose后的格式如下：",z)

原张量格式如下： tensor([[[[ 1,  2,  3,  4],
          [ 5,  6,  7,  8]],

         [[ 9, 10, 11, 12],
          [13, 14, 15, 16]],

         [[17, 18, 19, 20],
          [21, 22, 23, 24]],

         [[25, 26, 27, 28],
          [29, 30, 31, 32]]],


        [[[33, 34, 35, 36],
          [37, 38, 39, 40]],

         [[41, 42, 43, 44],
          [45, 46, 47, 48]],

         [[49, 50, 51, 52],
          [53, 54, 55, 56]],

         [[57, 58, 59, 60],
          [61, 62, 63, 64]]]])
使用permute后的格式如下： tensor([[[[ 1,  2,  3,  4],
          [ 9, 10, 11, 12],
          [17, 18, 19, 20],
          [25, 26, 27, 28]],

         [[ 5,  6,  7,  8],
          [13, 14, 15, 16],
          [21, 22, 23, 24],
          [29, 30, 31, 32]]],


        [[[33, 34, 35, 36],
          [41, 42, 43, 44],
          [49, 50, 51, 52],
          [57, 58, 59, 60]],

         [[37, 38, 39, 40],
          [45, 46, 47, 48],
          [53, 54, 55, 56],
          [61, 62, 63, 64]]]])
使用transpose后的格式如下： tensor([[[[ 1,  2,  3,  4],
 

In [23]:
## tips：下面是关于多头注意力的测试
batch_size = 2
seq_length = 4
d_model = 8
num_heads = 2

# 实例化类
mha = MultiHeadAttention(d_model, num_heads)

# 创建随机张量
Q = torch.randn(batch_size, seq_length, d_model)
K = torch.randn(batch_size, seq_length, d_model)
V = torch.randn(batch_size, seq_length, d_model)

# 测试 split_heads 和 concat_heads
split = mha.split_heads(Q)
concat = mha.concat_heads(split)
assert torch.allclose(Q, concat), "split_heads 和 concat_heads 不是互逆的。"

# 测试 scaled_dot_product_attention
attn_output = mha.dot_attention(split, split, split)
expected_shape = (batch_size, num_heads, seq_length, mha.d_k)
assert attn_output.shape == expected_shape, f"Attention 输出形状不正确，期望 {expected_shape}，得到 {attn_output.shape}。"

# 测试 forward 方法
output = mha(Q, K, V)
expected_shape = (batch_size, seq_length, d_model)
assert output.shape == expected_shape, f"输出形状不正确，期望 {expected_shape}，得到 {output.shape}。"

# 检查是否有运行时错误
try:
    output = mha(Q, K, V)
    print("前向传播成功。")
except Exception as e:
    print(f"前向传播时出错：{e}")

# 验证梯度是否正确传播
Q.requires_grad_(True)
output = mha(Q, K, V)
output.mean().backward()
assert Q.grad is not None, "梯度没有回传到 Q。"

print("所有测试通过！")

# 与 PyTorch 的实现进行比较
torch_mha = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads, batch_first=True)

# 将自定义 MHA 的权重复制到 PyTorch MHA
with torch.no_grad():
    torch_mha.in_proj_weight = nn.Parameter(torch.cat([
        mha.Qw.weight,
        mha.Kw.weight,
        mha.Vw.weight
    ], dim=0))
    torch_mha.in_proj_bias = nn.Parameter(torch.cat([
        mha.Qw.bias,
        mha.Kw.bias,
        mha.Vw.bias
    ], dim=0))
    torch_mha.out_proj.weight = mha.Ow.weight
    torch_mha.out_proj.bias = mha.Ow.bias

# 使用 PyTorch 的 MHA
torch_output, _ = torch_mha(Q, K, V)

# 比较输出
if torch.allclose(output, torch_output, atol=1e-6):
    print("自定义实现与 PyTorch 实现输出匹配。")
else:
    print("自定义实现与 PyTorch 实现输出不匹配。")

前向传播成功。
所有测试通过！
自定义实现与 PyTorch 实现输出匹配。


In [24]:
# tips：下面是关于Norm的测试

# 输入张量
x = torch.tensor([
    [[1.0, 2.0, 3.0, 4.0],
     [5.0, 6.0, 7.0, 8.0],
     [9.0, 10.0, 11.0, 12.0]],
    [[13.0, 14.0, 15.0, 16.0],
     [17.0, 18.0, 19.0, 20.0],
     [21.0, 22.0, 23.0, 24.0]]
])#2，3，4

# BatchNorm，假设特征维度为 4
batch_norm = nn.BatchNorm1d(num_features=4)
x_bn = x.permute(0, 2, 1)  # 将维度调整为 (batch, features, seq)，因为BatchNorm1d默认第一维是特征
output_bn = batch_norm(x_bn)
x_original = output_bn .permute(0, 2, 1)
print("BatchNorm 输出：\n", x_original)

# LayerNorm
layer_norm = nn.LayerNorm(normalized_shape=4)
output_ln = layer_norm(x)
print("LayerNorm 输出：\n", output_ln)

BatchNorm 输出：
 tensor([[[-1.4639, -1.4639, -1.4639, -1.4639],
         [-0.8783, -0.8783, -0.8783, -0.8783],
         [-0.2928, -0.2928, -0.2928, -0.2928]],

        [[ 0.2928,  0.2928,  0.2928,  0.2928],
         [ 0.8783,  0.8783,  0.8783,  0.8783],
         [ 1.4639,  1.4639,  1.4639,  1.4639]]], grad_fn=<PermuteBackward0>)
LayerNorm 输出：
 tensor([[[-1.3416, -0.4472,  0.4472,  1.3416],
         [-1.3416, -0.4472,  0.4472,  1.3416],
         [-1.3416, -0.4472,  0.4472,  1.3416]],

        [[-1.3416, -0.4472,  0.4472,  1.3416],
         [-1.3416, -0.4472,  0.4472,  1.3416],
         [-1.3416, -0.4472,  0.4472,  1.3416]]],
       grad_fn=<NativeLayerNormBackward0>)


In [25]:
# tips：下面是关于FeedForward的测试
ffn = FeedForward(512, 2048)
input_tensor = torch.randn(32, 128, 512)
output_tensor = ffn(input_tensor)
print(output_tensor.shape)


torch.Size([32, 128, 512])


In [1]:
# tips：下面是关于encoder的测试
d_model = 8
num_heads = 4
d_hidden = 128
seq_len = 10
batch_size = 2

# Initialize the EncoderLayer
encoder_layer = EncoderLayer(d_model, num_heads, d_hidden)

x = torch.rand(batch_size, seq_len, d_model)

# Forward pass through the encoder layer
output = encoder_layer(x)

# Verify the output shape
print("Input shape:", x.shape,x)
print("Output shape:", output.shape,output)

NameError: name 'EncoderLayer' is not defined