In [None]:
from datasets import load_metric, Dataset
from transformers import (
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    AutoModelForSequenceClassification
)
from sklearn.metrics import f1_score
import torch as nn
import numpy as np
import pandas as pd

In [None]:
# instantiate tokenizer

model_checkpoint = "allenai/scibert_scivocab_uncased"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
# tokenizing function

def preprocess_function_batch(examples):
    return tokenizer(
        examples["sentence"], 
        truncation=True,
        padding=True,
        max_length=512,
        #add_special_tokens=True,
        return_tensors="pt"
    )

In [None]:
# read in dataset

label2id = {"BACKGROUND": 0, "OBJECTIVE": 1, "METHODS": 2, "RESULTS": 3, "CONCLUSIONS": 4}

with open('data/train.txt') as f:
    train_lines = f.readlines()    
train_data = []
for line in train_lines:
    new_line = {
        "sentence": line.split("\t")[2][:-2], 
        "label": label2id[line.split("\t")[1]]
    }
    train_data.append(new_line)

with open('data/validation.txt') as f:
    eval_lines = f.readlines()    
eval_data = []
for line in eval_lines:
    new_line = {
        "sentence": line.split("\t")[2][:-2], 
        "label": label2id[line.split("\t")[1]]
    }
    eval_data.append(new_line)

with open('data/test.txt') as f:
    test_lines = f.readlines()    
test_data = []
for line in test_lines:
    new_line = {
        "sentence": line.split("\t")[2][:-2], 
        "label": label2id[line.split("\t")[1]]
    }
    test_data.append(new_line)

print(train_data[:5])
print(eval_data[:5])
print(test_data[:5])

In [None]:
# create huggingface datasets

train_dataset = Dataset.from_list(train_data)
eval_dataset = Dataset.from_list(eval_data)
test_dataset = Dataset.from_list(test_data)

print(train_dataset)
print(eval_dataset)
print(test_dataset)

In [None]:
# encode datasets

train_encoded = train_dataset.map(preprocess_function_batch, batched=True)
eval_encoded = eval_dataset.map(preprocess_function_batch, batched=True)
test_encoded = test_dataset.map(preprocess_function_batch, batched=True)

In [None]:
# rename to "labels" & calculate class weights

final_train = train_encoded.rename_column("label", "labels")
final_eval = eval_encoded.rename_column("label", "labels")
final_test = test_encoded.rename_column("label", "labels")

train_df = final_train.to_pandas()
class_weights = (1 - (train_df["labels"].value_counts().sort_index() / len(train_df))).values
class_weights

In [None]:
# weighted loss function (because of imbalanced classes)

class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(**inputs)
        logits = outputs.get("logits")
        labels = inputs.get("labels")
        loss_func = nn.CrossEntropyLoss(weight=class_weights)
        loss = loss_func(logits, labels)

        return (loss, outputs) if return_outputs else loss


In [None]:
# use f1 score as metric

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    f1 = f1_score(labels, preds, average="weighted")
    return {"f1": f1}

In [None]:
# load model

model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=5, hidden_dropout_prob=0.25)
model.to("cuda")

In [None]:
# specify output directory

model_output_dir = "scibert-finetuned-abstract-classification-h6"
print(model_output_dir)

# start TensorBoard before training to monitor progress

%load_ext tensorboard
%tensorboard --logdir '{model_output_dir}'/runs

In [None]:
# setup trainer arguments

args = TrainingArguments(
    output_dir=model_output_dir,
    evaluation_strategy="steps",
    eval_steps=50,
    logging_strategy="steps",
    logging_steps=50,
    save_strategy="steps",
    save_steps=100,
    learning_rate=3e-5,
    weight_decay=0.05,
    warmup_steps=100,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=10,
    load_best_model_at_end=True,
    # fp16=True, 
    report_to="tensorboard",
)

In [None]:
# setup trainer

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=final_train,
    eval_dataset=final_eval,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

# train the model & save checkpoint

trainer.train()
model.save_pretrained(model_output_dir + "/best_model")

In [None]:
#check on test set

metric = load_metric("accuracy")

dataset_test_encoded = final_test
test_predictions = trainer.predict(dataset_test_encoded)
test_predictions_argmax = np.argmax(test_predictions[0], axis=1)
test_references = np.array(final_test["labels"])
# Compute accuracy & f1
print("Test Results:")
print("accuracy:", metric.compute(predictions=test_predictions_argmax, references=test_references)["accuracy"])
print("f1-score:", f1_score(test_references, test_predictions_argmax, average="weighted"))
