# Login & install

In [None]:
from huggingface_hub import notebook_login

notebook_login()


In [None]:
!pip install transformers
!pip install datasets
!pip install seqeval
!pip install transformers[torch]
!pip install accelerate -U
!pip install evaluate

# download dataset & model

In [None]:
import os
import numpy as np
import pandas as pd
from collections import defaultdict

from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from transformers import DataCollatorForTokenClassification
from transformers import EarlyStoppingCallback
from datasets import load_dataset
import evaluate

## 下載資料集並切分

In [None]:
#%% 讀入 hugging face dataset
data = load_dataset("jamie613/custom_NER", data_files = 'train_data_sample.json')
#print(data)

# 切分為 train & test
data = data['train'].train_test_split(test_size = 0.1)
train = data['train']
test = data['test']
print(train)
print(test)

## 計算標籤個數

In [None]:
train_dict = defaultdict(int)
test_dict = defaultdict(int)

# 只計算 B 的標籤
for label in train['labels']:
  for l in label:
    if l.startswith('B'):
      train_dict[l] += 1

for label in test['labels']:
  for l in label:
    if l.startswith('B'):
      test_dict[l] += 1

df = pd.DataFrame([train_dict, test_dict], index=['train', 'test'])
print(df)

## 製作 label2id & id2label

In [None]:
#%% labels 轉 ner_tags
def label_2_id(example):
  labels = example['labels']
  return {'ner_tags' : [label2id[label] for label in labels]}

label2id = {'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8,
            'B-PERF' : 9, 'I-PERF' : 10, 'B-COMP' : 11, 'I-COMP' : 12, 'B-INST' : 13, 'I-INST' : 14,
            'B-OTH' : 15, 'I-OTH' : 16, 'B-MUSIC' : 17, 'I-MUSIC' : 18, 'B-OTHP' : 19, 'I-OTHP' : 20}
id2label = {v: k for k, v in label2id.items()}

train = train.map(label_2_id)
test = test.map(label_2_id)
#print(train)
#print(train[0])

## 下載預訓練模型 & 分詞器；對齊分詞後的 text 和 ner_tags


In [None]:
#%% 對齊分詞後的 text 和 ner_tags
# tokenizer 會把 word 拆成 subword
# 所以需要對齊
model_name = "bert-base-multilingual-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def align_toeknized_text_and_tags(tokenized_inputs, target_text, original_tags):
    align_tags = []
    text_idx = 0
    token_idx = 0

    # 到 [SEP] 前，不然 target_text 會超過
    while token_idx < (len(tokenized_inputs) - 1):
        token = tokenized_inputs[token_idx]

        # 句子頭
        if token == '[CLS]':
            align_tags.append(-100)
            token_idx += 1
            continue

        # [SEP]、[PAD] 直接全部標 -100
        if token == '[SEP]':
          align_tags = align_tags[:512] + [-100] *(512 - len(align_tags) - 1)
          token_idx = len(tokenized_inputs)
          continue

        # 這時候再抓 target，不然進到 [SEP] 和 [PAD] 後會報錯
        target = target_text[text_idx]

        # 比對 token 與 text
        #print('current token_idx: ', token_idx)
        #print('token: ', token)
        #print('current text_idx: ', text_idx)
        #print('target text: ', target)
        #print()

        # 若 token 與目前的 text 完全相符，則直接將 text 對應的 tag 貼上
        # 若 token 為 [UNK] 則直接把 text 對應的 tag 貼上
        if token == target or token == '[UNK]':
            align_tags.append(original_tags[text_idx])
            token_idx += 1

        # 有 subword 或是標點符號被分離的狀況
        else:
            # 本字的標籤 (O or B-)
            subword_token = original_tags[text_idx]

            # target 中所有的字母都對上後， text_idx 才 +1
            while target:
                #print('init target', target)
                # 只跳 token_idx 所以要再讀一次 token
                token = tokenized_inputs[token_idx]
                #print('token: ', token)
                # 移除 subword 前的 ##
                token = token.replace('##', '')
                # 移除以比對上的 subword
                target = target.replace(token, '', 1)
                #print('update target: ', target)

                # 標籤
                if subword_token == 0:
                    align_tags.append(0)

                elif subword_token % 2 != 0:
                    align_tags.append(subword_token)
                    subword_token += 1
                else:
                    align_tags.append(subword_token)

                token_idx += 1
        text_idx += 1

    # 補上 [SEP]
    align_tags.append(-100)

    return align_tags

