# Reconnaissance Automatique d'Événements dans des Textes

Ce notebook permet d'exécuter le pipeline complet de reconnaissance d'événements en utilisant des modèles BERT fine-tunés. Il est conçu pour fonctionner avec des annotations au format WebAnno TSV 3.3.

## Types d'événements reconnus
- Conflit
- Décision gouvernementale
- Décès
- Avancée technologique
- Événement culturel

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 1. Configuration de l'environnement

### 1.1 Monter Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Définir le chemin vers le dossier d'annotations
# Modifiez ce chemin selon l'emplacement de votre dossier dans Drive
ANNOTATIONS_DIR = '/content/drive/MyDrive/annotation'

# Créer un dossier pour les sorties
OUTPUT_DIR = '/content/output'
!mkdir -p {OUTPUT_DIR}

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 1.2 Installation des dépendances

In [None]:
!pip install transformers datasets torch torchcrf scikit-learn matplotlib seaborn pandas tqdm

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting torchcrf
  Downloading TorchCRF-1.1.0-py3-none-any.whl.metadata (2.3 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)


In [None]:
!pip install datasets==3.5.0 torchcrf==1.1.0



## 2. Implémentation des modules

### 2.1 Module de prétraitement

In [None]:
import os
import re
import pandas as pd
import numpy as np
from collections import defaultdict
from typing import List, Dict, Tuple, Set, Optional
import glob

# Définition des types d'événements
EVENT_TYPES = [
    "conflit",
    "décision gouvernementale",
    "décès",
    "avancée technologique",
    "événement culturel"
]

