In [2]:
# coding:utf8
import torch
import torch.nn as nn
import numpy as np
import random
import json

from jupyterlab.commands import watch
from numpy.array_api import positive

from week02.test import correct, epoch


In [5]:
# 任务：尝试在nlpdemo中使用rnn模型训练，判断特定字符在文本中的位置。
# 定义一个TorchModel类，固定套路
class TorchModel(nn.Module):
    def __init__(self, vector_dim, sentence_length, vocab):  # 接收词向量维度、句子长度、词表
        # 首先一步是调用父类构造器,这一行也得背记下来
        super(TorchModel, self).__init__()
        # 构建embedding层，nn.Embedding接收几个参数：词表大小、词向量维度、用什么编号作为填充
        self.embedding = nn.Embedding(len(vocab), vector_dim, padding_idx=0)
        # rnn层，输入层大小vector_dim,输出层大小vector_dim,输入和输出的张量形状一battch,seq_len,dim为主流形式
        self.rnn = nn.RNN(vector_dim, vector_dim, batch_first=True)
        # 线性层，把每个时间步的RNN输出映射为一个标量（用于二分类）
        self.classify = nn.Linear(vector_dim, 1)
        # 激活函数，用sigmoid函数把标量映射到[0,1]概率
        self.activation = torch.sigmoid
        # 损失函数，其实这里使用最好的二分类损失应该使用BCE
        self.loss = nn.functional.mse_loss

    # 正向传播，x是输入，y为可选，如果给了真实标签就返回损失用于训练
    def forward(self, x, y=None):
        # 把编号序列转换为向量序列，形状由（batch,seq_len）->(batch, seq_len, vector_dim)
        x = self.embedding(x)
        #  把embedding传给RNN，rnn_out是每个时间步的输出，第二个返回值是隐藏状态，这里并没有使用
        rnn_out, _ = self.rnn(x)
        # 把每一个时间步的输出映射到一个书logit，形状(batch,seq_len,1)
        y_pred = self.classify(rnn_out)
        # 如果传入真实标签则返回损失，否则返回预测值
        if y is not None:
            return self.loss(y_pred,y)
        else:
            return y_pred


In [12]:
# 词表构造函数
def build_vocab():
    chars = "你我他defghijklmnopqrstuvwxyz"  # 定义字符集
    vocab = {"pad": 0}  # vocab初始用0作为填充
    for index, char in enumerate(chars):
        vocab[char] = index + 1  # 每个字对应一个序号
    vocab['unk'] = len(vocab)  # 未知字符放在最后一位
    return vocab

build_vocab()

{'pad': 0,
 '你': 1,
 '我': 2,
 '他': 3,
 'd': 4,
 'e': 5,
 'f': 6,
 'g': 7,
 'h': 8,
 'i': 9,
 'j': 10,
 'k': 11,
 'l': 12,
 'm': 13,
 'n': 14,
 'o': 15,
 'p': 16,
 'q': 17,
 'r': 18,
 's': 19,
 't': 20,
 'u': 21,
 'v': 22,
 'w': 23,
 'x': 24,
 'y': 25,
 'z': 26,
 'unk': 27}

In [14]:
# 随机生产单个样本
def build_sample(vocab, sentence_length):
    # 随机从词表里面选择sentence_length个字符，可能重复，生产一个字符序列
    x = [random.choice(list(vocab.keys())) for _ in range(sentence_length)]
    y = []
    for char in x:
        if char in set("你我他"):
            y.append(1)
        else:
            y.append(0)
    x = [vocab.get(word, vocab['unk']) for word in x]
    return x,y

build_sample(build_vocab(),7)

([14, 3, 2, 8, 9, 4, 3], [0, 1, 1, 0, 0, 0, 1])

In [21]:
# 构建批量数据集
def build_dataset(sample_length, vocab, sentence_length):
    dataset_x = []
    dataset_y = []
    for i in range(sample_length):
        x, y = build_sample(vocab, sentence_length)
        dataset_x.append(x)
        dataset_y.append(y)
    # 把python列表转换为torch的张量：LongTensor用于索引，FloatTensor用于标签
    # unsqueeze把标签形状从(batch, seq_len)变成(batch,seq,len,1)
    return torch.LongTensor(dataset_x), torch.FloatTensor(dataset_y).unsqueeze(-1)
build_dataset(2,build_vocab(),7)

(tensor([[14, 25, 21, 17,  2,  3, 23],
         [ 8, 23, 15,  7, 26,  9, 22]]),
 tensor([[[0.],
          [0.],
          [0.],
          [0.],
          [1.],
          [1.],
          [0.]],
 
         [[0.],
          [0.],
          [0.],
          [0.],
          [0.],
          [0.],
          [0.]]]))

In [22]:
# 构建包装函数
def build_model(vocab, char_dim, sentence_length):
    model = TorchModel(char_dim, sentence_length, vocab)
    return model

