In [1]:
import torch
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
# Embedding 词嵌入
import math
import torch.nn as nn

class Embeddings(nn.Module):
    # d_model: 词嵌入的维度
    # vocab_size: 词表的大小
    def __init__(self, d_model, vocab_size):
        super(Embeddings, self).__init__()
        # 初始化词嵌入层
        self.embedding = nn.Embedding(vocab_size, d_model)
        # 初始化词嵌入层的参数
        self.d_model = d_model

    def forward(self, x):
        # 使嵌入向量的量级与后续残差/位置编码在同一尺度，稳定训练、加快收敛
        return self.embedding(x) * math.sqrt(self.d_model)


In [3]:
# 位置编码

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        # 初始化一个形状为(max_len, d_model)的零矩阵
        pe = torch.zeros(max_len, d_model, device=DEVICE)
        # 初始化一个形状为(max_len, 1)的索引矩阵
        position = torch.arange(0, max_len, dtype=torch.float, device=DEVICE).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float, device=DEVICE) * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        # self.pe shape: (1, max_len, d_model)
        
        # 提取与输入序列长度匹配的位置编码
        # 形状: (1, seq_len, d_model)
        pos_encoding = self.pe[:, :x.size(1), :]
        
        # 广播相加: (batch_size, seq_len, d_model) + (1, seq_len, d_model)
        x = x + pos_encoding
        
        # 应用 dropout
        return self.dropout(x)


In [4]:
# 注意力机制
from torch.nn import functional as F

def attention(query, key, value, mask=None, dropout=None):
    # query shape: (batch_size, n_heads, seq_len, d_tensor)
    # key shape: (batch_size, n_heads, seq_len, d_tensor)
    # value shape: (batch_size, n_heads, seq_len, d_tensor)
    # mask shape: (batch_size, n_heads, seq_len, seq_len)

    d_k = query.size(-1)

    # 计算注意力权重
    # shape: (batch_size, n_heads, seq_len, seq_len)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    scores = F.softmax(scores, dim=-1)

    if dropout is not None:
        scores = dropout(scores)

    # 返回注意力权重和注意力输出
    return torch.matmul(scores, value), scores

In [5]:
# 多头注意力

class MultiHeadAttention(nn.Module):
    # h: 多头注意力机制的头的数量
    # d_model: 词嵌入的维度
    # dropout: 丢弃率
    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % h == 0

        self.d_k = d_model // h
        self.h = h
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        
        nbatches = query.size(0)
        
        # 将embedding后乘WQ, WK, WV得到Q, K, V
        # 重塑形状为(batch_size, n_heads, seq_len, d_tensor)
        query, key, value = [
            layer(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
            for layer, x in zip(self.linears, (query, key, value))
        ]

        # 计算注意力权重
        x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)

        #(batch_size, h, seq_len, d_k) -> (batch_size, seq_len, h, d_k)
        # 将注意力权重重塑为(batch_size, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.d_model)

        # 返回线性变换后的结果
        return self.linears[-1](x)
            

In [6]:
# 层归一化
class LayerNorm(nn.Module):
    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.a_2 = nn.Parameter(torch.ones(features))
        self.b_2 = nn.Parameter(torch.zeros(features))
        self.eps = eps
    
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

# 前馈神经网络
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)
        self.w_2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, x):
        return self.w_2(self.dropout(F.relu(self.w_1(x))))

In [7]:
# 子层连接
class SublayerConnection(nn.Module):
    # size: 词嵌入的维度
    # dropout: 丢弃率
    # sublayer: 子层
    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(p=dropout)
    
    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

In [8]:
# 编码器层
class EncoderLayer(nn.Module):
    # size: 词嵌入的维度
    # self_attn: 自注意力机制
    # feed_forward: 前馈神经网络
    # dropout: 丢弃率
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        # multi ffn connection
        self.sublayer = clones(SublayerConnection(size, dropout), 2)
        # d_model
        self.size = size
    
    def forward(self, src, src_mask):
        # src: (batch_size, seq_len, d_model)
        # src_mask: (batch_size, seq_len, seq_len)
        src = self.sublayer[0](src, lambda x: self.self_attn(x, x, x, src_mask))
        return self.sublayer[1](src, self.feed_forward)

# 编码器
class Encoder(nn.Module):
    # layer: 编码器层
    # N: 编码器层的数量
    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
    
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

In [9]:
# 解码器层
class DecoderLayer(nn.Module):
    # size: 词嵌入的维度
    # self_attn: 自注意力机制
    # src_attn: 源注意力机制
    # feed_forward: 前馈神经网络
    # dropout: 丢弃率
    def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()
        self.size = size
        self.self_attn = self_attn
        # 解码器自注意力机制 
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sublayer = clones(SublayerConnection(size, dropout), 3)
        
    def forward(self, x, memory, src_mask, tgt_mask):
        m = memory
        x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
        return self.sublayer[2](x, self.feed_forward)
    
# 解码器
class Decoder(nn.Module):
    # layer: 解码器层
    # N: 解码器层的数量
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        self.layers = clones(layer, N)
        self.norm = LayerNorm(layer.size)
    
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

In [10]:
# 生成器
class Generator(nn.Module):
    # size: 词嵌入的维度
    # dropout: 丢弃率
    def __init__(self, d_model, vocab_size):
        super(Generator, self).__init__()
        self.proj = nn.Linear(d_model, vocab_size)
    
    def forward(self, x):
        return F.log_softmax(self.proj(x), dim=-1)

In [11]:
# transformer
class Transformer(nn.Module):
    # encoder: 编码器
    # decoder: 解码器
    # src_embed: 源嵌入
    # tgt_embed: 目标嵌入
    # generator: 生成器
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super(Transformer, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator
    
    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)
    
    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
    
    

In [12]:
import copy

def make_model(src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
    c = copy.deepcopy
    attn = MultiHeadAttention(h, d_model, dropout)
    ff = PositionwiseFeedForward(d_model, d_ff, dropout)
    position = PositionalEncoding(d_model, dropout)
    
    model = Transformer(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
        Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
        nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
        nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
        Generator(d_model, tgt_vocab)
    )

    # 初始化模型参数
    for p in model.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
    return model.to(DEVICE)
