In [37]:
# transformers not support NumPy 2.0 yet
!pip install -q numpy~=1.26.4 transformers~=4.46.2
!pip install -q datasets evaluate matplotlib pydantic seqeval

# 訓練 PII 偵測模型

In [None]:
from tqdm import tqdm
import pandas as pd
import numpy as np
import json
import evaluate

from transformers import (
  AutoTokenizer,
  AutoModelForTokenClassification,
  TrainingArguments,
  Trainer,
)
from datasets import load_dataset
from transformers import DataCollatorForTokenClassification
from transformers import pipeline

from pydantic import BaseModel
from pprint import pprint

import ast
import torch

# 檢查是否有 GPU 可以使用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("mps" if torch.backends.mps.is_available() else device)

## 下載資料

從 Kaggle 下載 PII External Dataset，並解壓縮到 `sample_data` 資料夾。

In [None]:
# 從 Kaggle 下載 PII External Dataset
!curl -L -o ./sample_data/pii-external-dataset.zip \
  https://www.kaggle.com/api/v1/datasets/download/alejopaullier/pii-external-dataset

# 解壓縮
!unzip -o -q ./sample_data/pii-external-dataset.zip -d ./sample_data/

## 資料包含什麼？

In [None]:
# The full `train` split
immutable_dataset = load_dataset('csv', data_files='sample_data/pii_dataset.csv', split='train')
# Split into 80% training and 20% testing sets
immutable_dataset = immutable_dataset.train_test_split(
  test_size=0.2, # 20% of the data is used for testing
  shuffle=False, # Ensure that train and test sets are the same across runs
  )
immutable_dataset

In [None]:
# 保留必要 features: 'tokens', 'labels'
dataset = immutable_dataset.remove_columns([
  'document', 'text', 'trailing_whitespace', 'prompt', 'prompt_id', 'name',
  'email', 'phone', 'job', 'address', 'username', 'url', 'hobby', 'len'])

# convert 'tokens' and 'labels' from string to list
dataset = dataset.map(lambda x: {'tokens': ast.literal_eval(x['tokens']), 'labels': ast.literal_eval(x['labels'])})

# 確認 tokens 長度與 labels 長度相等，避免有缺失的情況, 以 json string 的方式將 tokens 與 labels 轉換後比較
dataset = dataset.filter(lambda x: len(x['tokens']) == len(x['labels']))
dataset

In [None]:
# 將 tokens 欄位重新命名為 words 避免與後面的 tokens 概念混淆
dataset = dataset.rename_column('tokens', 'words')
# 顯示前 first_n_data 筆資料
first_n_data = 3
pd.set_option('display.max_colwidth', None)
pd.DataFrame(dataset['train'].select(range(first_n_data)))

## 資料中的 BIO 詞性標注

IOB 格式（inside, outside, beginning 的縮寫），也常被稱為 BIO 格式，是計算語言學中用於標記任務（例如命名實體識別 NER，詞性標記 POS）的常見標記格式。

* B - for the first token of a named entity
* I - for tokens inside named entity's
* O - for tokens outside any named entity

In [None]:
# 顯示 BIO 詞性標注
label_names = set()
for data in dataset['train']:
    label_names.update(data['labels'])
# convert set to list and sort label names
label_names = list(label_names)
pprint(label_names, compact=True)

## 訓練設定

In [None]:
# 訓練相關設定
class Config(BaseModel):
  seed: int = 42
  model_name: str = 'dslim/distilbert-NER' # 使用蒸餾模型，降低參數量，加快訓練速度
  saved_model_path: str = 'sample_data/saved_model' # path to save the trained model
  train_seq_len: int = 1024 # max size of input sequence for training
  train_batch_size: int = 4 # size of the input batch in training
  eval_batch_size: int = 4 # size of the input batch in evaluation
  epochs: int = 1 # 為加速訓練，只訓練一個 epoch
  lr: float = 2e-5 # learning rate, controls how fast or slow the model learns
  weight_decay: float = 0.01 # weight decay, helps the model stay simple and avoid overfitting by penalizing large weights.
  tags: list # BIO (Beginning, Inner, Outer) format tags
  id2tag: dict # integer label to BIO format tags mapping
  tag2id: dict # BIO format tags to integer tags mapping
  num_tags: int # number of PII (NER) tags

