基于Bert-Crf中文命名实体识别算法鲁棒性评测

赛题链接：https://www.datafountain.cn/competitions/510
数据说明数据集采用OntoNotes 4.0 数据集 (Weischedel et al., 2011)，数据集未分词，标签为(B,M,E)-(PER,LOC,ORG,GPE)的组合。

评测所用的验证集和测试集中部分句子由鲁棒性验证工具 TextFlint 产生，共有11种变形方式   
在句中随机加入标点符号，两阶段均使用；    
对句中任意单词做同义词替换，两阶段均使用；    
对句中任意单词做反义词替换，两阶段均使用；    
将句中可缩写的词语替换为缩写词，两阶段均使用；    
将句中数字随机变为其他数值，两阶段均使用；    
在句中随机插入语义无关的句子，两阶段均使用；    
将句中部分汉字换为同音字，仅第二阶段使用；    
在句中合适位置插入副词，仅第二阶段使用；    
将句中实体换为同类型的、更长的实体，仅第二阶段使用；    
将句中实体换为交叉类型词（如将渥太华ORG换为华盛顿ORG&PER），仅第二阶段使用；    
将句中实体换为训练集中不曾出现过的实体，仅第二阶段使用。

评测标准：我们通过将输出结果与人工标注的集合进行比较来分别计算每一种元素准确率(Precision)，召回率(Recall)和F-1分值(F-1 score)，并采用Micro-F1作为最终排名指标。Baseline概述：使用Bert+crf算法，以序列标注的形式来进行命名实体识别。

In [2]:
!pip3 install torch torchvision torchaudio
!pip install fastNLP
!pip install transformers



In [3]:
from transformers import BertModel, BertTokenizer
import torch
from torch.utils.data import Dataset
import tqdm
import fastNLP
from fastNLP import SpanFPreRecMetric, Vocabulary

数据处理

