In [None]:
! pip install transformers datasets evaluate seqeval accelerate -U
from datasets import load_dataset,load_metric, Dataset, Features, ClassLabel
from transformers import DebertaForTokenClassification, DebertaTokenizerFast, Trainer, TrainingArguments, DataCollatorForTokenClassification
import torch
import numpy as np
import pandas as pd
import csv
import evaluate
from sklearn.metrics import confusion_matrix


In [None]:
# optional block to disable unnecessary wandb login
! pip install -q wandb
import wandb
wandb.init(mode="disabled")

In [None]:
train_data_path = '/kaggle/input/qc1a-dataset/train.tsv'
eval_data_path = '/kaggle/input/qc1a-dataset/val.tsv'
test_data_path = '/kaggle/input/qc1a-dataset/test.tsv'
save_path = '/kaggle/working/'

train_data = pd.read_csv(train_data_path, sep='\t')
eval_data = pd.read_csv(eval_data_path, sep='\t')
test_data = pd.read_csv(test_data_path, sep='\t')

def str2list(token):
    if type(token) is float:
        return []
    token = token[1:-1]
    splitted = [word[1:-1] for word in token.split(", ")]
    return splitted

train_data["tokens"] = train_data["tokens"].map(str2list)
train_data["ner_tags"] = train_data["ner_tags"].map(str2list)
eval_data["tokens"] = eval_data["tokens"].map(str2list)
eval_data["ner_tags"] = eval_data["ner_tags"].map(str2list)
test_data["tokens"] = test_data["tokens"].map(str2list)
test_data["ner_tags"] = test_data["ner_tags"].map(str2list)

train_dataset = Dataset.from_pandas(train_data)
eval_dataset = Dataset.from_pandas(eval_data)
test_dataset = Dataset.from_pandas(test_data)

classmap = ClassLabel(num_classes=3, names=['O', 'B-focus', 'I-focus'])

train_dataset = train_dataset.map(lambda y: {"ner_tags": classmap.str2int(y["ner_tags"])})
eval_dataset = eval_dataset.map(lambda y: {"ner_tags": classmap.str2int(y["ner_tags"])})
test_dataset = test_dataset.map(lambda y: {"ner_tags": classmap.str2int(y["ner_tags"])})

label_names = dict(zip([0, 1, 2], ['O', 'B-focus', 'I-focus']))
id2label = {i: classmap.int2str(i) for i in range(classmap.num_classes)}
label2id = {c: classmap.str2int(c) for c in classmap.names}


In [None]:

model_checkpoints = ["microsoft/deberta-base"]

for model_checkpoint in model_checkpoints:
    tokenizer = DebertaTokenizerFast.from_pretrained(model_checkpoint, add_prefix_space=True)
    model = DebertaForTokenClassification.from_pretrained(
        model_checkpoint,
        id2label={i: classmap.int2str(i) for i in range(classmap.num_classes)},
        label2id={c: classmap.str2int(c) for c in classmap.names},
        finetuning_task="ner"
    )

    def tokenize_and_align_labels(examples):
        tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
        labels = []
        for i, label in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []
            for word_idx in word_ids:
                if word_idx is None:
                    label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)
                previous_word_idx = word_idx
            labels.append(label_ids)
        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
    eval_dataset = eval_dataset.map(tokenize_and_align_labels, batched=True)
    test_dataset = test_dataset.map(tokenize_and_align_labels, batched=True)

    data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

    metric = evaluate.load("seqeval")

    def compute_metrics(eval_preds):
        logits, labels = eval_preds
        predictions = np.argmax(logits, axis=-1)
        true_labels = [[label_names[l] for l in label if l != -100] for label in labels]
        true_predictions = [
            [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
            for prediction, label in zip(predictions, labels)
        ]
        all_metrics = metric.compute(predictions=true_predictions, references=true_labels)

        flattened_true = [label for sublist in true_labels for label in sublist]
        flattened_pred = [label for sublist in true_predictions for label in sublist]

        labels_set = list(set(flattened_true + flattened_pred))

        cm = confusion_matrix(flattened_true, flattened_pred, labels=labels_set)

        per_class_accuracies = cm.diagonal() / cm.sum(axis=1)
        class_accuracies = {label: per_class_accuracies[idx] for idx, label in enumerate(labels_set)}

        return {
            "overall_precision": all_metrics["overall_precision"],
            "overall_recall": all_metrics["overall_recall"],
            "overall_f1": all_metrics["overall_f1"],
            "overall_accuracy": all_metrics["overall_accuracy"],
            "per_class_accuracies": class_accuracies,
            "entity_metrics": {
                entity: {
                    "precision": metrics["precision"],
                    "recall": metrics["recall"],
                    "f1": metrics["f1"],
                    "number": metrics["number"]
                }
                for entity, metrics in all_metrics.items()
                if entity not in ["overall_precision", "overall_recall", "overall_f1", "overall_accuracy"]
            }
        }

    training_args = TrainingArguments(
        output_dir=save_path,
        num_train_epochs=3,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        learning_rate=5e-5,
        weight_decay=0.01,
        evaluation_strategy="epoch",
        logging_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    trainer.train()

    predictions, labels, _ = trainer.predict(test_dataset)
    predictions = np.argmax(predictions, axis=-1)

    true_labels = [[label_names[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    errors = []
    for i, (true_label, true_prediction) in enumerate(zip(true_labels, true_predictions)):
        for j, (label, prediction) in enumerate(zip(true_label, true_prediction)):
            if label != prediction:
                errors.append({
                    "index": i,
                    "tokens": test_data.iloc[i]["tokens"],
                    "token": test_data.iloc[i]["tokens"][j],
                    "true_label": label,
                    "predicted_label": prediction
                })

    errors_df = pd.DataFrame(errors)
    errors_df.to_csv(f"{save_path}/errors.tsv", sep='\t', index=False)


In [None]:
model.save_pretrained(save_path + "model/")
tokenizer.save_pretrained(save_path + "tokenizer/")