# 9章

- 本章で用いる「[日本語Wikipedia入力誤りデータセット](https://nlp.ist.i.kyoto-u.ac.jp/?%E6%97%A5%E6%9C%AC%E8%AA%9EWikipedia%E5%85%A5%E5%8A%9B%E8%AA%A4%E3%82%8A%E3%83%87%E3%83%BC%E3%82%BF%E3%82%BB%E3%83%83%E3%83%88)」は現在バージョン2が公開されていますが、本章では[バージョン1](https://nlp.ist.i.kyoto-u.ac.jp/?%E6%97%A5%E6%9C%AC%E8%AA%9EWikipedia%E5%85%A5%E5%8A%9B%E8%AA%A4%E3%82%8A%E3%83%87%E3%83%BC%E3%82%BF%E3%82%BB%E3%83%83%E3%83%88v1)を用いています。

本章では、文章校正タスクのうち漢字の誤変換タスクを扱う。BERTでは、文章校正をトークンの分類問題として扱うことで実装できる。<br>
正しいトークンについては同じトークンを、間違っていると考えられるトークンにはBERTの語彙の中から正しいと予測したトークンを返す。

|誤変換|正しい文章|
|:-|:-|
|優勝|優勝|
|トロフィー|トロフィー|
|を|を|
|変換|返還|
|し|し|
|た|た|
|。|。|

一方で、以下のようにこの方法では扱えない文章が存在する。

|誤変換|正しい文章|
|:-|:-|
|投|当初|
|##書|は|
|は|、|
|、|実行|
|実行|を|

本章では、上記のような文章を取り扱わないこととする。

In [5]:
# 9-3

import os
import random
import unicodedata
import pandas as pd
from tqdm import tqdm
import pprint

import torch
from torch.utils.data import DataLoader
from transformers import BertJapaneseTokenizer, BertForMaskedLM
import pytorch_lightning as pl

トークナイザを定義する。

In [20]:
# 9-4

class SC_tokenizer(BertJapaneseTokenizer):
    def encode_plus_tagged(
        self,
        wrong_text: str,
        correct_text: str,
        max_length: int = 128,
    ) -> dict:
        """
        ファインチューニング用
        誤変換を含む文章と正しい文章を渡し符号化し、
        誤変換文章のlabelsを正しい文章のinput_idsに置き換える
        """
        # 文章から直接符号化
        # tokenizer = BertJapaneseTokenizer.from_pretrained(model_name)で
        # インスタンス化したtokinzer(text)関数と同等
        wrong_encoding = self(
            wrong_text, max_length=max_length, padding='max_length', truncation=True
        )
        correct_encoding = self(
            correct_text, max_length=max_length, padding='max_length', truncation=True
        )
        # 正しい文章の符号をラベルとする
        wrong_encoding['labels'] = correct_encoding['input_ids']

        return wrong_encoding
    
    def encode_plus_untagged(
        self,
        text: str,
        max_length: int = None,
        return_tensors: str = 'pt',
    ) -> (dict, list):
        """
        文章をトークン化し、それぞれのトークンと文章中の文字列を対応付ける
        推論時にトークンごとのラベルを予測し、最終的に固有表現に変換する
        未知語や文章中の空白(MeCabにより消去される)に対しての処理が必要となる
        そのため、各トークンが元の文章のどの位置にあったかを特定しておく
        """
        words = self.word_tokenizer.tokenize(text) # MeCabで単語に分割
        tokens = []
        tokens_original = []
        # 単語をサブワードに分割してlistに格納
        for word in words:
            tokens_subword = self.subword_tokenizer.tokenize(word)
            tokens.extend(tokens_subword)
            # 未知語対応
            if tokens_subword[0] == '[UNK]':
                tokens_original.append(word)
            else:
                tokens_original.extend([token.replace("##", "") for token in tokens_subword])
        
        # トークンが文章中のどの位置にあるかを走査する
        position = 0
        spans = []
        for token in tokens_original:
            length = len(token)
            while True:
                if token != text[position: position + length]:
                    position += 1
                else:
                    spans.append([position, position + length])
                    position += length
                    break
        
        # トークンをID化する
        input_ids = self.convert_tokens_to_ids(tokens)
        # トークンIDを符号化する
        encoding = self.prepare_for_model(
            input_ids, 
            max_length=max_length,
            padding = 'max_length' if max_length else False,
            truncation = True if max_length else False,
        )
        sequence_length = len(encoding['input_ids']) # 符号化した文章の長さ
        # 先頭トークン[CLS]用のspanを追加する
        # このとき、次の[SEP]トークンを一緒に削除しておく
        spans = [[-1, -1]] + spans[:sequence_length - 2]
        # 末尾トークン[SEP]、末尾の空トークン[PAD]用のspanを追加する
        spans = spans + [[-1, -1]] * (sequence_length - len(spans))

        # 引数に応じてtorch.Tensor型に返還
        if return_tensors == 'pt':
            encoding = {key: torch.tensor(value) for key, value in encoding.items()}
        
        return (encoding, spans)
    
    def convert_output_to_text(
        self, 
        text: str,
        labels_arg: list,
        spans_arg: list,
    ) -> str:
        """
        文章、各トークンのラベルの予測値、文章中での位置から
        予測された文章に変換する
        """
        # 文章の長さチェック
        assert len(labels_arg) == len(spans_arg)

        # 特殊トークンを取り除く
        labels = []
        spans = []
        for label, span in zip(labels_arg, spans_arg):
            if span[0] != -1:
                labels.append(label)
                spans.append(span)
        
        # モデルが予測した文章を生成する
        text_pred = ""
        position = 0
        for label, span in zip(labels, spans):
            start, end = span
            # 空白文字の処理
            if position != start:
                text_pred += text[position: start]
            token_pred = self.convert_ids_to_tokens(label) # labelをトークン化
            token_pred = token_pred.replace("##", "") # サブワードの##を削除
            token_pred = unicodedata.normalize('NFKC', token_pred) # 文字列の正規化
            text_pred += token_pred
            position = end
        return text_pred

定義したトークナイザの動きを確認する。

モデルとトークナイザの呼び出し

In [21]:
# 9-5

model_name = 'cl-tohoku/bert-base-japanese-whole-word-masking'
tokenizer = SC_tokenizer.from_pretrained(model_name)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'BertJapaneseTokenizer'. 
The class this function is called from is 'SC_tokenizer'.


`encode_plus_tagged()`メソッドは、誤変換した文章と正しい文章が渡されたら、誤変換した文章を符号化し`labels`に正しい文章のトークンIDを付与する。

In [14]:
# 9-6

wrong_text = "優勝トロフィーを変換した"
correct_text = "優勝トロフィーを返還した"

encoding = tokenizer.encode_plus_tagged(
    wrong_text, correct_text, max_length=12,
)

In [15]:
pprint.pprint(encoding)

{'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
 'input_ids': [2, 759, 18204, 11, 4618, 15, 10, 3, 0, 0, 0, 0],
 'labels': [2, 759, 18204, 11, 8274, 15, 10, 3, 0, 0, 0, 0],
 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]}


