In [1]:
import sys
from pathlib import Path
ROOT_DIR = Path().resolve().parents[0]
sys.path.append(str(ROOT_DIR))
import config as cfg

from transformers import (
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback
)
import numpy as np
from sklearn.metrics import roc_auc_score
from datasets import load_from_disk, Dataset
import os
import json
from typing import Optional, List, Dict

N_RUN = 4               # Number of run to separe different experiments

# Functions and classes definition

In [2]:
def get_fold_datasets(ds: Dataset, fold: int) -> tuple[Dataset, Dataset]:
    """
	Splits the dataset into training and validation sets based on the specified fold.
    Args:
		ds: The dataset to split.
		fold: The fold number to use for validation.
	Returns:
		ds_train: The training dataset excluding the specified fold.
		ds_val: The validation dataset containing only the specified fold.
    """
    ds_train = ds.filter(lambda x: x["fold"] != fold)
    ds_val = ds.filter(lambda x: x["fold"] == fold)
    return ds_train, ds_val

In [3]:
def model_init(model_name: str = cfg.MODEL_BASE) -> AutoModelForSequenceClassification:
    """Initializes the model for sequence classification.
    Args:
		model_name: The name of the pre-trained model to load.
	Returns:
		An instance of AutoModelForSequenceClassification initialized with the specified model.
	"""
    return AutoModelForSequenceClassification.from_pretrained(
        model_name,
        num_labels=6,
        problem_type="multi_label_classification",
    )

In [4]:
def compute_metrics(eval_pred: tuple[np.ndarray, np.ndarray]) -> dict[str, float]:
    """ Computes the ROC AUC score for the evaluation predictions.
    Args:
        eval_pred: A tuple containing the logits and labels.
    Returns:
        A dictionary containing the ROC AUC score.
    """
    # Unpack the logits and labels from the evaluation predictions
    logits, labels = eval_pred
    # Convert logits to probabilities using the sigmoid function
    probs = 1 / (1 + np.exp(-logits))
    # Calculate the ROC AUC score using the probabilities and true labels
    auc = roc_auc_score(labels, probs, average="macro")
    # Return the ROC AUC score in a dictionary
    return {"roc_auc_macro": auc}

In [5]:
def print_highlighted_box(text: str, width: int = 80) -> None:
    """
    Prints a highlighted box with the given text centered.
    Used mainly to remark the start of a new fold message in the training
    process, which writes a lot of text to the console and makes it hard to
	follow the output.
    """
    spaces = (width - len(text)) // 2
    left_spaces = spaces - 1
    right_spaces = spaces if (width - len(text)) % 2 else spaces - 1
    print(f"{'-' * width}")
    print(f"|{' ' * (width - 2)}|")
    print(f"|{' ' * left_spaces}{text}{' ' * right_spaces}|")
    print(f"|{' ' * (width - 2)}|")
    print(f"{'-' * width}")

In [6]:
def make_training_args(cfg, checkpoint_dir: str) -> TrainingArguments:
    """
    Create a fully configured TrainingArguments instance.
    The function is intentionally stateless except for `cfg` and the path.
    Args:
		- cfg: Configuration object containing training parameters.
		- checkpoint_dir: Directory where the model checkpoints will be saved.
	Returns:
		- A TrainingArguments instance with all necessary parameters set.
    """
    return TrainingArguments(
        # structure
        num_train_epochs           	= cfg.EPOCHS,
        per_device_train_batch_size	= cfg.BATCH_SIZE,
        per_device_eval_batch_size 	= cfg.BATCH_SIZE,
        gradient_accumulation_steps	= 1,

        # optimisation
        learning_rate  							= cfg.LEARNING_RATE,
        weight_decay   							= 0.01,
        optim          							= "adamw_torch_fused",

        # evaluation / saving
        eval_strategy 							= "steps",
        eval_steps          				= cfg.EVAL_STEPS,
        save_strategy       				= "steps",
        save_steps          				= cfg.SAVE_STEPS,
        load_best_model_at_end 			= True,
        metric_for_best_model  			= "eval_roc_auc_macro",
        save_only_model     				= True,
        save_total_limit    				= cfg.SAVE_TOTAL_LIMIT,

        # precision / memory
        fp16                  			= True,
        gradient_checkpointing			= False,
        dataloader_num_workers			= 2,
        dataloader_pin_memory 			= True,

        # logging / reproducibility
        logging_steps 							= cfg.LOGGING_STEPS,
        seed          							= cfg.RANDOM_SEED,
        output_dir    							= checkpoint_dir,
    )

