## ライブラリをインポート

In [None]:
# Google Colab で実行する場合は以下のライブラリをインストール
# !pip install fugashi ipadic

In [None]:
import os
import re
import unicodedata
import itertools
import pandas as pd
import random
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertJapaneseTokenizer, BertForTokenClassification

## パラメータの設定

In [None]:
SEED_VALUE = 42
MODEL_NAME = 'tohoku-nlp/bert-base-japanese-whole-word-masking'
MAX_LENGTH = 128
TRAIN_BATCH_SIZE = 16
TEST_BATCH_SIZE = 256
MAX_EPOCH = 5
LEARNING_RATE = 2e-5

In [None]:
def set_seed(seed=SEED_VALUE):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

# シード値の固定
set_seed()

## データセットのダウンロード

In [None]:
# データのダウンロード
if not os.path.exists('ner-wikipedia-dataset'):
    !git clone --branch v2.0 https://github.com/stockmarkteam/ner-wikipedia-dataset

# データのロード
df = pd.read_json('ner-wikipedia-dataset/ner.json')

## 前処理

In [None]:
# 固有表現タイプの辞書
id_dict = {'人名': 1,
           '法人名': 2,
           '政治的組織名': 3,
           'その他の組織名': 4,
           '地名': 5,
           '施設名': 6,
           '製品名': 7,
           'イベント名': 8}

# idからtypeを取得する関数
def get_type_from_id(id):
    keys = [key for key, value in id_dict.items() if value == id]
    if keys:
        return keys[0]
    return None

In [None]:
def preprocess(data):
  # ｱｲｳ → アイウ, ＡＢＣ → ABC, １２３ → 123
  data['text'] = unicodedata.normalize('NFKC', data['text'])

  # typeを対応するtype_idに変換
  for entity in data['entities']:
    entity['type_id'] = id_dict[entity['type']]

  return data

df = df.apply(preprocess, axis=1)
df.head()

In [None]:
# データセットの分割 (学習:検証:テスト = 6:2:2)
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, )
train_df, valid_df = train_test_split(train_df, test_size=0.25, random_state=42)

# index降り直し
train_df.reset_index(drop=True, inplace=True)
valid_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

# サイズ表示
print(f'学習データ数   : {len(train_df)}')
print(f'検証データ数   : {len(valid_df)}')
print(f'テストデータ数 : {len(test_df)}')

## トークナイザの準備

