In [19]:
import torch
from torch import nn 
import torch.nn.functional as F
import math

In [20]:
enc_voc_size = 6000 
dec_voc_size = 8000
src_pad_idx = 1   
trg_pad_idx = 1
trg_sos_idx = 2
batch_size = 128
max_len = 1024
d_model = 512
n_layers = 3
n_heads = 2
ffn_hidden = 1024
drop_prob = 0.1
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

# Embedding 

## Token Embedding

In [21]:
class TokenEmbedding(nn.Embedding): # 词嵌入
    def __init__(self, vocab_size, d_model):
        super(TokenEmbedding, self).__init__(vocab_size, d_model, padding_idx=1)

## Position Embedding
![title](image/positional_encoding.jpg)

In [22]:
class PositionalEmbedding(nn.Module): # 位置编码
    def __init__(self, d_model, maxlen, device):
        super(PositionalEmbedding, self).__init__()
        self.encoding = torch.zeros(maxlen, d_model, device=device) # 位置编码矩阵
        self.encoding.requires_grad_(False)  # 位置编码不需要更新梯度

        pos = torch.arange(0, maxlen, device=device)
        pos = pos.float().unsqueeze(1)
        _2i = torch.arange(0, d_model, 2, device=device)

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))

    def forward(self, x):
        seq_len = x.shape[1]
        return self.encoding[:seq_len, :]

## Total Embedding

In [23]:
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, drop_prob, device):
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = TokenEmbedding(vocab_size, d_model)
        self.pos_emb = PositionalEmbedding(d_model, max_len, device)
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x):
        tok_emb = self.tok_emb(x)
        pos_emb = self.pos_emb(x)
        return self.drop_out(tok_emb + pos_emb)

这段代码定义了一个名为`TransformerEmbedding`的类，它是PyTorch的`nn.Module`的子类，用于实现Transformer模型中的嵌入层。

1. `__init__`方法：这是类的初始化方法，用于设置类的一些基本参数。

   - `vocab_size`：词汇表的大小，也就是有多少个不同的词或者标记。
   - `d_model`：模型的维度，也就是嵌入向量的长度。
   - `max_len`：最大序列长度，用于位置嵌入。
   - `drop_prob`：Dropout层的概率。
   - `device`：用于指定模型运行的设备，如"cpu"或"cuda"。

   在这个方法中，创建了三个子模块：

   - `self.tok_emb`：词嵌入层，用于将输入的词或标记转换为嵌入向量。
   - `self.pos_emb`：位置嵌入层，用于生成位置信息。
   - `self.drop_out`：Dropout层，用于在训练过程中随机丢弃一部分神经元，以防止过拟合。

2. `forward`方法：这是类的前向传播方法，定义了模型的计算过程。

   - `tok_emb`：通过词嵌入层将输入`x`转换为嵌入向量。
   - `pos_emb`：通过位置嵌入层为输入`x`生成位置信息。
   - `tok_emb + pos_emb`：将词嵌入和位置嵌入相加，得到最终的嵌入向量。
   - `self.drop_out(tok_emb + pos_emb)`：对最终的嵌入向量应用Dropout。

   这个方法的返回值是应用了Dropout的嵌入向量。

总的来说，这个类的作用是将输入的词或标记转换为包含位置信息的嵌入向量，并在训练过程中应用Dropout以防止过拟合。

# Layer Norm
![layer](image/layer_norm.jpg)

In [24]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-10):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps
    
    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        var = x.var(-1, unbiased=False, keepdim=True)
        out = (x - mean) / torch.sqrt(var + self.eps)
        out = self.gamma * out + self.beta
        return out

这个类是实现层归一化（Layer Normalization）的。层归一化是一种在深度学习中常用的归一化技术，它可以使神经网络更容易训练。

1. `__init__`方法：这是类的初始化方法，用于设置类的一些基本参数。

   - `d_model`：模型的维度，也就是输入数据的特征维度。
   - `eps`：一个很小的数，用于防止除以0的情况。

   在这个方法中，创建了两个参数：

   - `self.gamma`：缩放因子，是一个可学习的参数，初始值为1。
   - `self.beta`：偏移因子，是一个可学习的参数，初始值为0。

2. `forward`方法：这是类的前向传播方法，定义了层归一化的计算过程。

   - `mean`：计算输入`x`在最后一个维度上的均值。
   - `var`：计算输入`x`在最后一个维度上的方差。
   - `out`：对输入`x`进行归一化，即减去均值，然后除以标准差（即方差的平方根）。
   - `out`：对归一化后的结果进行缩放和偏移，即乘以`self.gamma`，然后加上`self.beta`。

   这个方法的返回值是经过层归一化后的结果。