def tokenize_text(examples):
    tokenized_inputs = tokenizer(examples['text'], padding = 'max_length') # 會變成 input_ids, token_type_ids, attention_mask
    #print(tokenized_inputs)
    #print(tokenized_inputs.tokens())

    target_text = examples['text'].split() # 1. 不會拆成兩個，所以對照時會對不上
    original_tags = examples['ner_tags']

    if len(target_text) > 512:
        target_text = target_text[:513]
        original_tags = original_tags[:513]

    align_tags = align_toeknized_text_and_tags(tokenized_inputs.tokens(), target_text, original_tags)

    tokenized_inputs['labels'] = align_tags
    return tokenized_inputs

tokenized_train = train.map(tokenize_text) # 此時 text 和 ner_tags 不等長
tokenized_train = tokenized_train.remove_columns(['ner_tags', 'text'])
tokenized_test = test.map(tokenize_text)
#print(tokenized_train)
#print(tokenized_test)

# 模型訓練

## 評估指標

In [None]:
#%% metric 設定
metric = evaluate.load('seqeval')

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis = -1)

    # 移除 special tokens
    predictions = [[p for (p, l) in zip(prediction, label) if l != -100]
                   for prediction, label in zip(predictions, labels)]
    labels = [[l for l in  label if l != -100] for label in labels]
    labels = [[id2label[l] for l in label] for label in labels]
    predictions = [[id2label[p] for p in prediction] for prediction in predictions]

    all_metrics = metric.compute(references= labels, predictions= predictions,
                                 mode = 'strict', scheme = 'IOB2', zero_division = 0)
    #print(all_metrics)

    return {
        'PERF_p' : all_metrics['PERF']['precision'],  # 判斷為 PERF 的，有多少真的是 PERF
        'PERF_r' : all_metrics['PERF']['recall'], # 全部的 PERF，有多少正確被判斷
        'INST_p' : all_metrics['INST']['precision'],
        'INST_r' : all_metrics['INST']['recall'],
        'COMP_p' : all_metrics['COMP']['precision'],
        'COMP_r' : all_metrics['COMP']['recall'],
        'precision' : all_metrics['overall_precision'],
        'recall' : all_metrics['overall_recall'],
        'f1' : all_metrics['overall_f1'],
        'accuracy' : all_metrics['overall_accuracy']
        }


## 訓練設定

In [None]:
#%% training
commit_message = 'dataset=155, epochs=50, batch_size=1, early_stopping=eval_f1'
output_name = 'custom_BERT_NER'
num_epochs = 50
batch_size = 1
logging_steps = len(tokenized_train) // batch_size

data_collator = DataCollatorForTokenClassification(tokenizer = tokenizer)

model = AutoModelForTokenClassification.from_pretrained(
    model_name,
    num_labels = len(id2label),
    id2label = id2label,
    label2id = label2id,
    ignore_mismatched_sizes=True  # set to True to use custom labels
    )

args = TrainingArguments(
    output_dir = output_name,
    per_device_train_batch_size = batch_size,
    per_device_eval_batch_size = batch_size,
    evaluation_strategy = 'epoch',
    save_strategy = 'epoch',
    save_total_limit = 2,
    learning_rate = 2e-5,
    num_train_epochs = num_epochs,
    weight_decay = 0.01,
    logging_steps = logging_steps,
    metric_for_best_model = 'eval_f1',
    greater_is_better = True,
    load_best_model_at_end = True,
    push_to_hub = True
    #push_to_hub = False
    )

trainer = Trainer(
    model = model,
    args = args,
    train_dataset = tokenized_train,
    eval_dataset = tokenized_test,
    data_collator = data_collator,
    compute_metrics = compute_metrics,
    tokenizer = tokenizer,
    callbacks = [EarlyStoppingCallback(early_stopping_patience = 3)]
    )

trainer.train()
trainer.push_to_hub(commit_message = commit_message)


# 預測

