# Import Libraries

In [None]:
from datasets import Dataset
from functools import partial
from itertools import combinations
import json
import matplotlib.pyplot as plt
import numpy as np
import os
from google.colab import drive
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix,cohen_kappa_score
from sklearn.model_selection import train_test_split
import seaborn as sns
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback, AutoTokenizer, AutoModelForSequenceClassification

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Dataset Preprocessing and Preparation

In [None]:
def prepare_dataloaders(dataframes, tokenizer, batch_size=128, max_length=76, num_workers=4, shuffle_train=True, label_column='label'):

    """
    Prepares Hugging Face Datasets and PyTorch DataLoaders for train/val/test splits.

    Args:
        dataframes (dict): Dictionary with keys 'train', 'val', and 'test', each containing the corresponding DataFrame.
        tokenizer: Hugging Face tokenizer used to encode the text inputs.
        batch_size (int): Batch size for the DataLoaders.
        max_length (int): Maximum sequence length for tokenization.
                          (76 was chosen because it corresponds to the maximum text length in the dataset.)
        num_workers (int): Number of worker threads used by the DataLoaders.
        shuffle_train (bool): Whether to shuffle the training DataLoader.
        label_column (str): Name of the column containing the labels in the DataFrames.

    Returns:
        datasets (dict): Dictionary containing the tokenized Hugging Face Datasets.
        dataloaders (dict): Dictionary containing the corresponding PyTorch DataLoaders.
    """

    def tokenize_dataframe(df):
        df = df[['transcription', label_column]].copy()
        dataset = Dataset.from_pandas(df)

        def tokenize(batch):
            return tokenizer(
                batch['transcription'],
                padding='max_length',
                truncation=True,
                max_length=max_length
            )

        dataset = dataset.map(tokenize, batched=True, batch_size=len(dataset))
        dataset.set_format('torch', columns=['input_ids', 'attention_mask', label_column], output_all_columns=False)
        dataset = dataset.rename_column(label_column, 'labels')
        return dataset

    datasets = {key: tokenize_dataframe(df) for key, df in dataframes.items()}

    dataloaders = {}
    for key, ds in datasets.items():
        shuffle_flag = shuffle_train if key == 'train' else False
        dataloaders[key] = DataLoader(
            ds,
            batch_size=batch_size,
            shuffle=shuffle_flag,
            num_workers=num_workers,
            pin_memory=True
        )

    return datasets, dataloaders

# Training's function

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = logits.argmax(-1)
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="weighted")
    return {"accuracy": acc, "eval_f1": f1}

In [None]:
def train_hf_model(model, datasets, output_dir='./results',
                   num_labels=2, train_batch_size=16, eval_batch_size=32,
                   num_train_epochs=3, learning_rate=2e-5,
                   eval_strategy="epoch", save_strategy="epoch",
                   metric_for_best_model="eval_f1", greater_is_better=True,
                   early_stopping_patience=3, compute_metrics=compute_metrics):

    """
    Generic function for training a Hugging Face model using the Trainer API.

    Args:
        model: Hugging Face model (e.g., AutoModelForSequenceClassification).
        datasets (dict): Dictionary containing Hugging Face Datasets with keys 'train', 'val' (and optionally 'test').
        output_dir (str): Directory where checkpoints and output files will be saved.
        num_labels (int): Number of target classes.
        train_batch_size (int): Batch size used during training.
        eval_batch_size (int): Batch size used during evaluation.
        num_train_epochs (int): Number of training epochs.
        learning_rate (float): Learning rate for the optimizer.
        eval_strategy (str): Evaluation strategy ("steps" or "epoch").
        save_strategy (str): Checkpoint saving strategy ("steps" or "epoch").
        metric_for_best_model (str): Metric used to select the best model.
        greater_is_better (bool): Whether a higher value of the selected metric indicates a better model.
        early_stopping_patience (int): Number of evaluation steps or epochs to wait before early stopping.
        compute_metrics (callable): Function used to compute evaluation metrics.

    Returns:
        trainer: Trained Hugging Face Trainer instance.
        eval_results: Dictionary containing evaluation results.
    """


    training_args = TrainingArguments(
        output_dir=output_dir,
        per_device_train_batch_size=train_batch_size,
        per_device_eval_batch_size=eval_batch_size,
        num_train_epochs=num_train_epochs,
        learning_rate=learning_rate,
        eval_strategy=eval_strategy,
        save_strategy=save_strategy,
        load_best_model_at_end=True,
        metric_for_best_model=metric_for_best_model,
        greater_is_better=greater_is_better,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=datasets['train'],
        eval_dataset=datasets['val'],
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=early_stopping_patience)]
    )

    # Start training
    trainer.train()
    eval_results = trainer.evaluate()

    print(f" Training completed successfully!")
    print(f" Evaluation results: {eval_results}")

    return trainer, eval_results

