In [None]:
!pip install datasets transformers > /dev/null

In [None]:
import os
import sys
import functools
from typing import List, Tuple, Mapping


from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader

import datasets
from transformers import AutoTokenizer, AutoModelForTokenClassification

In [None]:
dataset = datasets.load_dataset("benjamin/ner-uk")

dataset

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


DatasetDict({
    train: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 10833
    })
    validation: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 1307
    })
    test: Dataset({
        features: ['tokens', 'ner_tags'],
        num_rows: 668
    })
})

In [None]:
targets = set()
for split in ("train", "validation", "test"):
    for sample in dataset[split]:
        targets.update(sample["ner_tags"])

targets = sorted(targets)
print("Unique targets:", len(targets))
targets

Unique targets: 9


[0, 1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
# TASK: Using the hugging face models find the best model.
#       You could try multiligual models or use another UKR model.
#       HF models - https://huggingface.co/models
#       Examples: `nikitast/lang-segmentation-roberta`, `wietsedv/xlm-roberta-base-ft-udpos28-uk`, `google-bert/bert-base-multilingual-cased` etc.
model_id = 'ukr-models/uk-ner'

tokenizer = AutoTokenizer.from_pretrained(model_id)



In [None]:
tokenizer

XLMRobertaTokenizerFast(name_or_path='ukr-models/uk-ner', vocab_size=31274, model_max_length=1000000000000000019884624838656, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'bos_token': '<s>', 'eos_token': '</s>', 'unk_token': '<unk>', 'sep_token': '</s>', 'pad_token': '<pad>', 'cls_token': '<s>', 'mask_token': '<mask>'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	0: AddedToken("<s>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	1: AddedToken("<pad>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	2: AddedToken("</s>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	3: AddedToken("<unk>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	31273: AddedToken("<mask>", rstrip=False, lstrip=True, single_word=False, normalized=True, special=True),
}

In [None]:
sample = dataset["train"][20]
tmp = tokenizer(sample["tokens"], truncation=True, is_split_into_words=True)

print(">>", sample["tokens"])
print(">>", tmp["input_ids"])
print(">>", sample["ner_tags"])
print(">>", [tokenizer._tokenizer.id_to_token(tok) for tok in tmp["input_ids"]])
print(">>", tmp.word_ids())

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


>> ['Іноземці', ',', 'хоч', 'трохи', 'знайомі', 'з', 'Україною', ',', 'були', 'шоковані', 'рівнем', 'допомоги', 'Збройним', 'Силам', 'з', 'боку', 'суспільства', '.']
>> [0, 1537, 380, 6584, 1683, 6, 4, 22917, 21568, 24013, 260, 210, 27760, 6, 4, 6027, 21100, 11257, 30290, 14380, 1262, 28580, 1690, 13439, 1132, 210, 10189, 19959, 6, 5, 2]
>> [0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 3, 4, 0, 0, 0, 0]
>> ['<s>', '▁І', 'но', 'зем', 'ці', '▁', ',', '▁хоч', '▁трохи', '▁знайом', 'і', '▁з', '▁Україною', '▁', ',', '▁були', '▁шок', 'овані', '▁рівнем', '▁допомоги', '▁З', 'брой', 'ним', '▁Сил', 'ам', '▁з', '▁боку', '▁суспільства', '▁', '.', '</s>']
>> [None, 0, 0, 0, 0, 1, 1, 2, 3, 4, 4, 5, 6, 7, 7, 8, 9, 9, 10, 11, 12, 12, 12, 13, 13, 14, 15, 16, 17, 17, None]


## Datasets & DataLoaders

In [None]:
def tokenize_and_align(sample: Mapping[str, List[int]]) -> Tuple[List[int], List[int], List[int], List[int]]:
    words = sample["tokens"]
    ner_tags = sample["ner_tags"]

    tokenized_input = tokenizer(words, truncation=True, is_split_into_words=True)
    word_ids = tokenized_input.word_ids()

    # NOTE: The modern approach of solving NER classification when there are annotations for words
    #       is split the words into tokens and mark only the first token of a word tokens with
    #       NER label and the other tokens need to ignore, for example if you have something like this:
    #       Words:
    #         ['Вони', 'абсолютно', 'відповідають', 'Глобальному', 'договору', 'та', 'Цілям', 'сталого', 'розвитку', 'ООН', '.']
    #       NER labels:
    #         [     0,           0,              0,             7,          8,    8,       8,         8,          8,     8,   0]
    #       After the words tokenization you will have output like this (special tokens was ommited):
    #         [13825, 10241, 30086, 11358, 3151, 23012, 105, 15168, 489, 7414, 19406, 7275, 695, 5743, 16644, 6, 5]
    #       And we have a word ids for each of this token ids:
    #         [0,     1,     2,     3,     3,    3,     3,   4,     5,   6,    6,     7,    7,   8,    9,    10, 10]
    #       We see that 3d word consits of [11358, 3151, 23012, 105] tokens and so on.
    #       So, the "modern" appoach of token alignment will produce alignment:
    #         [0,     0,     0,     7,  -100, -100,  -100,   8,     8,   8, -100,     8, -100,   8,    8,     0, -100]

    prev_word_index = None
    label_ids = []
    for word_index in word_ids:
        # special tokens have a word id that is None.
        # set the label to -100 so they are automatically ignored in the loss function.
        if word_index is None:
            label_ids.append(-100)
        elif word_index != prev_word_index: # set the label for the first token of each word
            label_ids.append(ner_tags[word_index])
        else:
            # set current label for the other tokens, or you could set -100
            label_ids.append(-100)
            # label_ids.append(ner_tags[word_index])
        prev_word_index = word_index

    return tokenized_input["input_ids"], tokenized_input.word_ids(), tokenized_input["attention_mask"], label_ids


def dataset_mapping_fn(sample: Mapping[str, List[int]]) -> Mapping[str, List[int]]:
    sample["input_ids"], sample["word_numbers"], sample["attention_mask"], sample["label_ids"] = tokenize_and_align(sample)
    return sample

In [None]:
dataset = dataset.map(dataset_mapping_fn)
dataset

DatasetDict({
    train: Dataset({
        features: ['tokens', 'ner_tags', 'input_ids', 'word_numbers', 'attention_mask', 'label_ids'],
        num_rows: 10833
    })
    validation: Dataset({
        features: ['tokens', 'ner_tags', 'input_ids', 'word_numbers', 'attention_mask', 'label_ids'],
        num_rows: 1307
    })
    test: Dataset({
        features: ['tokens', 'ner_tags', 'input_ids', 'word_numbers', 'attention_mask', 'label_ids'],
        num_rows: 668
    })
})

In [None]:
class NERDataset(Dataset):
    def __init__(self, dataset: datasets.Dataset) -> None:
        self.dataset = dataset

    def __len__(self) -> int:
        return len(self.dataset)

    def __getitem__(self, idx: int) -> Tuple[List[int], List[int], List[int]]:
        sample = self.dataset[idx]
        x = torch.LongTensor(sample["input_ids"]), torch.LongTensor(sample["attention_mask"])
        y = torch.LongTensor(sample["label_ids"])
        return x, y

In [None]:
def collator(
    batch: List[Tuple[List[int], List[int], List[int]]],
    pad_token: int,
) -> Tuple[Mapping[str, torch.LongTensor], torch.LongTensor]:
    input_ids = pad_sequence([x[0] for x, _ in batch], batch_first=True, padding_value=pad_token)
    attention_mask = pad_sequence([x[1] for x, _ in batch], batch_first=True, padding_value=0)
    label_ids = pad_sequence([y for _, y in batch], batch_first=True, padding_value=-100)
    return {"input_ids": input_ids, "attention_mask": attention_mask}, label_ids

In [None]:
batch_size = 8
n_workers = os.cpu_count()
dataset_collator = functools.partial(collator, pad_token=tokenizer.pad_token_id)

train_loader = DataLoader(
    NERDataset(dataset["train"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=True,
    drop_last=True,
)
print("Train\n dataset size: {}\n  num batches: {}".format(len(train_loader.dataset), len(train_loader)))
print()
valid_loader = DataLoader(
    NERDataset(dataset["validation"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=False,
    drop_last=False,
)
print("Validation\n dataset size: {}\n  num batches: {}".format(len(valid_loader.dataset), len(valid_loader)))
print()
test_loader = DataLoader(
    NERDataset(dataset["test"]),
    batch_size=batch_size,
    num_workers=n_workers,
    collate_fn=dataset_collator,
    shuffle=False,
    drop_last=False,
)
print("Test\n dataset size: {}\n  num batches: {}".format(len(test_loader.dataset), len(test_loader)))

Train
 dataset size: 10833
  num batches: 1354

Validation
 dataset size: 1307
  num batches: 164

Test
 dataset size: 668
  num batches: 84


## Training & Evaluation

In [None]:
def sequence_f1(true_labels: np.array, predicted_labels: np.array) -> np.array:
    """F1 score for one sequence.

    Args:
        true_labels: ground truth labels.
        predicted_labels: model predictions.

    Returns:
        F1 scores for each class.
    """
    assert len(true_labels) == len(predicted_labels), "Mismatched length between true labels and predicted labels"

    scores = []
    targets = np.unique(true_labels)
    for _cls in targets:
        true_positives = np.sum((true_labels == predicted_labels) & (true_labels == _cls))
        false_positives = np.sum((true_labels != predicted_labels) & (predicted_labels == _cls))
        false_negatives = np.sum((true_labels != predicted_labels) & (true_labels == _cls))

        precision = np.nan_to_num(true_positives / (true_positives + false_positives), nan=0.0)
        recall = np.nan_to_num(true_positives / (true_positives + false_negatives), nan=0.0)
        f1_score = np.nan_to_num(2 * (precision * recall) / (precision + recall), nan=0.0)

        scores.append(f1_score)

    return np.mean(np.array(scores))

def sequence_f1_2(y_true, y_pred, average='macro'):
    """
    Calculate F1 scores for multiclass classification.

    Args:
        y_true: Ground truth labels.
        y_pred: Predicted labels.
        targets: Unique classes.
        average: How to average scores. Options: 'macro', 'micro', 'weighted'.

    Returns:
        F1-score.
    """

    def precision_recall_f1(tp, fp, fn):
        precision = np.nan_to_num(tp / (tp + fp), nan=0.0)
        recall = np.nan_to_num(tp / (tp + fn), nan=0.0)
        f1 = np.nan_to_num(2 * (precision * recall) / (precision + recall), nan=0.0)
        return precision, recall, f1

    # Initialize TP, FP, FN for each class
    tp, fp, fn = np.zeros(len(targets)), np.zeros(len(targets)), np.zeros(len(targets))

    for i, _cls in enumerate(targets):
        tp[i] = np.sum((y_true == _cls) & (y_pred == _cls))  # True positives
        fp[i] = np.sum((y_true != _cls) & (y_pred == _cls))  # False positives
        fn[i] = np.sum((y_true == _cls) & (y_pred != _cls))  # False negatives

    print(y_true)
    print(y_pred)

    # Precision, recall, and F1 per class
    precision, recall, f1_scores = precision_recall_f1(tp, fp, fn)



    if average == 'macro':
        print(np.mean(f1_scores))
        return f1_scores  # Treat all classes equally
    elif average == 'micro':
        total_tp = np.sum(tp)
        total_fp = np.sum(fp)
        total_fn = np.sum(fn)
        _, _, f1_micro = precision_recall_f1(total_tp, total_fp, total_fn)
        return f1_micro
    elif average == 'weighted':
        weights = np.bincount(y_true) / len(y_true)  # Weights based on class frequencies
        return np.sum(f1_scores * weights)
    else:
        raise ValueError("Invalid value for 'average'. Choose from 'macro', 'micro', 'weighted'.")

In [None]:
def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    optimizer: optim.Optimizer,
    device: str = "cpu",
    verbose: bool = True,
) -> Mapping[str, np.array]:
    """Train model one epoch.

    Args:
        model: model to train.
        loader: dataloader to use for training.
        criterion: loss function to optimize.
        optimizer: model training algorithm.
        device: device to use for training.
            Default is `"cpu"`.
        verbose: option to print training progress bar.
            Default is `True`.

    Returns:
        dict with training logs
    """
    model.train()

    losses = []
    all_true_labels = []
    all_pred_labels = []


    with tqdm(total=len(loader), desc="training", file=sys.stdout, ncols=100, disable=not verbose) as progress:
        for x_batch, y_true in loader:
            x_batch = {k: v.to(device) for k, v in x_batch.items()}
            y_true = y_true.to(device)

            optimizer.zero_grad()

            log_prob = model(**x_batch).logits

            B, T = y_true.shape
            loss = criterion(log_prob.view(B * T, -1), y_true.view(B * T))

            loss.backward()
            losses.append(loss.item())

            y_pred = log_prob.argmax(2).detach().cpu().numpy()
            y_true = y_true.detach().cpu().numpy()
            padding_mask = y_true != -100
            for i in range(y_true.shape[0]):
                all_true_labels.extend(y_true[i][padding_mask[i]])
                all_pred_labels.extend(y_pred[i][padding_mask[i]])


            progress.set_postfix_str(f"loss {losses[-1]:.4f}")

            optimizer.step()

            progress.update(1)

    logs = {
        "losses": np.array(losses),
        "true": np.array(all_true_labels),
        'preds': np.array(all_pred_labels),
    }
    return logs

In [None]:
@torch.inference_mode()
def evaluate(
    model: nn.Module,
    loader: DataLoader,
    criterion: nn.Module,
    device: str = "cpu",
    verbose: bool = True,
) -> Mapping[str, np.array]:
    """Model evaluation.

    Args:
        model: model to evaluate.
        loader: dataloader to use for evaluation.
        criterion: loss function.
        device: device to use for evaluation.
            Default is `"cpu"`.
        verbose: option to print evaluation progress bar.
            Default is `True`.

    Returns:
        dict with evaluation logs
    """
    model.eval()

    losses = []
    all_true_labels = []
    all_pred_labels = []

    for x_batch, y_true in tqdm(loader, desc="evaluation", file=sys.stdout, ncols=100, disable=not verbose):
        x_batch = {k: v.to(device) for k, v in x_batch.items()}
        y_true = y_true.to(device)

        log_prob = model(**x_batch).logits

        B, T = y_true.shape
        loss = criterion(log_prob.view(B * T, -1), y_true.view(B * T))

        losses.append(loss.item())

        y_pred = log_prob.argmax(2).detach().cpu().numpy()
        y_true = y_true.detach().cpu().numpy()
        padding_mask = y_true != -100
        for i in range(y_true.shape[0]):
            all_true_labels.extend(y_true[i][padding_mask[i]])
            all_pred_labels.extend(y_pred[i][padding_mask[i]])


    logs = {
        "losses": np.array(losses),
        "true": np.array(all_true_labels),
        'preds': np.array(all_pred_labels),
    }
    return logs


## Training

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device - {device}")

Device - cuda


In [None]:
model = AutoModelForTokenClassification.from_pretrained(model_id)
torch.manual_seed(42)
model.classifier = nn.Linear(model.classifier.in_features, len(targets))
model = model.to(device)
print(model)
print("Number of trainable parameters - {:,}".format(sum(p.numel() for p in model.parameters() if p.requires_grad)))

criterion = nn.CrossEntropyLoss(ignore_index=-100)
# NOTE: You can change learning rate to find a better model.
#       Please be carefull - transformers models are sensitive to learning rates,
#       if you take to high learning rate then your model will not converge.
optimizer = optim.Adam(model.parameters(), lr=2e-5)

XLMRobertaForTokenClassification(
  (roberta): XLMRobertaModel(
    (embeddings): XLMRobertaEmbeddings(
      (word_embeddings): Embedding(31274, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): XLMRobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x XLMRobertaLayer(
          (attention): XLMRobertaAttention(
            (self): XLMRobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): XLMRobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bia

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score

In [None]:
# NOTE: you can change number of epochs to train a better model
n_epochs = 3

train_losses = []
train_scores = []

valid_losses = []
valid_scores = []

#best_score = float("-inf")

for ep in range(n_epochs):
    print(f"\nEpoch {ep + 1:2d}/{n_epochs:2d}")

    train_logs = train_one_epoch(model, train_loader, criterion, optimizer, device, verbose=True)
    train_losses.append(np.mean(train_logs["losses"]))
    train_scores.append(f1_score(train_logs['true'], train_logs['preds'], average='macro'))
    print("      loss:", train_losses[-1])
    print("        f1:", train_scores[-1].mean(), train_scores[-1])


    valid_logs = evaluate(model, valid_loader, criterion, device, verbose=True)
    valid_losses.append(np.mean(valid_logs["losses"]))
    valid_scores.append(f1_score(valid_logs['true'], valid_logs['preds'], average='macro'))
    print("      loss:", valid_losses[-1])
    print("        f1:", valid_scores[-1].mean(), valid_scores[-1])
    print(classification_report(valid_logs['true'], valid_logs['preds']))

    # if valid_scores[-1].mean() >= best_score:
    #     checkpoint = {
    #         "model_state_dict": model.state_dict(),
    #         "optimizer_state_dict": optimizer.state_dict(),
    #         "epoch": ep,
    #         "num_epochs": n_epochs,
    #         "metrics": {
    #             "training": {"loss": train_losses[-1], "accuracy": train_scores[-1]},
    #             "validation": {"loss": valid_losses[-1], "accuracy": valid_scores[-1]},
    #         },
    #     }
    #     torch.save(checkpoint, "best.pth")
    #     print("🟢 Saved new best state! 🟢")
    #     best_score = valid_scores[-1].mean()  # update best score to a new one


Epoch  1/ 3
training: 100%|████████████████████████████████████| 1354/1354 [03:03<00:00,  7.39it/s, loss 0.0201]
      loss: 0.05998303959026803
        f1: 0.7578181658401121 0.7578181658401121
evaluation: 100%|█████████████████████████████████████████████████| 164/164 [00:05<00:00, 28.76it/s]
      loss: 0.03839348183491228
        f1: 0.8442343743531651 0.8442343743531651
              precision    recall  f1-score   support

           0       1.00      1.00      1.00     21594
           1       0.95      0.94      0.95       543
           2       0.97      0.96      0.97       202
           3       0.84      0.91      0.88       151
           4       0.89      0.87      0.88       149
           5       0.90      0.94      0.92       115
           6       0.65      1.00      0.79        28
           7       0.50      0.53      0.52        77
           8       0.82      0.62      0.71        68

    accuracy                           0.99     22927
   macro avg       0.84  