```
// Copyright 2020 Twitter, Inc.
// SPDX-License-Identifier: Apache-2.0
```

# Finetune Part of Speech Tagging Models

Take an existing BERT model (with or without TPP pre-training) and fine-tune it on an Part of Speech Tagging dataset.


## Setup libraries


In [None]:
%pip install transformers==3.5.1 datasets==1.1.2 torch==1.4.0 seqeval==1.2.2 gensim==3.8.1

## Define parameters


In [None]:
HOMEDIR = "./"
DATADIR = f"{HOMEDIR}/"
pre_trained_model_path = "bert-base-multilingual-uncased"
langs = "en"  # "en" # "**"


## Setup Helpers


In [None]:
import json
import random
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union

import numpy as np
import pandas as pd
import torch
from IPython.core.debugger import set_trace
from sklearn.metrics import classification_report
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm, trange
from transformers import (
    AutoModelForTokenClassification,
    AutoTokenizer,
    BertForTokenClassification,
    BertTokenizerFast,
    Pipeline,
    RobertaTokenizerFast,
    TokenClassificationPipeline,
    Trainer,
    TrainingArguments,
)
from transformers.tokenization_utils_base import (
    BatchEncoding,
    PaddingStrategy,
    PreTrainedTokenizerBase,
)


In [None]:
{
    l.split(":")[0]: i + 1
    for i, l in enumerate(
        """ADJ: adjective
ADP: adposition
ADV: adverb
AUX: auxiliary
CCONJ: coordinating conjunction
DET: determiner
INTJ: interjection
NOUN: noun
NUM: numeral
PART: particle
PRON: pronoun
PROPN: proper noun
PUNCT: punctuation
SCONJ: subordinating conjunction
SYM: symbol
VERB: verb
X: other""".splitlines()
    )
}


In [None]:
URL_REGEX = re.compile(r"^http[s]?://[^ ]+")


def clean_tokens(token):
    if ord(token[0]) == 65039:
        token = token[1:]
    if token == chr(65039):
        return ""
    if token == "\n":
        return "[LF]"
    return URL_REGEX.sub("[URL]", token)


def read_ud_data(file_path, label_col=3):
    with open(file_path) as fp:
        all_tokens = []
        all_labels = []
        for line in tqdm(fp):
            seq = json.loads(line)
            seq = seq["tokens"]
            if not seq:
                print(f"Error: {seq}")
                continue
            tokens, labels = zip(*[(t[1], t[label_col]) for t in seq])
            tokens = [clean_tokens(token) for token in tokens]
            # Remove empty tokens
            tokens, labels = tuple(
                zip(*[(t, l) for t, l in zip(tokens, labels) if t and t.strip()])
            )
            all_tokens.append(tokens)
            all_labels.append(labels)
    return all_tokens, all_labels


label2id = {
    "ADJ": 1,
    "ADP": 2,
    "ADV": 3,
    "AUX": 4,
    "CCONJ": 5,
    "DET": 6,
    "INTJ": 7,
    "NOUN": 8,
    "NUM": 9,
    "PART": 10,
    "PRON": 11,
    "PROPN": 12,
    "PUNCT": 13,
    "SCONJ": 14,
    "SYM": 15,
    "VERB": 16,
    "X": 17,
}

LABEL_MAP = {k: k for k in label2id}
label2id = {LABEL_MAP[k]: v - 1 for k, v in label2id.items()}

id2label = {v: k for k, v in label2id.items()}