In [None]:
'''固有表現抽出に適応したBertJapaneseTokenizerを拡張したトークナイザ
'''
class ExtensionTokenizer(BertJapaneseTokenizer):

    # 学習時に用いるラベル付きエンコーダ
    def tagged_encode_plus(self, text, entity_list, max_length):
        '''[Step1] 固有表現かそれ以外かで分割
        '''
        entity_list = sorted(entity_list, key=lambda x: x['span'][0]) # 固有表現の位置の昇順でソート

        data_splitted = [] # 分割後の文字列格納用
        head = 0           # 文字列の先頭のindex

        for entity in entity_list:
            # 次に出現する固有表現の先頭・末尾・IDを取得
            entity_head = entity['span'][0]
            entity_tail = entity['span'][1]
            label = entity['type_id']

            # 固有表現にID、固有表現以外に'0'をラベルとして付与
            data_splitted.append({'text': text[head:entity_head], 'label':0})
            data_splitted.append({'text': text[entity_head:entity_tail], 'label':label})

            head = entity_tail  # 先頭indexを更新

        # 最後の固有表現以降のtextに'0'をラベルとしてを付与
        data_splitted.append({'text': text[head:], 'label':0})

        # head = entity_startの時、{'text': '', 'label': 0}となってしまうため、textが空の要素を削除
        data_splitted = [ s for s in data_splitted if s['text'] ]

        '''[Step2] トークナイザを用い、分割された文字列をトークン化・ラベル付与
        '''
        tokens = []
        labels = []

        for s in data_splitted:
            text_splitted = s['text']
            label_splitted = s['label']

            tokens_splitted = self.tokenize(text_splitted)        # トークン化
            labels_splitted = [label_splitted] * len(tokens_splitted)  # 各トークンにラベル付与

            tokens.extend(tokens_splitted)  # トークンを結合
            labels.extend(labels_splitted)  # ラベルを結合

        '''[Step3] BERTに入力可能な形式に符号化
        '''
        encoding = self.encode_plus(tokens,
                                    max_length=max_length,
                                    padding='max_length',
                                    truncation=True,
                                    return_tensors='pt')

        # トークン[CLS]、[SEP]に'0'ラベルとして付与
        labels = [0] + labels[:max_length-2] + [0]
        # トークン[PAD]に'0'をラベルとして付与
        labels = labels + [0]*( max_length - len(labels) )

        encoding['input_ids'] = encoding['input_ids'][0]
        encoding['attention_mask'] = encoding['attention_mask'][0]
        encoding['token_type_ids'] = encoding['token_type_ids'][0]
        encoding['labels'] = torch.tensor([labels])[0]
        return encoding

    # テスト時に用いるencordingとspansを返すエンコーダ
    def untagged_encode_plus(self, text, max_length):
        '''[Step1] BERTに入力可能な形式に符号化
        '''
        encoding = self.encode_plus(text=text,
                                    max_length=max_length,
                                    padding='max_length',
                                    truncation=True,
                                    return_tensors = 'pt')

        encoding['input_ids'] = encoding['input_ids'][0]
        encoding['token_type_ids'] = encoding['token_type_ids'][0]
        encoding['attention_mask'] = encoding['attention_mask'][0]

        '''[Step2]各トークンのスパンを格納
        '''
        spans = []

        tokens = self.convert_ids_to_tokens(encoding['input_ids'])
        head = 0

        for token in tokens:
            # '##'は文字数にカウントしないので読み飛ばす
            token = token.replace('##','')

            # スペシャルトークンの場合はダミーとしてspanを[-1, -1]とする
            if token == '[PAD]':
                spans.append([-1, -1])
            elif token == '[UNK]':
                spans.append([-1, -1])
            elif token == '[CLS]':
                spans.append([-1, -1])
            elif token == '[SEP]':
                spans.append([-1, -1])

            # text中からtokenをを探索し，開始位置 + 文字列長をspanとする
            # トークンが見つかるまでスペースを読み飛ばす
            else:
                length = len(token)
                while 1:
                    if token == text[head:head+length]:
                        spans.append([head, head+length])
                        head += length
                        break

                    head += 1

        spans = torch.tensor(spans)
        return encoding, spans

In [None]:
# 拡張したトークナイザをロード
tokenizer = ExtensionTokenizer.from_pretrained(MODEL_NAME)

## データセット・データローダーの作成

In [None]:
class TrainDataset(Dataset):
    def __init__(self, texts, entity_lists, tokenizer, max_length):
        self.texts = texts
        self.entity_lists = entity_lists
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):
        text = self.texts[index]
        entity_list = self.entity_lists[index]

        encoding = self.tokenizer.tagged_encode_plus(text, entity_list, self.max_length)
        input_ids = encoding['input_ids']
        attention_mask = encoding['attention_mask']
        labels = encoding['labels']

        return text, input_ids, attention_mask, labels

In [None]:
class TestDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):
        text = self.texts[index]

        encoding, spans = self.tokenizer.untagged_encode_plus(text, self.max_length)
        input_ids = encoding['input_ids']
        attention_mask = encoding['attention_mask']

        return  text, input_ids, attention_mask, spans