class TSVAnnotationParser:
    """
    Classe pour parser les fichiers d'annotation au format WebAnno TSV 3.3
    et extraire les annotations d'événements.
    """

    def __init__(self, event_types: List[str] = EVENT_TYPES):
        """
        Initialise le parser avec les types d'événements à reconnaître.

        Args:
            event_types: Liste des types d'événements à reconnaître
        """
        self.event_types = event_types

    def parse_tsv_file(self, file_path: str) -> Dict:
        """
        Parse un fichier TSV et extrait le texte et les annotations.

        Args:
            file_path: Chemin vers le fichier TSV

        Returns:
            Un dictionnaire contenant le texte et les annotations
        """
        with open(file_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()

        # Extraire l'en-tête et le format
        header = []
        i = 0
        while i < len(lines) and not lines[i].startswith('#Text='):
            header.append(lines[i].strip())
            i += 1

        # Initialiser les structures de données
        sentences = []
        current_sentence = {'text': '', 'tokens': [], 'annotations': []}

        # Parser le contenu
        while i < len(lines):
            line = lines[i].strip()

            # Nouvelle phrase
            if line.startswith('#Text='):
                if current_sentence['tokens']:
                    sentences.append(current_sentence)
                    current_sentence = {'text': '', 'tokens': [], 'annotations': []}

                current_sentence['text'] = line[6:]  # Extraire le texte après '#Text='

            # Ligne de token
            elif line and not line.startswith('#'):
                parts = line.split('\t')
                if len(parts) >= 3:  # Au moins l'ID, les positions et le texte
                    token_id = parts[0]
                    token_span = parts[1]
                    token_text = parts[2]

                    # Extraire les annotations d'événements si présentes
                    event_annotation = None
                    if len(parts) >= 4 and parts[3] != '_':
                        # Vérifier si l'annotation contient un indice (ex: "Décision gouvernementale[1]")
                        match = re.match(r'(.+?)(?:\[(\d+)\])?$', parts[3])
                        if match:
                            event_type = match.group(1)
                            event_index = match.group(2)

                            if event_type in self.event_types:
                                event_annotation = {
                                    'type': event_type,
                                    'index': int(event_index) if event_index else None
                                }

                    # Ajouter le token et son annotation
                    token_info = {
                        'id': token_id,
                        'span': token_span,
                        'text': token_text,
                        'event': event_annotation
                    }
                    current_sentence['tokens'].append(token_info)

                    # Si le token a une annotation, l'ajouter à la liste des annotations
                    if event_annotation:
                        start, end = map(int, token_span.split('-'))
                        annotation = {
                            'token_id': token_id,
                            'start': start,
                            'end': end,
                            'text': token_text,
                            'type': event_annotation['type'],
                            'index': event_annotation['index']
                        }
                        current_sentence['annotations'].append(annotation)

            i += 1

        # Ajouter la dernière phrase
        if current_sentence['tokens']:
            sentences.append(current_sentence)

        # Regrouper les annotations multi-tokens
        for sentence in sentences:
            self._group_multi_token_annotations(sentence)

        return {
            'header': header,
            'sentences': sentences
        }

    def _group_multi_token_annotations(self, sentence: Dict) -> None:
        """
        Regroupe les annotations qui s'étendent sur plusieurs tokens.

        Args:
            sentence: Dictionnaire contenant les informations d'une phrase
        """
        # Regrouper par index d'annotation
        grouped_annotations = defaultdict(list)

        for annotation in sentence['annotations']:
            if annotation['index'] is not None:
                key = (annotation['type'], annotation['index'])
                grouped_annotations[key].append(annotation)
            else:
                # Les annotations sans index sont déjà des entités complètes
                key = (annotation['type'], f"single_{annotation['token_id']}")
                grouped_annotations[key].append(annotation)

        # Créer des annotations multi-tokens
        multi_token_annotations = []

        for (event_type, _), annotations in grouped_annotations.items():
            if len(annotations) > 0:
                # Trier par position de début
                sorted_annotations = sorted(annotations, key=lambda x: x['start'])

                # Créer une annotation multi-tokens
                start = sorted_annotations[0]['start']
                end = sorted_annotations[-1]['end']
                text = ' '.join(a['text'] for a in sorted_annotations)
                token_ids = [a['token_id'] for a in sorted_annotations]

                multi_token_annotation = {
                    'token_ids': token_ids,
                    'start': start,
                    'end': end,
                    'text': text,
                    'type': event_type
                }

                multi_token_annotations.append(multi_token_annotation)

        # Remplacer les annotations
        sentence['multi_token_annotations'] = multi_token_annotations


class BIOConverter:
    """
    Classe pour convertir les annotations en format BIO (Beginning, Inside, Outside).
    """

    def __init__(self, event_types: List[str] = EVENT_TYPES):
        """
        Initialise le convertisseur avec les types d'événements à reconnaître.

        Args:
            event_types: Liste des types d'événements à reconnaître
        """
        self.event_types = event_types

    def convert_to_bio(self, parsed_data: Dict) -> List[Dict]:
        """
        Convertit les annotations en format BIO.

        Args:
            parsed_data: Données parsées par TSVAnnotationParser

        Returns:
            Liste de dictionnaires contenant les tokens et leurs tags BIO
        """
        bio_sentences = []

        for sentence in parsed_data['sentences']:
            bio_sentence = {
                'text': sentence['text'],
                'tokens': [],
                'bio_tags': []
            }

            # Initialiser tous les tokens comme "O" (Outside)
            for token in sentence['tokens']:
                bio_sentence['tokens'].append(token['text'])
                bio_sentence['bio_tags'].append('O')

            # Mettre à jour les tags pour les annotations multi-tokens
            for annotation in sentence.get('multi_token_annotations', []):
                event_type = annotation['type']
                token_ids = annotation['token_ids']

                # Convertir les IDs de tokens en indices (0-based)
                token_indices = []
                for token_id in token_ids:
                    for i, token in enumerate(sentence['tokens']):
                        if token['id'] == token_id:
                            token_indices.append(i)
                            break

                # Assigner les tags B et I
                for i, idx in enumerate(token_indices):
                    if i == 0:
                        bio_sentence['bio_tags'][idx] = f'B-{event_type}'
                    else:
                        bio_sentence['bio_tags'][idx] = f'I-{event_type}'

            bio_sentences.append(bio_sentence)

        return bio_sentences


class DatasetBuilder:
    """
    Classe pour construire un dataset à partir des annotations au format BIO.
    """

    def __init__(self, event_types: List[str] = EVENT_TYPES):
        """
        Initialise le builder avec les types d'événements à reconnaître.

        Args:
            event_types: Liste des types d'événements à reconnaître
        """
        self.event_types = event_types
        self.parser = TSVAnnotationParser(event_types)
        self.converter = BIOConverter(event_types)

    def build_from_directory(self, directory: str, pattern: str = "*/MBY3.tsv") -> List[Dict]:
        """
        Construit un dataset à partir des fichiers TSV dans un répertoire.

        Args:
            directory: Chemin vers le répertoire contenant les fichiers TSV
            pattern: Pattern pour trouver les fichiers TSV (par défaut: "*/MBY3.tsv")

        Returns:
            Liste de dictionnaires contenant les phrases et leurs annotations BIO
        """
        dataset = []

        # Trouver tous les fichiers TSV correspondant au pattern
        tsv_files = glob.glob(os.path.join(directory, pattern))

        for tsv_file in tsv_files:
            # Parser le fichier TSV
            parsed_data = self.parser.parse_tsv_file(tsv_file)

            # Convertir en format BIO
            bio_sentences = self.converter.convert_to_bio(parsed_data)

            # Ajouter au dataset
            for bio_sentence in bio_sentences:
                dataset.append({
                    'file': tsv_file,
                    'text': bio_sentence['text'],
                    'tokens': bio_sentence['tokens'],
                    'bio_tags': bio_sentence['bio_tags']
                })

        return dataset

    def split_dataset(self, dataset: List[Dict], train_ratio: float = 0.7, val_ratio: float = 0.15,
                     test_ratio: float = 0.15, random_seed: int = 42) -> Tuple[List[Dict], List[Dict], List[Dict]]:
        """
        Divise le dataset en ensembles d'entraînement, de validation et de test.

        Args:
            dataset: Liste de dictionnaires contenant les phrases et leurs annotations BIO
            train_ratio: Proportion de données pour l'entraînement
            val_ratio: Proportion de données pour la validation
            test_ratio: Proportion de données pour le test
            random_seed: Graine aléatoire pour la reproductibilité

        Returns:
            Tuple contenant les ensembles d'entraînement, de validation et de test
        """
        assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-10, "Les ratios doivent sommer à 1"

        # Mélanger le dataset
        np.random.seed(random_seed)
        indices = np.random.permutation(len(dataset))

        # Calculer les indices de séparation
        train_idx = int(len(dataset) * train_ratio)
        val_idx = train_idx + int(len(dataset) * val_ratio)

        # Diviser le dataset
        train_data = [dataset[i] for i in indices[:train_idx]]
        val_data = [dataset[i] for i in indices[train_idx:val_idx]]
        test_data = [dataset[i] for i in indices[val_idx:]]

        return train_data, val_data, test_data

### 2.2 Module de tokenisation et d'encodage

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split

class BERTTokenizerForEventRecognition:
    """
    Classe pour tokeniser les textes et aligner les annotations BIO avec les tokens BERT.
    """

    def __init__(self, model_name: str = "camembert-base", max_length: int = 128):
        """
        Initialise le tokenizer BERT.

        Args:
            model_name: Nom du modèle BERT à utiliser
            max_length: Longueur maximale des séquences
        """
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_length = max_length

    def tokenize_and_align_labels(self, text: str, tokens: List[str], bio_tags: List[str]) -> Dict:
        """
        Tokenise le texte et aligne les annotations BIO avec les tokens BERT.

        Args:
            text: Texte brut
            tokens: Liste des tokens originaux
            bio_tags: Liste des tags BIO correspondant aux tokens originaux

        Returns:
            Dictionnaire contenant les tokens BERT et les tags BIO alignés
        """
        tokenized_inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt",
            return_offsets_mapping=True,
            return_special_tokens_mask=True,
        )

        # Créer une liste de tags BIO alignés avec les tokens BERT
        aligned_labels = []

        # Obtenir les positions des tokens originaux dans le texte
        token_positions = []
        current_pos = 0
        for token in tokens:
            start = text.find(token, current_pos)
            if start == -1:  # Si le token n'est pas trouvé, essayer avec une recherche moins stricte
                for i in range(current_pos, len(text)):
                    if text[i:i+len(token)].lower() == token.lower():
                        start = i
                        break

            if start != -1:
                end = start + len(token)
                token_positions.append((start, end))
                current_pos = end
            else:
                # Si le token n'est toujours pas trouvé, utiliser la position actuelle
                token_positions.append((current_pos, current_pos + len(token)))
                current_pos += len(token)

        # Aligner les tags BIO avec les tokens BERT
        offset_mapping = tokenized_inputs.pop("offset_mapping")[0].numpy()
        special_tokens_mask = tokenized_inputs.pop("special_tokens_mask")[0].numpy()

        for i, (offset_start, offset_end) in enumerate(offset_mapping):
            # Ignorer les tokens spéciaux ([CLS], [SEP], etc.)
            if special_tokens_mask[i] == 1:
                aligned_labels.append("O")
                continue

            # Trouver le token original qui correspond à ce token BERT
            token_idx = None
            for j, (token_start, token_end) in enumerate(token_positions):
                if offset_start >= token_start and offset_end <= token_end:
                    token_idx = j
                    break

            # Si un token original est trouvé, utiliser son tag BIO
            if token_idx is not None:
                # Pour les tokens BERT qui sont des sous-mots (commençant par ##),
                # utiliser "I-" au lieu de "B-" s'ils ne sont pas le premier sous-mot
                current_tag = bio_tags[token_idx]

                # Si ce n'est pas le premier token BERT pour ce token original
                # et que le tag commence par "B-", le remplacer par "I-"
                if i > 0 and offset_mapping[i-1][1] > 0 and token_idx == self._find_token_idx(offset_mapping[i-1], token_positions):
                    if current_tag.startswith("B-"):
                        current_tag = "I-" + current_tag[2:]

                aligned_labels.append(current_tag)
            else:
                # Si aucun token original ne correspond, utiliser "O"
                aligned_labels.append("O")

        # Tronquer ou remplir la liste des labels pour qu'elle ait la même longueur que les tokens BERT
        if len(aligned_labels) < self.max_length:
            aligned_labels.extend(["O"] * (self.max_length - len(aligned_labels)))
        else:
            aligned_labels = aligned_labels[:self.max_length]

        return {
            "input_ids": tokenized_inputs["input_ids"][0],
            "attention_mask": tokenized_inputs["attention_mask"][0],
            "token_type_ids": tokenized_inputs["token_type_ids"][0],
            "labels": aligned_labels
        }

    def _find_token_idx(self, offset: Tuple[int, int], token_positions: List[Tuple[int, int]]) -> Optional[int]:
        """
        Trouve l'indice du token original qui correspond à un offset donné.

        Args:
            offset: Tuple (start, end) représentant l'offset du token BERT
            token_positions: Liste de tuples (start, end) représentant les positions des tokens originaux

        Returns:
            Indice du token original ou None si aucun ne correspond
        """
        offset_start, offset_end = offset

        for i, (token_start, token_end) in enumerate(token_positions):
            if offset_start >= token_start and offset_end <= token_end:
                return i

        return None


