<a href="https://colab.research.google.com/github/simulate111/Textual-Data-Analysis-25/blob/main/sequence_labeling_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sequence labeling example

Let's train a transformer model on a Named Entity Recognition (NER) dataset.

---

## Setup

Install the required Python packages:

In [None]:
!pip install --quiet fsspec==2024.10.0 transformers datasets evaluate seqeval
pip install --upgrade datasets

Import the libraries we'll be using here.

In [None]:
import datasets
import transformers
import evaluate

from pprint import pprint

Make things a bit more quiet. (This only affects what shows on screen when running. If you're debugging, you probably want to comment these out.)

In [None]:
transformers.utils.logging.set_verbosity_error()
datasets.logging.set_verbosity_error()
datasets.disable_progress_bar()

---

## Load dataset

Load a dataset for training using `datasets`.

In [None]:
DATASET = 'conll2003'

builder = datasets.load_dataset_builder(DATASET)
dataset = datasets.load_dataset(DATASET)

NotImplementedError: Loading a dataset cached in a LocalFileSystem is not supported.

Let's have a look at the description and dataset.

In [None]:
print(builder.info.description)

In [None]:
print(dataset)

We have the conventional split into `train`, `validation`, and `test`.

We're here only interested in the `tokens` and `ner_tags`. (In particular, the `ner_tags` and `chunk_tags` are included to support methods based on manually engineered features, and as such not highly relevant to the deep learning approach we're pursuing here.)

Let's have a look at one example.

In [None]:
print(dataset['train'][0]['tokens'])
print(dataset['train'][0]['ner_tags'])

Take note of the number of different labels and create mappings from label IDs to label strings and vice versa; we'll need these later.

In [None]:
label_names = dataset['train'].features['ner_tags'].feature.names
print('Labels:', label_names)

num_labels = len(label_names)
id2label = { k: v for k, v in enumerate(label_names) }
label2id = { v: k for k, v in enumerate(label_names) }

print('Number of labels:', num_labels)
print('id2label mapping:', id2label)
print('labelid2 mapping:', label2id)

Let's see that example again, applying the label mapping:

In [None]:
for token, tag_id in zip(dataset['train'][0]['tokens'], dataset['train'][0]['ner_tags']):
    print(f'{token}\t{id2label[tag_id]}')

---

## Tokenize and vectorize dataset