In [None]:
# データセットの作成
train_dataset = TrainDataset(texts = train_df['text'], entity_lists = train_df['entities'], tokenizer = tokenizer, max_length = MAX_LENGTH)
valid_dataset = TrainDataset(texts = valid_df['text'], entity_lists = valid_df['entities'], tokenizer = tokenizer, max_length = MAX_LENGTH)
test_dataset = TestDataset(texts = test_df['text'], tokenizer = tokenizer, max_length = MAX_LENGTH)

# データローダの作成
train_dataloader = DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True, pin_memory=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=TEST_BATCH_SIZE, shuffle=True, pin_memory=True)
test_dataloader = DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=False, pin_memory=True)

## 事前学習モデル

###モデルの準備
BERTをロードし、概要を確認する

In [None]:
# 学習済みモデルをロード
model = BertForTokenClassification.from_pretrained(MODEL_NAME, num_labels=9)

print(f'\nmodelのパラメータを確認:\n{model.get_parameter}')

### 推論

In [None]:
'''文字列の符号化、BERTによる推論、BERTの出力をentitiesに変換する関数
'''
def predict(test_dataloader, model):
    # モデルをGPUまたはCPUに乗せる
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    # 予測
    model.eval()
    entity_lists = []
    with torch.inference_mode():
        for batch_text, batch_input_ids, batch_attention_mask, batch_spans in tqdm(test_dataloader):
            batch_input_ids = batch_input_ids.to(device)
            batch_attention_mask = batch_attention_mask.to(device)
            output = model(input_ids = batch_input_ids, attention_mask = batch_attention_mask)

            for logits, text, spans in zip(output.logits, batch_text, batch_spans):
                # 最も高い確率のクラスを予測ラベルとする
                labels = [logit.argmax(-1).cpu().item() for logit in logits]

                # スペシャルトークンを削除
                labels = [label for label, span in zip(labels, spans) if span[0] != -1]
                spans = [span for span in spans if span[0] != -1]

                # 同じラベルが連続するトークンをまとめる
                entity_list = []
                label_head = 0  # 連続するラベルの先頭
                for label, group in itertools.groupby(labels):
                    label_tail = label_head + len(list(group)) - 1 # 連続するラベルの末尾

                    # 予測固有表現をentitiesに格納
                    head = spans[label_head][0].item()
                    tail = spans[label_tail][1].item()
                    if label != 0:
                        entity = {'name': text[head:tail],
                                  'span': [head, tail],
                                  'type_id': label}

                        entity_list.append(entity)

                    label_head = label_tail + 1

                entity_lists.append(entity_list)

    return entity_lists

In [None]:
pred = predict(test_dataloader, model)

In [None]:
def convert_type_ids_to_text(entity_list):
    return [{'name': entity['name'], 'span': entity['span'], 'entity_type': get_type_from_id(entity['type_id'])} for entity in entity_list]

# 結果をランダムに確認
for i in range(5):
    index = random.randint(0, len(test_dataset) - 1)

    print(f'テキスト　　 : {test_df["text"][index]}')
    print(f'正解固有表現 : {convert_type_ids_to_text(test_df["entities"][index])}')
    print(f'予測固有表現 : {convert_type_ids_to_text(pred[index])}\n')

### 性能評価

In [None]:
'''適合率、再現率、F値を計算し、モデルを評価する関数
'''
def evaluate(dataset, entities_list, predicted_entities_list, type_id=None):
    entities_count = 0            # 正解固有表現の個数
    predicted_entities_count = 0  # 予測固有表現の個数
    correct_count = 0             # 予測固有表現うち正解の個数

    for entities, predicted_entities in zip(entities_list, predicted_entities_list):

        # 引数type_idが指定された場合、そのクラスの固有表現のみを抽出
        if type_id:
            entities = [ entity for entity in entities if entity['type_id'] == type_id ]
            predicted_entities = [ entity for entity in predicted_entities if entity['type_id'] == type_id ]

        # 重複固有表現をset型に変換
        get_span_type = lambda entity: (entity['span'][0], entity['span'][1], entity['type_id'])
        set_entities = set( get_span_type(entity) for entity in entities )
        set_entities_predicted = set( get_span_type(entity) for entity in predicted_entities )

        # 各個数を更新
        entities_count += len(entities)
        predicted_entities_count += len(predicted_entities)
        correct_count += len( set_entities & set_entities_predicted )

    precision = correct_count / predicted_entities_count    # 適合率
    recall = correct_count / entities_count                 # 再現率
    if(precision + recall != 0):
        f_value = 2 * precision*recall / (precision + recall) # F値
    else:
        f_value = -1

    result = {'正解の固有表現の数': entities_count,
              'AIが予測した固有表現の数': predicted_entities_count,
              '正解数': correct_count,
              '適合率': precision,
              '再現率': recall,
              'F1スコア': f_value}

    return result

