# CamemBERT

Installations

In [None]:
# !pip install sentencepiece
# !pip install datasets
# !pip install transformers==4.33
# !pip install torch
# !pip install accelerate -U
# !pip install iterative-stratification


Chargements

In [None]:
import torch
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from datasets import Dataset

In [None]:
label_list = ['aucun', 'geogFeat geogName', 'geogFeat', 'geogName', 'geogName name']

In [None]:
def get_data(path, annotated=True):
    with open(path, 'r', encoding='utf8') as fileio:
        lines = fileio.read().strip().split('\n')
        lines.pop(0)

    print(len(lines))

    annos = []

    for line in lines:
        _, token, tags = line.split('"""')
        annos.append((token, tags[1:].split()))

    data = []
    counter = 0
    tokens = []
    if annotated:
        ner_tags = []

    for i, (token, tags) in enumerate(annos):
        tokens.append(token)
        if annotated:
            ner_tags.append(' '.join(sorted(tags)))
        if token == '.':
            if annotated:
                data.append({
                    'id': counter,
                    'tokens': tokens,
                    'ner_tags': list(map(lambda x: label_list.index(x), ner_tags))
                })
            else:
                data.append({
                    'id': counter,
                    'tokens': tokens,
                    'ner_tags': [0] * len(tokens)
                })

            counter += 1
            tokens = []
            if annotated:
                ner_tags = []

    if tokens:
        if annotated:
            data.append({
                'id': counter,
                'tokens': tokens,
                'ner_tags': list(map(lambda x: label_list.index(x), ner_tags))
            })
        else:
            data.append({
                'id': counter,
                'tokens': tokens,
                'ner_tags': [0] * len(tokens)
            })

    return data


In [None]:
data_ = get_data('train_2.csv')
len(data_)

In [None]:
from iterstrat.ml_stratifiers import MultilabelStratifiedShuffleSplit
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.feature_extraction.text import CountVectorizer
mlb = MultiLabelBinarizer()
cv = CountVectorizer()

y = mlb.fit_transform([list(set(i['ner_tags'])) for i in data_])

mskf = MultilabelStratifiedShuffleSplit(n_splits=1,random_state=42, test_size=0.05)

for train_index, test_index in mskf.split( y,y):
  pass


In [None]:
data = Dataset.from_list(data_)

In [None]:
# train, test = train_test_split(data, test_size=0.1, random_state=42)
train_dataset = Dataset.from_dict(data[train_index])
test_dataset = Dataset.from_dict(data[test_index])

Choix du modèle

In [None]:
mdl = "camembert-base"

Choix device

In [None]:
device = torch.device("cuda:0")

Tokenizer

In [None]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True, padding="max_length", max_length=512)

    labels = []
    word_idss = []
    for i, label in enumerate(examples[f"ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append(4)
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append(label[word_idx])
            else:
                label_ids.append(label[previous_word_idx])
                # label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
        word_idss.append(word_ids)

    tokenized_inputs["labels"] = labels
    tokenized_inputs['word_ids'] = word_idss
    return tokenized_inputs

In [None]:
tokenizer = AutoTokenizer.from_pretrained(mdl)
# data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer,padding=True)

In [None]:
tokenized_dataset = data.map(tokenize_and_align_labels, batched=True)
tokenized_train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_test_dataset = test_dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
len(tokenized_dataset[0]['labels']) == len(tokenized_dataset[0]['input_ids'])

In [None]:
tokenized_dataset
['id', 'tokens', 'ner_tags', 'input_ids', 'attention_mask', 'labels', 'word_ids']
tokenized_dataset[1]['labels'][:10],tokenized_dataset[1]['input_ids'][:10]

## train

Chargement du modèle et paramètres pour classif

In [None]:
model = AutoModelForTokenClassification.from_pretrained(mdl, num_labels = len(label_list)).to(device)


In [None]:

training_args = TrainingArguments(
    output_dir="test_trainer",
    evaluation_strategy="steps",
    num_train_epochs=30,
    learning_rate=1e-4,
    per_device_train_batch_size=8,
    metric_for_best_model='eval_loss',
    load_best_model_at_end=True,
    save_total_limit=5,
    save_steps=100,
    eval_steps=100,
    seed=42
    )


In [None]:

# trainer = Trainer(
#     model=model,
#     args=training_args,
#     train_dataset=Dataset.from_dict(tokenized_dataset[train_index]),
#     eval_dataset=Dataset.from_dict(tokenized_dataset[test_index])
# )


trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=Dataset.from_dict(tokenized_dataset),
    eval_dataset=Dataset.from_dict(tokenized_dataset)
)

Apprentissage (fine tuning)

In [None]:
trainer.train()

Prédiction du jeu de test

In [None]:
from pprint import pprint
from collections import Counter

def read_preds(y_pred, word_idss):
    preds = []
    need_print = False
    for y, word_ids in zip(y_pred, word_idss):
        if need_print:
            pprint(list(map(lambda x: label_list[x], pred)))
            need_print = False
        pred = []
        previous_tag = None
        previous_idx = -1
        need_vote = False
        votes = []
        for tag, idx in zip(y, word_ids):
            if idx != None:
                if idx != previous_idx:
                    if need_vote:
                        c = Counter(votes)
                        common = c.most_common(1)[0]
                        if pred[-1] != common[0] and common[1] != 1:
                            pred[-1] = common[0]
                    need_vote = False
                    votes = []
                    pred.append(tag)
                    previous_idx = idx
                    previous_tag = tag
                else:
                    votes.append(previous_tag)
        preds.append(pred)

    return preds

In [None]:
y_pred = trainer.predict(tokenized_test_dataset).predictions.argmax(axis=-1)
y_true = tokenized_test_dataset['labels']
preds = read_preds(y_pred, tokenized_test_dataset['word_ids'])
preds_flat = [label_list[p] for pred in preds for p in pred]
trues = read_preds(y_true, tokenized_test_dataset['word_ids'])
trues_flat = [label_list[t] for true in trues for t in true]
print(classification_report(trues_flat, preds_flat, digits=3, labels=label_list))

In [None]:
fileio = open(f'{mdl}.test.conll', 'w', encoding='utf8')

for true, pred, tokens in zip(trues, preds, test_dataset['tokens']):
    for t, p, token in zip(true, pred, tokens):
        fileio.write(f'{token} {label_list[p]} {label_list[t]}\n')
    fileio.write('\n')

fileio.close()

## eval

In [None]:
eval = get_data('test.csv', annotated=False)
len(eval)

In [None]:
eval_dataset = Dataset.from_list(eval)
tokenized_eval_dataset = eval_dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
y_pred = trainer.predict(tokenized_eval_dataset).predictions.argmax(axis=-1)
preds = read_preds(y_pred, tokenized_eval_dataset['word_ids'])

In [None]:
import pandas as pd

In [None]:
from itertools import chain

labels = list(map(lambda x: label_list[x], chain.from_iterable(preds)))
res = [f"{i},{label}" for i, label in enumerate(labels)]
res.insert(0, "Id,Label")
len(res)

In [None]:
mdl

In [None]:
with open(f'{mdl}.csv', 'w', encoding='utf8', newline='\n') as fileio:
    fileio.write("\n".join(res))

