In [None]:
from transformers import (
    AutoTokenizer, 
    AutoModelForSequenceClassification, 
    TrainingArguments, 
    Trainer
)
from transformers import EarlyStoppingCallback
from sklearn.metrics import classification_report
from sklearn.utils.class_weight import compute_class_weight
from datasets import Dataset
import torch
import pandas as pd
import numpy as np
import os

In [None]:
# === 1. Charger les données OLID ===
df = pd.read_csv("../datasets/training-v1/offenseval-training-v1.tsv", sep="\t", header=None)
df.columns = ["id", "text", "label_A", "label_B", "label_C"]

# Encode les labels pour chaque task
def encode_labels(df):
    df = df.copy()
    df["label_A_enc"] = df["label_A"].map({"NOT": 0, "OFF": 1})
    df["label_B_enc"] = df["label_B"].map({"UNT": 0, "TIN": 1})
    df["label_C_enc"] = df["label_C"].map({"IND": 0, "GRP": 1, "OTH": 2})
    return df

df = encode_labels(df)

In [None]:
MODEL_NAMES = {
    "A": "roberta-base",             # Task A: general model
    "B": "GroNLP/hateBERT",          # Task B: fine-tuned with loss weighting
    "C": "GroNLP/hateBERT"           # Task C: basic hateBERT
}
NUM_LABELS = {"A": 2, "B": 2, "C": 3}# === 3. Préparer les datasets HuggingFace ===

In [None]:
# === Prepare dataset ===
def prepare_dataset(df, task):
    if task == "A":
        df_task = df.dropna(subset=["label_A_enc"])
        labels = df_task["label_A_enc"].tolist()
    elif task == "B":
        df_task = df[df["label_A"] == "OFF"].dropna(subset=["label_B_enc"])
        labels = df_task["label_B_enc"].tolist()
    elif task == "C":
        df_task = df[(df["label_A"] == "OFF") & (df["label_B"] == "TIN")].dropna(subset=["label_C_enc"])
        labels = df_task["label_C_enc"].tolist()

    texts = df_task["text"].tolist()
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAMES[task], use_fast=True)
    encodings = tokenizer(texts, truncation=True, padding=True)

    dataset = Dataset.from_dict({
        "input_ids": encodings["input_ids"],
        "attention_mask": encodings["attention_mask"],
        "labels": torch.tensor(labels, dtype=torch.long).tolist()
    })

    return dataset.train_test_split(test_size=0.2, seed=42), labels

In [None]:
# === Compute class weights for weighted loss ===
def compute_class_weights(labels, num_labels, task=None):
    class_weights = compute_class_weight(class_weight='balanced', classes=np.arange(num_labels), y=labels)

    # Optional boost for Task C
    if task == "C":
        class_weights[1] *= 2.0  # GRP
        class_weights[2] *= 3.0  # OTH

    return torch.tensor(class_weights, dtype=torch.float)

In [None]:
class WeightedFocalLossTrainer(Trainer):
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.to(self.args.device) if class_weights is not None else None
        self.gamma = 2.0

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels").long()
        outputs = model(**inputs)
        logits = outputs.logits

        ce_loss = torch.nn.functional.cross_entropy(logits, labels, weight=self.class_weights, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma * ce_loss).mean()

        return (focal_loss, outputs) if return_outputs else focal_loss



In [None]:
# === Metrics ===
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    preds = np.argmax(predictions, axis=1)
    report = classification_report(labels, preds, output_dict=True, zero_division=0)
    return {
        "f1": report["weighted avg"]["f1-score"],
        "accuracy": report["accuracy"]
    }


In [None]:
# === 5. Entraînement d'un modèle pour chaque task ===
# === Training per task ===
def train_task(task, resume=True):
    print(f"\n🧠 Starting training for Task {task}")

    # Choose model
    if task == "A":
        model_name = "roberta-base"
    else:
        model_name = "GroNLP/hateBERT"

    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    dataset, labels = prepare_dataset(df, task)

    output_dir = f"./results_{task}_{model_name.split('/')[-1]}"
    logging_dir = f"./logs_{task}_{model_name.split('/')[-1]}"

    checkpoint_path = None
    if resume and os.path.isdir(output_dir):
        checkpoints = [os.path.join(output_dir, d) for d in os.listdir(output_dir) if d.startswith("checkpoint")]
        if checkpoints:
            checkpoint_path = sorted(checkpoints, key=lambda x: int(x.split('-')[-1]))[-1]
            print(f"🔁 Resuming from checkpoint: {checkpoint_path}")
        else:
            print("⚠️ No checkpoint found — starting from scratch.")
    else:
        print("🆕 Starting training from scratch.")

    model = AutoModelForSequenceClassification.from_pretrained(
        checkpoint_path if checkpoint_path else model_name,
        num_labels=NUM_LABELS[task]
    ).to("cuda" if torch.cuda.is_available() else "cpu")

    training_args = TrainingArguments(
        output_dir=output_dir,
        logging_dir=logging_dir,
        num_train_epochs=4,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=16,
        save_strategy="steps" if task == "B" else "epoch",
        save_steps=500 if task == "B" else None,
        eval_strategy="steps" if task == "B" else "epoch",
        eval_steps=500 if task == "B" else None,
        logging_steps=100,
        learning_rate=2e-5,
        weight_decay=0.01,
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        label_smoothing_factor=0.1 if task == "B" else 0.0,
        save_total_limit=2,
        report_to="none",
        logging_first_step=True,
        disable_tqdm=False,
        greater_is_better=True,
        seed=42
    )

    if task == "B":
        class_weights = compute_class_weights(labels, NUM_LABELS[task], task=task)
        trainer = WeightedFocalLossTrainer(
            class_weights=class_weights,
            model=model,
            args=training_args,
            train_dataset=dataset["train"],
            eval_dataset=dataset["test"],
            compute_metrics=compute_metrics,
            callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
        )
    else:
        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=dataset["train"],
            eval_dataset=dataset["test"],
            tokenizer=tokenizer,
            compute_metrics=compute_metrics
        )

    trainer.train(resume_from_checkpoint=checkpoint_path if checkpoint_path else None)
    trainer.save_model(f"./best_model_{task}_{model_name.split('/')[-1]}")
    print(f"✅ Task {task} training complete.")

In [None]:
# === 6. Lancer les trois trainings ===
for task in ["A", "B", "C"]:
    train_task(task, resume=False)