# Обучение модели Named Entity Recognition

Для обучения модели я решил взять нейросеть ruBERT. Оттуда я позаимствую токенизатор и использую веса для файнтьюнинга при помощи модуля ***BertForTokenClassification*** из библиотеки *transformers*. В качестве датасета я выбрал [NERUS](https://github.com/natasha/nerus). Поверх обучения модели я использовал фреймворк *pytorch-lightning*, а для отслеживания процесса обучения фреймворк *tensorboard*.

In [1]:
import pickle

import numpy as np
from nerus import load_nerus
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertForTokenClassification, BertTokenizerFast, AdamW, get_linear_schedule_with_warmup

import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger

from seqeval.metrics import classification_report
from torchmetrics import F1Score, Precision, Recall

## Подготовка датасета

Скачаем датасет, рассмотрим пример из него и проведем подготовку.

In [2]:
# !wget https://storage.yandexcloud.net/natasha-nerus/data/nerus_lenta.conllu.gz -P ../datasets/

In [2]:
docs = load_nerus('../datasets/nerus_lenta.conllu.gz')
doc = next(docs)
doc.ner

NERMarkup(
    text='Вице-премьер по социальным вопросам Татьяна Голикова рассказала, в каких регионах России зафиксирована наиболее высокая смертность от рака, сообщает РИА Новости. По словам Голиковой, чаще всего онкологические заболевания становились причиной смерти в Псковской, Тверской, Тульской и Орловской областях, а также в Севастополе. Вице-премьер напомнила, что главные факторы смертности в России — рак и болезни системы кровообращения. В начале года стало известно, что смертность от онкологических заболеваний среди россиян снизилась впервые за три года. По данным Росстата, в 2017 году от рака умерли 289 тысяч человек. Это на 3,5 процента меньше, чем годом ранее.',
    spans=[Span(
         start=36,
         stop=52,
         type='PER'
     ),
     Span(
         start=82,
         stop=88,
         type='LOC'
     ),
     Span(
         start=149,
         stop=160,
         type='ORG'
     ),
     Span(
         start=172,
         stop=181,
         type='PER'
     ),
  

Один экземпляр данных представляет собой текст и разметку в формате Span(start, stop, type):

- *start* - символ начала именованной сущности в исходном тексте
- *stop* - символ конца именованной сущности в исходном тексте
- *type* - тип именованной сущности в исходном тексте *{'B-LOC', 'B-ORG', 'B-PER', 'I-LOC', 'I-ORG', 'I-PER', 'O'}*

Поскольку для обучения необходимо реализовывать метод  \_\_get_item\_\_ в классе датасета, генератор, в виде которого представлен датасет не подойдет, необходимо собрать список с разметкой

In [4]:
# docs = load_nerus('../datasets/nerus_lenta.conllu.gz')
# data = []
# for doc in docs:
#     data += [doc.ner]

# with open('../datasets/NERUSner.pickle', 'wb') as handle:
#         pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)

# len(data)

In [3]:
with open('../datasets/NERUSner.pickle', 'rb') as handle:
    data = pickle.load(handle)

len(data)

739346

Размер датасета составляет ~740K текстов. Для обучения возьмем меньшее количество.

Создадим словари для перевода лейблов в айди и наоборот.

In [5]:
unique_tags = {'B-LOC', 'B-ORG', 'B-PER', 'I-LOC', 'I-ORG', 'I-PER', 'O'}
labels_to_ids = {k: v for v, k in enumerate(unique_tags)}
ids_to_labels = {v: k for v, k in enumerate(unique_tags)}

Реализуем класс pytorch датасета

In [6]:
class NERUSdataset(Dataset):
    def __init__(self, data: list, tokenizer: BertTokenizerFast, max_len: int = 512):
        self.len = len(data)
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def annotate_entities(self, spans, offset_mapping):
        '''
        Сопоставлет аннотации с токенами, полученными из BERT токенайзера и offset_mapping
                Параметры:
                        spans list(Span): разметка из датасета в формате Span(start, stop, type)
                        offset_mapping list(tuple): карта сдвигов о соответствии символов в исходном тексте и токенов в токенизированном представлении текста
                Возвращаемое значение:
                        annotations list(string): разметка токенов в формате {'B-LOC', 'B-ORG', 'B-PER', 'I-LOC', 'I-ORG', 'I-PER', 'O'}
        '''
        annotations = ["O"] * len(offset_mapping)
        
        for span in spans:
            start, stop, entity_type = span.start, span.stop, span.type
            start_token = None
            end_token = None
            for i, (token_start, token_end) in enumerate(offset_mapping):
                if token_start is not None and start >= token_start and start < token_end:
                    start_token = i
                if token_start is not None and stop > token_start and stop <= token_end:
                    end_token = i + 1
                    
            if start_token is not None and end_token is not None:
                annotations[start_token] = "B-" + entity_type
                for i in range(start_token + 1, end_token):
                    annotations[i] = "I-" + entity_type
    
        return annotations
    
    def __getitem__(self, index):
        sentence = self.data[index].text
        spans = self.data[index].spans

        encoding = self.tokenizer(sentence,
                                return_offsets_mapping=True,
                                padding='max_length',
                                truncation=True,
                                max_length=self.max_len)
        
        annotations = self.annotate_entities(spans, encoding["offset_mapping"])
        labels = [labels_to_ids[label] for label in annotations]

        item = {key: torch.as_tensor(val) for key, val in encoding.items()}
        item['labels'] = torch.as_tensor(labels)
        return item
    
    def __len__(self):
        return self.len

Обернем датасет в модуль *LightningDataModule*. Опишем в нем датасеты и Dataloader'ы для всех этапов

In [7]:
class NERUSDataModule(pl.LightningDataModule):
    def __init__(self, train_data: list, val_data: list, test_data: list, tokenizer: BertTokenizerFast, batch_size: int = 32, max_token_len: int = 512):
        super().__init__()
        self.batch_size = batch_size
        self.train_data = train_data
        self.val_data = val_data
        self.test_data = test_data
        self.tokenizer = tokenizer
        self.max_token_len = max_token_len
        
    def setup(self, stage=None):
        self.train_dataset = NERUSdataset(
            self.train_data,
            self.tokenizer,
            self.max_token_len
        )
        self.val_dataset = NERUSdataset(
            self.val_data,
            self.tokenizer,
            self.max_token_len
        )
        self.test_dataset = NERUSdataset(
            self.test_data,
            self.tokenizer,
            self.max_token_len
        )
        
    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=2
        )
        
    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            num_workers=2
        )
        
    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            num_workers=2
        )