`encode_plus_untagged()`メソッドは、文章を符号化し、空白や未知語を考慮した上でそれぞれのトークンの位置を返す。

In [16]:
# 9-7

wrong_text = "優勝トロフィーを変換した"
encoding, spans = tokenizer.encode_plus_untagged(
    wrong_text, return_tensors='pt'
)

In [17]:
encoding

{'input_ids': tensor([    2,   759, 18204,    11,  4618,    15,    10,     3]),
 'token_type_ids': tensor([0, 0, 0, 0, 0, 0, 0, 0]),
 'attention_mask': tensor([1, 1, 1, 1, 1, 1, 1, 1])}

In [18]:
spans

[[-1, -1], [0, 2], [2, 7], [7, 8], [8, 10], [10, 11], [11, 12], [-1, -1]]

`convert_output_to_text()`関数は、文章とラベル列、各トークンの文章中の位置から予測した文章を出力する。

In [22]:
# 9-8

labels_pred = [2, 759, 18204, 11, 8274, 15, 10, 3]
text_pred = tokenizer.convert_output_to_text(
    wrong_text, labels_pred, spans
)

In [23]:
text_pred

'優勝トロフィーを返還した'

## 9.2 BERTにおける実装

In [None]:
# 9-9
bert_mlm = BertForMaskedLM.from_pretrained(MODEL_NAME)
bert_mlm = bert_mlm.cuda()