In [None]:
evaluation_df = pd.DataFrame()

# 各クラスの予測性能を評価
for key, value in id_dict.items():
    evaluation = evaluate(test_df, test_df['entities'], pred, type_id=value)
    evaluation_df[key] = evaluation.values()  # 各列に評価結果を格納

# 全クラスの予測性能を評価
evaluation_all = evaluate(test_df, test_df['entities'], pred, type_id=None)
evaluation_df['ALL'] = evaluation_all.values()  #　全クラスの結果を末尾の列に格納

# 行名を設定
evaluation_df.index = evaluation_all.keys()

evaluation_df

## ファインチューニング

### 学習

In [None]:
'''モデルをファインチューニングする関数
'''
def train(model, train_dataloader, valid_dataloader, optimizer, max_epoch):

    # モデルをGPUまたはCPUに乗せる
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    print(f'使用デバイス：{device}')

    # ネットワークがある程度固定であれば、高速化させる
    torch.backends.cudnn.benchmark = True

    train_average_loss_list = []
    val_average_loss_list = []
    history = {}

    # epochのループ
    for epoch in range(max_epoch):
        print(f'\nepoch [{epoch+1}/{max_epoch}]')

        '''[Step1]学習
        '''
        model.train()
        sum_loss = 0.0

        # ミニバッチを取り出す
        for batch_text, batch_input_ids, batch_attention_mask, batch_labels in tqdm(train_dataloader):
            batch_input_ids = batch_input_ids.to(device)
            batch_attention_mask = batch_attention_mask.to(device)
            batch_labels = batch_labels.to(device)

            optimizer.zero_grad() # optimizerを初期化

            loss, logits = model(input_ids = batch_input_ids,
                                 token_type_ids = None,
                                 attention_mask = batch_attention_mask,
                                 labels = batch_labels,
                                 return_dict = False)

            loss.backward() # 逆伝搬
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 勾配クリッピング
            optimizer.step()  # 最適化

            # 1エポックの損失の和を更新
            sum_loss += loss.item()

        # 1エポックの平均損失を記録
        average_loss = sum_loss / len(train_dataloader)
        train_average_loss_list.append(average_loss)

        '''[Step2]検証
        '''
        model.eval()
        sum_loss = 0.0

        # ミニバッチを取り出す
        with torch.inference_mode():
            for batch_text, batch_input_ids, batch_attention_mask, batch_labels in (valid_dataloader):
                batch_input_ids = batch_input_ids.to(device)
                batch_attention_mask = batch_attention_mask.to(device)
                batch_labels = batch_labels.to(device)

                loss, logits = model(input_ids = batch_input_ids,
                                    token_type_ids = None,
                                    attention_mask = batch_attention_mask,
                                    labels = batch_labels,
                                    return_dict = False)

                # 1エポックの損失の和を更新
                sum_loss += loss.item()

            # 1エポックの平均損失を記録
            average_loss = sum_loss / len(valid_dataloader)
            val_average_loss_list.append(average_loss)

        print(f'train_loss: {train_average_loss_list[epoch]:.4f}, val_loss: {val_average_loss_list[epoch]:.4f}')

    history['train_loss'] = train_average_loss_list
    history['val_loss'] = val_average_loss_list

    return model, history