id2tag = dict(enumerate(label_names))
config = Config(
  tags=label_names,
  id2tag=id2tag,
  tag2id=dict((v, k) for k, v in id2tag.items()),
  num_tags=len(label_names)
  )

pprint(f'id2tag: {sorted(config.id2tag.items(), key=lambda x: x[0])}')
pprint(f'tag2id: {sorted(config.tag2id.items(), key=lambda x: x[1])}')

In [None]:
# 並列顯示前 max_display 個 words 與 labels
max_display = 50

def show_nth_data(dataset, nth, max_display):
    words = dataset[nth]['words'][:max_display]
    labels = dataset[nth]['labels'][:max_display]
    line1 = ""
    line2 = ""
    for word, label in zip(words, labels):
        max_length = max(len(word), len(label))
        line1 += word + " " * (max_length - len(word) + 1)
        line2 += label + " " * (max_length - len(label) + 1)
    pprint(line1, width=200)
    pprint(line2, width=200)
    print()

def show_data(dataset, first_n_data, max_display):
    for i in range(first_n_data):
        show_nth_data(dataset, i, max_display)

# show_data(dataset['train'], first_n_data, max_display)
show_data(dataset['test'], first_n_data, max_display)

## 先觀察 Fine-tuning 前的表現

In [None]:
test_text = '''
Hello, I'm Badi Nakamura, and I work as a programmer.
I'm based out of 2703 Woolsey Street, and you can reach me via email at badinakamura@gmail.org.
'''

classifier = pipeline(
  task="token-classification",
  model=config.model_name,
  device=device,)

# 合併顯示預測結果
def show_prediction(text, classifier):
    result = classifier(text)
    line1 = ""
    line2 = ""
    for r in result:
        word = r['word']
        label = r['entity']
        max_length = max(len(word), len(label))
        line1 += word + " " * (max_length - len(word) + 1)
        line2 += label + " " * (max_length - len(label) + 1)
    pprint(line1, width=200)
    pprint(line2, width=200)

print(f'輸入: {test_text}')
show_prediction(test_text, classifier)

## 了解 Tokenizer 行為

In [None]:
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
# 確認 tokenizer 是否為 fast tokenizer
tokenizer.is_fast

In [None]:

# 以第 n 筆資料為例
data_cat = 'test'
data_nth = 2
input_words = dataset[data_cat][data_nth]["words"]
input_labels = dataset[data_cat][data_nth]["labels"]
input_token_ids = tokenizer(
  input_words,
  # is_split_into_words: Whether or not the input is already pre-tokenized (e.g., split into words).
  # If set to True, the tokenizer assumes the input is already split into words (for instance, by splitting it on whitespace) which it will tokenize.
  # This is useful for NER or token classification.
  is_split_into_words=True)

# 如我們所見，分詞器添加了模型使用的特殊標記（[CLS] 在開頭和 [SEP] 在結尾），
# 並且大多數單詞保持不變。然而，某些單詞 (word) 會被分為數個子詞 (subword)，如: Bar, ##eil 和 ##ly
pprint(input_token_ids.tokens(), compact=True)

In [None]:
# 原始資料
print(f'length of input_words: {len(input_words)}')
print(f'length of input_labels: {len(input_labels)}')
# 這導致了我們的輸入和標籤之間的不匹配
print(f'length of token id: {len(input_token_ids.tokens())}')