In [None]:
# 9-10
text = '優勝トロフィーを変換した。'

# 符号化とともに各トークンの文章中の位置を計算しておく。
encoding, spans = tokenizer.encode_plus_untagged(
    text, return_tensors='pt'
)
encoding = { k: v.cuda() for k, v in encoding.items() }

# BERTに入力し、トークン毎にスコアの最も高いトークンのIDを予測値とする。
with torch.no_grad():
    output = bert_mlm(**encoding)
    scores = output.logits
    labels_predicted = scores[0].argmax(-1).cpu().numpy().tolist()
    
# ラベル列を文章に変換
predict_text = tokenizer.convert_bert_output_to_text(
    text, labels_predicted, spans
)

In [None]:
# 9-11
data = [
    {
        'wrong_text': '優勝トロフィーを変換した。',
        'correct_text': '優勝トロフィーを返還した。',
    },
    {
        'wrong_text': '人と森は強制している。',
        'correct_text': '人と森は共生している。',
    }
]

# 各データを符号化し、データローダへ入力できるようにする。
max_length=32
dataset_for_loader = []
for sample in data:
    wrong_text = sample['wrong_text']
    correct_text = sample['correct_text']
    encoding = tokenizer.encode_plus_tagged(
        wrong_text, correct_text, max_length=max_length
    )
    encoding = { k: torch.tensor(v) for k, v in encoding.items() }
    dataset_for_loader.append(encoding)

# データローダを作成
dataloader = DataLoader(dataset_for_loader, batch_size=2)

# ミニバッチをBERTへ入力し、損失を計算。
for batch in dataloader:
    encoding = { k: v.cuda() for k, v in batch.items() }
    output = bert_mlm(**encoding)
    loss = output.loss # 損失

In [None]:
# 9-2

os.makedirs("/data/chapter9")

In [None]:
# 9-12
!curl -L "https://nlp.ist.i.kyoto-u.ac.jp/DLcounter/lime.cgi?down=https://nlp.ist.i.kyoto-u.ac.jp/nl-resource/JWTD/jwtd.tar.gz&name=JWTD.tar.gz" -o JWTD.tar.gz
!tar zxvf JWTD.tar.gz