In [None]:
class SplitTokenClassificationPipeline(TokenClassificationPipeline):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __call__(self, inputs: Union[str, List[str]], **kwargs):
        """
    Classify each token of the text(s) given as inputs.
    Args:
        inputs (:obj:`str` or :obj:`List[str]`):
            One or several texts (or one list of texts) for token classification.
    Return:
        A list or a list of list of :obj:`dict`: Each result comes as a list of dictionaries (one for each token in
        the corresponding input, or each entity if this pipeline was instantiated with
        :obj:`grouped_entities=True`) with the following keys:
        - **word** (:obj:`str`) -- The token/word classified.
        - **score** (:obj:`float`) -- The corresponding probability for :obj:`entity`.
        - **entity** (:obj:`str`) -- The entity predicted for that token/word.
        - **index** (:obj:`int`, only present when ``self.grouped_entities=False``) -- The index of the
          corresponding token in the sentence.
    """
        #     set_trace()
        inputs, offset_mappings = self._args_parser(
            inputs, **kwargs
        )  # offset_mappings introduced in newer version
        # inputs = self._args_parser(inputs, **kwargs) # offset_mappings introduced in newer version

        answers = []

        for i, sentence in enumerate(
            inputs[0]
        ):  # Another addition to only select first list item in newer version
            # sentence = sentence[0]
            # Manage correct placement of the tensors
            with self.device_placement():

                tokens = self.tokenizer(
                    sentence,
                    return_attention_mask=False,
                    return_tensors=self.framework,
                    truncation=True,
                    return_special_tokens_mask=True,
                    return_offsets_mapping=self.tokenizer.is_fast,
                    is_split_into_words=True,  # This is the new addition,
                    padding=True,
                    max_length=self.tokenizer.max_len,
                )
                if self.tokenizer.is_fast:
                    offset_mapping = tokens.pop("offset_mapping").cpu().numpy()[0]
                elif offset_mappings:
                    offset_mapping = offset_mappings[i]
                else:
                    offset_mapping = None

                special_tokens_mask = tokens.pop("special_tokens_mask").cpu().numpy()[0]

                # Forward
                if self.framework == "tf":
                    entities = self.model(tokens.data)[0][0].numpy()
                    input_ids = tokens["input_ids"].numpy()[0]
                else:
                    with torch.no_grad():
                        tokens = self.ensure_tensor_on_device(**tokens)
                        entities = self.model(**tokens)[0][0].cpu().numpy()
                        input_ids = tokens["input_ids"].cpu().numpy()[0]

            score = np.exp(entities) / np.exp(entities).sum(-1, keepdims=True)
            labels_idx = score.argmax(axis=-1)

            entities = []
            # Filter to labels not in `self.ignore_labels`
            # Filter special_tokens
            filtered_labels_idx = [
                (idx, label_idx)
                for idx, label_idx in enumerate(labels_idx)
                if (self.model.config.id2label[label_idx] not in self.ignore_labels)
                and not special_tokens_mask[idx]
            ]

            for idx, label_idx in filtered_labels_idx:
                if offset_mapping is not None:
                    start_ind, end_ind = offset_mapping[idx]
                    word_ref = sentence[start_ind:end_ind]
                    word = self.tokenizer.convert_ids_to_tokens([int(input_ids[idx])])[
                        0
                    ]
                    is_subword = len(word_ref) != len(word)

                    if int(input_ids[idx]) == self.tokenizer.unk_token_id:
                        word = word_ref
                        is_subword = False
                else:
                    word = self.tokenizer.convert_ids_to_tokens(int(input_ids[idx]))

                entity = {
                    "word": word,
                    "score": score[idx][label_idx].item(),
                    "entity": self.model.config.id2label[label_idx],
                    "index": idx,
                    "offset": (start_ind, end_ind),
                }

                entity["is_subword"] = is_subword  # Another addition

                if self.grouped_entities and self.ignore_subwords:
                    entity["is_subword"] = is_subword

                entities += [entity]

            if self.grouped_entities:
                answers += [self.group_entities(entities)]
            # Append ungrouped entities
            else:
                answers += [entities]

        if len(answers) == 1:
            return answers[0]
        return answers