In [4]:
class Nerdataset(Dataset):
    def _init_(self, path):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')
        self.device = 'cude:3'
        self.label2idx = { 'O': 0, 'B-LOC': 1, 'M-LOC': 2, 'E-LOC': 3,
                          'S-LOC': 4, 'B-PER': 5, 'M-PER': 6, 'E-PER': 7,
                          'S-PER': 8, 'B-GPE': 9, 'M-GPE': 10, 'E-GPE': 11,
                          'S-GPE': 12, 'B-ORG': 13, 'M-ORG': 14, 'E-ORG': 15,
                          'S-ORG': 16 }
        self.bert = BertModel.from_pretrained('bert-base-chinese').to(self.device)
        self.examples = self.processor(path)
    def __len__(self):
        return len(self.examples[0])

    def __getitem__(self, index):
        return self.examples[0][index], self.examples[1][index]

    @staticmethod
    def collate_fn(data):
        data.sort(key=lambda x: - len(x[1]))
        data = list(zip(*data))
        return tuple(data)
    
    def processor(self, path):
        with open(path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            lines = lines[1:]
            sentence = ''
            label = []
            s = []
            l = []
            for line in tqdm.tqdm(lines):
                if len(line) > 2:
                    line = line.strip().split(',')
                    sentence += line[0]
                    label += [line[1]]
                else:
                    tokens = ['[CLS]'] + [self.tokenizer.tokenize(i)[0] for
                                          i in sentence] + ['SEP']
                    if len(tokens) > 512:
                        sentence = []
                        label = []
                        continue
                    ids = torch.tensor(
                        [self.tokenizer.convert_tokens_to_ids(tokens)])
                    assert ids.shape[1] - 2 == len(sentence) \
                           and len(sentence) == len(label)
                    s.append(ids)
                    l.append([self.label2idx[i] for i in label])
                    sentence = ''
                    label = []
            return [s, l]

        

bert crf 

In [5]:
class NER(torch.nn.Module):
    def __init__(self):
        super(NER, self).__init__()
        self.device = 'cuda:3'
        self.bert = BertModel.from_pretrained('bert-base-chinese').to(
            self.device)
        self.encode_lstm = torch.nn.LSTM(input_size=768, hidden_size=768,
                                  batch_first=True, bidirectional=True).to(
                                  self.device)
        self.crf = fastNLP.modules.ConditionalRandomField(17).to(self.device)
        self.decode = torch.nn.Linear(768 * 2, 17)
        
    def forward(self, Data):
        data, label = Data
        l = [len(i) for i in label]
        m = 0
        for i in l:
            m = max(m, i)
        label = torch.tensor([i + [-1] * (m - len(i)) for i in label])
        data = torch.stack([torch.cat(
            [self.bert(i.to('cuda:3'))[0][0][1:-1]] + [
                torch.tensor([0] * 768).unsqueeze(dim=0).to('cuda:3')] * (
                        m - l[j]), dim=0) for j, i in enumerate(data)])
        data = torch.nn.utils.rnn.pack_padded_sequence(data, l,
                                                       batch_first=True)
        data, _ = self.encode_lstm(data)
        data, _ = torch.nn.utils.rnn.pad_packed_sequence(data)
        data = data.transpose(0, 1)

        data = self.decode(data)
        if self.training:
            return self.crf.forward(data, label, torch.BoolTensor(
                [[1] * i + [0] * (m - i) for i in l]).to(self.device))
        return self.crf.viterbi_decode(data, torch.BoolTensor(
            [[1] * i + [0] * (m - i) for i in l]).to(self.device))[0]


In [6]:
def train(net, train_iter, test_iter, epoch, optimizer):
    net.to('cuda:3')
    n = 0
    best = 0
    i = 0
    while n < 10:
        i += 1
        net.train()
        tot_loss = 0
        with tqdm.tqdm(total=len(train_iter), desc='training') as pbar:
            for Data in train_iter:
                data, label = Data
                optimizer.zero_grad()
                all = 0
                for j in label:
                    all += len(j)
                loss = net(Data).sum() / all
                loss.backward()
                optimizer.step()
                tot_loss += loss.item()
                pbar.update(1)
                pbar.set_postfix({"loss": loss.item(), 'lr':
                    optimizer.state_dict()['param_groups'][0]['lr']})
        print(f"epoch {i}: loss: {tot_loss / len(train_iter)}")
        # evaluate_accuracy(train_iter, net, i)
        score = evaluate_accuracy(test_iter, net, i)
        score = evaluate_accuracy(test_iter, net, i)
        n += 1
        if score['f'] > best:
            torch.save(net, 'best')
            best = score['f']
            n = 0
        print(best)

In [7]:
def evaluate_accuracy(data_iter, net, epoch):
    net.eval()
    vocab = Vocabulary(unknown=None, padding=None)
    vocab.add_word_lst(['O', 'B-LOC', 'M-LOC', 'E-LOC',
                        'S-LOC', 'B-PER', 'M-PER', 'E-PER',
                        'S-PER', 'B-GPE', 'M-GPE', 'E-GPE',
                        'S-GPE', 'B-ORG', 'M-ORG', 'E-ORG',
                        'S-ORG'])
    metrics = SpanFPreRecMetric(tag_vocab=vocab,
                                encoding_type='bmeso', f_type='macro')
    with torch.no_grad():
        for data in tqdm.tqdm(data_iter):
            pred = net(data)
            label = data[1]
            l = [len(i) for i in label]
            m = 0
            for i in l:
                m = max(m, i)
            label = torch.tensor([i + [-1] * (m - len(i)) for i in label])
            metrics.evaluate(pred, label,
                             torch.tensor(l))
    return metrics.get_metric()

In [None]:
print('--------------------- Process train data -------------------------')
train_iter = torch.utils.data.DataLoader(dataset=Nerdataset('ner/train.csv'), batch_size=20, shuffle=True,
    collate_fn=Nerdataset.collate_fn)
print('--------------------- Process test data -------------------------')
test_iter = torch.utils.data.DataLoader(dataset=Nerdataset('ner/A.csv'), batch_size=20, shuffle=False,
    collate_fn=Nerdataset.collate_fn)

print('--------------------- Start training -------------------------')
ner = NER()
bert_params = list(map(id, ner.bert.parameters()))
logits_params = filter(lambda p: id(p) not in bert_params, ner.parameters())
params = [
    {"params": logits_params, "lr": 1e-3},
    {"params": ner.bert.parameters(), "lr": 1e-5}]

optimizer = torch.optim.Adam(params)
train(ner, train_iter, test_iter, 30, optimizer)