Разделим данные на обучающую, валидационную и тестовую выборки. Для обучения возьмем не 740К текстов, а 40К.

In [8]:
train_val_data, test_data = train_test_split(data[:40000], test_size=0.1)
train_data, val_data = train_test_split(train_val_data, test_size=0.2)
len(train_data), len(val_data), len(test_data)

(28800, 7200, 4000)

In [9]:
BATCH_SIZE = 32
MAX_LEN = 512
BERT_MODEL_NAME = 'ai-forever/ruBert-base'
tokenizer = BertTokenizerFast.from_pretrained(BERT_MODEL_NAME)
data_module = NERUSDataModule(
    train_data,
    val_data,
    test_data,
    tokenizer,
    batch_size=BATCH_SIZE,
    max_token_len=512
)

## Подготовка модели

Напишем модуль *LightningModule* для реализации модели. Внутри него инициализируем ***BertForTokenClassification*** с весами от *ruBERT*, указав новое количество классов в классификаторе. Реализуем все этапы обучения модели, подсчет метрик, прямой прогон через BERT и конфигурацию оптимизатора с шедулером.

В качестве метрики будем использовать F1-score с micro и macro усреднением. В наших данных будет наблюдатсья сильный дизбаланс классов, т.к. сущностей с меткой 'O' будет больше всего. Micro F1-score рассчитывается на уровне всего набора данных, объединяя предсказания и истинные метки для каждой сущности. Это подразумевает, что все предсказанные и истинные метки рассматриваются как один большой класс. Macro F1-score рассчитывается, усредняя F1-score для каждой индивидуальной сущности. Так мы можем равномерно оценить качество извлечения для всех сущностей, даже если некоторые из них представлены в данных редко.