class EventRecognitionDataset(Dataset):
    """
    Dataset PyTorch pour la reconnaissance d'événements.
    """

    def __init__(self, encoded_data: List[Dict], label_map: Dict[str, int]):
        """
        Initialise le dataset.

        Args:
            encoded_data: Liste de dictionnaires contenant les données encodées
            label_map: Dictionnaire mappant les tags BIO aux indices numériques
        """
        self.encoded_data = encoded_data
        self.label_map = label_map

    def __len__(self) -> int:
        """
        Retourne la taille du dataset.

        Returns:
            Nombre d'exemples dans le dataset
        """
        return len(self.encoded_data)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Retourne un exemple du dataset.

        Args:
            idx: Indice de l'exemple

        Returns:
            Dictionnaire contenant les tenseurs pour l'entraînement
        """
        item = self.encoded_data[idx]

        # Convertir les labels en indices numériques
        labels = [self.label_map.get(label, 0) for label in item["labels"]]

        return {
            "input_ids": item["input_ids"],
            "attention_mask": item["attention_mask"],
            "token_type_ids": item["token_type_ids"],
            "labels": torch.tensor(labels, dtype=torch.long)
        }


class BERTDataProcessor:
    """
    Classe pour traiter les données et les préparer pour l'entraînement avec BERT.
    """

    def __init__(self, model_name: str = "camembert-base", max_length: int = 128,
                event_types: List[str] = EVENT_TYPES):
        """
        Initialise le processeur de données.

        Args:
            model_name: Nom du modèle BERT à utiliser
            max_length: Longueur maximale des séquences
            event_types: Liste des types d'événements à reconnaître
        """
        self.tokenizer_wrapper = BERTTokenizerForEventRecognition(model_name, max_length)
        self.event_types = event_types

        # Créer le mapping des labels
        self.label_map = {"O": 0}
        idx = 1
        for event_type in event_types:
            self.label_map[f"B-{event_type}"] = idx
            idx += 1
            self.label_map[f"I-{event_type}"] = idx
            idx += 1

        self.id_to_label = {v: k for k, v in self.label_map.items()}

    def process_bio_data(self, bio_data: List[Dict]) -> Tuple[EventRecognitionDataset, EventRecognitionDataset, EventRecognitionDataset]:
        """
        Traite les données BIO et crée les datasets pour l'entraînement, la validation et le test.

        Args:
            bio_data: Liste de dictionnaires contenant les phrases et leurs annotations BIO

        Returns:
            Tuple contenant les datasets d'entraînement, de validation et de test
        """
        # Encoder les données
        encoded_data = []

        for item in bio_data:
            encoded_item = self.tokenizer_wrapper.tokenize_and_align_labels(
                item["text"],
                item["tokens"],
                item["bio_tags"]
            )
            encoded_data.append(encoded_item)

        # Diviser les données en ensembles d'entraînement, de validation et de test
        train_data, temp_data = train_test_split(encoded_data, test_size=0.3, random_state=42)
        val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

        # Créer les datasets
        train_dataset = EventRecognitionDataset(train_data, self.label_map)
        val_dataset = EventRecognitionDataset(val_data, self.label_map)
        test_dataset = EventRecognitionDataset(test_data, self.label_map)

        return train_dataset, val_dataset, test_dataset

    def create_data_loaders(self, train_dataset: EventRecognitionDataset, val_dataset: EventRecognitionDataset,
                           test_dataset: EventRecognitionDataset, batch_size: int = 16) -> Tuple[DataLoader, DataLoader, DataLoader]:
        """
        Crée les data loaders pour l'entraînement, la validation et le test.

        Args:
            train_dataset: Dataset d'entraînement
            val_dataset: Dataset de validation
            test_dataset: Dataset de test
            batch_size: Taille des batchs

        Returns:
            Tuple contenant les data loaders d'entraînement, de validation et de test
        """
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size)
        test_loader = DataLoader(test_dataset, batch_size=batch_size)

        return train_loader, val_loader, test_loader

    def get_label_map(self) -> Dict[str, int]:
        """
        Retourne le mapping des labels.

        Returns:
            Dictionnaire mappant les tags BIO aux indices numériques
        """
        return self.label_map

    def get_id_to_label(self) -> Dict[int, str]:
        """
        Retourne le mapping inverse des labels.

        Returns:
            Dictionnaire mappant les indices numériques aux tags BIO
        """
        return self.id_to_label

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer
from sklearn.model_selection import train_test_split

class BERTTokenizerForEventRecognition:
    """
    Classe pour tokeniser les textes et aligner les annotations BIO avec les tokens BERT.
    """

    def __init__(self, model_name: str = "camembert-base", max_length: int = 128):
        """
        Initialise le tokenizer BERT.

        Args:
            model_name: Nom du modèle BERT à utiliser
            max_length: Longueur maximale des séquences
        """
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_length = max_length

    def tokenize_and_align_labels(self, text: str, tokens: List[str], bio_tags: List[str]) -> Dict:
        """
        Tokenise le texte et aligne les annotations BIO avec les tokens BERT.

        Args:
            text: Texte brut
            tokens: Liste des tokens originaux
            bio_tags: Liste des tags BIO correspondant aux tokens originaux

        Returns:
            Dictionnaire contenant les tokens BERT et les tags BIO alignés
        """
        tokenized_inputs = self.tokenizer(
            text,
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt",
            return_offsets_mapping=True,
            return_special_tokens_mask=True,
        )

        # Créer une liste de tags BIO alignés avec les tokens BERT
        aligned_labels = []

        # Obtenir les positions des tokens originaux dans le texte
        token_positions = []
        current_pos = 0
        for token in tokens:
            start = text.find(token, current_pos)
            if start == -1:  # Si le token n'est pas trouvé, essayer avec une recherche moins stricte
                for i in range(current_pos, len(text)):
                    if text[i:i+len(token)].lower() == token.lower():
                        start = i
                        break

            if start != -1:
                end = start + len(token)
                token_positions.append((start, end))
                current_pos = end
            else:
                # Si le token n'est toujours pas trouvé, utiliser la position actuelle
                token_positions.append((current_pos, current_pos + len(token)))
                current_pos += len(token)

        # Aligner les tags BIO avec les tokens BERT
        offset_mapping = tokenized_inputs.pop("offset_mapping")[0].numpy()
        special_tokens_mask = tokenized_inputs.pop("special_tokens_mask")[0].numpy()

        for i, (offset_start, offset_end) in enumerate(offset_mapping):
            # Ignorer les tokens spéciaux ([CLS], [SEP], etc.)
            if special_tokens_mask[i] == 1:
                aligned_labels.append("O")
                continue

            # Trouver le token original qui correspond à ce token BERT
            token_idx = None
            for j, (token_start, token_end) in enumerate(token_positions):
                if offset_start >= token_start and offset_end <= token_end:
                    token_idx = j
                    break

            # Si un token original est trouvé, utiliser son tag BIO
            if token_idx is not None:
                # Pour les tokens BERT qui sont des sous-mots (commençant par ##),
                # utiliser "I-" au lieu de "B-" s'ils ne sont pas le premier sous-mot
                current_tag = bio_tags[token_idx]

                # Si ce n'est pas le premier token BERT pour ce token original
                # et que le tag commence par "B-", le remplacer par "I-"
                if i > 0 and offset_mapping[i-1][1] > 0 and token_idx == self._find_token_idx(offset_mapping[i-1], token_positions):
                    if current_tag.startswith("B-"):
                        current_tag = "I-" + current_tag[2:]

                aligned_labels.append(current_tag)
            else:
                # Si aucun token original ne correspond, utiliser "O"
                aligned_labels.append("O")

        # Tronquer ou remplir la liste des labels pour qu'elle ait la même longueur que les tokens BERT
        if len(aligned_labels) < self.max_length:
            aligned_labels.extend(["O"] * (self.max_length - len(aligned_labels)))
        else:
            aligned_labels = aligned_labels[:self.max_length]

        # Manually create token_type_ids as a tensor of zeros
        token_type_ids = torch.zeros_like(tokenized_inputs["input_ids"][0])

        return {
            "input_ids": tokenized_inputs["input_ids"][0],
            "attention_mask": tokenized_inputs["attention_mask"][0],
            "token_type_ids": token_type_ids,  # Use the created token_type_ids
            "labels": aligned_labels
        }

    def _find_token_idx(self, offset: Tuple[int, int], token_positions: List[Tuple[int, int]]) -> Optional[int]:
        """
        Trouve l'indice du token original qui correspond à un offset donné.

        Args:
            offset: Tuple (start, end) représentant l'offset du token BERT
            token_positions: Liste de tuples (start, end) représentant les positions des tokens originaux

        Returns:
            Indice du token original ou None si aucun ne correspond
        """
        offset_start, offset_end = offset

        for i, (token_start, token_end) in enumerate(token_positions):
            if offset_start >= token_start and offset_end <= token_end:
                return i

        return None

In [None]:
!pip uninstall -y torchcrf
!pip install git+https://github.com/kmkurn/pytorch-crf.git


Found existing installation: TorchCRF 1.1.0
Uninstalling TorchCRF-1.1.0:
  Successfully uninstalled TorchCRF-1.1.0
Collecting git+https://github.com/kmkurn/pytorch-crf.git
  Cloning https://github.com/kmkurn/pytorch-crf.git to /tmp/pip-req-build-yzojvzcu
  Running command git clone --filter=blob:none --quiet https://github.com/kmkurn/pytorch-crf.git /tmp/pip-req-build-yzojvzcu
  Resolved https://github.com/kmkurn/pytorch-crf.git to commit 623e3402d00a2728e99d6e8486010d67c754267b
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pytorch-crf
  Building wheel for pytorch-crf (setup.py) ... [?25l[?25hdone
  Created wheel for pytorch-crf: filename=pytorch_crf-0.7.2-py3-none-any.whl size=6410 sha256=085178c1f147e8958dfb2f9063e3b9333678cdb4e4a2c56e5b36c4597ff4e2cb
  Stored in directory: /tmp/pip-ephem-wheel-cache-pol31imo/wheels/fd/83/cc/f11543939f8911b8dcff86e2bd54423e21f779d0938958cc7f
Successfully built pytorch-crf
Installing collected packages:

### 2.3 Module de modélisation BERT

In [None]:

import torch.nn as nn
from torch.nn import CrossEntropyLoss
from transformers import BertModel, BertPreTrainedModel, BertConfig
from torchcrf import CRF
from typing import Union

class BERTForEventRecognition(BertPreTrainedModel):
    """
    Modèle BERT pour la reconnaissance d'événements basé sur une architecture de type NER.
    """

    def __init__(self, config, num_labels: int):
        """
        Initialise le modèle BERT pour la reconnaissance d'événements.

        Args:
            config: Configuration du modèle BERT
            num_labels: Nombre de labels (classes) pour la classification
        """
        super().__init__(config)
        self.num_labels = num_labels

        # Modèle BERT de base
        self.bert = BertModel(config)

        # Dropout pour la régularisation
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

        # Couche de classification
        self.classifier = nn.Linear(config.hidden_size, num_labels)

        # Initialisation des poids
        self.init_weights()

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None,
               position_ids=None, head_mask=None, inputs_embeds=None, labels=None):
        """
        Passe avant du modèle.

        Args:
            input_ids: Indices des tokens d'entrée
            attention_mask: Masque d'attention
            token_type_ids: Indices des types de tokens
            position_ids: Indices de position
            head_mask: Masque pour les têtes d'attention
            inputs_embeds: Embeddings d'entrée
            labels: Labels pour le calcul de la perte

        Returns:
            Tuple contenant la perte et les logits
        """
        # Obtenir les embeddings de BERT
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds
        )

        # Extraire les embeddings de la dernière couche cachée
        sequence_output = outputs[0]

        # Appliquer le dropout
        sequence_output = self.dropout(sequence_output)

        # Calculer les logits
        logits = self.classifier(sequence_output)

        # Calculer la perte si les labels sont fournis
        loss = None
        if labels is not None:
            loss_fct = CrossEntropyLoss()

            # Masquer les positions de padding
            active_loss = attention_mask.view(-1) == 1
            active_logits = logits.view(-1, self.num_labels)
            active_labels = torch.where(
                active_loss,
                labels.view(-1),
                torch.tensor(loss_fct.ignore_index).type_as(labels)
            )

            loss = loss_fct(active_logits, active_labels)

        return (loss, logits) if loss is not None else logits


class BERTCRFForEventRecognition(BertPreTrainedModel):
    """
    Modèle BERT avec CRF pour la reconnaissance d'événements.
    """

    def __init__(self, config, num_labels: int):
        """
        Initialise le modèle BERT avec CRF pour la reconnaissance d'événements.

        Args:
            config: Configuration du modèle BERT
            num_labels: Nombre de labels (classes) pour la classification
        """
        super().__init__(config)
        self.num_labels = num_labels

        # Modèle BERT de base
        self.bert = BertModel(config)

        # Dropout pour la régularisation
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

        # Couche de classification
        self.classifier = nn.Linear(config.hidden_size, num_labels)

        # Couche CRF
        self.crf = CRF(num_labels, batch_first=True)

        # Initialisation des poids
        self.init_weights()

    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None,
               position_ids=None, head_mask=None, inputs_embeds=None, labels=None):
        """
        Passe avant du modèle.

        Args:
            input_ids: Indices des tokens d'entrée
            attention_mask: Masque d'attention
            token_type_ids: Indices des types de tokens
            position_ids: Indices de position
            head_mask: Masque pour les têtes d'attention
            inputs_embeds: Embeddings d'entrée
            labels: Labels pour le calcul de la perte

        Returns:
            Tuple contenant la perte et les logits
        """
        # Obtenir les embeddings de BERT
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds
        )

        # Extraire les embeddings de la dernière couche cachée
        sequence_output = outputs[0]

        # Appliquer le dropout
        sequence_output = self.dropout(sequence_output)

        # Calculer les logits
        emissions = self.classifier(sequence_output)

        # Calculer la perte si les labels sont fournis
        loss = None
        if labels is not None:
            # Masquer les positions de padding pour le CRF
            mask = attention_mask.type(torch.bool)

            # Calculer la log-vraisemblance négative
            loss = -self.crf(emissions, labels, mask=mask, reduction='mean')

        return (loss, emissions) if loss is not None else emissions

    def decode(self, emissions, mask=None):
        """
        Décode les émissions pour obtenir les meilleurs chemins (séquences de labels).

        Args:
            emissions: Émissions du modèle
            mask: Masque pour les positions valides

        Returns:
            Liste des meilleurs chemins
        """
        return self.crf.decode(emissions, mask=mask)


class BERTModelSelector:
    """
    Classe pour sélectionner et configurer un modèle BERT adapté à la tâche de reconnaissance d'événements.
    """

    def __init__(self, num_labels: int, use_crf: bool = True):
        """
        Initialise le sélecteur de modèle.

        Args:
            num_labels: Nombre de labels (classes) pour la classification
            use_crf: Utiliser une couche CRF pour améliorer les prédictions
        """
        self.num_labels = num_labels
        self.use_crf = use_crf

    def select_model(self, model_name: str = "camembert-base") -> Union[BERTForEventRecognition, BERTCRFForEventRecognition]:
        """
        Sélectionne et configure un modèle BERT adapté à la tâche de reconnaissance d'événements.

        Args:
            model_name: Nom du modèle BERT à utiliser

        Returns:
            Modèle BERT configuré pour la reconnaissance d'événements
        """
        # Charger la configuration du modèle
        config = BertConfig.from_pretrained(model_name)

        # Créer le modèle
        if self.use_crf:
            model = BERTCRFForEventRecognition.from_pretrained(
                model_name,
                config=config,
                num_labels=self.num_labels
            )
        else:
            model = BERTForEventRecognition.from_pretrained(
                model_name,
                config=config,
                num_labels=self.num_labels
            )

        return model

    @staticmethod
    def get_recommended_models() -> List[Dict[str, str]]:
        """
        Retourne une liste de modèles BERT recommandés pour la tâche de reconnaissance d'événements.

        Returns:
            Liste de dictionnaires contenant les informations sur les modèles recommandés
        """
        return [
            {
                "name": "camembert-base",
                "description": "Modèle BERT français, pré-entraîné sur un large corpus français",
                "advantages": "Excellente performance sur les textes français, meilleure compréhension des nuances linguistiques",
                "disadvantages": "Limité au français, peut être moins adapté si le corpus contient d'autres langues"
            },
            {
                "name": "bert-base-multilingual-cased",
                "description": "Modèle BERT multilingue (cased) pré-entraîné sur 104 langues, dont le français",
                "advantages": "Bonne performance sur les langues non-anglaises, adapté aux textes français",
                "disadvantages": "Moins performant que les modèles spécifiques au français sur certaines tâches"
            },
            {
                "name": "flaubert/flaubert_base_cased",
                "description": "Modèle BERT français alternatif, pré-entraîné sur un corpus français diversifié",
                "advantages": "Bonne performance sur les textes français, architecture optimisée",
                "disadvantages": "Limité au français, peut nécessiter plus de ressources computationnelles"
            },
            {
                "name": "xlm-roberta-base",
                "description": "Modèle RoBERTa multilingue, pré-entraîné sur 100 langues avec une architecture améliorée",
                "advantages": "Performances supérieures à BERT multilingue sur de nombreuses tâches, robuste aux variations linguistiques",
                "disadvantages": "Plus lourd en termes de ressources computationnelles"
            }
        ]

### 2.4 Module de fine-tuning

In [None]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score
import time
import json
from tqdm.notebook import tqdm
import logging

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class EarlyStopping:
    """
    Classe pour implémenter l'early stopping pendant l'entraînement.
    """

    def __init__(self, patience: int = 3, min_delta: float = 0.0, restore_best_weights: bool = True):
        """
        Initialise l'early stopping.

        Args:
            patience: Nombre d'époques à attendre après la dernière amélioration
            min_delta: Amélioration minimale pour être considérée comme une amélioration
            restore_best_weights: Restaurer les meilleurs poids à la fin
        """
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_score = None
        self.best_weights = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, score: float, model: torch.nn.Module) -> bool:
        """
        Vérifie si l'entraînement doit être arrêté.

        Args:
            score: Score actuel (plus élevé est meilleur)
            model: Modèle actuel

        Returns:
            True si l'entraînement doit être arrêté, False sinon
        """
        if self.best_score is None:
            self.best_score = score
            self.best_weights = {k: v.cpu().clone() for k, v in model.state_dict().items()}
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            logger.info(f"EarlyStopping counter: {self.counter} out of {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
                if self.restore_best_weights:
                    model.load_state_dict(self.best_weights)
                return True
        else:
            self.best_score = score
            self.best_weights = {k: v.cpu().clone() for k, v in model.state_dict().items()}
            self.counter = 0

        return False


class MetricsTracker:
    """
    Classe pour suivre les métriques pendant l'entraînement.
    """

    def __init__(self):
        """
        Initialise le tracker de métriques.
        """
        self.train_losses = []
        self.val_losses = []
        self.val_f1_scores = []
        self.val_precision_scores = []
        self.val_recall_scores = []
        self.learning_rates = []

    def update(self, train_loss: float, val_loss: float, val_f1: float, val_precision: float, val_recall: float, lr: float):
        """
        Met à jour les métriques.

        Args:
            train_loss: Perte d'entraînement
            val_loss: Perte de validation
            val_f1: F1-score de validation
            val_precision: Précision de validation
            val_recall: Rappel de validation
            lr: Taux d'apprentissage actuel
        """
        self.train_losses.append(train_loss)
        self.val_losses.append(val_loss)
        self.val_f1_scores.append(val_f1)
        self.val_precision_scores.append(val_precision)
        self.val_recall_scores.append(val_recall)
        self.learning_rates.append(lr)

    def plot_metrics(self, save_path: str = "training_metrics.png"):
        """
        Trace les métriques d'entraînement.

        Args:
            save_path: Chemin pour sauvegarder le graphique
        """
        plt.figure(figsize=(15, 10))

        # Tracer les pertes
        plt.subplot(2, 2, 1)
        plt.plot(self.train_losses, label="Train Loss")
        plt.plot(self.val_losses, label="Validation Loss")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.title("Training and Validation Loss")
        plt.legend()

        # Tracer les F1-scores
        plt.subplot(2, 2, 2)
        plt.plot(self.val_f1_scores, label="F1-Score")
        plt.xlabel("Epoch")
        plt.ylabel("F1-Score")
        plt.title("Validation F1-Score")
        plt.legend()

        # Tracer la précision et le rappel
        plt.subplot(2, 2, 3)
        plt.plot(self.val_precision_scores, label="Precision")
        plt.plot(self.val_recall_scores, label="Recall")
        plt.xlabel("Epoch")
        plt.ylabel("Score")
        plt.title("Validation Precision and Recall")
        plt.legend()

        # Tracer le taux d'apprentissage
        plt.subplot(2, 2, 4)
        plt.plot(self.learning_rates)
        plt.xlabel("Epoch")
        plt.ylabel("Learning Rate")
        plt.title("Learning Rate")

        plt.tight_layout()
        plt.savefig(save_path)
        plt.close()

    def save_metrics(self, save_path: str = "training_metrics.json"):
        """
        Sauvegarde les métriques dans un fichier JSON.

        Args:
            save_path: Chemin pour sauvegarder les métriques
        """
        metrics = {
            "train_losses": self.train_losses,
            "val_losses": self.val_losses,
            "val_f1_scores": self.val_f1_scores,
            "val_precision_scores": self.val_precision_scores,
            "val_recall_scores": self.val_recall_scores,
            "learning_rates": self.learning_rates
        }

        with open(save_path, "w") as f:
            json.dump(metrics, f, indent=4)


class BERTFineTuner:
    """
    Classe pour fine-tuner un modèle BERT pour la reconnaissance d'événements.
    """

    def __init__(self, model: torch.nn.Module, id_to_label: Dict[int, str], device: str = None):
        """
        Initialise le fine-tuner.

        Args:
            model: Modèle BERT à fine-tuner
            id_to_label: Dictionnaire mappant les indices numériques aux tags BIO
            device: Appareil sur lequel exécuter l'entraînement (cpu ou cuda)
        """
        self.model = model
        self.id_to_label = id_to_label
        self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

        logger.info(f"Using device: {self.device}")

    def train(self, train_dataloader: torch.utils.data.DataLoader, val_dataloader: torch.utils.data.DataLoader,
             epochs: int = 10, learning_rate: float = 2e-5, weight_decay: float = 0.01,
             warmup_steps: int = 0, max_grad_norm: float = 1.0, output_dir: str = "model_output"):
        """
        Fine-tune le modèle BERT.

        Args:
            train_dataloader: DataLoader pour les données d'entraînement
            val_dataloader: DataLoader pour les données de validation
            epochs: Nombre d'époques d'entraînement
            learning_rate: Taux d'apprentissage initial
            weight_decay: Décroissance des poids pour la régularisation
            warmup_steps: Nombre d'étapes de warmup pour le scheduler
            max_grad_norm: Norme maximale du gradient pour le clipping
            output_dir: Répertoire pour sauvegarder le modèle

        Returns:
            Métriques d'entraînement
        """
        # Créer le répertoire de sortie s'il n'existe pas
        os.makedirs(output_dir, exist_ok=True)

        # Initialiser l'optimiseur
        optimizer = AdamW(self.model.parameters(), lr=learning_rate, weight_decay=weight_decay)

        # Calculer le nombre total d'étapes d'entraînement
        total_steps = len(train_dataloader) * epochs

        # Initialiser le scheduler
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=warmup_steps,
            num_training_steps=total_steps
        )

        # Initialiser le scheduler de réduction du taux d'apprentissage
        lr_scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, verbose=True)

        # Initialiser l'early stopping
        early_stopping = EarlyStopping(patience=3, restore_best_weights=True)

        # Initialiser le tracker de métriques
        metrics_tracker = MetricsTracker()

        # Boucle d'entraînement
        logger.info("Starting training...")

        for epoch in range(epochs):
            # Entraînement
            self.model.train()
            train_loss = 0.0
            train_steps = 0

            progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
            for batch in progress_bar:
                # Déplacer les tenseurs sur le device
                batch = {k: v.to(self.device) for k, v in batch.items()}

                # Forward pass
                outputs = self.model(
                    input_ids=batch["input_ids"],
                    attention_mask=batch["attention_mask"],
                    token_type_ids=batch["token_type_ids"],
                    labels=batch["labels"]
                )

                loss = outputs[0]

                # Backward pass
                optimizer.zero_grad()
                loss.backward()

                # Clipping du gradient
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_grad_norm)

                # Mise à jour des poids
                optimizer.step()
                scheduler.step()

                # Mise à jour de la perte
                train_loss += loss.item()
                train_steps += 1

                # Mise à jour de la barre de progression
                progress_bar.set_postfix({"loss": loss.item()})

            # Calculer la perte moyenne d'entraînement
            avg_train_loss = train_loss / train_steps

            # Validation
            self.model.eval()
            val_loss = 0.0
            val_steps = 0
            all_preds = []
            all_labels = []

            progress_bar = tqdm(val_dataloader, desc=f"Epoch {epoch+1}/{epochs} [Val]")
            with torch.no_grad():
                for batch in progress_bar:
                    # Déplacer les tenseurs sur le device
                    batch = {k: v.to(self.device) for k, v in batch.items()}

                    # Forward pass
                    outputs = self.model(
                        input_ids=batch["input_ids"],
                        attention_mask=batch["attention_mask"],
                        token_type_ids=batch["token_type_ids"],
                        labels=batch["labels"]
                    )

                    loss, logits = outputs[:2]

                    # Mise à jour de la perte
                    val_loss += loss.item()
                    val_steps += 1

                    # Obtenir les prédictions
                    if hasattr(self.model, 'decode'):
                        # Pour les modèles avec CRF
                        preds = self.model.decode(logits, mask=batch["attention_mask"].bool())
                        preds = [p for sublist in preds for p in sublist]  # Aplatir la liste
                    else:
                        # Pour les modèles sans CRF
                        preds = torch.argmax(logits, dim=2).flatten().cpu().numpy()

                    # Obtenir les labels réels (ignorer les tokens spéciaux et le padding)
                    labels = batch["labels"].flatten().cpu().numpy()
                    mask = batch["attention_mask"].flatten().cpu().numpy()

                    # Filtrer les tokens spéciaux et le padding
                    valid_indices = np.where(mask == 1)[0]
                    filtered_preds = [preds[i] for i in valid_indices] if hasattr(self.model, 'decode') else preds[valid_indices]
                    filtered_labels = labels[valid_indices]

                    all_preds.extend(filtered_preds)
                    all_labels.extend(filtered_labels)

                    # Mise à jour de la barre de progression
                    progress_bar.set_postfix({"loss": loss.item()})

            # Calculer la perte moyenne de validation
            avg_val_loss = val_loss / val_steps

            # Calculer les métriques
            val_f1 = f1_score(all_labels, all_preds, average='weighted')
            val_precision = precision_score(all_labels, all_preds, average='weighted')
            val_recall = recall_score(all_labels, all_preds, average='weighted')

            # Mettre à jour le scheduler de réduction du taux d'apprentissage
            lr_scheduler.step(val_f1)

            # Obtenir le taux d'apprentissage actuel
            current_lr = optimizer.param_groups[0]['lr']

            # Mettre à jour le tracker de métriques
            metrics_tracker.update(avg_train_loss, avg_val_loss, val_f1, val_precision, val_recall, current_lr)

            # Afficher les métriques
            logger.info(f"Epoch {epoch+1}/{epochs}:")
            logger.info(f"  Train Loss: {avg_train_loss:.4f}")
            logger.info(f"  Val Loss: {avg_val_loss:.4f}")
            logger.info(f"  Val F1: {val_f1:.4f}")
            logger.info(f"  Val Precision: {val_precision:.4f}")
            logger.info(f"  Val Recall: {val_recall:.4f}")
            logger.info(f"  Learning Rate: {current_lr:.8f}")

            # Sauvegarder le modèle
            model_path = os.path.join(output_dir, f"model_epoch_{epoch+1}")
            os.makedirs(model_path, exist_ok=True)
            self.model.save_pretrained(model_path)
            logger.info(f"Model saved to {model_path}")

            # Vérifier l'early stopping
            if early_stopping(val_f1, self.model):
                logger.info("Early stopping triggered")
                break

        # Tracer et sauvegarder les métriques
        metrics_tracker.plot_metrics(os.path.join(output_dir, "training_metrics.png"))
        metrics_tracker.save_metrics(os.path.join(output_dir, "training_metrics.json"))

        # Sauvegarder le modèle final
        final_model_path = os.path.join(output_dir, "final_model")
        os.makedirs(final_model_path, exist_ok=True)
        self.model.save_pretrained(final_model_path)
        logger.info(f"Final model saved to {final_model_path}")

        return metrics_tracker

    def evaluate(self, test_dataloader: torch.utils.data.DataLoader, output_dir: str = "evaluation_output"):
        """
        Évalue le modèle sur un ensemble de test.

        Args:
            test_dataloader: DataLoader pour les données de test
            output_dir: Répertoire pour sauvegarder les résultats d'évaluation

        Returns:
            Métriques d'évaluation
        """
        # Créer le répertoire de sortie s'il n'existe pas
        os.makedirs(output_dir, exist_ok=True)

        # Évaluation
        self.model.eval()
        test_loss = 0.0
        test_steps = 0
        all_preds = []
        all_labels = []

        progress_bar = tqdm(test_dataloader, desc="Evaluation")
        with torch.no_grad():
            for batch in progress_bar:
                # Déplacer les tenseurs sur le device
                batch = {k: v.to(self.device) for k, v in batch.items()}

                # Forward pass
                outputs = self.model(
                    input_ids=batch["input_ids"],
                    attention_mask=batch["attention_mask"],
                    token_type_ids=batch["token_type_ids"],
                    labels=batch["labels"]
                )

                loss, logits = outputs[:2]

                # Mise à jour de la perte
                test_loss += loss.item()
                test_steps += 1

                # Obtenir les prédictions
                if hasattr(self.model, 'decode'):
                    # Pour les modèles avec CRF
                    preds = self.model.decode(logits, mask=batch["attention_mask"].bool())
                    preds = [p for sublist in preds for p in sublist]  # Aplatir la liste
                else:
                    # Pour les modèles sans CRF
                    preds = torch.argmax(logits, dim=2).flatten().cpu().numpy()

                # Obtenir les labels réels (ignorer les tokens spéciaux et le padding)
                labels = batch["labels"].flatten().cpu().numpy()
                mask = batch["attention_mask"].flatten().cpu().numpy()

                # Filtrer les tokens spéciaux et le padding
                valid_indices = np.where(mask == 1)[0]
                filtered_preds = [preds[0][i] for i in valid_indices] if hasattr(self.model, 'decode') else preds[valid_indices]
                filtered_labels = labels[valid_indices]

                all_preds.extend(filtered_preds)
                all_labels.extend(filtered_labels)

                # Mise à jour de la barre de progression
                progress_bar.set_postfix({"loss": loss.item()})

        # Calculer la perte moyenne de test
        avg_test_loss = test_loss / test_steps

        # Convertir les indices en labels
        pred_labels = [self.id_to_label[p] for p in all_preds]
        true_labels = [self.id_to_label[l] for l in all_labels]

        # Calculer les métriques
        report = classification_report(all_labels, all_preds, target_names=list(self.id_to_label.values()), output_dict=True)

        # Sauvegarder le rapport de classification
        with open(os.path.join(output_dir, "classification_report.json"), "w") as f:
            json.dump(report, f, indent=4)

        # Afficher les métriques
        logger.info(f"Test Loss: {avg_test_loss:.4f}")
        logger.info(f"Test F1 (weighted): {report['weighted avg']['f1-score']:.4f}")
        logger.info(f"Test Precision (weighted): {report['weighted avg']['precision']:.4f}")
        logger.info(f"Test Recall (weighted): {report['weighted avg']['recall']:.4f}")

        return {
            "loss": avg_test_loss,
            "f1": report['weighted avg']['f1-score'],
            "precision": report['weighted avg']['precision'],
            "recall": report['weighted avg']['recall'],
            "report": report
        }

## 3. Pipeline complet de reconnaissance d'événements

### 3.1 Chargement et prétraitement des données

In [None]:
# Créer le builder de dataset
builder = DatasetBuilder(EVENT_TYPES)

# Construire le dataset à partir des fichiers TSV
print(f"Chargement des données depuis {ANNOTATIONS_DIR}...")
dataset = builder.build_from_directory(ANNOTATIONS_DIR, pattern="*/MBY3.tsv")
print(f"Nombre d'exemples chargés: {len(dataset)}")

# Diviser le dataset en ensembles d'entraînement, de validation et de test
train_data, val_data, test_data = builder.split_dataset(dataset, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15)
print(f"Nombre d'exemples d'entraînement: {len(train_data)}")
print(f"Nombre d'exemples de validation: {len(val_data)}")
print(f"Nombre d'exemples de test: {len(test_data)}")

Chargement des données depuis /content/drive/MyDrive/annotation...
Nombre d'exemples chargés: 1441
Nombre d'exemples d'entraînement: 1008
Nombre d'exemples de validation: 216
Nombre d'exemples de test: 217


### 3.2 Tokenisation et encodage des données

In [None]:
# Définir le modèle BERT à utiliser
MODEL_NAME = "camembert-base"  # Vous pouvez changer pour "bert-base-multilingual-cased", "flaubert/flaubert_base_cased", etc.
MAX_LENGTH = 128
BATCH_SIZE = 16

# Créer le processeur de données
processor = BERTDataProcessor(MODEL_NAME, MAX_LENGTH, EVENT_TYPES)

# Créer les datasets PyTorch
print(f"Tokenisation et encodage des données avec {MODEL_NAME}...")
train_dataset, val_dataset, test_dataset = processor.process_bio_data(dataset)

# Créer les dataloaders
train_dataloader, val_dataloader, test_dataloader = processor.create_data_loaders(
    train_dataset, val_dataset, test_dataset, BATCH_SIZE
)

# Obtenir le mapping des labels
label_map = processor.get_label_map()
id_to_label = processor.get_id_to_label()

print(f"Nombre de labels: {len(label_map)}")
print(f"Labels: {label_map}")

Tokenisation et encodage des données avec camembert-base...
Nombre de labels: 11
Labels: {'O': 0, 'B-conflit': 1, 'I-conflit': 2, 'B-décision gouvernementale': 3, 'I-décision gouvernementale': 4, 'B-décès': 5, 'I-décès': 6, 'B-avancée technologique': 7, 'I-avancée technologique': 8, 'B-événement culturel': 9, 'I-événement culturel': 10}


### 3.3 Configuration et entraînement du modèle

In [None]:
# Paramètres d'entraînement
USE_CRF = True
EPOCHS = 10
LEARNING_RATE = 3e-5
WEIGHT_DECAY = 0.01
WARMUP_STEPS = 500
MAX_GRAD_NORM = 1.0

# Créer le sélecteur de modèle
selector = BERTModelSelector(len(label_map), USE_CRF)

# Afficher les modèles recommandés
print("Modèles BERT recommandés pour la reconnaissance d'événements:")
for model_info in selector.get_recommended_models():
    print(f"- {model_info['name']}: {model_info['description']}")
    print(f"  Avantages: {model_info['advantages']}")
    print(f"  Inconvénients: {model_info['disadvantages']}")
    print()

# Sélectionner le modèle
print(f"Configuration du modèle {MODEL_NAME} (CRF: {USE_CRF})...")
model = selector.select_model(MODEL_NAME)

# Créer le fine-tuner
fine_tuner = BERTFineTuner(model, id_to_label)

# Entraîner le modèle
print("Début de l'entraînement...")
metrics = fine_tuner.train(
    train_dataloader,
    val_dataloader,
    epochs=EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    warmup_steps=WARMUP_STEPS,
    max_grad_norm=MAX_GRAD_NORM,
    output_dir=os.path.join(OUTPUT_DIR, "model")
)

You are using a model of type camembert to instantiate a model of type bert. This is not supported for all configurations of models and can yield errors.


Modèles BERT recommandés pour la reconnaissance d'événements:
- camembert-base: Modèle BERT français, pré-entraîné sur un large corpus français
  Avantages: Excellente performance sur les textes français, meilleure compréhension des nuances linguistiques
  Inconvénients: Limité au français, peut être moins adapté si le corpus contient d'autres langues

- bert-base-multilingual-cased: Modèle BERT multilingue (cased) pré-entraîné sur 104 langues, dont le français
  Avantages: Bonne performance sur les langues non-anglaises, adapté aux textes français
  Inconvénients: Moins performant que les modèles spécifiques au français sur certaines tâches

- flaubert/flaubert_base_cased: Modèle BERT français alternatif, pré-entraîné sur un corpus français diversifié
  Avantages: Bonne performance sur les textes français, architecture optimisée
  Inconvénients: Limité au français, peut nécessiter plus de ressources computationnelles

- xlm-roberta-base: Modèle RoBERTa multilingue, pré-entraîné sur 10

Some weights of BERTCRFForEventRecognition were not initialized from the model checkpoint at camembert-base and are newly initialized: ['bert.embeddings.LayerNorm.bias', 'bert.embeddings.LayerNorm.weight', 'bert.embeddings.position_embeddings.weight', 'bert.embeddings.token_type_embeddings.weight', 'bert.embeddings.word_embeddings.weight', 'bert.encoder.layer.0.attention.output.LayerNorm.bias', 'bert.encoder.layer.0.attention.output.LayerNorm.weight', 'bert.encoder.layer.0.attention.output.dense.bias', 'bert.encoder.layer.0.attention.output.dense.weight', 'bert.encoder.layer.0.attention.self.key.bias', 'bert.encoder.layer.0.attention.self.key.weight', 'bert.encoder.layer.0.attention.self.query.bias', 'bert.encoder.layer.0.attention.self.query.weight', 'bert.encoder.layer.0.attention.self.value.bias', 'bert.encoder.layer.0.attention.self.value.weight', 'bert.encoder.layer.0.intermediate.dense.bias', 'bert.encoder.layer.0.intermediate.dense.weight', 'bert.encoder.layer.0.output.LayerNorm

Début de l'entraînement...


Epoch 1/10 [Train]:   0%|          | 0/63 [00:00<?, ?it/s]

Epoch 1/10 [Val]:   0%|          | 0/14 [00:00<?, ?it/s]

IndexError: list index out of range

### 3.4 Évaluation du modèle

In [None]:
# Évaluer le modèle sur l'ensemble de test
print("Évaluation du modèle sur l'ensemble de test...")
eval_results = fine_tuner.evaluate(
    test_dataloader,
    output_dir=os.path.join(OUTPUT_DIR, "evaluation")
)

# Afficher les résultats
print(f"F1-score: {eval_results['f1']:.4f}")
print(f"Précision: {eval_results['precision']:.4f}")
print(f"Rappel: {eval_results['recall']:.4f}")

# Afficher le rapport de classification
print("\nRapport de classification:")
for label, metrics in eval_results['report'].items():
    if isinstance(metrics, dict):
        print(f"{label}:")
        print(f"  Précision: {metrics['precision']:.4f}")
        print(f"  Rappel: {metrics['recall']:.4f}")
        print(f"  F1-score: {metrics['f1-score']:.4f}")
        print(f"  Support: {metrics['support']}")

### 3.5 Utilisation du modèle pour la prédiction

In [None]:
def predict_events(text, model, tokenizer, id_to_label, device=None):
    """
    Prédit les événements dans un texte.

    Args:
        text: Texte à analyser
        model: Modèle BERT entraîné
        tokenizer: Tokenizer BERT
        id_to_label: Dictionnaire mappant les indices numériques aux tags BIO
        device: Appareil sur lequel exécuter la prédiction (cpu ou cuda)

    Returns:
        Liste de tuples (token, tag)
    """
    device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    # Tokeniser le texte
    inputs = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        return_offsets_mapping=True,
        return_special_tokens_mask=True
    )

    # Obtenir les offsets et le masque des tokens spéciaux
    offset_mapping = inputs.pop("offset_mapping")[0].numpy()
    special_tokens_mask = inputs.pop("special_tokens_mask")[0].numpy()

    # Déplacer les tenseurs sur le device
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Prédire les tags
    with torch.no_grad():
        outputs = model(**inputs)

        if hasattr(model, 'decode'):
            # Pour les modèles avec CRF
            preds = model.decode(outputs, mask=inputs["attention_mask"].bool())[0]
        else:
            # Pour les modèles sans CRF
            preds = torch.argmax(outputs, dim=2)[0].cpu().numpy()

    # Convertir les prédictions en tags
    tags = [id_to_label[p] for p in preds]

    # Obtenir les tokens
    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0].cpu().numpy())

    # Filtrer les tokens spéciaux
    result = []
    for i, (token, tag) in enumerate(zip(tokens, tags)):
        if special_tokens_mask[i] == 0:  # Ignorer les tokens spéciaux
            result.append((token, tag))

    return result

# Exemple d'utilisation
tokenizer = processor.tokenizer_wrapper.tokenizer

# Texte d'exemple
example_text = "Le gouvernement français a annoncé hier une nouvelle décision concernant la politique énergétique du pays."

# Prédire les événements
predictions = predict_events(example_text, model, tokenizer, id_to_label)

# Afficher les résultats
print("Prédictions:")
for token, tag in predictions:
    print(f"{token}: {tag}")

# Extraire les événements
events = []
current_event = None

for i, (token, tag) in enumerate(predictions):
    if tag.startswith("B-"):
        if current_event:
            events.append(current_event)
        event_type = tag[2:]
        current_event = {"type": event_type, "tokens": [token]}
    elif tag.startswith("I-") and current_event and tag[2:] == current_event["type"]:
        current_event["tokens"].append(token)
    elif tag == "O":
        if current_event:
            events.append(current_event)
            current_event = None

if current_event:
    events.append(current_event)

# Afficher les événements extraits
print("\nÉvénements extraits:")
for event in events:
    print(f"Type: {event['type']}")
    print(f"Texte: {''.join(event['tokens']).replace('##', '')}")
    print()

### 3.6 Sauvegarde du modèle et du tokenizer

In [None]:
# Sauvegarder le modèle et le tokenizer dans Google Drive
SAVE_DIR = '/content/drive/MyDrive/event_recognition_model'
os.makedirs(SAVE_DIR, exist_ok=True)

# Sauvegarder le modèle
model.save_pretrained(SAVE_DIR)

# Sauvegarder le tokenizer
tokenizer.save_pretrained(SAVE_DIR)

# Sauvegarder le mapping des labels
with open(os.path.join(SAVE_DIR, 'label_map.json'), 'w') as f:
    json.dump({"label_map": label_map, "id_to_label": id_to_label}, f, indent=4)

print(f"Modèle, tokenizer et mapping des labels sauvegardés dans {SAVE_DIR}")

## 4. Conclusion

Félicitations ! Vous avez maintenant un pipeline complet pour la reconnaissance automatique d'événements dans des textes. Ce pipeline comprend :

1. Le prétraitement des données annotées au format WebAnno TSV 3.3
2. La tokenisation et l'encodage des textes pour BERT
3. La configuration et l'entraînement d'un modèle BERT (avec ou sans CRF)
4. L'évaluation des performances du modèle
5. L'utilisation du modèle pour la prédiction sur de nouveaux textes

Vous pouvez maintenant utiliser ce modèle pour détecter automatiquement les événements dans vos textes !