In [29]:
# 评估函数
def evaluate(model, vocab, sample_length):
    model.eval()  # 切换到评估模式，背下来
    # 生成200个样本作为测试集
    x, y = build_dataset(200, vocab, sample_length)

    # 统计正样本个数
    positive_positions = int(y.sum())
    # 统计总样本数
    total_positions = 200 * sample_length
    print("本次预测集中共有%d个正样本位置,%d个负样本位置" %
          (positive_positions, total_positions - positive_positions))

    correct, wrong = 0, 0
    # 关闭梯度计算
    with torch.no_grad():
        y_pred = model(x)

        for y_p_seq,y_t_seq in zip(y_pred,y):
            for y_p, y_t in zip(y_p_seq, y_t_seq):
                if float(y_p) < 0.5 and int(y_t) == 0:
                    correct += 1
                elif float(y_p) >= 0.5 and int(y_t) == 1:
                    correct += 1
                else:
                    wrong += 1
    print("正确预测个数:%d, 正确率:%f" % (correct, correct / (correct + wrong)))
    return correct / (correct + wrong)

In [30]:
def main():
    # 配置参数
    epoch_num = 20 # 训练轮数
    batch_size = 20  # 每次训练的样本个数
    train_sample = 500  # 每轮训练总共训练的样本总数
    char_dim = 20  # 每个字的维度
    sentence_length = 6  # 句子长度
    learning_rate = 0.005 # 学习率
    # 建立词表
    vocab = build_vocab()
    # 建立模型
    model = build_model(vocab, char_dim, sentence_length)
    # 选择优化器
    optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
    log = []
    # 训练过程,一轮一轮训练
    for epoch in range(epoch_num):
        model.train()
        watch_loss = []
        for batch in range(int(train_sample / batch_size)):
            x, y = build_dataset(batch_size, vocab, sentence_length) # 构造一个数据集
            optim.zero_grad() # 梯度归零
            loss = model(x, y)
            loss.backward() # 计算梯度
            optim.step() # 更新权重

            watch_loss.append(loss.item())
        print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))
        acc = evaluate(model, vocab, sentence_length)  # 测试本轮模型结果
        log.append([acc, np.mean(watch_loss)])

    # 保存模型
    torch.save(model.state_dict(), "model.pth")
    # 保存词表
    writer = open("vocab.json", "w", encoding="utf8")
    writer.write(json.dumps(vocab, ensure_ascii=False, indent=2))
    writer.close()
    return


In [31]:
def predict(model_path, vocab_path, input_strings):
    char_dim = 20  # 每个字的维度
    sentence_length = 6  # 样本文本长度
    vocab = json.load(open(vocab_path, "r", encoding="utf8"))  # 加载字符表
    model = build_model(vocab, char_dim, sentence_length)  # 建立模型
    model.load_state_dict(torch.load(model_path))  # 加载训练好的权重

    x = []
    for input_string in input_strings:
        x.append([vocab.get(char, vocab['unk']) for char in input_string])  # 将输入序列化

    model.eval()  # 测试模式
    with torch.no_grad():  # 不计算梯度
        result = model.forward(torch.LongTensor(x))  # 模型预测 (batch_size, sen_len, 1)
        result = result.squeeze(-1)  # (batch_size, sen_len)

    for i, input_string in enumerate(input_strings):
        print(f"\n输入:{input_string}")
        print("=" * 50)
        for j, char in enumerate(input_string):
            prob = float(result[i][j])
            is_target = "是" if prob >= 0.5 else "否"
            print(f"  位置{j} 字符'{char}': {is_target}特定字符 (概率:{prob:.4f})")


In [32]:
if __name__ == "__main__":
    main()
    test_strings = ["fnvf我e", "wz你dfg", "rqwdeg", "n我k他ww"]
    predict("model.pth", "vocab.json", test_strings)


第1轮平均loss:0.045528
本次预测集中共有123个正样本位置,1077个负样本位置
正确预测个数:1200, 正确率:1.000000
第2轮平均loss:0.003898
本次预测集中共有119个正样本位置,1081个负样本位置
正确预测个数:1200, 正确率:1.000000
第3轮平均loss:0.000993
本次预测集中共有143个正样本位置,1057个负样本位置
正确预测个数:1200, 正确率:1.000000
第4轮平均loss:0.000487
本次预测集中共有128个正样本位置,1072个负样本位置
正确预测个数:1200, 正确率:1.000000
第5轮平均loss:0.000315
本次预测集中共有119个正样本位置,1081个负样本位置
正确预测个数:1200, 正确率:1.000000
第6轮平均loss:0.000243
本次预测集中共有106个正样本位置,1094个负样本位置
正确预测个数:1200, 正确率:1.000000
第7轮平均loss:0.000180
本次预测集中共有118个正样本位置,1082个负样本位置
正确预测个数:1200, 正确率:1.000000
第8轮平均loss:0.000159
本次预测集中共有129个正样本位置,1071个负样本位置
正确预测个数:1200, 正确率:1.000000
第9轮平均loss:0.000121
本次预测集中共有121个正样本位置,1079个负样本位置
正确预测个数:1200, 正确率:1.000000
第10轮平均loss:0.000108
本次预测集中共有148个正样本位置,1052个负样本位置
正确预测个数:1200, 正确率:1.000000
第11轮平均loss:0.000091
本次预测集中共有139个正样本位置,1061个负样本位置
正确预测个数:1200, 正确率:1.000000
第12轮平均loss:0.000077
本次预测集中共有128个正样本位置,1072个负样本位置
正确预测个数:1200, 正确率:1.000000
第13轮平均loss:0.000071
本次预测集中共有138个正样本位置,1062个负样本位置
正确预测个数:1200, 正确率:1.000000
第14轮平均loss:0.000069
本次预测集中共有121个正样