In [22]:
class NERTagger(pl.LightningModule):
    def __init__(self, n_training_steps: int, n_warmup_steps: int, learning_rate: float):
        super().__init__()
        self.save_hyperparameters()
        self.bert = BertForTokenClassification.from_pretrained(BERT_MODEL_NAME, num_labels=len(labels_to_ids))
        self.n_training_steps = n_training_steps
        self.n_warmup_steps = n_warmup_steps
        self.learning_rate = learning_rate

        self.train_f1_macro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='macro')
        self.train_f1_micro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='micro')
        self.val_f1_macro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='macro')
        self.val_f1_micro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='micro')
        self.test_f1_macro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='macro')
        self.test_f1_micro = F1Score(task="multiclass", num_classes=len(labels_to_ids), average='micro')

    def forward(self, input_ids, attention_mask=None, labels=None):
        output = self.bert(input_ids, attention_mask=attention_mask, labels=labels)
        logits = output.logits
        loss = output.loss
        return logits, loss

    def training_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        logits, loss = self(input_ids, attention_mask, labels)

        targets = labels.view(-1)
        logits = logits.view(-1, len(unique_tags))
        preds = torch.argmax(logits, axis=1)
        
        self.train_f1_macro(preds, targets)
        self.train_f1_micro(preds, targets)
        
        self.log('train_f1_macro', self.train_f1_macro, logger=True, on_step=True, on_epoch=False)
        self.log('train_f1_micro', self.train_f1_micro, logger=True, on_step=True, on_epoch=False)
        self.log("train_loss", loss, prog_bar=True, logger=True, sync_dist=True)
        return loss
        

    def validation_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        logits, loss = self(input_ids, attention_mask, labels)

        targets = labels.view(-1)
        logits = logits.view(-1, len(unique_tags))
        preds = torch.argmax(logits, axis=1)

        self.val_f1_macro(preds, targets)
        self.val_f1_micro(preds, targets)
        
        self.log('val_f1_macro', self.val_f1_micro, logger=True, on_step=True, on_epoch=False)
        self.log('val_f1_micro', self.val_f1_micro, logger=True, on_step=True, on_epoch=False)
        self.log("val_loss", loss, prog_bar=True, logger=True, sync_dist=True)
        return loss
        
    def test_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        logits, loss = self(input_ids, attention_mask, labels)

        targets = labels.view(-1)
        logits = logits.view(-1, len(unique_tags))
        preds = torch.argmax(logits, axis=1)

        self.test_f1_macro(preds, targets)
        self.test_f1_micro(preds, targets)
        
        self.log('test_f1_macro', self.test_f1_macro)
        self.log('test_f1_micro', self.test_f1_micro)
        self.log("test_loss", loss, prog_bar=True, logger=True, sync_dist=True)
        return loss

    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=self.learning_rate)
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=self.n_warmup_steps,
            num_training_steps=self.n_training_steps
        )
        return dict(
            optimizer=optimizer,
            lr_scheduler=dict(
                scheduler=scheduler,
                interval='step'
            )
        )

In [11]:
N_EPOCHS = 10
steps_per_epoch=len(train_data) // BATCH_SIZE
total_training_steps = steps_per_epoch * N_EPOCHS
warmup_steps = total_training_steps // 5
warmup_steps, total_training_steps

(1800, 9000)

Пропишем сохранение чекпоинтов и логгирование в TensorBoard

In [12]:
checkpoint_callback = ModelCheckpoint(
    dirpath="../model/checkpoints",
    filename="NERTagger-{epoch}-{step}-{val_loss:.4f}",
    auto_insert_metric_name=True,
    every_n_train_steps=1000,
    verbose=True,
    save_top_k=-1
)

In [13]:
logger = TensorBoardLogger("../model/lightning_logs", name="NER-tagger")
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5)

## Обучение модели

Запустим обучение и протестируем обученную модель на тестовой выборке и своем предложении

