In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install evaluate
!pip install accelerate -U
!pip install --upgrade transformers

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import evaluate
import torch
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback, PretrainedConfig
from transformers.integrations import TensorBoardCallback
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
from torch.nn import BCEWithLogitsLoss

In [None]:
file_path = '/content/drive/MyDrive/moral_foundations_mlc_hf/data/df_sentence_corpus.csv'

df = pd.read_csv(file_path)

sentences = df['sentence'].astype(str).tolist()
labels = df[['care_virtue', 'care_vice', 'fairness_virtue', 'fairness_vice', 'loyalty_virtue',
'loyalty_vice', 'authority_virtue', 'authority_vice', 'sanctity_virtue', 'sanctity_vice']].values

label_names = ['care_virtue', 'care_vice', 'fairness_virtue', 'fairness_vice', 'loyalty_virtue',
'loyalty_vice', 'authority_virtue', 'authority_vice', 'sanctity_virtue', 'sanctity_vice']

train_texts, val_texts, train_labels, val_labels = train_test_split(
    sentences, labels, test_size=0.2, random_state=666)

In [None]:
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

train_encodings = tokenizer(train_texts, truncation=True, padding=True)
val_encodings = tokenizer(val_texts, truncation=True, padding=True)

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = Dataset(train_encodings, train_labels)
val_dataset = Dataset(val_encodings, val_labels)

In [None]:
model = RobertaForSequenceClassification.from_pretrained(
    'roberta-base', num_labels=10, problem_type="multi_label_classification")

In [None]:
training_args = TrainingArguments(
    output_dir='/content/drive/MyDrive/moral_foundations_mlc_hf/results',
    num_train_epochs=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.1,
    logging_dir='/content/drive/MyDrive/moral_foundations_mlc_hf/logs',
    logging_steps=100,
    evaluation_strategy="steps",
    eval_steps=500,
    save_steps=500,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    fp16=True,
    label_smoothing_factor=0.1,
)

early_stopping = EarlyStoppingCallback(early_stopping_patience=3)

accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = torch.sigmoid(torch.tensor(logits)).numpy()
    predictions = (predictions > 0.666).astype(int)

    overall_accuracy = accuracy_metric.compute(predictions=predictions.flatten(), references=labels.flatten())["accuracy"]
    overall_f1 = f1_metric.compute(predictions=predictions.flatten(), references=labels.flatten(), average="weighted")["f1"]
    overall_precision = precision_metric.compute(predictions=predictions.flatten(), references=labels.flatten(), average="weighted")["precision"]
    overall_recall = recall_metric.compute(predictions=predictions.flatten(), references=labels.flatten(), average="weighted")["recall"]

    per_class_metrics = {}
    for i, label_name in enumerate(label_names):
        class_preds = predictions[:, i]
        class_labels = labels[:, i]

        accuracy = accuracy_metric.compute(predictions=class_preds, references=class_labels)["accuracy"]
        f1 = f1_metric.compute(predictions=class_preds, references=class_labels, average="binary")["f1"]
        precision = precision_metric.compute(predictions=class_preds, references=class_labels, average="binary")["precision"]
        recall = recall_metric.compute(predictions=class_preds, references=class_labels, average="binary")["recall"]

        per_class_metrics[label_name] = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1
        }

    metrics = {
        'overall_accuracy': overall_accuracy,
        'overall_precision': overall_precision,
        'overall_recall': overall_recall,
        'overall_f1': overall_f1,
        'per_class_metrics': per_class_metrics
    }

    return metrics

def multi_label_loss(outputs, labels):
    loss_fct = BCEWithLogitsLoss()
    return loss_fct(outputs.logits, labels.float())

class PlottingCallback(TensorBoardCallback):
    def __init__(self, tb_writer):
        super().__init__()
        self.tb_writer = tb_writer
        self.train_losses = []
        self.eval_losses = []
        self.train_steps = []
        self.eval_steps = []

    def on_log(self, args, state, control, logs=None, **kwargs):
        super().on_log(args, state, control, logs, **kwargs)
        if "loss" in logs:
            self.train_losses.append(logs["loss"])
            self.train_steps.append(state.global_step)
        if "eval_loss" in logs:
            self.eval_losses.append(logs["eval_loss"])
            self.eval_steps.append(state.global_step)

        if 'eval_per_class_metrics' in logs:
            for label, metrics in logs['eval_per_class_metrics'].items():
                for metric_name, value in metrics.items():
                    self.tb_writer.add_scalar(f'{label}/{metric_name}', value, state.global_step)

    def on_train_end(self, args, state, control, **kwargs):
        plt.figure(figsize=(10, 5))
        plt.plot(self.train_steps, self.train_losses, label="Training Loss")
        plt.plot(self.eval_steps, self.eval_losses, label="Validation Loss")
        plt.xlabel("Steps")
        plt.ylabel("Loss")
        plt.title("Training and Validation Loss")
        plt.legend()
        plt.savefig('/content/drive/MyDrive/moral_foundations_mlc_hf/plots/loss_plot.png')
        plt.close()

tb_writer = SummaryWriter('/content/drive/MyDrive/moral_foundations_mlc_hf/tensorboard_logs')

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    callbacks=[PlottingCallback(tb_writer), early_stopping],
)

def compute_loss(self, model, inputs, return_outputs=False):
    labels = inputs.pop("labels")
    outputs = model(**inputs)
    loss = multi_label_loss(outputs, labels)
    return (loss, outputs) if return_outputs else loss

trainer.compute_loss = compute_loss.__get__(trainer)

class CustomConfig(PretrainedConfig):
    model_type = "roberta"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.num_labels = len(label_names)
        self.label_names = label_names

config = CustomConfig.from_pretrained('roberta-base')
config.label_names = label_names

In [None]:
trainer.train()

In [None]:
evaluation = trainer.evaluate()

In [None]:
print("Overall metrics:")
print(f"Accuracy: {evaluation['eval_overall_accuracy']:.4f}")
print(f"Precision: {evaluation['eval_overall_precision']:.4f}")
print(f"Recall: {evaluation['eval_overall_recall']:.4f}")
print(f"F1-score: {evaluation['eval_overall_f1']:.4f}")

print("\nPer-class metrics:")
for label, metrics in evaluation['eval_per_class_metrics'].items():
    print(f"\n{label}:")
    for metric_name, value in metrics.items():
        print(f"  {metric_name}: {value:.4f}")

In [None]:
import os
import json

model_save_path = "/content/drive/MyDrive/moral_foundations_mlc_hf/MoralFoundationsClassifier"
os.makedirs(model_save_path, exist_ok=True)
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)
config.save_pretrained(model_save_path)
torch.save(model.state_dict(), os.path.join(model_save_path, "pytorch_model.bin"))

with open(os.path.join(model_save_path, "label_names.json"), 'w') as f:
    json.dump(label_names, f)

In [None]:
tb_writer.close()