In [None]:
# 9-13
def create_dataset(data_df):

    tokenizer = SC_tokenizer.from_pretrained(MODEL_NAME)

    def check_token_count(row):
        """
        誤変換の文章と正しい文章でトークンに対応がつくかどうかを判定。
        （条件は上の文章を参照）
        """
        wrong_text_tokens = tokenizer.tokenize(row['wrong_text'])
        correct_text_tokens = tokenizer.tokenize(row['correct_text'])
        if len(wrong_text_tokens) != len(correct_text_tokens):
            return False
        
        diff_count = 0
        threthold_count = 2
        for wrong_text_token, correct_text_token \
            in zip(wrong_text_tokens, correct_text_tokens):

            if wrong_text_token != correct_text_token:
                diff_count += 1
                if diff_count > threthold_count:
                    return False
        return True

    def normalize(text):
        """
        文字列の正規化
        """
        text = text.strip()
        text = unicodedata.normalize('NFKC', text)
        return text

    # 漢字の誤変換のデータのみを抜き出す。
    category_type = 'kanji-conversion'
    data_df.query('category == @category_type', inplace=True) 
    data_df.rename(
        columns={'pre_text': 'wrong_text', 'post_text': 'correct_text'}, 
        inplace=True
    )
    
    # 誤変換と正しい文章をそれぞれ正規化し、
    # それらの間でトークン列に対応がつくもののみを抜き出す。
    data_df['wrong_text'] = data_df['wrong_text'].map(normalize) 
    data_df['correct_text'] = data_df['correct_text'].map(normalize)
    kanji_conversion_num = len(data_df)
    data_df = data_df[data_df.apply(check_token_count, axis=1)]
    same_tokens_count_num = len(data_df)
    print(
        f'- 漢字誤変換の総数：{kanji_conversion_num}',
        f'- トークンの対応関係のつく文章の総数: {same_tokens_count_num}',
        f'  (全体の{same_tokens_count_num/kanji_conversion_num*100:.0f}%)',
        sep = '\n'
    )
    return data_df[['wrong_text', 'correct_text']].to_dict(orient='records')

# データのロード
train_df = pd.read_json(
    './jwtd/train.jsonl', orient='records', lines=True
)
test_df = pd.read_json(
    './jwtd/test.jsonl', orient='records', lines=True
)

# 学習用と検証用データ
print('学習と検証用のデータセット：')
dataset = create_dataset(train_df)
random.shuffle(dataset)
n = len(dataset)
n_train = int(n*0.8)
dataset_train = dataset[:n_train]
dataset_val = dataset[n_train:]

# テストデータ
print('テスト用のデータセット：')
dataset_test = create_dataset(test_df)

In [None]:
# 9-14
def create_dataset_for_loader(tokenizer, dataset, max_length):
    """
    データセットをデータローダに入力可能な形式にする。
    """
    dataset_for_loader = []
    for sample in tqdm(dataset):
        wrong_text = sample['wrong_text']
        correct_text = sample['correct_text']
        encoding = tokenizer.encode_plus_tagged(
            wrong_text, correct_text, max_length=max_length
        )
        encoding = { k: torch.tensor(v) for k, v in encoding.items() }
        dataset_for_loader.append(encoding)
    return dataset_for_loader

tokenizer = SC_tokenizer.from_pretrained(MODEL_NAME)

# データセットの作成
max_length = 32
dataset_train_for_loader = create_dataset_for_loader(
    tokenizer, dataset_train, max_length
)
dataset_val_for_loader = create_dataset_for_loader(
    tokenizer, dataset_val, max_length
)

# データローダの作成
dataloader_train = DataLoader(
    dataset_train_for_loader, batch_size=32, shuffle=True
)
dataloader_val = DataLoader(dataset_val_for_loader, batch_size=256)

In [None]:
# 9-15
class BertForMaskedLM_pl(pl.LightningModule):
        
    def __init__(self, model_name, lr):
        super().__init__()
        self.save_hyperparameters()
        self.bert_mlm = BertForMaskedLM.from_pretrained(model_name)
        
    def training_step(self, batch, batch_idx):
        output = self.bert_mlm(**batch)
        loss = output.loss
        self.log('train_loss', loss)
        return loss
        
    def validation_step(self, batch, batch_idx):
        output = self.bert_mlm(**batch)
        val_loss = output.loss
        self.log('val_loss', val_loss)
   
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

checkpoint = pl.callbacks.ModelCheckpoint(
    monitor='val_loss',
    mode='min',
    save_top_k=1,
    save_weights_only=True,
    dirpath='model/'
)

trainer = pl.Trainer(
    gpus=1,
    max_epochs=5,
    callbacks=[checkpoint]
)

# ファインチューニング
model = BertForMaskedLM_pl(MODEL_NAME, lr=1e-5)
trainer.fit(model, dataloader_train, dataloader_val)
best_model_path = checkpoint.best_model_path