# Test's function

In [None]:
def test_model(model, dataset_test, class_names=None, batch_size=32, average='macro'):

    """
    Runs model inference on the test dataset and computes evaluation metrics.
    Returns a dictionary containing metrics, predictions, and additional information.

    Args:
        model: Trained PyTorch model.
        dataset_test: PyTorch Dataset that must return 'input_ids', 'attention_mask', and 'labels'.
        class_names (list): List of class names (e.g., ['No Satire', 'Satire']).
        batch_size (int): Batch size used for the DataLoader.
        average (str): Averaging method for F1/Precision/Recall ('macro', 'weighted', etc.).

    Returns:
        results (dict): Dictionary containing metrics, confusion matrix, predictions, and per-class information.
    """

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()

    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, drop_last=False)

    all_logits = []
    all_labels = []

    with torch.no_grad():
        for batch in dataloader_test:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels']

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            all_logits.append(logits.cpu())
            all_labels.append(labels.cpu())

    all_logits = torch.cat(all_logits, dim=0)
    true_labels = torch.cat(all_labels, dim=0).numpy()

    probabilities = F.softmax(all_logits, dim=1)
    predicted_labels = torch.argmax(probabilities, dim=1).numpy()

    # Metrics
    accuracy = accuracy_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels, average=average)
    precision = precision_score(true_labels, predicted_labels, average=average)
    recall = recall_score(true_labels, predicted_labels, average=average)
    kappa = cohen_kappa_score(true_labels, predicted_labels)

    # Confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Number of istances fro classes
    if class_names is None:
        class_names = [str(i) for i in range(cm.shape[0])]
    num_per_class = dict(zip(class_names, np.sum(cm, axis=1)))

    # Stampa
    print("\n=== Test Results ===")
    print(f"Accuracy:  {accuracy:.4f}")
    print(f"F1 Score ({average}): {f1:.4f}")
    print(f"Precision ({average}): {precision:.4f}")
    print(f"Recall ({average}): {recall:.4f}")
    print(f"Cohen Kappa: {kappa:.4f}")
    print("\nClassification Report:")
    print(classification_report(true_labels, predicted_labels, target_names=class_names))

    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.show()

    results = {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall,
        "cohen_kappa": kappa,
        "true_labels": true_labels,
        "predicted_labels": predicted_labels,
        "probabilities": probabilities.numpy(),
        "confusion_matrix": cm,
        "num_per_class": num_per_class,
        "class_names": class_names
    }

    return results

# Save Results

In [None]:
def save_test_results(test_results, save_path):

"""
Saves test results to a JSON file and numerical arrays to .npy files.
Automatically handles non-serializable NumPy data types.
"""

    os.makedirs(save_path, exist_ok=True)

    array_keys = [
        'true_labels',
        'predicted_labels',
        'probabilities',
        'confusion_matrix',
        'num_per_class'
    ]

    def make_serializable(obj):
        if isinstance(obj, (np.integer, np.int32, np.int64)):
            return int(obj)
        elif isinstance(obj, (np.floating, np.float32, np.float64)):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, (set, tuple)):
            return list(obj)
        else:
            return obj

    json_results = {k: make_serializable(v) for k, v in test_results.items() if k not in array_keys}

    json_file = os.path.join(save_path, 'test_results.json')
    with open(json_file, 'w') as f:
        json.dump(json_results, f, indent=4)

    for key in array_keys:
        if key in test_results:
            np.save(os.path.join(save_path, f"{key}.npy"), test_results[key])

    print(f"Results saved in: {save_path}")

# Main

In [None]:
torch.cuda.empty_cache()

In [None]:
# Global variables
base_path = "Add cross validation's path"
base_save_path = "Add saving reults's path"
num_splits = 10
batch_size = 128
model_name = 'dbmdz/bert-base-italian-cased'
num_labels = 2

tokenizer = AutoTokenizer.from_pretrained(model_name)

