In [None]:
import torch
from torch import nn, optim
from torch.nn import functional as F
import math
import mltools
from d2l import torch as d2l

In [None]:
class LSTMTMModel(nn.Module):
    """循环神经网络预测模型"""

    def __init__(self, *args, vocab_size, **kwargs):
        """初始化函数"""
        nn.Module.__init__(self, *args, **kwargs)
        self.vocab_size = vocab_size  # 定义词汇表大小
        self.hidden_layer = nn.LSTM(self.vocab_size, hidden_size=256, num_layers=1, batch_first=True)  # 定义隐藏层
        self.output_layer = nn.Linear(256, self.vocab_size)  # 定义输出层

    def forward(self, x, state=None):
        """前向传播"""
        x = F.one_hot(x, self.vocab_size)  # 将输入嵌入, x形状为(批量大小, 时间步数, 嵌入大小)
        x = x.to(torch.float32)
        x, state = self.hidden_layer(x, state)  # x形状为(批量大小, 时间步数, 隐藏大小), state形状为(隐藏层数, 批量大小, 隐藏大小)
        x = self.output_layer(x)  # 它的输出形状是(批量大小, 时间步数, 输出大小)
        x = x.permute(0, 2, 1)  # 交换时间步数和输出大小的维度, x形状为(批量大小, 输出大小, 时间步数)
        return x, state

In [None]:
device = torch.device("cuda")
train_iter, vocab = d2l.load_data_time_machine(batch_size=32, num_steps=35)
model = LSTMTMModel(vocab_size=len(vocab))  # 定义训练模型
model.to(device)
loss = nn.CrossEntropyLoss()  # 设置损失函数
optimizer = optim.SGD(model.parameters(), lr=1)  # 设置优化器
ml = mltools.MachineLearning("LSTMTM")
ml.add_model(model)
epoch, timer = ml.batch_create(create_recorder=False)
recorder = ml.create_recorder(1)

In [None]:
# 训练模型
num_epochs = epoch(500)
animator = ml.create_animator(xlabel="epoch", xlim=[0, epoch.totol_epoch + 1], ylim=-0.1, legend=["train perplexity"])  # 创建动画器
for current_epoch in range(1, num_epochs + 1):
    timer.start()

    # 计算训练集
    metric_train = mltools.Accumulator(2)  # 累加器：(train_loss, train_size)
    model.train()  # 训练模式
    for x, y in train_iter:
        x = x.to(device)  # 转换x
        y = y.to(device)  # 转换y
        y_train, _ = model(x)  # 计算模型
        train_loss = loss(y_train, y)  # 计算训练损失

        # 梯度更新
        optimizer.zero_grad()
        train_loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1, norm_type=2)
        optimizer.step()

        metric_train.add(train_loss * y.numel(), y.numel())
    recorder[0].append(math.exp(metric_train[0] / metric_train[1]))

    timer.stop()

    # 打印输出值
    ml.logger.info(f"train perplexity {recorder[0][-1]:.3f}")
    ml.print_training_time_massage(timer, num_epochs, current_epoch)
    ml.logger.info(f"trained on {str(device)}")
    animator.show(recorder.data)
else:
    # 打印输出值
    ml.logger.info(f"train perplexity {recorder[0][-1]:.3f}")
    ml.print_training_time_massage(timer, num_epochs, current_epoch)
    ml.logger.info(f"trained on {str(device)}")
    animator.show(recorder.data)
ml.save()

In [None]:
# 测试模型
model.eval()
metric = mltools.Accumulator(2)  # 累加器：(test_acc, test_size)
with torch.no_grad():
    for x, y in train_iter:
        x = x.to(device)  # 转换x
        y = y.to(device)  # 转换y
        y_test, _ = model(x)  # 计算模型
        test_pred = y_test.argmax(dim=1)  # 计算准确率
        test_acc = (test_pred == y).sum()  # 计算测试准确率
        metric.add(test_acc, y.numel())
ml.logger.info(f"test acc {metric[0] / metric[1]:.3f}")  # 计算测试准确率并输出

In [None]:
# 预测模型
model.eval()
prefix, num_preds = "time traveller ", 50
outputs = vocab[list(prefix)]
state = None
for y in prefix:  # 预热期
    _, state = model(torch.tensor([vocab[y]], device=device).reshape(1, 1), state)
for _ in range(num_preds):  # 预测num_preds步
    y, state = model(torch.tensor([outputs[-1]], device=device).reshape(1, 1), state)
    outputs.append(int(y.argmax(dim=1).reshape(1)))
print("".join([vocab.idx_to_token[i] for i in outputs]))