In [None]:
# 資料對比
print('=== Tokenizer 前 ===')
show_nth_data(dataset[data_cat], data_nth, max_display)
print('=== Tokenizer 後 ===')
pprint(input_token_ids.tokens()[:max_display], compact=True)
# 感謝 fast tokenizer 我們可以輕鬆地將每個標記映射到其對應的單詞。
# `word_ids` return the list of tokens (sub-parts of the input strings after word/subword splitting and before conversion to integer indices)
# at a given batch index (only works for the output of a fast tokenizer).
print()
print('=== 對應的 word_ids ===')
pprint(input_token_ids.word_ids()[:max_display], compact=True)


## 重新校準 Tokenizer 與標籤

稍作處理後，我們可以擴展標籤列表以匹配標記。首先，我們將應用的規則是特殊標記獲得 -100 標籤。這是因為默認情況下，-100 是我們在損失函數中被忽略的索引。然後，每個標記獲得與其所在單詞開始標記相同的標籤，因為它們是同一實體的一部分。對於單詞內但不在開頭的標記，我們將 B- 替換為 I- ，因為該標記不是開始實體：

In [24]:
def align_labels_with_tokens(word_ids, labels):
    new_labels = []
    current_word_id = None
    for word_id in word_ids:
        if word_id is None:
            # Special token
            label = -100
        elif word_id != current_word_id:
            # Start of a new token!
            current_word_id = word_id
            label = -100 if word_id is None else config.tag2id.get(labels[word_id])
        else:
            # Same word as previous token
            label = config.tag2id.get(labels[word_id])
            # # If the label is B-XXX we change it to I-XXX
            # if label % 2 == 1:
            #     label += 1
        new_labels.append(label)

    return new_labels

In [None]:
labels = align_labels_with_tokens(input_token_ids.word_ids(), input_labels)

print(f'length of token id: {len(input_token_ids.tokens())}')
print(f'length of labels: {len(labels)}')

In [None]:
print('=== 對應的標籤 ===')
pprint(labels[:max_display], compact=True)
# Da-Fu Anderson, 814 Keswick Boulevard
pprint(input_token_ids.tokens()[:max_display], compact=True)
# 與原始資料比較
print()
print('=== 原始資料 ===')
show_nth_data(dataset[data_cat], data_nth, max_display)
print('=== 標籤定義 ===')
pprint(sorted(config.id2tag.items(), key=lambda x: x[0]), compact=True)

## 批次重新 Tokenizer 與標籤

要預處理整個數據集，我們需要對所有輸入進行分詞，並對所有標籤應用 `align_labels_with_tokens()`。為了利用快速分詞器的速度，最好一次分詞大量文本，因此我們將編寫一個批次處理函數，並使用 Dataset.map() 方法，選項設置為 batched=True。

與之前的示例不同的是，當分詞器的輸入是文本列表（或在我們的情況下，是單詞列表的列表）時，word_ids() 函數需要獲取我們想要單詞 ID 的示例索引，因此我們也添加了這一點：

In [27]:
def tokenize_and_align_labels(dataset):
    tokenized_inputs = tokenizer(
        dataset['words'], truncation=True, is_split_into_words=True
    )
    all_labels = dataset['labels']
    new_labels = []
    for i, labels in enumerate(all_labels):
        word_ids = tokenized_inputs.word_ids(i)
        new_labels.append(align_labels_with_tokens(word_ids, labels))

    tokenized_inputs["labels"] = new_labels
    return tokenized_inputs

In [None]:
tokenized_dataset = dataset.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=dataset['train'].column_names,
)

In [None]:
# 顯示前 first_n_data 筆資料
pd.set_option('display.max_colwidth', None)
pd.DataFrame(tokenized_dataset[data_cat].select(range(first_n_data)))

# token_type_ids: 同常用於多句子，因為我們只有單一句子，所以都會是 0
# attention_mask: 1 表示該 token 是真實的，0 表示是 padding token，BERT 只會關注 1 的 token

## 數據整理