# Training in every cross validation's fold
for i in range(5, 6):
    print(f"\n=== Processing Cross {i} ===")
    split_path = os.path.join(base_path, f'Cross {i}')

    # CSV Files
    train_file = os.path.join(split_path, 'train.csv')
    val_file = os.path.join(split_path, 'validation.csv')
    test_file = os.path.join(split_path, 'test.csv')

    # Read the CSV
    df_train = pd.read_csv(train_file)
    df_val = pd.read_csv(val_file)
    df_test = pd.read_csv(test_file)

    # Label mapping
    labels = sorted(df_train['label'].unique().tolist())
    id2label = {id: label for id, label in enumerate(labels)}
    label2id = {label: id for id, label in enumerate(labels)}
    print("id2label:", id2label)
    print("label2id:", label2id)

    # Map labels in dataframes
    df_train['label'] = df_train['label'].map(label2id)
    df_val['label'] = df_val['label'].map(label2id)
    df_test['label'] = df_test['label'].map(label2id)

    # Prepare dataset and dataloader
    dataframes = {'train': df_train, 'val': df_val, 'test': df_test}
    datasets, dataloaders = prepare_dataloaders(dataframes, tokenizer, batch_size=batch_size)

    # Model
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=num_labels
    )

    # Training
    output_dir = os.path.join(base_save_path, f'Cross {i}')
    trainer, eval_results = train_hf_model(
        model=model,
        datasets=datasets,
        output_dir=output_dir,
        num_labels=num_labels
    )

    # Test
    print(f"\n--- Test Cross {i} ---")
    test_results = test_model(
        model=model,
        dataset_test=datasets['test'],
        class_names=labels,
        batch_size=32
    )

    # Saving reults
    save_path = os.path.join(base_save_path, f'Cross {i}')
    save_test_results(test_results, save_path)

# Cross-validation evaluation with external dataset

In [None]:
# Global variables
base_path = "Add cross validation's path"
base_save_path = 'Add saving path'
num_splits = 10
batch_size = 128
model_name = 'dbmdz/bert-base-italian-cased'
num_labels = 2
i=1

tokenizer = AutoTokenizer.from_pretrained(model_name)


print(f"\n=== Processing ===")
split_path = os.path.join(base_path, f'Cross {i}')

# CSV files
train_file = os.path.join(split_path, 'train.csv')
val_file = os.path.join(split_path, 'validation.csv')
test_file = os.path.join(split_path, 'test.csv')

# Read CSV
df_train = pd.read_csv(train_file)
df_val = pd.read_csv(val_file)
df_test = pd.read_csv(test_file)

# Label mapping
labels = sorted(df_train['label'].unique().tolist())
id2label = {id: label for id, label in enumerate(labels)}
label2id = {label: id for id, label in enumerate(labels)}
print("id2label:", id2label)
print("label2id:", label2id)

# Map label to dataframes
df_train['label'] = df_train['label'].map(label2id)
df_val['label'] = df_val['label'].map(label2id)
df_test['label'] = df_test['label'].map(label2id)

df_all= pd.concat([df_train, df_val, df_test])
df_all = df_all.sample(frac=1, random_state=42).reset_index(drop=True)

df_train, df_val = train_test_split(df_all, test_size=0.2, random_state=42)

# Prepare dataset and dataloader
dataframes = {'train': df_train, 'val': df_val}
datasets, dataloaders = prepare_dataloaders(dataframes, tokenizer, batch_size=batch_size)

# Model
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=num_labels
)

# Training
output_dir = os.path.join(base_save_path, f'Cross {i}')
trainer, eval_results = train_hf_model(
        model=model,
        datasets=datasets,
        output_dir=output_dir,
        num_labels=num_labels
)

In [None]:
dataset_esterno_path ="Add external dataset's path"
base_save_path = "Add external dataset's result path"

# CSV file
external_file= os.path.join( dataset_esterno_path, 'dataset_2k.csv')

df_external = pd.read_csv(external_file)

df_external['label'] = df_external['label'].map(label2id)

# Prepare dataset and dataloader
dataframes = {'external': df_external}
datasets, dataloaders = prepare_dataloaders(dataframes, tokenizer, batch_size=batch_size)

# Test
print(f"\n--- Test datasert external")
test_results = test_model(
        model=model,
        dataset_test=datasets['external'],
        class_names=labels,
        batch_size=32
    )

# Save results
save_path = os.path.join(base_save_path, f'Cross {i}')
save_test_results(test_results, save_path)