In [13]:
import torch
import torch.nn as nn
import time
import json
import os
import math
from tqdm import tqdm
from torch.utils.data import  DataLoader
import torch.nn.functional as F
from torch.utils.data import TensorDataset
import numpy as np
from gensim.models import KeyedVectors
from transformers import BertTokenizer, BertModel

In [14]:

class Dictionary(object):
    def __init__(self, path):

        self.word2tkn = {"[PAD]": 0}
        self.tkn2word = ["[PAD]"]

        self.label2idx = {} #内容为 label->idx 的映射
        self.idx2label = [] #内容为 [label, label_desc] 的列表

        # 获取 label 的 映射
        with open(os.path.join(path, 'labels.json'), 'r', encoding='utf-8') as f:
            for line in f:
                one_data = json.loads(line)
                label, label_desc = one_data['label'], one_data['label_desc']
                self.idx2label.append([label, label_desc])
                self.label2idx[label] = len(self.idx2label) - 1

    def add_word(self, word):
        if word not in self.word2tkn:
            self.tkn2word.append(word)
            self.word2tkn[word] = len(self.tkn2word) - 1
        return self.word2tkn[word]


class Corpus(object):
    '''
    完成对数据集的读取和预处理，处理后得到所有文本数据的对应的 token 表示及相应的标签。
    
    该类适用于任务一、任务二，若要完成任务三，需对整个类进行调整，例如，可直接调用预训练模型提供的 tokenizer 将文本转为对应的 token 序列。
    '''
    def __init__(self, path, max_token_per_sent):
        self.dictionary = Dictionary(path)

        self.max_token_per_sent = max_token_per_sent

        self.train = self.tokenize(os.path.join(path, 'train.json'))
        self.valid = self.tokenize(os.path.join(path, 'dev.json'))
        self.test = self.tokenize(os.path.join(path, 'test.json'), True)
       

        #-----------------------------------------------------begin-----------------------------------------------------#
        # 若要采用预训练的 embedding, 需处理得到 token->embedding 的映射矩阵 embedding_weight。矩阵的格式参考 nn.Embedding() 中的参数 _weight
        # 注意，需考虑 [PAD] 和 [UNK] 两个特殊词向量的设置
        # word2vector = KeyedVectors.load_word2vec_format(os.path.join(path,'sgns.target.word-word.dynwin5.thr10.neg5.dim300.iter5'))
        # embedding_dim = word2vector.vector_size
        # embedding_weight = np.zeros((len(self.dictionary.tkn2word), embedding_dim))
        # for word, token in self.dictionary.word2tkn.items():
        #     if word in word2vector:
        #         embedding_weight[token] = word2vector[word]
        #     else:
        #         embedding_weight[token] = np.random.uniform(-0.01, 0.01, embedding_dim).astype("float32")
        # self.embedding_weight = torch.tensor(embedding_weight, dtype=torch.float32)
        

        #把train, valid, test 中的label换乘词向量
        #不需要这一步，在LSTM_Model中会自动转换
        # for i in range(len(self.train)):
        #     self.train[i][1] = self.embedding_weight[self.train[i][1]]
        # for i in range(len(self.valid)):
        #     self.valid[i][1] = self.embedding_weight[self.valid[i][1]]
        # for i in range(len(self.test)):
        #     self.test[i][1] = self.embedding_weight[self.test[i][1]]
        #------------------------------------------------------end------------------------------------------------------#

    def pad(self, origin_token_seq):
        '''
        padding: 将原始的 token 序列补 0 至预设的最大长度 self.max_token_per_sent
        '''
        if len(origin_token_seq) > self.max_token_per_sent:
            return origin_token_seq[:self.max_token_per_sent]
        else:
            return origin_token_seq + [0 for _ in range(self.max_token_per_sent-len(origin_token_seq))]

    def tokenize(self, path, test_mode=False):
        '''
        处理指定的数据集分割，处理后每条数据中的 sentence 都将转化成对应的 token 序列。
        '''
        idss = []
        labels = []
        maskss = []
        tokenizer = BertTokenizer.from_pretrained('./bert-base-chinese')
        with open(path, 'r', encoding='utf8') as f:
            for line in f:
                one_data = json.loads(line)  # 读取一条数据
                sent = one_data['sentence']
                #-----------------------------------------------------begin-----------------------------------------------------#
                # 若要采用预训练的 embedding, 需在此处对 sent 进行分词

                
                tokens = tokenizer(sent, return_tensors=None, truncation=True, padding="max_length",max_length=self.max_token_per_sent)
                input_ids = tokens['input_ids']
                attention_mask = tokens['attention_mask']
                if test_mode:
                    label = json.loads(line)['id']      
                    labels.append(label)
                else:
                    label = json.loads(line)['label']
                    labels.append(self.dictionary.label2idx[label])


                
                idss.append(input_ids)
                maskss.append(attention_mask)
                
                
            idss = torch.tensor(np.array(idss))
            maskss = torch.tensor(np.array(maskss))
            labels = torch.tensor(np.array(labels)).long()
             #idss的内容格式：[ ids1,id2....    ]
             #labels的内容格式：[label1,label2....]
             #TensorDataset是一个pytorch的包装数据的类，可以把数据包装成TensorDataset的形式，然后再放入DataLoader中
             #格式是 (ids1, label1)元素组成的数据集
        return TensorDataset(idss,labels, maskss)