# Metrics
def compute_metrics(all_preds, all_labels, return_report=False, extra_label_map=None):
    if extra_label_map is None:
        extra_label_map = {}

    def clean_pred(preds):
        return [p["entity"] for p in preds if p["offset"][0] == 0]

    def clean_label(labels):
        return [LABEL_MAP[extra_label_map.get(l, l)] for l in labels]

    true_predictions = [clean_pred(preds) for preds in all_preds]
    true_labels = [clean_label(labels) for labels in all_labels]
    tp = []
    tl = []
    num_errors = 0
    for i, (p, l) in enumerate(zip(true_predictions, true_labels)):
        if len(p) != len(l):
            # print(f"{i} len(p)[{len(p)}] != len(l)[{len(l)}], p={p}, l={l}")
            num_errors += 1
            continue
        tp.append(p)
        tl.append(l)

    print(f"Found {num_errors} errors in length mismatch.")
    true_predictions = tp
    true_labels = tl

    if return_report:
        #       report = classification_report(true_labels, true_predictions)
        #       print(report)
        report = classification_report(
            sum(true_labels, []), sum(true_predictions, []), output_dict=True
        )
        return report

    metrics = {
        "accuracy_score": accuracy_score(true_labels, true_predictions),
        "precision": precision_score(true_labels, true_predictions),
        "recall": recall_score(true_labels, true_predictions),
        "f1": f1_score(true_labels, true_predictions),
    }

    return metrics


def get_preds(all_tokens, ner_pipeline):
    all_preds = []
    batch_size = 64
    for i in trange(0, len(all_tokens) + batch_size, batch_size):
        batch = all_tokens[i : i + batch_size]
        if batch:
            preds = ner_pipeline(batch)
            all_preds.extend(preds)
    return all_preds


def run_eval(data_path, ner_pipeline, extra_label_map=None):
    all_tokens, all_labels = read_ud_data(data_path)
    all_preds = get_preds(all_tokens, ner_pipeline)
    report = compute_metrics(
        all_preds, all_labels, return_report=True, extra_label_map=extra_label_map
    )
    df_report = pd.DataFrame(report).T  # pd.DataFrame.from_dict(report, orient="index")
    return df_report


In [None]:
@dataclass
class DataCollatorForTokenClassification:
    """
    NOTE: Code taken from huggingface transformers library
    Data collator that will dynamically pad the inputs received, as well as the labels.
    Args:
        tokenizer (:class:`~transformers.PreTrainedTokenizer` or :class:`~transformers.PreTrainedTokenizerFast`):
            The tokenizer used for encoding the data.
        padding (:obj:`bool`, :obj:`str` or :class:`~transformers.tokenization_utils_base.PaddingStrategy`, `optional`, defaults to :obj:`True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:
            * :obj:`True` or :obj:`'longest'`: Pad to the longest sequence in the batch (or no padding if only a single
              sequence if provided).
            * :obj:`'max_length'`: Pad to a maximum length specified with the argument :obj:`max_length` or to the
              maximum acceptable input length for the model if that argument is not provided.
            * :obj:`False` or :obj:`'do_not_pad'` (default): No padding (i.e., can output a batch with sequences of
              different lengths).
        max_length (:obj:`int`, `optional`):
            Maximum length of the returned list and optionally padding length (see above).
        pad_to_multiple_of (:obj:`int`, `optional`):
            If set will pad the sequence to a multiple of the provided value.
            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
        label_pad_token_id (:obj:`int`, `optional`, defaults to -100):
            The id to use when padding the labels (-100 will be automatically ignore by PyTorch loss functions).
    """

    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    label_pad_token_id: int = -100

    def __call__(self, features):
        label_name = "label" if "label" in features[0].keys() else "labels"
        labels = (
            [feature[label_name] for feature in features]
            if label_name in features[0].keys()
            else None
        )
        for feature in features:
            feature.pop(label_name)
        batch = self.tokenizer.pad(
            features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            # Conversion to tensors will fail if we have labels as they are not of the same length yet.
            return_tensors="pt" if labels is None else None,
        )

        if labels is None:
            return batch

        sequence_length = torch.tensor(batch["input_ids"]).shape[1]
        padding_side = self.tokenizer.padding_side
        if padding_side == "right":
            batch["labels"] = [
                label.tolist()
                + [self.label_pad_token_id] * (sequence_length - len(label))
                for label in labels
            ]
        else:
            batch["labels"] = [
                [self.label_pad_token_id] * (sequence_length - len(label))
                + label.tolist()
                for label in labels
            ]

        batch = {
            k: torch.tensor(v, dtype=torch.int64)[:, : self.max_length]
            for k, v in batch.items()
        }
        return batch