As in the [text classification notebook](https://github.com/TurkuNLP/textual-data-analysis-course/blob/main/text_classification_basic_example.ipynb), we'll first load the tokenizer that corresponds to the model that we want to use. `AutoTokenizer` is a convenience class that will return the appropriate tokenizer for the model it's given as an argument:

In [None]:
MODEL = 'bert-base-cased'

tokenizer = transformers.AutoTokenizer.from_pretrained(MODEL)

The tokenizer will, most importantly, produce `input_ids`, which identify the tokens of the text.

The BERT tokenizer also produces an `attention_mask`, which can be used to make the model ignore some tokens, and `token_type_ids`, which can differentiate parts of the input e.g. when it consists of two separate texts.

In [None]:
pprint(tokenizer('this is an example sentence'))

A key point here is to note that the data already has its own definition of "token", and the tokenizer may split some of those into parts:

In [None]:
input_ids = tokenizer('Turku is not in the vocabulary').input_ids


print(input_ids)
print()
print(tokenizer.convert_ids_to_tokens(input_ids))

For each sentence, our data consists of a tokenized list of strings ("words") rather than a single string. If we call the tokenizer with its default options, it interprets each token as a different example:

In [None]:
print(dataset['train'][0]['tokens'])
print()

for ids in tokenizer(dataset['train'][0]['tokens']).input_ids:
    print(tokenizer.convert_ids_to_tokens(ids))

To get the correct mapping, we provide `is_split_into_words=True` to the tokenizer.

In [None]:
tokens = 'Turku is not in the vocabulary'.split()
tokenized = tokenizer(tokens, is_split_into_words=True)

print(tokens)
print()
pprint(tokenizer.convert_ids_to_tokens(tokenized.input_ids))

The tokenizer also provides us with a mapping from the tokenizer tokens to "original" tokens ("words")

In [None]:
tokenized.word_ids()

With `is_split_into_words=True`, we can tokenize the input so that its tokens are compatible with the model, but the labels will be misaligned.

In [None]:
from itertools import zip_longest

token_ids = tokenizer(dataset['train'][0]['tokens'], is_split_into_words=True).input_ids
tag_ids = dataset['train'][0]['ner_tags']

for token_id, tag_id in zip_longest(token_ids, tag_ids):
    token = tokenizer.convert_ids_to_tokens(token_id)
    tag = id2label[tag_id] if tag_id is not None else None
    print(f'{token}\t{tag}')

To resolve this, we'll borrow a function from [a transformers tutorial](https://huggingface.co/course/chapter7/2). Here, `-100` is a "magic value" for a label that pytorch ignores.

In [None]:
def align_labels_with_tokens(labels, word_ids):
    new_labels = []
    current_word = None
    for word_id in word_ids:
        if word_id != current_word:    # Start of a new word
            current_word = word_id
            label = -100 if word_id is None else labels[word_id]
            new_labels.append(label)
        elif word_id is None:          # Special token
            new_labels.append(-100)
        else:                          # Same word as previous token
            label = labels[word_id]
            if label % 2 == 1:         # If label is B-XXX we change it to I-XXX
                label += 1
            new_labels.append(label)
    return new_labels

We'll also borrow a function for jointly tokenizing the text and aliging labels:

In [None]:
def tokenize_and_align_labels(inputs):
    outputs = tokenizer(inputs['tokens'], truncation=True, is_split_into_words=True)
    new_labels = []
    for i, labels in enumerate(inputs['ner_tags']):
        word_ids = outputs.word_ids(i)
        new_labels.append(align_labels_with_tokens(labels, word_ids))
    outputs['labels'] = new_labels
    return outputs

We'll then apply this to the whole dataset:

In [None]:
dataset = dataset.map(tokenize_and_align_labels, batched=True)

Now these should match up:

In [None]:
token_ids = dataset['train'][0]['input_ids']
tag_ids = dataset['train'][0]['labels']

for token_id, tag_id in zip_longest(token_ids, tag_ids):
    token = tokenizer.convert_ids_to_tokens(token_id)
    tag = id2label[tag_id] if tag_id != -100 else None
    print(f'{token}\t{tag}')

---

## Instantiate model

Now, we'll instantiate a pretrained model with a sequence labeling head. In the `transformers` library, this class of models are named `...ModelForTokenClassification`. (cf. `...ModelForSequenceClassification`) We'll again use the `Auto` variant to get the appropriate class based on model name.

**NOTE**: we need to provide the number of labels to `from_pretrained` so that the function knows the size of the output layer that is required. The `id2label` and `label2id` mappings allow the model to report its classification results in interpretable text labels.

In [None]:
model = transformers.AutoModelForTokenClassification.from_pretrained(
    MODEL,
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id
)

---

## Training configuration

To assess the progress and results of training, we'll use the standard `seqeval` library. We'll also need to introduce a function that takes model outputs and the labels from the dataset and calls the metric.

Here, we'll again borrow from [the transformers tutorial](https://huggingface.co/course/chapter7/2):

In [None]:
metrics = evaluate.load('seqeval')


def compute_metrics(outputs_and_labels):
    outputs, labels = outputs_and_labels
    predictions = outputs.argmax(axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[id2label[i] for i in label if i != -100] for label in labels]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metrics.compute(predictions=true_predictions, references=true_labels)
    return {
        'precision': all_metrics['overall_precision'],
        'recall': all_metrics['overall_recall'],
        'f1': all_metrics['overall_f1'],
        'accuracy': all_metrics['overall_accuracy'],
    }

We'll also need a collator for padding the examples to the same length to process them in batches.

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

The `TrainingArguments` class configures many of the details of the model training. You may want to try optimizing the following hyperparameters to improve model performance:

* `learning_rate`: the step size for weight updates
* `per_device_train_batch_size`: number of examples per training batch
* `max_steps`: the maximum number of steps to train for

In [None]:
trainer_args = transformers.TrainingArguments(
    report_to="none",
    output_dir='checkpoints',
    evaluation_strategy='steps',
    logging_strategy='steps',
    load_best_model_at_end=True,
    eval_steps=100,
    logging_steps=100,
    learning_rate=0.00002,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    max_steps=1500,
)

Finally, we'll create a custom [callback](https://huggingface.co/docs/transformers/main_classes/callback) to store values logged during training so that we can more easily examine them later. (This is only needed for visualization and is not necessary to understand in detail.)

In [None]:
from collections import defaultdict

class LogSavingCallback(transformers.TrainerCallback):
    def on_train_begin(self, *args, **kwargs):
        self.logs = defaultdict(list)
        self.training = True

    def on_train_end(self, *args, **kwargs):
        self.training = False

    def on_log(self, args, state, control, logs, model=None, **kwargs):
        if self.training:
            for k, v in logs.items():
                if k != "epoch" or v not in self.logs[k]:
                    self.logs[k].append(v)

training_logs = LogSavingCallback()

---

## Train (fine-tune) model

In [None]:
trainer = transformers.Trainer(
    model=model,
    args=trainer_args,
    train_dataset=dataset['train'],
    eval_dataset=dataset['validation'],
    compute_metrics=compute_metrics,
    tokenizer=tokenizer,
    data_collator=data_collator,
    callbacks=[training_logs],
)

In [None]:
trainer.train()

---

## Evaluate trained model

We can use the `trainer` to evaluate the trained model using the metric we defined:

In [None]:
eval_results = trainer.evaluate(dataset['test'])

pprint(eval_results)

print('\nF1:', eval_results['eval_f1'])

As we captured performance during training using the `training_logs` callback, we can also have a look at training and evaluation loss and evaluation $F_1$ progression. (The code here is only for visualization and you do not need to understand it, but you should aim to be able to interpret the plots.)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

def plot(logs, keys, labels):
    values = sum([logs[k] for k in keys], [])
    plt.ylim(max(min(values)-0.1, 0.0), min(max(values)+0.1, 1.0))
    for key, label in zip(keys, labels):
        plt.plot(logs['epoch'], logs[key], label=label)
    plt.legend()
    plt.show()

plot(training_logs.logs, ['loss', 'eval_loss'], ['Training loss', 'Evaluation loss'])

In [None]:
plot(training_logs.logs, ['eval_f1'], ['Evaluation F1'])

---

## Create pipeline



We can wrap our fine-tuned model in a pipeline for convenience. (We need to specify `device` here as the model is on GPU.)

In [None]:
pipe = transformers.pipeline(
    'token-classification',
    model=model,
    tokenizer=tokenizer,
    aggregation_strategy='simple',
    device=0
)

We can then use the pipeline simply as follows:

In [None]:
pipe('Finnish cities include Turku and Tampere.')

Or, for convenience

In [None]:
def tag(text):
    output = pipe(text)
    print('input:', text)
    print('output:', [(o['word'], o['entity_group']) for o in output])

tag('Finnish cities include Turku and Tampere.')

In [None]:
tag('Paavo Nurmi was born in Turku in 1897.')

In [None]:
tag('Nokia is a company founded near the town of Nokia.')