In [7]:
class TrainerWithTrainMetrics(Trainer):
	"""
    Custom Trainer class that extends the default Trainer to include training
    metrics in the evaluation process, in addition to the standard evaluation metrics.
    These metrics will be used to monitor the training performance and plot
    training curves.
    """
	def evaluate(
        self,
        eval_dataset: Optional[Dataset] = None,
        ignore_keys: Optional[List[str]] = None,
        metric_key_prefix: str = "eval",
    ) -> Dict[str, float]:
		"""
		Evaluate the model on the given evaluation dataset and also on the training dataset.
		This method extends the default evaluate method to include training metrics.
		Args:
			- eval_dataset: The dataset to evaluate the model on. If None, uses the training dataset.
			- ignore_keys: A list of keys to ignore in the evaluation.
			- metric_key_prefix: A prefix for the metric keys in the returned dictionary.
		Returns:
			- A dictionary containing the evaluation metrics, including training metrics.
		"""

		# Validation metrics (what is usually returned by Trainer.evaluate)
		metrics = super().evaluate(
            eval_dataset=eval_dataset,
            ignore_keys=ignore_keys,
            metric_key_prefix=metric_key_prefix,
        )

        # Training metrics (added in this custom Trainer)
		train_metrics = super().evaluate(
            eval_dataset=self.train_dataset,
            ignore_keys=ignore_keys,
            metric_key_prefix="train",
        )

        # Combine metrics
		metrics.update(train_metrics)
		return metrics

In [8]:
def make_trainer(
    cfg,
    model_init_fn,
    train_ds,
    val_ds,
    training_args: TrainingArguments,
) -> TrainerWithTrainMetrics:
    """
    Build the customised Trainer with metrics-over-train logic and
    early-stopping callback.
    Args:
		cfg: Configuration object containing training parameters.
		model_init_fn: Function to initialize the model.
		train_ds: Training dataset.
		val_ds: Validation dataset.
		training_args: Training arguments for the Trainer.
	Returns:
		A TrainerWithTrainMetrics instance configured with the provided parameters.
    """
    return TrainerWithTrainMetrics(
        model          = model_init_fn(cfg.MODEL_BASE),
        args           = training_args,
        train_dataset  = train_ds,
        eval_dataset   = val_ds,
        compute_metrics= compute_metrics,
        callbacks      = [
            EarlyStoppingCallback(
                early_stopping_patience=cfg.EARLY_STOP_PATIENCE
            )
        ],
    )

# Load dataset

In [9]:
ds_train_tokenized = load_from_disk(cfg.PATH_DS_TRAIN_TOKENIZED)
ds_train_tokenized

Dataset({
    features: ['id', 'fold', 'labels', 'input_ids', 'token_type_ids', 'attention_mask'],
    num_rows: 159571
})

# Cross-Validation Training Loop

In [10]:
# Iterate over the number of folds defined in the configuration
# This allows for cross-validation training, where the model is trained and validated on different subsets of the data.
# Each fold will have its own training and validation datasets, and the model will be trained separately for each fold.
for fold in range(cfg.N_FOLDS):

	# Build the path for the model checkpoints and final model dinamically based on the run and the fold
	path_checkpoint_dir = os.path.join(cfg.PATH_CHECKPOINTS, cfg.MODEL_BASE, f"run_{N_RUN}", f"fold_{fold}")
	path_model_final = os.path.join(path_checkpoint_dir, "model_final")

	# Get the training and validation datasets for the current fold
	ds_train, ds_val = get_fold_datasets(ds_train_tokenized, fold)

	# Print a highlighted box with the fold information
	message = (
		f"FOLD {fold}: TRAIN SIZE: {len(ds_train)} "
		f"({len(ds_train)/len(ds_train_tokenized):.2%}), VAL SIZE: {len(ds_val)} "
		f"({len(ds_val)/len(ds_train_tokenized):.2%})"
	)
	print_highlighted_box(text=message, width=80)

	# Initialize the training arguments
	args = make_training_args(cfg, path_checkpoint_dir)

	# Initialize the trainer with the model, training arguments, datasets, and metrics
	trainer = make_trainer(
		cfg=cfg, model_init_fn=model_init, train_ds=ds_train,
		val_ds=ds_val, training_args=args
	)

	# Train the model
	train_results = trainer.train()
    
	# Save log_history of the last training to easily access it later
	path_hist = os.path.join(path_checkpoint_dir, "log_history.json")
	with open(path_hist, "w") as f:
		json.dump(trainer.state.log_history, f, indent=2)

	# Save the final model
	trainer.save_model(path_model_final)

--------------------------------------------------------------------------------
|                                                                              |
|        FOLD 0: TRAIN SIZE: 127656 (80.00%), VAL SIZE: 31915 (20.00%)         |
|                                                                              |
--------------------------------------------------------------------------------


  return torch._C._cuda_getDeviceCount() > 0
Some weights of DebertaV2ForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-v3-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


KeyboardInterrupt: 