In [1]:
import pickle

import numpy as np
import pandas as pd
import torch
import math
import torch.nn as nn
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset
from torch import optim
from torchnet import meter
from tqdm import tqdm

In [2]:
# 模型输入参数，需要自己根据需要调整
hidden_dim = 100  # 隐层大小
epochs = 10  # 迭代次数
batch_size = 32  # 每个批次样本大小
embedding_dim = 20  # 每个字形成的嵌入向量大小
output_dim = 2  # 输出维度，因为是二分类
lr = 0.001  # 学习率
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
input_shape = 180  # 每句话的词的个数，如果不够需要使用0进行填充

Using device: cuda


In [3]:
# 加载文本数据
def load_data(file_path, input_shape=20):
    df = pd.read_csv(file_path, sep='\t')

    df = df.dropna(subset=['text'])  # 删除text为空的行
    df['text'] = df['text'].astype(str)  # 确保所有text都是字符串
    # 标签及词汇表
    labels, vocabulary = list(df['label'].unique()), list(df['text'].unique())

    # 构造字符级别的特征
    string = ''
    for word in vocabulary:
        string += word

    # 所有的词汇表
    vocabulary = set(string)

    # word2idx 将字映射为索引
    word_dictionary = {word: i + 1 for i, word in enumerate(vocabulary)}
    with open('word_dict.pk', 'wb') as f:
        pickle.dump(word_dictionary, f)
    # idx2word 将索引映射为字
    inverse_word_dictionary = {i + 1: word for i, word in enumerate(vocabulary)}
    # label2idx 将正反面映射为0和1
    label_dictionary = {label: i for i, label in enumerate(labels)}
    with open('label_dict.pk', 'wb') as f:
        pickle.dump(label_dictionary, f)
    # idx2label 将0和1映射为正反面
    output_dictionary = {i: labels for i, labels in enumerate(labels)}

    # 训练数据中所有词的个数
    vocab_size = len(word_dictionary.keys())  # 词汇表大小
    # 标签类别，分别为正、反面
    label_size = len(label_dictionary.keys())  # 标签类别数量

    # 序列填充，按input_shape填充，长度不足的按0补充
    # 将一句话映射成对应的索引 [0,24,63...]
    x = [[word_dictionary[word] for word in sent] for sent in df['text']]
    # 如果长度不够input_shape，使用0进行填充
    x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)
    # 形成标签0和1
    y = [[label_dictionary[sent]] for sent in df['label']]
    #     y = [np_utils.to_categorical(label, num_classes=label_size) for label in y]
    y = np.array(y)

    return x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionary

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=128):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 初始化Shape为(max_len, d_model)的PE (positional encoding)
        pe = torch.zeros(max_len, d_model)
        # 初始化一个tensor [[0, 1, 2, 3, ...]]
        position = torch.arange(0, max_len).unsqueeze(1)
        # 这里就是sin和cos括号中的内容，通过e和ln进行了变换
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
        )
        # 计算PE(pos, 2i)
        pe[:, 0::2] = torch.sin(position * div_term)
        # 计算PE(pos, 2i+1)
        pe[:, 1::2] = torch.cos(position * div_term)
        # 为了方便计算，在最外面在unsqueeze出一个batch
        pe = pe.unsqueeze(0)
        # 如果一个参数不参与梯度下降，但又希望保存model的时候将其保存下来
        # 这个时候就可以用register_buffer
        self.register_buffer("pe", pe)

    def forward(self, x):
        # 将x和positional encoding相加。
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return self.dropout(x)

In [5]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_class, feedforward_dim=256, num_head=2, num_layers=3, dropout=0.1,
                 max_len=128):
        super(Transformer, self).__init__()
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 位置编码层
        self.positional_encoding = PositionalEncoding(embedding_dim, dropout, max_len)
        # 编码层
        self.encoder_layer = nn.TransformerEncoderLayer(embedding_dim, 
                                                        num_head, 
                                                        feedforward_dim, 
                                                        dropout,
                                                        batch_first=True)
        self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers)
        # 输出层
        self.fc = nn.Linear(embedding_dim, num_class)

    def forward(self, x):
        # 输入的数据维度为【批次，序列长度】，需要交换因为transformer的输入维度为【序列长度，批次，嵌入向量维度】
        x = x.transpose(0, 1)
        # 将输入的数据进行词嵌入，得到数据的维度为【序列长度，批次，嵌入向量维度】
        x = self.embedding(x)
        # 维度为【序列长度，批次，嵌入向量维度】
        x = self.positional_encoding(x)
        # 维度为【序列长度，批次，嵌入向量维度】
        x = self.transformer(x)
        # 将每个词的输出向量取均值，也可以随意取一个标记输出结果，维度为【批次，嵌入向量维度】
        x = x.mean(axis=0)
        # 进行分类，维度为【批次，分类数】
        x = self.fc(x)
        return x