In [None]:
# 預測
from transformers import pipeline
import torch
import pandas as pd
import re
import unicodedata

def cleaning_text(text):
    return re.sub(r'[\u3000\xa0\n\t]', ' ', text)

full_width = str.maketrans('''
                           ＡＢＣＤＥＦＧＨＩＪＫＬＭＮＯＰＱＲＳＴＵＶＷＸＹＺ
                           ａｂｃｄｅｆｇｈｉｊｋｌｍｎｏｐｑｒｓｔｕｖｗｘｙｚ
                           １２３４５６７８９０
                           ''',
                           '''
                           ABCEDFGHIJKLMNOPQRSTUVWXYZ
                           abcdefghijklmnopqrstuvwxyz
                           1234567890
                           ''')

## 預測文字

In [None]:
text_1 ='''
"nicht Bach, sondern Meer."
「他的名字不應該是小溪，應該是大海。」
貝多芬曾如此感嘆。巴赫的作品，精巧複雜，為歷代音樂人必須下苦功探索的功課。鋼琴博士林聖縈每隔幾年就會演出郭德堡變奏曲，作為對人生階段的回顧。上回演出為2018年，之後世界經歷翻天覆地的變化，因此2023年獨奏會，將再度演出郭德堡變奏曲全曲，檢視五年來的轉變，搭配第五號法國組曲與D小調夏康舞曲，自精巧架構的作品中，流瀉繽紛多彩的樂音。
'''

text_2 = '''
位在伊比利半島上的西班牙，擁有獨特的文化傳承，吸引著世界各地的人們。然而在長途旅行不甚便利的19、20世紀交際，作曲家僅能依靠小說、遊記、書信，甚至其他人的藝術作品，來認識西班牙風情，進而譜寫以西班牙為主題的「伊比利風味」作品。男低音羅俊穎挑選法、俄作曲家隔空描繪西班牙風景、舞蹈和人們的作品，對照西班牙作曲家法雅以西班牙傳統歌謠寫成、原汁原味的《七首西班牙民歌》，想像與真實各有獨特魅力！
男低音 / 羅俊穎
鋼　琴 / 翁重華
2024年1月19日（五）晚 於臺中國家歌劇小劇場 500
2023年12月21日（四）晚 於國家兩廳院演奏廳 400 / 600 / 800
購票請洽：https://bit.ly/Lo_Spain 02-3393-9888及FamilyMart全家便利商店、7-ELEVEN超商。
兩廳院會員9折；臺中國家歌劇院會員9折、忘我會員75折；學生8折；身障及必要陪同者一名、65歲以上長者5折。
'''

text_3 = '''
演出曲目：
Georges Bizet 比才
　1. Le Matin 早晨
　2. Pastorale 田園
　3. Serenade 夜曲
Jules Massenet 馬斯奈
"Nuit d'Espagne" 西班牙之夜
Maurice Ravel 拉威爾
Pièce en forme de habanera 哈巴奈拉風的練習曲
Maurice Ravel 拉威爾
"Don Quichotte à Dulcinée" 唐吉軻德致杜辛妮
　1. Chanson romanesque 浪漫曲
　2. Chanson épique 敘事詩
　3. Chanson à boire 飲酒歌
Mikhail Ivanovich Glinka 葛令卡
　1. Ночной зефир 夜的微風
　2. Я здесь, Инезилья 我在這兒！伊內齊拉
　3. Болеро 波麗路舞曲
Pyotr Ilyich Tchaikovsky 柴可夫斯基
"Серенада Дон Жуана" 唐璜小夜曲
Dmitri Shostakovich 蕭士塔高維契
"Испанские песни, op. 100" 西班牙歌曲，作品100
　1. Прощай, Гренада ! 再會！格拉納達
　2. Звёздочки 小星星
　3. Первая встреча 初相見
　4. Ронда 隆達
　5. Черноокая 黑眼睛的姑娘
　6. Сон ( Баркарола) 夢（船歌）
Manuel de Falla 法雅
"Siete canciones populares Españolas" 七首西班牙民歌
　1. El paño moruno 摩爾人的布
　2. Seguidilla murciana 莫夕亞地區的賽圭第亞舞曲
　3. Asturiana 阿斯圖里亞那民謠
　4. Jota 荷他舞曲
　5. Nana 搖籃曲
　6. Canción 歌謠
　7. Polo 舞曲

主辦單位：Legato樂聚
贊助單位：國藝會
'''