In [None]:
# 9-16
def predict(text, tokenizer, bert_mlm):
    """
    文章を入力として受け、BERTが予測した文章を出力
    """
    # 符号化
    encoding, spans = tokenizer.encode_plus_untagged(
        text, return_tensors='pt'
    ) 
    encoding = { k: v.cuda() for k, v in encoding.items() }

    # ラベルの予測値の計算
    with torch.no_grad():
        output = bert_mlm(**encoding)
        scores = output.logits
        labels_predicted = scores[0].argmax(-1).cpu().numpy().tolist()

    # ラベル列を文章に変換
    predict_text = tokenizer.convert_bert_output_to_text(
        text, labels_predicted, spans
    )

    return predict_text

# いくつかの例に対してBERTによる文章校正を行ってみる。
text_list = [
    'ユーザーの試行に合わせた楽曲を配信する。',
    'メールに明日の会議の史料を添付した。',
    '乳酸菌で牛乳を発行するとヨーグルトができる。',
    '突然、子供が帰省を発した。'
]

# トークナイザ、ファインチューニング済みのモデルのロード
tokenizer = SC_tokenizer.from_pretrained(MODEL_NAME)
model = BertForMaskedLM_pl.load_from_checkpoint(best_model_path)
bert_mlm = model.bert_mlm.cuda()

for text in text_list:
    predict_text = predict(text, tokenizer, bert_mlm) # BERTによる予測
    print('---')
    print(f'入力：{text}')
    print(f'出力：{predict_text}')

In [None]:
# 9-17
# BERTで予測を行い、正解数を数える。
correct_num = 0 
for sample in tqdm(dataset_test):
    wrong_text = sample['wrong_text']
    correct_text = sample['correct_text']
    predict_text = predict(wrong_text, tokenizer, bert_mlm) # BERT予測
   
    if correct_text == predict_text: # 正解の場合
        correct_num += 1

print(f'Accuracy: {correct_num/len(dataset_test):.2f}')

In [None]:
# 9-18
correct_position_num = 0 # 正しく誤変換の漢字を特定できたデータの数
for sample in tqdm(dataset_test):
    wrong_text = sample['wrong_text']
    correct_text = sample['correct_text']
    
    # 符号化
    encoding = tokenizer(wrong_text)
    wrong_input_ids = encoding['input_ids'] # 誤変換の文の符合列
    encoding = {k: torch.tensor([v]).cuda() for k,v in encoding.items()}
    correct_encoding = tokenizer(correct_text)
    correct_input_ids = correct_encoding['input_ids'] # 正しい文の符合列
    
    # 文章を予測
    with torch.no_grad():
        output = bert_mlm(**encoding)
        scores = output.logits
        # 予測された文章のトークンのID
        predict_input_ids = scores[0].argmax(-1).cpu().numpy().tolist() 

    # 特殊トークンを取り除く
    wrong_input_ids = wrong_input_ids[1:-1]
    correct_input_ids =  correct_input_ids[1:-1]
    predict_input_ids =  predict_input_ids[1:-1]
    
    # 誤変換した漢字を特定できているかを判定
    # 符合列を比較する。
    detect_flag = True
    for wrong_token, correct_token, predict_token \
        in zip(wrong_input_ids, correct_input_ids, predict_input_ids):

        if wrong_token == correct_token: # 正しいトークン
            # 正しいトークンなのに誤って別のトークンに変換している場合
            if wrong_token != predict_token: 
                detect_flag = False
                break
        else: # 誤変換のトークン
            # 誤変換のトークンなのに、そのままにしている場合
            if wrong_token == predict_token: 
                detect_flag = False
                break

    if detect_flag: # 誤変換の漢字の位置を正しく特定できた場合
        correct_position_num += 1
        
print(f'Accuracy: {correct_position_num/len(dataset_test):.2f}')