In [1]:
# Suppress unnecessary warnings and set verbosity for Transformers
import warnings
import transformers
transformers.logging.set_verbosity_error()
warnings.filterwarnings("ignore")

# PyTorch core libraries
import torch
from torch import nn, Tensor
from torch.nn.functional import softmax
from torch.utils.data import Dataset, DataLoader
from torchmetrics import Accuracy

# Transformers and Datasets
from transformers import CamembertModel, CamembertTokenizer, CamembertConfig
from datasets import load_dataset

# PyTorch Lightning and Metrics
import pytorch_lightning as pl
from torchmetrics import Accuracy
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

# Visualization and DataFrame utilities
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
tokenizer = CamembertTokenizer.from_pretrained('camembert-base')

In [3]:
class XNLIDataset(Dataset):
    def __init__(self, cache_directory, split="train", language="fr", tokenizer=tokenizer, max_length=64):
        """
        PyTorch-compatible dataset for the XNLI dataset.

        Args:
            split (str): Data split to load ("train", "test", or "validation").
            language (str): Target language for the dataset.
            cache_directory (str): Directory to cache the downloaded dataset.
            max_length (int): Maximum sequence length for padding/truncation.
        """
        super(XNLIDataset, self).__init__()
        self.split = split
        self.language = language
        self.cache_directory = cache_directory
        self.max_length = max_length

        # Load the data and the tokenizer
        self.data = load_dataset(
            "facebook/xnli",
            name=self.language,
            cache_dir=self.cache_directory
        )[self.split]  # Load the specified data split

        self.tokenizer = tokenizer  # CamembertTokenizer.from_pretrained("camembert-base")

    def __len__(self):
        """Returns the size of the dataset."""
        return len(self.data)

    def __getitem__(self, idx):
        """
        Retrieve a specific sample from the dataset.

        Args:
            idx (int): Index of the sample.

        Returns:
            dict: Contains `input_ids`, `attention_mask`, and `label`.
        """
        example = self.data[idx]
        inputs = self.tokenizer(
            example["premise"],
            example["hypothesis"],
            max_length=self.max_length,
            truncation=True,
            padding="max_length",
            return_tensors="pt"
        )

        # Add the labels
        inputs = {key: val.squeeze(0) for key, val in inputs.items()}  # Remove batch dimension
        inputs["label"] = torch.tensor(example["label"], dtype=torch.long)

        return inputs

In [4]:
dataset = load_dataset("facebook/xnli", name='fr', cache_dir="../../../data/xnli")

# Display some examples from the dataset
print(f"Premise : {dataset['validation'][2]['premise']}")
print(f"Hypothesis : {dataset['validation'][2]['hypothesis']}")
print(f"Label : {dataset['validation'][2]['label']} (entailment)")

Using the latest cached version of the dataset since facebook/xnli couldn't be found on the Hugging Face Hub
Found the latest cached dataset configuration 'fr' at ..\..\..\data\xnli\facebook___xnli\fr\0.0.0\b8dd5d7af51114dbda02c0e3f6133f332186418e (last modified on Wed Dec  4 15:17:37 2024).


Premise : Et il a dit, maman, je suis à la maison.
Hypothesis : Il a dit à sa mère qu'il était rentré.
Label : 0 (entailment)


In [26]:
data_path = "data/xnli"

xnli_train_dataset = XNLIDataset(split="train", language="fr", cache_directory=data_path, max_length=64)
xnli_val_dataset = XNLIDataset(split="validation", language="fr", cache_directory=data_path, max_length=64)