In [None]:
def encode_labels(labels, encodings):
    labels = [label2id[LABEL_MAP[label]] for label in labels]
    offset = encodings["offset_mapping"]
    # create an empty array of -100
    n = len(offset)
    doc_enc_labels = np.ones(n, dtype=int) * -100
    arr_offset = np.array(offset)
    positions = np.arange(n)
    mask = (arr_offset[:, 0] == 0) & (
        (positions != 0) & (positions != n - 1)
    )
    # set labels whose first offset position is 0 and the second is not 0
    doc_enc_labels[mask] = labels
    return doc_enc_labels.tolist()


class UDDataset(torch.utils.data.Dataset):
    def __init__(self, file_paths, tokenizer):
        self.file_paths = file_paths if isinstance(file_paths, list) else [file_paths]
        self.tokenizer = tokenizer
        self._setup()

    def _setup(self):
        all_tokens, all_labels = [], []
        for file_path in self.file_paths:
            all_tokens_fp, all_labels_fp = read_ud_data(file_path)
            all_tokens += all_tokens_fp
            all_labels += all_labels_fp
        self.data = []
        num_errors = 0
        all_encodings = encodings = self.tokenizer(
            all_tokens, is_split_into_words=True, return_offsets_mapping=True
        )

        for i, (tokens, labels) in tqdm(enumerate(zip(all_tokens, all_labels))):
            if len(tokens) == 0 or len(labels) == 0:
                num_errors += 1
                continue
            try:
                encodings = {k: all_encodings[k][i] for k in all_encodings}
                labels = encode_labels(labels, encodings)
                encodings.pop("offset_mapping")  # Don't pass to model
                self.data.append((encodings, labels))
            except ValueError as e:
                num_errors += 1
                print(f"idx={i} has issues [num_errors={num_errors}/{i}]: {e}")
                continue
        print(
            f"Errors: {num_errors}, data={len(self.data)}, %error={num_errors*100./len(self.data)}"
        )

    def __getitem__(self, idx):
        encodings, labels = self.data[idx]
        item = {key: torch.tensor(val) for key, val in encodings.items()}
        item["labels"] = torch.tensor(labels)
        return item

    def __len__(self):
        return len(self.data)


## Run Training


In [None]:
model_dir = pre_trained_model_path
tokenizer = AutoTokenizer.from_pretrained(
    str(model_dir), max_len=512, truncation=True, padding=True, use_fast=True
)


In [None]:
data_dir = Path(f"{HOMEDIR}/ud_data/").expanduser()

LANG_MAP = {
    "en": "English",
    "ja": "Japanese",
    "ar": "Arabic",
    "hi": "Hindi",
    "**": "**",
}

langs = "en"  # "en" # "**"
train_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-train.conllu.json")
)
val_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-dev.conllu.json")
)
test_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-test.conllu.json")
)
train_data_path


In [None]:
train_dataset = UDDataset(train_data_path, tokenizer)
val_dataset = UDDataset(val_data_path, tokenizer)


In [None]:
len(train_dataset), len(val_dataset)


In [None]:
model_prefix = "multi" if langs == "**" else langs
ner_model_dir = str(Path(f"{HOMEDIR}/{model_prefix}_udpos_model").expanduser())
logging_dir = str(Path(f"{HOMEDIR}/{model_prefix}_udpos_logs").expanduser())

data_collator = DataCollatorForTokenClassification(
    tokenizer=tokenizer, padding=True, max_length=tokenizer.max_len
)

eval_every_steps = -1  # -1 for eval at end