text_4 = '''
女高音林小美受教於王大明、陳白白，林小美曾與吉他孫欣欣一起灌錄威爾第的作品「四季」，專輯獲金曲獎肯定。
也曾在伯恩斯坦的指揮下，演出威爾第鋼琴協奏曲。
本次將演出比才歌劇《卡門》中的〈飲酒歌〉。卡門強烈的性格，讓這齣歌劇頗受觀眾喜愛。
'''

## 預測結果

In [None]:
#text = text_1 # 郭德堡
#text = text_2 # 伊比利
#text = text_3 # 伊比利曲目
text = text_4

# 文字整理
text = cleaning_text(text)
# 把全形數字、英文字母轉為半形
text = text.translate(full_width)

# 訓練後直接預測
# 沒有重新讀入模型和分詞器
inputs = tokenizer(text, return_tensors = 'pt')
inputs = inputs.to('cuda') # 如果不是從 huggingface 下載模型

# 如果要從 huggingface 下載模型
#model_name = 'jamie613/custom_BERT_NER'
#tokenizer = AutoTokenizer.from_pretrained(model_name)
#nlp = pipeline('ner', model = model_name, tokenizer = tokenizer, grouped_entities = True, device = 0)

with torch.no_grad():
    logits = model(**inputs).logits
predictions = torch.argmax(logits, axis = -1).tolist() # 包含頭尾
#print([[id2label[p] for p in predict] for predict in predictions])

tokens = inputs['input_ids'].tolist()[0]
for i in range(0, len(tokens), 10):
  substring = []
  for j in range(10):
    substring.append(tokenizer.decode(tokens[min((i + j), len(tokens) - 1)]))
  sub_pred = [id2label[p] for p in predictions[0][i : i + 10]]
  df = pd.DataFrame([substring, sub_pred])
  print(df.to_markdown())
  print()


In [None]:
# 用 pipeline 預測
nlp = pipeline('ner', model = model, tokenizer = tokenizer, grouped_entities = True, device = 0)
#print(nlp(text))

res = nlp(text)
#print(res)

ner = tokenizer.decode(tokenizer(text)['input_ids']).split()
# 去除頭尾的 [CLS] 和 [SEP]
# 原本句子中的空格會消失
ner = ner[1 : -1]
ner_tag = ['O'] * len(ner)
#print(ner_tag)

# 比對目前 entity_group 中的 word 是 tokenizer.decode()
text_idx = 0
ner_idx = 0

ner_length = len(ner)

for i in range(len(res)):
    left = res[i]['start']
    right = res[i]['end']

    while text_idx < right:
        #print('current ner:', ner[ner_idx], ner_idx)
        #print('current text: ', text[text_idx : text_idx + 3], text_idx)
        while text[text_idx] == ' ':
            text_idx += 1
        if text_idx == left:
            ner_tag[ner_idx] = 'B-' + res[i]['entity_group']
            #print(text[text_idx], 'tag: ', ner_tag[ner_idx])
        if text_idx > left and text_idx < right:
            ner_tag[ner_idx] = 'I-' + res[i]['entity_group']
            #print(text[text_idx], 'tag: ', ner_tag[ner_idx])
        text_idx += len(ner[ner_idx])
        ner_idx += 1
        #print()

df = pd.DataFrame([ner, ner_tag])

for i in range(0, len(ner_tag), 10):
    print(df.iloc[:, np.r_[i : min(i+10, len(ner_tag))]].to_markdown())
    print()

# Confusion Matrix

In [None]:
from transformers import pipeline
import torch
import pandas as pd

# 從 huggingface 下載模型
cm_model_name = 'jamie613/custom_BERT_NER'
tokenizer = AutoTokenizer.from_pretrained(cm_model_name)
model = AutoModelForTokenClassification.from_pretrained(cm_model_name)
nlp = pipeline("ner", model=model, tokenizer=tokenizer, grouped_entities = True)