train_loader = DataLoader(xnli_train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(xnli_val_dataset, batch_size=256, shuffle=False)


batch = next(iter(val_loader))
print(f"Batch shape: {batch['input_ids'].shape}")
print(f"Token IDs (example):\n{batch['input_ids'][2]} \n")
decoded_text = tokenizer.decode(batch['input_ids'][2])
print(f"Decoded text (example):\n{decoded_text}")

Batch shape: torch.Size([256, 64])
Token IDs (example):
tensor([    5,   139,    51,    33,   227,     7,  2699,     7,    50,   146,
           15,    13,   269,     9,     6,     6,    69,    33,   227,    15,
           77,   907,    46,    11,    62,   149, 10540,     9,     6,     1,
            1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
            1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
            1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
            1,     1,     1,     1]) 

Decoded text (example):
<s> Et il a dit, maman, je suis à la maison.</s></s> Il a dit à sa mère qu'il était rentré.</s><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad>


In [8]:
class CamemBERTBaseModel(nn.Module):
    def __init__(self, model_path: str, trainable: bool = False):
        """
        Initialize the base CamemBERT model.
        param model_path: Path to the pre-trained CamemBERT model.
        """
        super(CamemBERTBaseModel, self).__init__()
        self.base_model = CamembertModel.from_pretrained(model_path)
        self.tranaible = trainable
        self.config = CamembertConfig()
        
        if not trainable:
            for param in self.base_model.parameters():
                param.requires_grad = False
            self.base_model.eval()
        else :
            self.base_model.train()

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the base model.
        param input_ids: Tensor of token IDs.
        param attention_mask: Tensor of attention masks.
        return: Last hidden states from the base model.
        """
        outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        return outputs.last_hidden_state

    def get_hidden_size(self) -> int:
        return self.config.hidden_size

In [None]:
import torch
import torch.nn as nn

class CamemBERTBaseModel(nn.Module):
    def __init__(self, model_path: str, trainable: bool = False):
        """
        Initialize the base model from a saved .pth file.
        :param model_path: Path to the saved model file (.pth).
        :param trainable: Whether the model's parameters are trainable.
        """
        super(CamemBERTBaseModel, self).__init__()
        # Charger le modèle sauvegardé
        self.state_dict = torch.load(model_path)
    
        self.base_model = torch.load_state_dict(self.state_dict)
        self.trainable = trainable
        
        # Configurer les paramètres comme non entraînables si spécifié
        if not trainable:
            for param in self.base_model.parameters():
                param.requires_grad = False
            self.base_model.eval()
        else:
            self.base_model.train()

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through the base model.
        :param input_ids: Tensor of input features.
        :param attention_mask: Tensor of attention masks.
        :return: Output of the base model.
        """
        # Le comportement exact dépend du modèle sauvegardé
        outputs = self.base_model(input_ids, attention_mask)
        return outputs

    def get_hidden_size(self) -> int:
        """
        Return the hidden size of the model.
        :return: Hidden size as an integer.
        """
        # Cette méthode doit être adaptée pour retourner la dimension de sortie
        # Assurez-vous que votre modèle a une propriété ou un attribut correspondant
        return self.base_model.hidden_size if hasattr(self.base_model, "hidden_size") else 768



In [51]:
from src.model import CamembertModel, CamembertConfig
config = CamembertConfig()
model = CamembertModel(config)

model_path = r"C:\Users\Napster\Desktop\M2_ISI\MLA\CamemBERT\MLA-CamemBERT\models\checkpoint_epoch_4.pth"
state_dict = torch.load(model_path)
model.load_state_dict(state_dict)

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:
class NLIFinetuningModel(nn.Module):
    def __init__(self, base_model, num_labels: int = 3):
        """
        Initialize the NLI fine-tuning model.
        :param base_model: Instance of the base model.
        :param num_labels: Number of labels for NLI.
        """
        super(NLIFinetuningModel, self).__init__()
        self.base_model = base_model

        self.hidden_size = base_model.get_hidden_size()
        self.num_labels = num_labels

        self.nli_head = nn.Linear(self.hidden_size, num_labels)

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor, labels: torch.Tensor = None):
        """
        Forward pass for NLI fine-tuning.
        :param input_ids: Tensor of input features.
        :param attention_mask: Tensor of attention masks.
        :param labels: Optional tensor of labels (batch_size).
        :return: Dictionary containing logits and optionally loss.
        """
        # Get last hidden states from the base model
        hidden_states = self.base_model(input_ids=input_ids, attention_mask=attention_mask)

        # Extract the [CLS] token's representation or equivalent
        cls_output = hidden_states[:, 0, :]  # Shape: (batch_size, hidden_size)

        # Pass through the NLI head
        logits = self.nli_head(cls_output)  # Shape: (batch_size, num_labels)

        # Compute loss if labels are provided
        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return {"logits": logits, "loss": loss}


