# Training hidden vs public

To run on colab. Code from https://github.com/huggingface/notebooks/blob/main/examples/token_classification.ipynb

In [None]:
!pip install transformers
!pip install datasets
!pip install seqeval

In [None]:
import transformers
from sklearn.model_selection import train_test_split
from datasets import Dataset
from ast import literal_eval
from datasets import load_metric, DatasetDict
from transformers import (
    AutoTokenizer,
    DataCollatorForTokenClassification,
    AutoModelForTokenClassification,
    TrainingArguments,
    Trainer,
)
from smart_open import open
from transformers import AutoTokenizer

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
label_encoding_dict = {
            "O": 0,
            "B-PERS": 1,
            "I-PERS": 2,
        }

In [None]:
training_tagset = ['O', 'B-PERS', 'I-PERS']

In [None]:
language = "fr"
transformer_model = {"fr": "dbmdz/bert-base-french-europeana-cased",}
hipe_datasets = {"fr": ["all-fr.jsonl.gz"]}

In [None]:
lines = []
for hd in hipe_datasets[language]:
    with open(hd) as fr:
        lines += fr.readlines()

annotations = []
counter = 0
for line in lines:
    line = line.replace("B-HIDDEN", "B-PERS")
    line = line.replace("I-HIDDEN", "I-PERS")
    line = line.replace("B-PUBLIC", "B-PERS")
    line = line.replace("I-PUBLIC", "I-PERS")
    line = literal_eval(line)
    line["id"] = str(counter)
    counter += 1
    annotations.append(line)

dict_hf = {'id': [],
           'tokens': [],
           'ner_tags': []}

for a in annotations:
    dict_hf["id"].append(a["id"])
    dict_hf["tokens"].append(a["tokens"])
    dict_hf["ner_tags"].append(a["ner_tags"])

dataset = Dataset.from_dict(dict_hf)
train_testvalid = dataset.train_test_split(test_size=0.2, random_state=42)
test_valid = train_testvalid['test'].train_test_split(test_size=0.5, random_state=42)
dataset = DatasetDict({
    'train': train_testvalid['train'],
    'test': test_valid['test'],
    'validation': test_valid['train']})

In [None]:
print(dataset["train"])
print(dataset["validation"])
print(dataset["test"])

In [None]:
dataset["test"][0]

In [None]:
task = "ner" # Should be one of "ner", "pos" or "chunk"
model_checkpoint = transformer_model[language]
batch_size = 16

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(training_tagset))

In [None]:
example = dataset["train"][4]
print(example["tokens"])
print(example["ner_tags"])

In [None]:
tokenized_input = tokenizer(example["tokens"], is_split_into_words=True)
tokens = tokenizer.convert_ids_to_tokens(tokenized_input["input_ids"])
#print(tokens)

In [None]:
len(example[f"{task}_tags"]), len(tokenized_input["input_ids"])

In [None]:
#print(tokenized_input.word_ids())

In [None]:
word_ids = tokenized_input.word_ids()
aligned_labels = [-100 if i is None else example[f"{task}_tags"][i] for i in word_ids]
print(len(aligned_labels), len(tokenized_input["input_ids"]))

In [None]:
label_all_tokens = True

In [None]:
def tokenize_and_align_labels(examples):
    print(examples)
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"{task}_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens have a word id that is None. We set the label to -100 so they are automatically
            # ignored in the loss function.
            if word_idx is None:
                label_ids.append(-100)
            # We set the label for the first token of each word.
            elif word_idx != previous_word_idx:
                label_ids.append(label_encoding_dict[label[word_idx]])
            # For the other tokens in a word, we set the label to either the current label or -100, depending on
            # the label_all_tokens flag.
            else:
                label_ids.append(label_encoding_dict[label[word_idx]] if label_all_tokens else -100)
            previous_word_idx = word_idx

        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [None]:
# tokenize_and_align_labels(dataset['train'][:5])

In [None]:
tokenized_datasets = dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)
data_collator = DataCollatorForTokenClassification(tokenizer)

In [None]:
model_name = model_checkpoint.split("/")[-1]
args = TrainingArguments(
    f"person-finetuned",
    evaluation_strategy = "epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=10,
    weight_decay=0.01,
    save_strategy="epoch",
    load_best_model_at_end=True,
)

In [None]:
metric = load_metric("seqeval")

In [None]:
# labels = [label_encoding_dict[i] for i in example[f"{task}_tags"]]
labels = [i for i in example[f"{task}_tags"]]
metric.compute(predictions=[labels], references=[labels])

In [None]:
import numpy as np

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (special tokens)
    true_predictions = [
        [training_tagset[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [training_tagset[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
trainer = Trainer(
    model,
    args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
predictions, labels, _ = trainer.predict(tokenized_datasets["test"])
predictions = np.argmax(predictions, axis=2)

# Remove ignored index (special tokens)
true_predictions = [
    [training_tagset[p] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]
true_labels = [
    [training_tagset[l] for (p, l) in zip(prediction, label) if l != -100]
    for prediction, label in zip(predictions, labels)
]

results = metric.compute(predictions=true_predictions, references=true_labels)
results

In [None]:
trainer.save_model("person_final")

In [None]:
from google.colab import files
import shutil
shutil.make_archive("person_fr", 'zip', "person_final")

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!cp "/content/person_fr.zip" /content/drive/MyDrive

# Predictions

In [None]:
from transformers import pipeline, AutoModelForTokenClassification, AutoTokenizer

language = "fr"

# Change path if needed:
model = AutoModelForTokenClassification.from_pretrained("hidden_vs_public_final")
tokenizer = AutoTokenizer.from_pretrained("hidden_vs_public_final")
tokenizer.add_tokens(["[PERS]"])

In [None]:
def get_labels(output):
    list_entities = []
    dict_labels = {"LABEL_0": "O",
                   "LABEL_1": "B-PUBLIC",
                   "LABEL_2": "I-PUBLIC",
                   "LABEL_3": "B-HIDDEN",
                   "LABEL_4": "I-HIDDEN",
                  }
    for entity_group in output:
        if entity_group["word"] == "[PERS]":
            list_entities.append(dict_labels[entity_group["entity_group"]])
    return list_entities

In [None]:
ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

In [None]:
output = ner_pipeline("Le président [PERS] a été invité à la conference de Paris.")
get_labels(output)

In [None]:
output = ner_pipeline("[PERS] [PERS], menuisier, 37 ans, a été cambriolé.")
get_labels(output)

In [None]:
sentence = "Une réunion trimestrielle de la corporation de cet arrondissement s'est tenue au Guildhall, lundi, sous la présidence de [PERS] [PERS]."

In [None]:
output = ner_pipeline(sentence)
get_labels(output)

In [None]:
sentence = "Une réunion trimestrielle de la corporation de cet arrondissement s'est tenue au Guildhall, lundi, où habite [PERS] [PERS]."

In [None]:
output = ner_pipeline(sentence)
get_labels(output)

In [None]:
sentence = "[PERS] [PERS] a été poignardé alors qu'il se rendait à l'opéra."
output = ner_pipeline(sentence)
get_labels(output)

# Pushing to huggingface hub

In [None]:
!apt install git-lfs

In [None]:
trainer.push_to_hub()