In [1]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# 位置编码

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # 计算位置编码
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(
        torch.tensor(10000.0)) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + x + self.pe[:, :x.size(1)]
        return x

# 示例用法
d_model = 512
max_len = 100
num_heads = 8

# 位置编码
pos_encoder = PositionalEncoding(d_model, max_len)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

# 应用位置编码
input_sequence = pos_encoder(input_sequence)
print("输入序列的位置编码:")
print(input_sequence.shape)

输入序列的位置编码:
torch.Size([5, 100, 512])


# 多头注意力

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        self.depth = d_model // num_heads
        
        # 查询、键和值的线性投影
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        
        # 输出线性投影
        self.output_linear = nn.Linear(d_model, d_model)
    
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.depth).transpose(1, 2)
    
    def forward(self, query, key, value, mask=None):
        
        # 线性投影
        query = self.query_linear(query)
        key = self.key_linear(key)
        value = self.value_linear(value)
        
        # 分割头部
        query = self.split_heads(query)
        key = self.split_heads(key)
        value = self.split_heads(value)
        
        # 缩放点积注意力
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.depth)
        
        # 如果提供了掩码，则应用掩码
        if mask is not None:
            scores += scores.masked_fill(mask == 0, -1e9)
        
        # 计算注意力权重并应用softmax
        attention_weights = torch.softmax(scores, dim=-1)
        
        # 应用注意力到值
        attention_output = torch.matmul(attention_weights, value)
        
        # 合并头部
        batch_size, _, seq_length, d_k = attention_output.size()
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size,
        seq_length, self.d_model)
        
        # 线性投影
        attention_output = self.output_linear(attention_output)
        
        return attention_output

# 示例用法
d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048

# 多头注意力
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

# 多头注意力
attention_output= multihead_attn(input_sequence, input_sequence, input_sequence)
print("attention_output shape:", attention_output.shape)

attention_output shape: torch.Size([5, 100, 512])


# 前馈网络

In [4]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        # 线性变换1
        x = self.relu(self.linear1(x))
        
        # 线性变换2
        x = self.linear2(x)
        
        return x

# 示例用法
d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048

# 多头注意力
multihead_attn = MultiHeadAttention(d_model, num_heads)

# 前馈网络
ff_network = FeedForward(d_model, d_ff)

# 示例输入序列
input_sequence = torch.randn(5, max_len, d_model)

# 多头注意力
attention_output= multihead_attn(input_sequence, input_sequence, input_sequence)

# 前馈网络
output_ff = ff_network(attention_output)
print('input_sequence',input_sequence.shape)
print("output_ff", output_ff.shape)

input_sequence torch.Size([5, 100, 512])
output_ff torch.Size([5, 100, 512])


# 编码器

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, mask):
        
        # 自注意力层
        attention_output= self.self_attention(x, x,
        x, mask)
        attention_output = self.dropout(attention_output)
        x = x + attention_output
        x = self.norm1(x)
        
        # 前馈层
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm2(x)
        
        return x

d_model = 512
max_len = 100
num_heads = 8
d_ff = 2048


# 多头注意力
encoder_layer = EncoderLayer(d_model, num_heads, d_ff, 0.1)

# 示例输入序列
input_sequence = torch.randn(1, max_len, d_model)

# 多头注意力
encoder_output= encoder_layer(input_sequence, None)
print("encoder output shape:", encoder_output.shape)

encoder output shape: torch.Size([1, 100, 512])


# 解码器

In [6]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.masked_self_attention = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        
        # 掩码的自注意力层
        self_attention_output= self.masked_self_attention(x, x, x, tgt_mask)
        self_attention_output = self.dropout(self_attention_output)
        x = x + self_attention_output
        x = self.norm1(x)
        
        # 编码器-解码器注意力层
        enc_dec_attention_output= self.enc_dec_attention(x, encoder_output, 
        encoder_output, src_mask)
        enc_dec_attention_output = self.dropout(enc_dec_attention_output)
        x = x + enc_dec_attention_output
        x = self.norm2(x)
        
        # 前馈层
        feed_forward_output = self.feed_forward(x)
        feed_forward_output = self.dropout(feed_forward_output)
        x = x + feed_forward_output
        x = self.norm3(x)
        
        return x

# 定义DecoderLayer的参数
d_model = 512  # 模型的维度
num_heads = 8  # 注意力头的数量
d_ff = 2048    # 前馈网络的维度
dropout = 0.1  # 丢弃概率
batch_size = 1 # 批量大小
max_len = 100  # 序列的最大长度

# 定义DecoderLayer实例
decoder_layer = DecoderLayer(d_model, num_heads, d_ff, dropout)


src_mask = torch.rand(batch_size, max_len, max_len) > 0.5
tgt_mask = torch.tril(torch.ones(max_len, max_len)).unsqueeze(0) == 0

# 将输入张量传递到DecoderLayer
output = decoder_layer(input_sequence, encoder_output, src_mask, tgt_mask)

# 输出形状
print("Output shape:", output.shape)

Output shape: torch.Size([1, 100, 512])


# Transformer模型

In [7]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff,
    max_len, dropout):
        super(Transformer, self).__init__()

        # 定义编码器和解码器的词嵌入层
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)

        # 定义位置编码层
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # 定义编码器和解码器的多层堆叠
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout)
        for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout)
        for _ in range(num_layers)])

        # 定义线性层
        self.linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    # 生成掩码
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    # 前向传播
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)

        # 编码器输入的词嵌入和位置编码
        encoder_embedding = self.encoder_embedding(src)
        en_positional_encoding = self.positional_encoding(encoder_embedding)
        src_embedded = self.dropout(en_positional_encoding)

        # 解码器输入的词嵌入和位置编码
        decoder_embedding = self.decoder_embedding(tgt)
        de_positional_encoding = self.positional_encoding(decoder_embedding)
        tgt_embedded = self.dropout(de_positional_encoding)

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.linear(dec_output)
        return output

# 示例用法
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_len = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, 
d_ff, max_len, dropout)

# 生成随机示例数据
src_data = torch.randint(1, src_vocab_size, (5, max_len))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len))  # (batch_size, seq_length)
transformer(src_data, tgt_data[:, :-1]).shape

torch.Size([5, 99, 5000])

# Transformer 模型的训练和评估

In [8]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

# 训练循环
transformer.train()

for epoch in range(10):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:]
    .contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"第 {epoch+1} 轮：损失= {loss.item():.4f}")


# 虚拟数据
src_data = torch.randint(1, src_vocab_size, (5, max_len))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len))  # (batch_size, seq_length)

# 评估循环
transformer.eval()
with torch.no_grad():
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:]
    .contiguous().view(-1))
    print(f"\n虚拟数据的评估损失= {loss.item():.4f}")

第 1 轮：损失= 8.6675
第 2 轮：损失= 8.1944
第 3 轮：损失= 7.9596
第 4 轮：损失= 7.7520
第 5 轮：损失= 7.5324
第 6 轮：损失= 7.2933
第 7 轮：损失= 7.0099
第 8 轮：损失= 6.7522
第 9 轮：损失= 6.5018
第 10 轮：损失= 6.2690

虚拟数据的评估损失= 8.6879