In [6]:
# 1.获取训练数据
x_train, y_train, output_dictionary_train, vocab_size_train, label_size, inverse_word_dictionary_train = load_data(
    "./train.tsv", input_shape)
x_test, y_test, output_dictionary_test, vocab_size_test, label_size, inverse_word_dictionary_test = load_data(
    "./test.tsv", input_shape)

idx = 0
word_dictionary = {}
for k, v in inverse_word_dictionary_train.items():
    word_dictionary[idx] = v
    idx += 1
for k, v in inverse_word_dictionary_test.items():
    word_dictionary[idx] = v
    idx += 1

# 3.将numpy转成tensor
x_train = torch.from_numpy(x_train).to(torch.int32)
y_train = torch.from_numpy(y_train).to(torch.float32)
x_test = torch.from_numpy(x_test).to(torch.int32)
y_test = torch.from_numpy(y_test).to(torch.float32)

# 4.形成训练数据集
train_data = TensorDataset(x_train, y_train)
test_data = TensorDataset(x_test, y_test)

# 5.将数据加载成迭代器
train_loader = torch.utils.data.DataLoader(train_data,
                                           batch_size,
                                           True)

test_loader = torch.utils.data.DataLoader(test_data,
                                          batch_size,
                                          False)

In [7]:
# 6.模型训练
model = Transformer(len(word_dictionary), embedding_dim, output_dim)
model.to(device)

Configimizer = optim.Adam(model.parameters(), lr=lr)  # 优化器
criterion = nn.CrossEntropyLoss()  # 多分类损失函数

loss_meter = meter.AverageValueMeter()

best_acc = 0  # 保存最好准确率
best_model = None  # 保存对应最好准确率的模型参数

for epoch in range(epochs):
    model.train()  # 开启训练模式
    epoch_acc = 0  # 每个epoch的准确率
    epoch_acc_count = 0  # 每个epoch训练的样本数
    train_count = 0  # 用于计算总的样本数，方便求准确率
    loss_meter.reset()
    print('\n')
    train_bar = tqdm(train_loader)  # 形成进度条
    for data in train_bar:
        x_train, y_train = data  # 解包迭代器中的X和Y

        x_input = x_train.long().contiguous()
        x_input = x_input.to(device)
        y_train = y_train.to(device)
        Configimizer.zero_grad()

        # 形成预测结果
        output_ = model(x_input)

        # 计算损失
        loss = criterion(output_, y_train.long().view(-1))
        loss.backward()
        Configimizer.step()

        loss_meter.add(loss.item())

        # 计算每个epoch正确的个数
        epoch_acc_count += (output_.argmax(axis=1) == y_train.view(-1)).sum()
        train_count += len(x_train)

    # 每个epoch对应的准确率
    epoch_acc = epoch_acc_count / train_count

    # 打印信息
    print("【EPOCH: 】%s" % str(epoch + 1))
    print("训练损失为%s" % (str(loss_meter.mean)))
    print("训练精度为%s" % (str(epoch_acc.item() * 100)[:5]) + '%')

    # 保存模型及相关信息
    if epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model = model.state_dict()

    # 在训练结束保存最优的模型参数
    if epoch == epochs - 1:
        # 保存模型
        torch.save(best_model, './best_model.pkl')





100%|██████████| 625/625 [00:06<00:00, 96.79it/s] 


【EPOCH: 】1
训练损失为0.5938016190528861
训练精度为67.00%




100%|██████████| 625/625 [00:05<00:00, 105.89it/s]


【EPOCH: 】2
训练损失为0.49649377355575586
训练精度为75.96%




100%|██████████| 625/625 [00:05<00:00, 105.65it/s]


【EPOCH: 】3
训练损失为0.4772913066864019
训练精度为77.01%




100%|██████████| 625/625 [00:05<00:00, 104.92it/s]


【EPOCH: 】4
训练损失为0.4640879731416705
训练精度为77.95%




100%|██████████| 625/625 [00:06<00:00, 103.59it/s]


【EPOCH: 】5
训练损失为0.4548997776746752
训练精度为78.65%




100%|██████████| 625/625 [00:05<00:00, 104.90it/s]


【EPOCH: 】6
训练损失为0.4473022180557251
训练精度为79.42%




100%|██████████| 625/625 [00:05<00:00, 105.68it/s]


【EPOCH: 】7
训练损失为0.44032969510555275
训练精度为79.54%




100%|██████████| 625/625 [00:05<00:00, 104.95it/s]


【EPOCH: 】8
训练损失为0.4288354567050931
训练精度为80.47%




100%|██████████| 625/625 [00:06<00:00, 102.15it/s]


【EPOCH: 】9
训练损失为0.4224468970060348
训练精度为80.70%




100%|██████████| 625/625 [00:05<00:00, 104.74it/s]

【EPOCH: 】10
训练损失为0.41861978163719177
训练精度为81.03%