In [15]:
if __name__ == '__main__':
    dataset_folder = './data/tnews_public'
    output_folder = './output'

    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

    #-----------------------------------------------------begin-----------------------------------------------------#
    # 以下为超参数，可根据需要修改
        # 每个词向量的维度
    max_token_per_sent = 50 # 每个句子预设的最大 token 数
    batch_size = 32
    num_epochs = 5
    lr = 1e-4
    #------------------------------------------------------end------------------------------------------------------#

    dataset = Corpus(dataset_folder, max_token_per_sent)

    #embedding_dim = dataset.embedding_weight.shape[1] # 词向量维度 

    #vocab_size = len(dataset.dictionary.tkn2word) # 词表大小

    data_loader_train = DataLoader(dataset=dataset.train, batch_size=batch_size, shuffle=True)
    data_loader_valid = DataLoader(dataset=dataset.valid, batch_size=batch_size, shuffle=False)
    data_loader_test = DataLoader(dataset=dataset.test, batch_size=batch_size, shuffle=False)

      
    #------------------------------------------------------end------------------------------------------------------#
    


In [22]:
from transformers import BertTokenizer, BertModel
import torch
import torch.nn as nn


class Bert_Model(nn.Module):
    def __init__(self, dropout=0.2):
        super(Bert_Model, self).__init__()
        self.dropout = nn.Dropout(dropout)
        print("Loading Model...")
        self.bert = BertModel.from_pretrained('./bert-base-chinese')
        print("Loaded.")
        self.classifier = nn.Sequential(
            nn.Linear(768, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 15),
        )

    def forward(self, x, mask=None):
        with torch.no_grad():
            x = self.bert(x, attention_mask=mask)[0][:, 0, :]
        x = self.dropout(x)
        x = self.classifier(x)

        return x

# 初始化BERT分类器
num_labels = 15  # 根据具体任务的类别数量设置
model = Bert_Model().to(device)

Loading Model...
Loaded.


In [25]:

def valid():
    '''
    进行验证，返回模型在验证集上的 accuracy
    '''
    total_true = []

    model.eval() #这句话在测试之前使用，不启用 BatchNormalization 和 Dropout
    with torch.no_grad():
        for data in tqdm(data_loader_valid, dynamic_ncols=True):
           # print(data)
            batch_x, batch_y ,batch_z = data[0].to(device), data[1].to(device),data[2].to(device)

            y_hat = model(batch_x,batch_z)
            # 取分类概率最大的类别作为预测的类别
            y_hat = torch.tensor([torch.argmax(_) for _ in y_hat]).to(device)

            total_true.append(torch.sum(y_hat == batch_y).item())

        return sum(total_true) / (batch_size * len(total_true))

# 设置损失函数
loss_function = nn.CrossEntropyLoss()
# 设置优化器,冻结预训练的参数                    
for name, param in model.named_parameters():
    if "bert" in name:
        param.requires_grad = False  
    
#设置优化器时，只传入最后一层分类层的参数           
optimizer = torch.optim.Adam(model.classifier.parameters(), lr=lr, weight_decay=5e-4)  


def train():

    max_valid_acc = 0
    
    for epoch in range(num_epochs):
        model.train()  #这一句话是为了启用 BatchNormalization 和 Dropout，一定要在训练前调用，否则会有影响

        total_loss = []
        total_true = []

        tqdm_iterator = tqdm(data_loader_train, dynamic_ncols=True, desc=f'Epoch {epoch + 1}/{num_epochs}')

        for data in tqdm_iterator:
            # 选取对应批次数据的输入、maskss和标签
            batch_x, batch_y ,batch_z= data[0].to(device), data[1].to(device),data[2].to(device)

            # 模型预测
            #------------------------------------change------------------------------------#
            y_hat = model(batch_x,batch_z)
            #------------------------------------endOfChange------------------------------------#
            loss = loss_function(y_hat, batch_y)

            optimizer.zero_grad()   # 梯度清零
            loss.backward()         # 计算梯度
            optimizer.step()        # 更新参数

            y_hat = torch.tensor([torch.argmax(_) for _ in y_hat]).to(device)
            
            total_true.append(torch.sum(y_hat == batch_y).item())
            total_loss.append(loss.item())

            tqdm_iterator.set_postfix(loss=sum(total_loss) / len(total_loss),
                                      acc=sum(total_true) / (batch_size * len(total_true)))
        
        tqdm_iterator.close()

        train_loss = sum(total_loss) / len(total_loss)
        train_acc = sum(total_true) / (batch_size * len(total_true))

        valid_acc = valid()

        #if valid_acc > max_valid_acc:
           # torch.save(model, os.path.join(output_folder, "model.ckpt"))

        print(f"epoch: {epoch}, train loss: {train_loss:.4f}, train accuracy: {train_acc*100:.2f}%, valid accuracy: {valid_acc*100:.2f}%")

train()

Epoch 1/5:   5%|▌         | 90/1668 [00:22<06:25,  4.09it/s, acc=0.528, loss=1.39]


KeyboardInterrupt: 