总的来说，这个类的作用是对输入数据进行层归一化，即在每个样本内部，对所有特征进行归一化。这可以使得不同的特征有相同的尺度，从而使神经网络更容易训练。

# FFN
![layer](image/positionwise_feed_forward.jpg)

In [25]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, hidden)
        self.fc2 = nn.Linear(hidden, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# 多头注意力机制

In [26]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()

        self.n_head = n_head
        self.d_model = d_model
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_combine = nn.Linear(d_model, d_model)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, q, k, v, mask=None):
        batch, time, dimension = q.shape
        n_d = self.d_model // self.n_head
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        q = q.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        k = k.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        v = v.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)

        score = q @ k.transpose(2, 3) / math.sqrt(n_d)
        if mask is not None:
            # mask = torch.tril(torch.ones(time, time, dtype=bool))
            # score = score.masked_fill(mask == 0, -10000)
            score = score.masked_fill(mask == 0, float("-inf"))
        score = self.softmax(score) @ v

        score = score.permute(0, 2, 1, 3).contiguous().view(batch, time, dimension)

        output = self.w_combine(score)
        return output

# Encoder Layer
![enc-dec](image/enc_dec.jpg)

In [27]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, drop_prob) -> None:  #  
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, n_head)
        self.norm1 = LayerNorm(d_model)
        self.drop1 = nn.Dropout(drop_prob)

        self.ffn = PositionwiseFeedForward(d_model, ffn_hidden, drop_prob)
        self.norm2 = LayerNorm(d_model)
        self.drop2 = nn.Dropout(drop_prob)

    def forward(self, x, mask=None):
        _x = x
        x = self.attention(x, x, x, mask)

        x = self.drop1(x)
        x = self.norm1(x + _x)

        _x = x
        x = self.ffn(x)

        x = self.drop2(x)
        x = self.norm2(x + _x)
        return x

In [28]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, n_head) # 自注意力
        self.norm1 = LayerNorm(d_model)
        self.dropout1 = nn.Dropout(drop_prob)

        self.cross_attention = MultiHeadAttention(d_model, n_head)
        self.norm2 = LayerNorm(d_model)
        self.dropout2 = nn.Dropout(drop_prob)

        self.ffn = PositionwiseFeedForward(d_model, ffn_hidden, drop_prob)
        self.norm3 = LayerNorm(d_model)
        self.dropout3 = nn.Dropout(drop_prob)

    def forward(self, dec, enc, t_mask, s_mask):
        _x = dec
        x = self.attention1(dec, dec, dec, t_mask)  # 下三角掩码

        x = self.dropout1(x)
        x = self.norm1(x + _x)

        if enc is not None:
            _x = x
            x = self.cross_attention(x, enc, enc, s_mask)

            x = self.dropout2(x)
            x = self.norm2(x + _x)

        _x = x
        x = self.ffn(x)

        x = self.dropout3(x)
        x = self.norm3(x + _x)
        return x

在神经网络中，Dropout和归一化（Normalization）是两种常用的技术，它们各自有不同的作用。

1. Dropout：Dropout是一种正则化技术，它的作用是防止过拟合。在训练过程中，Dropout会随机地将一部分神经元的输出置为0，这可以防止模型过度依赖某些特定的神经元，增加模型的泛化能力。

2. 归一化：归一化的目的是调整输入的数值范围，使其在一个相对统一的尺度上。这可以帮助模型更快地收敛，也可以提高模型的性能。

在这段代码中，先进行Dropout，然后进行归一化。这样做的好处是，Dropout会引入一些随机性，增加模型的鲁棒性；而归一化则可以确保这些随机性不会影响模型的学习过程，因为归一化会将输入的数值范围调整到一个相对统一的尺度上。

总的来说，先进行Dropout再进行归一化，可以同时享受到Dropout的正则化效果和归一化的稳定性。

In [29]:
class Encoder(nn.Module):
    def __init__(
        self,
        env_voc_size,
        max_len,
        d_model,
        ffn_hidden,
        n_head,
        n_layer,
        drop_prob,
        device,
    ):
        super(Encoder, self).__init__()

        self.embedding = TransformerEmbedding(
            env_voc_size, d_model, max_len, drop_prob, device
        )

        self.layers = nn.ModuleList(
            [
                EncoderLayer(d_model, ffn_hidden, n_head, drop_prob)
                for _ in range(n_layer)
            ]
        )

    def forward(self, x, s_mask):
        x = self.embedding(x)
        for layer in self.layers:
            x = layer(x, s_mask)
        return x