training_args = TrainingArguments(
    output_dir=str(ner_model_dir),  # output directory
    num_train_epochs=3,  # total number of training epochs
    per_device_train_batch_size=16,  # batch size per device during training
    per_device_eval_batch_size=64,  # batch size for evaluation
    warmup_steps=500,  # number of warmup steps for learning rate scheduler
    weight_decay=0.01,  # strength of weight decay
    logging_dir=str(logging_dir),  # directory for storing logs
    logging_steps=10,
    # evaluation_strategy="steps",
    # eval_steps=100,
    save_steps=500 if eval_every_steps < 1 else eval_every_steps,
    save_total_limit=2 if eval_every_steps < 1 else None,
    max_steps=-1 if eval_every_steps < 1 else 5,
    label_names=[id2label[i] for i in range(len(id2label))],
)


def train_model():
    model = AutoModelForTokenClassification.from_pretrained(
        str(model_dir), num_labels=len(label2id), id2label=id2label, label2id=label2id
    )
    trainer = Trainer(
        model=model,  # the instantiated 🤗 Transformers model to be trained
        args=training_args,  # training arguments, defined above
        train_dataset=train_dataset,  # training dataset
        eval_dataset=val_dataset,  # evaluation dataset
        data_collator=data_collator,
        tokenizer=tokenizer,
    )
    trainer.train()
    trainer.save_model(ner_model_dir)
    tokenizer.save_pretrained(ner_model_dir)
    return model, trainer


In [None]:
%%time
model, trainer = train_model()

## Run evaluation


In [None]:
from typing import Any, Callable, Dict, List, NewType, Optional, Tuple, Union

import numpy as np
import pandas as pd
import torch
from tqdm import trange
from transformers import (
    AutoModelForTokenClassification,
    AutoTokenizer,
    Pipeline,
    TokenClassificationPipeline,
    pipeline,
)


In [None]:
ner_model_dir = str(Path(f"{HOMEDIR}/{model_prefix}_udpos_model/").expanduser())

model = AutoModelForTokenClassification.from_pretrained(ner_model_dir)
tokenizer = AutoTokenizer.from_pretrained(
    str(model_dir), max_len=512, truncation=True, padding=True, use_fast=True
)


In [None]:
ner_pipeline = SplitTokenClassificationPipeline(
    model=model, tokenizer=tokenizer, grouped_entities=False, ignore_labels=[], device=0
)


In [None]:
data_dir = Path(f"{HOMEDIR}/ud_data/").expanduser()

LANG_MAP = {"en": "English", "ja": "Japanese", "ar": "Arabic", "hi": "Hindi", "**": "*"}

langs = "**"  # "en" # "**"
train_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-train.conllu.json")
)
val_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-dev.conllu.json")
)
test_data_path = list(
    data_dir.glob(f"./UD_{LANG_MAP.get(langs, langs)}-*/*-ud-test.conllu.json")
)
extra_label_map = None


In [None]:
test_data_path


In [None]:
%%time
reports = {}
for test_path in test_data_path:
    lang, split = test_path.name.split("-")[0].split("_")
    print(lang, split, test_path.name)
    report = run_eval(test_path, ner_pipeline, extra_label_map=extra_label_map)
    reports[(lang, split)] = report

In [None]:
report


In [None]:
df_report = pd.concat(reports)
df_report


In [None]:
df_report.to_csv(Path(ner_model_dir) / "test_eval_report.txt", sep="\t")


In [None]:
df_report[df_report.index.isin(["accuracy"], level=2)]


In [None]:
df_report = pd.read_csv(
    Path(ner_model_dir) / "test_eval_report.txt", sep="\t", index_col=[0, 1, 2]
)
df_report[df_report.index.isin(["accuracy"], level=2)]


In [None]:
if Path(f"{HOMEDIR}/test_eval_report.txt").expanduser().exists():
    df_report = pd.read_csv(
        f"{HOMEDIR}/test_eval_report.txt", sep="\t", index_col=[0, 1]
    )
df_report[df_report.index.isin(["accuracy"], level=2)]
