# 5章
- 以下で実行するコードには確率的な処理が含まれていることがあり、コードの出力結果と本書に記載されている出力例が異なることがあります。

In [1]:
# ライブラリインポート
!pip install transformers==4.5.0 fugashi==1.1.0 ipadic==1.0.0

You should consider upgrading via the '/Users/harukikondo/opt/anaconda3/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
# ライブラリのインポート
import numpy as np
import torch
from transformers import BertJapaneseTokenizer, BertForMaskedLM

In [3]:
# モデルの指定
model_name = 'cl-tohoku/bert-base-japanese-whole-word-masking'
# トークナイザを用意する。
tokenizer = BertJapaneseTokenizer.from_pretrained(model_name)
bert_mlm = BertForMaskedLM.from_pretrained(model_name)
bert_mlm = bert_mlm.cuda()

Some weights of the model checkpoint at cl-tohoku/bert-base-japanese-whole-word-masking were not used when initializing BertForMaskedLM: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


AssertionError: Torch not compiled with CUDA enabled

In [4]:
# 文章をトークン化して出力する。
text = '今日は[MASK]へ行く。'
tokens = tokenizer.tokenize(text)
print(tokens)

['今日', 'は', '[MASK]', 'へ', '行く', '。']


In [5]:
# 文章を符号化する。
input_ids = tokenizer.encode(text, return_tensors='pt')
# GPUに配置する。
input_ids = input_ids.cuda()

# BERTに入力し、分類スコアを得る。
# 系列長を揃える必要がないので、単にiput_idsのみを入力します。
with torch.no_grad():
    output = bert_mlm(input_ids=input_ids)
    scores = output.logits

AssertionError: Torch not compiled with CUDA enabled

In [None]:
# ID列で'[MASK]'(IDは4)の位置を調べる
mask_position = input_ids[0].tolist().index(4) 

# スコアが最も良いトークンのIDを取り出し、トークンに変換する。
id_best = scores[0, mask_position].argmax(-1).item()
# IDからトークンを取り出す。
token_best = tokenizer.convert_ids_to_tokens(id_best)
# ##を空欄に変換する。
token_best = token_best.replace('##', '')
# [MASK]を上で求めたトークンで置き換える。
text = text.replace('[MASK]',token_best)
# 出力する。
print(text)

In [None]:
# スコア上位10位のトークンを代入した際に文章を出力する関数
# text 対象文章
# tokenizer トークナイザー
# bert_mlm BERTモデル
# num_took 何位まで扱うかの数
def predict_mask_topk(text, tokenizer, bert_mlm, num_topk):
    """
    文章中の最初の[MASK]をスコアの上位のトークンに置き換える。
    上位何位まで使うかは、num_topkで指定。
    出力は穴埋めされた文章のリストと、置き換えられたトークンのスコアのリスト。
    """
    # 文章を符号化する。(テンソル化)
    input_ids = tokenizer.encode(text, return_tensors='pt')
    # GPUに配置する。
    input_ids = input_ids.cuda()
    with torch.no_grad():
        # BERTモデルに入力する。
        output = bert_mlm(input_ids=input_ids)
    # 分類スコアを取得する。(32,000の各トークンIDに対するスコアの1次配列)
    scores = output.logits

    # [MASK]の位置を調べる。
    mask_position = input_ids[0].tolist().index(4) 
    # もっともスコアの高いIDを取り出す。(もともと登録されている日本語の語彙は、32,000種類))
    topk = scores[0, mask_position].topk(num_topk)
    # トークンのID
    ids_topk = topk.indices 
    # IDからトークンの値を取得する。
    tokens_topk = tokenizer.convert_ids_to_tokens(ids_topk) 
    # トークンのスコアを取得する。
    scores_topk = topk.values.cpu().numpy() 

    # 穴埋めされたテキストを格納するための配列
    text_topk = [] 
    # 文章中の[MASK]を上で求めたトークンで置き換える。(num_topk回繰り返す。)
    for token in tokens_topk:
        # 「##」を空文字に置き換える。
        token = token.replace('##', '')
        # [MASK]文字を取り替える。
        text_topk.append(text.replace('[MASK]', token, 1))

    return text_topk, scores_topk

# 対象の文章
text = '今日は[MASK]へ行く。'
# 関数を呼び出す。
text_topk, _ = predict_mask_topk(text, tokenizer, bert_mlm, 10)
# 結果を出力する。　
print(*text_topk, sep='\n')

In [None]:
# 複数の[MASK]を穴埋めする場合の関数
def greedy_prediction(text, tokenizer, bert_mlm):
    """
    [MASK]を含む文章を入力として、貪欲法で穴埋めを行った文章を出力する。
    """
    # 前から順に[MASK]を一つづつ、スコアの最も高いトークンに置き換える。
    # この場合は、１つ目の[MASK]を穴埋めした文章に対してさらにもう一度同じ関数を呼び出す。
    for _ in range(text.count('[MASK]')):
        # predict_mask_topk関数を呼び出す。
        text = predict_mask_topk(text, tokenizer, bert_mlm, 1)[0][0]
    return text

# 対象の文章
text = '今日は[MASK][MASK]へ行く。'
# 関数の呼び出し
greedy_prediction(text, tokenizer, bert_mlm)

In [None]:
# 大部分の文章が[MASK]の場合
text = '今日は[MASK][MASK][MASK][MASK][MASK]'
# 関数を呼び出す。
greedy_prediction(text, tokenizer, bert_mlm)

In [None]:
# ビームサーチ法による文章の穴埋めを実行する関数
def beam_search(text, tokenizer, bert_mlm, num_topk):
    """
    ビームサーチで文章の穴埋めを行う。
    """
    num_mask = text.count('[MASK]')
    # 対象文章を配列に格納する。
    text_topk = [text]
    # スコアを格納する配列
    scores_topk = np.array([0])
    # [MASK]の数だけ繰り返す。
    for _ in range(num_mask):
        # 現在得られている、それぞれの文章に対して、
        # 最初の[MASK]をスコアが上位のトークンで穴埋めする。
        # それぞれの文章を穴埋めした結果を格納する配列
        text_candidates = [] 
        # 穴埋めに使ったトークンのスコアを格納する配列。
        score_candidates = []
        for text_mask, score in zip(text_topk, scores_topk):
            # predict_mask_topk関数の呼び出し
            text_topk_inner, scores_topk_inner = predict_mask_topk(
                text_mask, tokenizer, bert_mlm, num_topk
            )
            # 配列に格納する。
            text_candidates.extend(text_topk_inner)
            score_candidates.append( score + scores_topk_inner )

        # 穴埋めにより生成された文章の中から合計スコアの高いものを選ぶ。
        score_candidates = np.hstack(score_candidates)
        idx_list = score_candidates.argsort()[::-1][:num_topk]
        text_topk = [ text_candidates[idx] for idx in idx_list ]
        scores_topk = score_candidates[idx_list]

    return text_topk

# 対象文章
text = "今日は[MASK][MASK]へ行く。"
# beam_search関数の呼び出し
text_topk = beam_search(text, tokenizer, bert_mlm, 10)
print(*text_topk, sep='\n')

In [None]:
# 対象文章
text = '今日は[MASK][MASK][MASK][MASK][MASK]'
# beam_search関数の呼び出し
text_topk = beam_search(text, tokenizer, bert_mlm, 10)
print(*text_topk, sep='\n')