In [8]:
# word2idx = {}
# 
# for k, v in word_dictionary.items():
#     word2idx[v] = k
# 
# label_dict = {0: "非谣言", 1: "谣言"}
# 
# try:
#     input_shape = 180  # 序列长度，就是时间步大小，也就是这里的每句话中的词的个数
#     #     sent = "电视刚安装好，说实话，画质不怎么样，很差！"
#     # 用于测试的话
#     sent = "你应该知道的100个中国文学常识 !"
#     # 将对应的字转化为相应的序号
#     x = [[word2idx[word] for word in sent]]
#     # 如果长度不够180，使用0进行填充
#     x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0)
#     x = torch.from_numpy(x).to(device)
# 
#     # 加载模型
#     model_path = './best_model.pkl'
#     model = Transformer(len(word_dictionary), embedding_dim, output_dim).to(device)
#     model.load_state_dict(torch.load(model_path))
# 
#     # 模型预测，注意输入的数据第一个input_shape,就是180
#     y_pred = model(x.long())
# 
#     print('输入语句: %s' % sent)
#     print('谣言检测结果: %s' % label_dict[y_pred.argmax().item()])
# 
# except KeyError as err:
#     print("您输入的句子有汉字不在词汇表中，请重新输入！")
#     print("不在词汇表中的单词为：%s." % err)

In [11]:
# 7. 在测试集上评估模型性能
model.load_state_dict(torch.load('./best_model.pkl'))
model.eval()  # 设置模型为评估模式
test_acc = 0
test_count = 0

with torch.no_grad():  # 不计算梯度，节省内存
    test_bar = tqdm(test_loader, desc='测试进度')
    for data in test_bar:
        x_test, y_test = data
        x_test = x_test.long().to(device)
        y_test = y_test.to(device).view(-1)  # 确保标签形状正确
        
        output = model(x_test)
        pred = output.argmax(dim=1)
        
        correct = (pred == y_test).sum().item()
        test_acc += correct
        test_count += len(y_test)
        
        # 更新进度条信息
        test_bar.set_postfix(acc=f'{test_acc/test_count:.4f}')

test_accuracy = test_acc / test_count
print(f'\n测试集准确率: {test_accuracy * 100:.2f}%')

# 8. 对单条语句进行预测
def predict_sentence(model, sentence, word2idx, max_len=180):
    # 预处理输入句子
    sequence = [word2idx.get(char, 0) for char in sentence]  # 0表示未知字符
    sequence = pad_sequences([sequence], maxlen=max_len, padding='post', value=0)
    tensor = torch.LongTensor(sequence).to(device)

    # 模型预测
    model.eval()
    with torch.no_grad():
        output = model(tensor)
        probabilities = torch.softmax(output, dim=1)
        prediction = output.argmax(dim=1).item()

    return prediction, probabilities.cpu().numpy()[0]

# 创建字符到索引的映射
word2idx = {}
for char, idx in word_dictionary.items():
    word2idx[char] = idx

label_dict = {0: "非谣言", 1: "谣言"}

# 测试不同语句
test_sentences = [
    "电视刚安装好，说实话，画质不怎么样，很差！",
    "你应该知道的100个中国文学常识!",
    "科学研究表明每天喝8杯水有益健康",
    "最新消息：下周将有三颗小行星撞击地球"
]

print("\n单条语句预测结果:")
for sent in test_sentences:
    try:
        pred, probs = predict_sentence(model, sent, word2idx)
        print(f"语句: '{sent}'")
        print(f"预测结果: {label_dict[pred]} (置信度: {probs[pred]:.4f})")
        print(f"详细概率: 非谣言={probs[0]:.4f}, 谣言={probs[1]:.4f}")
        print("-" * 60)
        print('\n')
    except KeyError:
        print(f"语句包含不在词汇表中的字符: '{sent}'")

测试进度: 100%|██████████| 140/140 [00:00<00:00, 215.69it/s, acc=0.4712]


测试集准确率: 47.12%

单条语句预测结果:
语句: '电视刚安装好，说实话，画质不怎么样，很差！'
预测结果: 谣言 (置信度: 0.6179)
详细概率: 非谣言=0.3821, 谣言=0.6179
------------------------------------------------------------


语句: '你应该知道的100个中国文学常识!'
预测结果: 谣言 (置信度: 0.6179)
详细概率: 非谣言=0.3821, 谣言=0.6179
------------------------------------------------------------


语句: '科学研究表明每天喝8杯水有益健康'
预测结果: 谣言 (置信度: 0.6179)
详细概率: 非谣言=0.3821, 谣言=0.6179
------------------------------------------------------------


语句: '最新消息：下周将有三颗小行星撞击地球'
预测结果: 谣言 (置信度: 0.6179)
详细概率: 非谣言=0.3821, 谣言=0.6179
------------------------------------------------------------