In [30]:
class Decoder(nn.Module):
    def __init__(
        self,
        dec_voc_size,
        max_len,
        d_model,
        ffn_hidden,
        n_head,
        n_layer,
        drop_prob,
        device,
    ):
        super(Decoder, self).__init__()

        self.embedding = TransformerEmbedding(
            dec_voc_size, d_model, max_len, drop_prob, device
        )

        self.layers = nn.ModuleList(
            [
                DecoderLayer(d_model, ffn_hidden, n_head, drop_prob)
                for _ in range(n_layer)
            ]
        )

        self.fc = nn.Linear(d_model, dec_voc_size) # 全连接层 输出维度为词表大小

    def forward(self, dec, enc, t_mask, s_mask):
        dec = self.embedding(dec)
        for layer in self.layers:
            dec = layer(dec, enc, t_mask, s_mask)

        dec = self.fc(dec)

        return dec

# 龙珠集齐，构建Transformer?!  (mask才是真细节)
![total](image/jiegou.png)

In [31]:

class Transformer(nn.Module):
    def __init__(
        self,
        src_pad_idx,
        trg_pad_idx,
        enc_voc_size,
        dec_voc_size,
        max_len,
        d_model,
        n_heads,
        ffn_hidden,
        n_layers,
        drop_prob,
        device,
    ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(
            enc_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )
        self.decoder = Decoder(
            dec_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_pad_mask(self, q, k, pad_idx_q, pad_idx_k):
        len_q, len_k = q.size(1), k.size(1)

        # (Batch, Time, len_q, len_k)
        q = q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
        q = q.repeat(1, 1, 1, len_k)

        k = k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
        k = k.repeat(1, 1, len_q, 1)

        mask = q & k  # 逻辑与
        return mask

    def make_casual_mask(self, q, k):
        len_q, len_k = q.size(1), k.size(1)
        mask = (
            torch.tril(torch.ones(len_q, len_k)).type(torch.BoolTensor).to(self.device)
        )
        return mask

    def forward(self, src, trg):
        src_mask = self.make_pad_mask(src, src, self.src_pad_idx, self.src_pad_idx)
        trg_mask = self.make_pad_mask(
            trg, trg, self.trg_pad_idx, self.trg_pad_idx
        ) * self.make_casual_mask(trg, trg)
        src_trg_mask = self.make_pad_mask(trg, src, self.trg_pad_idx, self.src_pad_idx)

        enc = self.encoder(src, src_mask)
        ouput = self.decoder(trg, enc, trg_mask, src_trg_mask)
        return ouput

In [33]:
model = Transformer(src_pad_idx=src_pad_idx,
                    trg_pad_idx=trg_pad_idx,
                    d_model=d_model,
                    enc_voc_size=enc_voc_size,
                    dec_voc_size=dec_voc_size,
                    max_len=max_len,
                    ffn_hidden=ffn_hidden,
                    n_heads=n_heads,
                    n_layers=n_layers,
                    drop_prob=drop_prob,
                    device=device).to(device)

def initialize_weights(m): # 初始化权重
    if hasattr(m, 'weight') and m.weight.dim() > 1:
        nn.init.kaiming_uniform_(m.weight.data)
        
model.apply(initialize_weights) # 初始化权重
src = torch.load('tensor_src.pt') # 读取数据
src = torch.cat((src, torch.ones(src.shape[0], 2, dtype=torch.int)), dim=-1) # 添加结束符
trg = torch.load('tensor_trg.pt') # 读取数据
result = model(src, trg) 
print(result, result.shape)

tensor([[[ 0.2096,  4.0805,  1.3268,  ...,  4.5154, -3.0169,  1.1317],
         [-0.1783,  2.5734,  0.7091,  ...,  3.3577, -1.3724,  1.8047],
         [-1.2102,  3.4396,  1.4156,  ...,  3.0125, -2.1139,  1.4954],
         ...,
         [ 1.1453,  2.2822,  0.3995,  ...,  2.7893, -1.3602,  1.1842],
         [ 1.0298,  3.7359,  1.4463,  ...,  2.0101, -2.0346,  1.5018],
         [ 1.3843,  4.2372,  1.4020,  ...,  2.7285, -1.6696,  1.4603]],

        [[ 0.6930,  4.1842,  2.6438,  ...,  2.1498, -1.7789,  2.1723],
         [-0.3690,  3.2009,  1.4606,  ...,  2.4403, -1.0351,  0.9493],
         [-0.3905,  3.6216,  3.1230,  ...,  2.1094, -0.3664, -0.5516],
         ...,
         [ 0.9126,  2.5781,  0.9418,  ...,  2.7622, -2.0733,  0.3550],
         [-0.3082,  3.8304,  1.2920,  ...,  3.0891, -2.0876, -0.1144],
         [ 1.4525,  2.6919,  1.1688,  ...,  2.2144, -2.5237,  2.2584]],

        [[-0.2040,  2.8150,  0.7135,  ...,  3.0976, -2.2169,  1.4494],
         [-0.0475,  2.5148,  0.8170,  ...,  2