label2id = {'O': 0, 'B-PER': 1, 'I-PER': 2, 'B-ORG': 3, 'I-ORG': 4, 'B-LOC': 5, 'I-LOC': 6, 'B-MISC': 7, 'I-MISC': 8,
            'B-PERF' : 9, 'I-PERF' : 10, 'B-COMP' : 11, 'I-COMP' : 12, 'B-INST' : 13, 'I-INST' : 14,
            'B-OTH' : 15, 'I-OTH' : 16, 'B-MUSIC' : 17, 'I-MUSIC' : 18, 'B-OTHP' : 19, 'I-OTHP' : 20}
id2label = {v: k for k, v in label2id.items()}

In [None]:
def test_predictions(text):
  inputs = tokenizer(text, return_tensors = 'pt')
  with torch.no_grad():
      logits = model(**inputs).logits
  predictions = torch.argmax(logits, axis = -1).tolist() # 包含頭尾
  pred_tags = [id2label[p] for p in predictions[0]][1 : -1]
  # 將分詞後的 input_ids 取出，將 tesnor 轉為 list，刪除 [CLS]、[SEP]
  inputs = inputs['input_ids'].tolist()[0][1:-1]
  # input_ids 轉回 tokens；subword 前有 ##
  inputs = tokenizer.convert_ids_to_tokens(inputs)

  # 還原文句、對齊標籤
  text = []
  ner = []
  for i in range(len(pred_tags)):
      if not inputs[i].startswith('##'):
          text.append(inputs[i])
          ner.append(pred_tags[i])
      # subword 刪掉前面的 ## 後，併到前面的字
      else:
          text[-1] += inputs[i].replace('##', '')

  return ner

In [None]:
actual_labels = []
pred_labels = []
for i in range(len(test)):
  pred_labels.extend(test_predictions(test[i]['text']))
  actual_labels.extend(test[i]['labels'])


In [None]:
from sklearn import metrics

cm = metrics.confusion_matrix(actual_labels, pred_labels,
                              labels = ['B-PERF', 'I-PERF',
                                        'B-COMP', 'I-COMP',
                                        'B-INST', 'I-INST',
                                        'B-MUSIC', 'I-MUSIC',
                                        'B-PER', 'I-PER',
                                        'B-OTH', 'I-OTH',
                                        'B-OTHP', 'I-OTHP',
                                        'B-ORG', 'I-ORG',
                                        'B-LOC', 'I-LOC',
                                        'B-MISC', 'I-MISC',
                                        'O'])

cm = pd.DataFrame(cm, index = ['B-PERF', 'I-PERF',
                               'B-COMP', 'I-COMP',
                               'B-INST', 'I-INST',
                               'B-MUSIC', 'I-MUSIC',
                               'B-PER', 'I-PER',
                               'B-OTH', 'I-OTH',
                               'B-OTHP', 'I-OTHP',
                               'B-ORG', 'I-ORG',
                               'B-LOC', 'I-LOC',
                               'B-MISC', 'I-MISC',
                               'O'],
                  columns = ['B-PERF', 'I-PERF',
                             'B-COMP', 'I-COMP',
                             'B-INST', 'I-INST',
                             'B-MUSIC', 'I-MUSIC',
                             'B-PER', 'I-PER',
                             'B-OTH', 'I-OTH',
                             'B-OTHP', 'I-OTHP',
                             'B-ORG', 'I-ORG',
                             'B-LOC', 'I-LOC',
                             'B-MISC', 'I-MISC',
                             'O'])

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# 刪除 'O' 的行、列
cm_sub = cm.iloc[:-1, :-1]

#製作遮罩
mask_df = pd.DataFrame(np.diag(np.full(cm_sub.shape[0], True)))

for i in range(len(mask_df)):
  for j in range(len(mask_df)):
    if cm_sub.iloc[i][j] == 0:
      mask_df.iloc[i][j] = True

ax = sns.heatmap(cm_sub, cmap = 'Blues', vmin = 0,
                 linewidth = 0.5, linecolor = 'gray',
                 annot = True, mask = mask_df.to_numpy())
ax.plot(ax.get_xlim(), ax.get_xlim(), linestyle = '-.')
plt.show()

In [None]:
test_df = test.to_pandas()
test_df.to_csv('test_df.csv', encoding = 'utf-8-sig', index = False)