In [10]:
class NLI(pl.LightningModule):
    def __init__(self, model, lr=5e-5):
        """
        NLI model for training with PyTorch Lightning.
        :param model: Instance of the fine-tuning model.
        :param lr: Learning rate.
        """
        super(NLI, self).__init__()
        self.model = model
        self.lr = lr

        # Accuracy metrics for training and validation
        self.train_accuracy = Accuracy(task="multiclass", num_classes=3)
        self.val_accuracy = Accuracy(task="multiclass", num_classes=3)

        # Metrics tracked per step
        self.train_losses_step = []
        self.train_accuracies_step = []
        self.val_losses_step = []
        self.val_accuracies_step = []

        # Metrics tracked per epoch
        self.train_losses_epoch = []
        self.train_accuracies_epoch = []
        self.val_losses_epoch = []
        self.val_accuracies_epoch = []

    def forward(self, batch):
        """
        Forward pass for inference.
        :param batch: Input batch containing input IDs, attention masks, and labels.
        :return: Model logits.
        """
        input_ids, attention_mask, labels = batch
        outputs = self.model(input_ids, attention_mask, labels)
        return outputs["logits"]

    def training_step(self, batch, batch_index):
        """
        Performs a single training step.
        :param batch: Input batch containing input IDs, attention masks, and labels.
        :param batch_index: Index of the batch.
        :return: Training loss.
        """
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["label"]

        outputs = self.model(input_ids, attention_mask, labels)
        loss = outputs["loss"]

        # Compute accuracy
        preds = torch.argmax(outputs["logits"], dim=1)
        acc = self.train_accuracy(preds, labels)

        # Store step metrics
        self.train_losses_step.append(loss.item())
        self.train_accuracies_step.append(acc.item())

        # Log metrics for progress bar
        self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=False)
        self.log("train_acc", acc, prog_bar=True, on_step=True, on_epoch=False)

        return loss

    def on_train_epoch_end(self):
        """
        Computes and stores epoch-level training metrics at the end of each epoch.
        """
        avg_loss = torch.tensor(self.train_losses_step).mean().item()
        avg_acc = torch.tensor(self.train_accuracies_step).mean().item()

        self.train_losses_epoch.append(avg_loss)
        self.train_accuracies_epoch.append(avg_acc)

        # Display epoch results
        print(f"[Epoch {self.current_epoch}] Train Loss: {avg_loss:.4f}, Train Accuracy: {avg_acc:.4f}")

        # Clear step metrics to prepare for the next epoch
        self.train_losses_step.clear()
        self.train_accuracies_step.clear()

    def validation_step(self, batch, batch_index):
        """
        Performs a single validation step.
        :param batch: Input batch containing input IDs, attention masks, and labels.
        :param batch_index: Index of the batch.
        :return: Validation loss.
        """
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["label"]

        outputs = self.model(input_ids, attention_mask, labels)
        loss = outputs["loss"]

        # Compute accuracy
        preds = torch.argmax(outputs["logits"], dim=1)
        acc = self.val_accuracy(preds, labels)

        # Store step metrics
        self.val_losses_step.append(loss.item())
        self.val_accuracies_step.append(acc.item())

        # Log metrics for progress bar
        self.log("val_loss", loss, prog_bar=True, on_step=False, on_epoch=True)
        self.log("val_acc", acc, prog_bar=True, on_step=False, on_epoch=True)

        return loss

    def on_validation_epoch_end(self):
        """
        Computes and stores epoch-level validation metrics at the end of each epoch.
        """
        avg_loss = torch.tensor(self.val_losses_step).mean().item()
        avg_acc = torch.tensor(self.val_accuracies_step).mean().item()

        self.val_losses_epoch.append(avg_loss)
        self.val_accuracies_epoch.append(avg_acc)

        # Display epoch results
        print(f"[Epoch {self.current_epoch}] Val Loss: {avg_loss:.4f}, Val Accuracy: {avg_acc:.4f}")

        # Clear step metrics to prepare for the next epoch
        self.val_losses_step.clear()
        self.val_accuracies_step.clear()

    def configure_optimizers(self):
        """
        Configures the optimizer and learning rate scheduler.
        :return: Dictionary containing the optimizer and scheduler configurations.
        """
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.lr)

        # Dynamically calculate the total number of steps
        steps_per_epoch = 1534
        total_steps = steps_per_epoch * self.trainer.max_epochs

        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer,
            max_lr=self.lr,
            total_steps=total_steps,
            pct_start=0.1,
            anneal_strategy="linear",
        )
        return {"optimizer": optimizer, "lr_scheduler": {"scheduler": scheduler, "interval": "step"}}

In [27]:
import os
# os.chdir("../../../../../data/xnli")
os.getcwd()

'c:\\Users\\Napster\\Desktop\\M2_ISI\\MLA\\CamemBERT\\MLA-CamemBERT'

In [28]:
steps_per_epoch = len(train_loader)

# Define the path to the pre-trained model
model_path = "models/4gb_oscar"  # Path to the Hugging Face pre-trained model

# Step 1: Create the fine-tuning model
nli_camembert = NLIFinetuningModel(
    base_model=CamemBERTBaseModel(model_path, trainable=True),  # Use a trainable base model
    num_labels=3  # Classes: entailment, neutral, contradiction
)

# Step 2: Configure the PyTorch Lightning module for training
nb_epochs = 3
nb_steps_per_epoch = len(train_loader)
pl_camembert = NLI(
    model=nli_camembert
)

# Step 3: Set up model checkpointing
checkpoint_callback = ModelCheckpoint(
    monitor="val_loss",           # Metric to monitor (validation loss)
    dirpath="checkpoints/",       # Directory for saving model checkpoints
    filename="nli-{epoch:02d}-{val_loss:.2f}",  # Format for checkpoint filenames
    save_top_k=2,                 # Save only the top 2 models with the best validation loss
    mode="min"                    # Minimize the monitored metric
)

# Step 4: Set up TensorBoard logging
logger = TensorBoardLogger(
    save_dir="my_logs",           # Directory for saving logs
    name="final-experiment"       # Name for the experiment
)

# Step 5: Initialize the PyTorch Lightning Trainer
trainer = pl.Trainer(
    max_epochs=5,                 # Number of training epochs
    accelerator="cpu",            # Use GPU for training (if available)
    devices=1,                    # Number of GPUs to use
    callbacks=[checkpoint_callback],  # Add checkpointing callback
    logger=logger                 # Use the configured TensorBoard logger
)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [29]:
xnli_train_dataset = XNLIDataset(split="train", language="fr", cache_directory=data_path, max_length=64)
xnli_val_dataset = XNLIDataset(split="validation", language="fr", cache_directory=data_path, max_length=64)

train_loader = DataLoader(xnli_train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(xnli_val_dataset, batch_size=256, shuffle=False)

steps_per_epoch = len(train_loader)
steps_per_epoch

1534

In [None]:
trainer.fit(pl_camembert, train_loader, val_loader)