In [None]:
# 最適化器としてAdamを使用
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

# ファインチューニング
finetuned_model, history = train(model = model,
                                 train_dataloader = train_dataloader,
                                 valid_dataloader = valid_dataloader,
                                 optimizer = optimizer,
                                 max_epoch=MAX_EPOCH)

In [None]:
# 学習曲線の表示
plt.figure(figsize=(4,3))
plt.plot(history['train_loss'],label='train', c='b')
plt.plot(history['val_loss'],label='val', c='r')
plt.title('loss')
plt.xticks(size=14)
plt.yticks(size=14)
plt.grid(lw=2)
plt.legend(fontsize=14)
plt.show()

### 推論

In [None]:
pred_by_finetuned_model = predict(test_dataloader, finetuned_model)

In [None]:
# 結果をランダムに確認
for i in range(5):
    index = random.randint(0, len(test_dataset) - 1)

    print(f'テキスト　　 : {test_df["text"][index]}')
    print(f'正解固有表現 : {convert_type_ids_to_text(test_df["entities"][index])}')
    print(f'予測固有表現 : {convert_type_ids_to_text(pred_by_finetuned_model[index])}\n')

### 性能評価

In [None]:
evaluation_df = pd.DataFrame()

# 各クラスの予測性能を評価
for key, value in id_dict.items():
    evaluation = evaluate(test_df, test_df['entities'], pred_by_finetuned_model, type_id=value)
    evaluation_df[key] = evaluation.values()  # 各列に評価結果を格納

# 全クラスの予測性能を評価
evaluation_all = evaluate(test_df, test_df['entities'], pred_by_finetuned_model, type_id=None)
evaluation_df['ALL'] = evaluation_all.values()  #　全クラスの結果を末尾の列に格納

# 行名を設定
evaluation_df.index = evaluation_all.keys()

evaluation_df

## 銀河鉄道の夜の固有表現抽出

In [None]:
# 青空文庫から小説をダウンロード
if not os.path.exists('456_ruby_145.zip'):
    !wget https://www.aozora.gr.jp/cards/000081/files/456_ruby_145.zip
    !unzip 456_ruby_145.zip

In [None]:
# ダウンロードしたtxtを確認
with open('gingatetsudono_yoru.txt', mode='r', encoding='shift_jis') as f:
    text = f.read()

print(text)

In [None]:
#前処理
# ヘッダとフッタの削除
text = re.split(r'\-{5,}',text)[2]
text = re.split(r'底本：', text)[0]
text = text.strip() # 連続する改行文字の削除

text = re.sub(r'《.+?》', '', text)     # ルビを削除
text =text.replace('｜', '')            # ルビの付を削除
text = re.sub(r'［＃.+?］', '', text)   # 入力者注を削除

text = unicodedata.normalize('NFKC', text)

print(text)

In [None]:
# データセットの作成
texts = text.split('\n')
novel_dataset = TestDataset(texts = texts, tokenizer = tokenizer, max_length = MAX_LENGTH)

# データローダの作成
novel_dataloader = DataLoader(novel_dataset, batch_size=TEST_BATCH_SIZE, shuffle=False, pin_memory=True)

In [None]:
pred_for_novel = predict(novel_dataloader, finetuned_model)

In [None]:
# 各文の固有表現を取り出し
entity_dict = {}
for entity_list in pred_for_novel:
    # 固有表現がない場合はスキップ
    if len(entity_list) == 0:
        continue

    # 固有表現を振り分け
    for entity in entity_list:
        entity_type = get_type_from_id(entity['type_id'])
        if entity_type not in entity_dict:
            entity_dict[entity_type] = set()
        entity_dict[entity_type].add(entity['name'])

for key, value in entity_dict.items():
    print(f'■{key}')
    print(sorted(value), '\n')