In [24]:
trainer = pl.Trainer(
    logger=logger,
    callbacks=[early_stopping_callback, checkpoint_callback],
    max_epochs=N_EPOCHS,
    accelerator="gpu",
    devices = [0, 1],
    log_every_n_steps=1
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [25]:
torch.set_float32_matmul_precision('high')
model = NERTagger(
    learning_rate=3e-4,
    n_warmup_steps=warmup_steps,
    n_training_steps=total_training_steps
)
trainer.fit(model, data_module)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at ai-forever/ruBert-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2
Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2
----------------------------------------------------------------------------------------------------
distributed_backend=nccl
All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------

/opt/conda/lib/python3.9/site-packages/pytorch_lightning/callbacks/model_checkpoint.py:639: Checkpoint directory /home/jovyan/DS-Cloud-test/model/checkpoints exists and is not empty.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
LOCAL_RANK: 1 - CUDA_VISIBLE_DEVICES: [0,1]

  | Name           | Type        

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

In [26]:
trainer.test(model, data_module)

Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2
Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2
----------------------------------------------------------------------------------------------------
distributed_backend=nccl
All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1]
LOCAL_RANK: 1 - CUDA_VISIBLE_DEVICES: [0,1]
/opt/conda/lib/python3.9/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:232: Using `DistributedSampler` with the dataloaders. During `trainer.test()`, it is recommended to use `Trainer(devices=1, num_nodes=1)` to ensure each sample/batch gets evaluated exactly once. Otherwise, multi-device settings use `DistributedSampler` that replicates some samples to make sure all devices have same batch size in case of uneven inputs.


Testing: |          | 0/? [00:00<?, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
      test_f1_macro         0.9687628149986267
      test_f1_micro         0.9949067234992981
        test_loss          0.020064521580934525
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_f1_macro': 0.9687628149986267,
  'test_f1_micro': 0.9949067234992981,
  'test_loss': 0.020064521580934525}]

In [29]:
trained_model = NERTagger.load_from_checkpoint(trainer.checkpoint_callback.best_model_path)
trained_model.eval()
trained_model.freeze()

Some weights of BertForTokenClassification were not initialized from the model checkpoint at ai-forever/ruBert-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Теперь подадим модели на вход случайное предложение и посмотрим результат.

In [30]:
def beauty_visual(preds, offset_mapping):
    prediction = ''
    for token_pred, mapping in zip(preds, offset_mapping.squeeze().tolist()):
        if token_pred[0][0] =='[':
            continue
        pred = ''
        if token_pred[1][0] == 'B':
            prediction += ' '
            pred = token_pred[1][2:]
        elif token_pred[1][0] == 'I' and token_pred[0][:2] != '##':
            pred = '-' * len(token_pred[0]) + '-'
        elif token_pred[0][:2] == '##' or token_pred[1][0] == 'I':
            pred = '-' * len(token_pred[0][3:])
        else:
            prediction += ' '
            pred = token_pred[1]
        pred += (mapping[1]-mapping[0] - len(pred)) * '-'
        prediction += pred
    return prediction[1:]

In [31]:
sentence = "Глава компании Apple Тим Кук осенью представил новый телефон в штаб квартире Apple Inc в Купертино"

inputs = tokenizer(sentence,
                    return_offsets_mapping=True,
                    padding='max_length',
                    truncation=True,
                    max_length=512,
                    return_tensors="pt")

ids = inputs["input_ids"].to('cuda:0')
mask = inputs["attention_mask"].to('cuda:0')

outputs = trained_model(ids, attention_mask=mask)
logits = outputs[0]
logits = logits.view(-1, len(unique_tags))
preds = torch.argmax(logits, axis=1)

tokens = tokenizer.convert_ids_to_tokens(ids.view(-1))
token_predictions = [ids_to_labels[i] for i in preds.cpu().numpy()]
wp_preds = list(zip(tokens, token_predictions)) 

print(sentence)
print(beauty_visual(wp_preds, inputs["offset_mapping"]))

Глава компании Apple Тим Кук осенью представил новый телефон в штаб квартире Apple Inc в Купертино
O---- O------- ORG-- PER---- O----- O--------- O---- O------ O O--- O------- ORG------ O LOC------