我們不能僅使用 `DataCollatorWithPadding`，因為它只填充輸入（input IDs, attention mask, and token type IDs）。在這裡，我們的標籤應該以與輸入完全相同的方式進行填充，以保持相同的大小，使用 -100 作為值，以便在損失計算中忽略相應的預測。

這一切都由 `DataCollatorForTokenClassification` 完成。與 `DataCollatorWithPadding` 一樣，它需要使用預處理輸入的分詞器：

In [30]:
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [None]:
# 展示 DataCollatorForTokenClassification 的輸出, 標籤以 -100 表示 padding
batch = data_collator([tokenized_dataset[data_cat][i] for i in range(first_n_data)])
pprint(batch["labels"])

## 模型評估函數

在訓練過程中包含度量標準通常有助於評估模型的性能。您可以使用 Evaluate 庫快速加載評估方法。對於這個任務，請加載 [seqeval](https://huggingface.co/docs/evaluate/a_quick_tour) 框架。Seqeval 實際上會生成多個分數：precision, recall, F1, 和 accuracy。

* Precision: 精確率，是指所有被標記為正的樣本中實際為正的比例。

$\ Precision = \frac{\text{correctly classified actual positives}}{\text{everything classified as positives}} = \frac{TP}{TP + FP} $

* Recall: 召回率，是指所有實際為正的樣本中被標記為正的比例。

$\ Recall = \frac{\text{correctly classified actual positives}}{\text{all actual positives}} = \frac{TP}{TP + FN} $

* F1: F1 值是精確率和召回率的調和平均值，用於綜合考慮精確率和召回率。

$\ F1 = 2 \times \frac{Precision \times Recall}{Precision + Recall} $

* Accuracy: 準確率，是指所有被正確分類的樣本數量與總樣本數量之比。

$\ Accuracy = \frac{\text{correctly classifications}}{\text{total classifications}} = \frac{TP + TN}{TP + TN + FP + FN} $


In [32]:
seqeval = evaluate.load("seqeval")

def compute_metrics(eval_preds):
    # Unpack logits and labels from the input
    logits, labels = eval_preds

    # Convert logits to the index of the maximum logit value
    predictions = np.argmax(logits, axis=-1)

    # Map predictions and labels to their corresponding label names, ignoring padding (-100)
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_names[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # Compute evaluation metrics using seqeval
    results = seqeval.compute(predictions=true_predictions, references=true_labels)

    # Return the computed metrics
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

## 訓練模型

您現在可以開始訓練您的模型了！使用 AutoModelForTokenClassification 加載預訓練的模型，並指定預期標籤的數量和標籤映射：

In [None]:
model = AutoModelForTokenClassification.from_pretrained(
  config.model_name,
  num_labels=config.num_tags,
  ignore_mismatched_sizes=True, # 忽略不匹配的大小
  id2label=config.id2tag,
  label2id=config.tag2id,
  )

In [None]:
training_args = TrainingArguments(
  output_dir='sample_data/train_output',
  learning_rate=config.lr,
  per_device_train_batch_size=config.train_batch_size,
  per_device_eval_batch_size=config.eval_batch_size,
  num_train_epochs=config.epochs,
  weight_decay=config.weight_decay,
  eval_strategy='epoch', # 每個 epoch 評估一次
  save_strategy='epoch', # 每個 epoch 儲存一次
  load_best_model_at_end=True,
)

trainer = Trainer(
  model=model,
  args=training_args,
  train_dataset=tokenized_dataset['train'],
  eval_dataset=tokenized_dataset['test'],
  data_collator=data_collator,
  tokenizer=tokenizer,
  compute_metrics=compute_metrics,
)

In [None]:
# 開始訓練，這可能需要一些時間
trainer.train()

In [36]:
# 儲存模型
trainer.save_model(config.saved_model_path)

In [None]:
# 載入模型
classifier = pipeline(
  task="token-classification",
  model=config.saved_model_path,
  device=device,)

# 顯示預測結果
print(f'輸入: {test_text}')
show_prediction(test_text, classifier)