In [None]:
# 9-1
!mkdir chap9
%cd ./chap9

In [2]:
# 9-3
import random
from tqdm import tqdm
import unicodedata

import pandas as pd
import torch
from torch.utils.data import DataLoader
from transformers import BertJapaneseTokenizer, BertForMaskedLM
import pytorch_lightning as pl

# 日本語の事前学習済みモデル
MODEL_NAME = "tohoku-nlp/bert-base-japanese-whole-word-masking"

In [3]:
# 9-4
# =============================================================================
# SC_tokenizer（誤変換補正用トークナイザ）
# -----------------------------------------------------------------------------
# 目的（理論）：
# - 「誤変換を含む文」を入力 x とし、「正しい文」をラベル y（語彙上のトークンID列）として
#   学習することで、**各位置ごとに“正しいトークンID”を分類**するタスクへ還元する。
# - これは BERT をエンコーダとし、出力ヘッドが **語彙サイズ |V|** のクラス分類を
#   各タイムステップに対して行う設計（トークン分類）に相当する。
#   * 損失関数の典型：位置 t ごとに CrossEntropyLoss（教師は正解トークンID）。
#   * BERT の “Masked LM” と違い、ここでは**全面的に教師強制**で全位置を監督できる（PAD を除外するなら ignore_index を活用）。
# - この形式は **置換型の誤り**（打鍵ミス・漢字/かな揺れ・全半角揺れ等）に強いが、
#   **挿入/削除**や**語順入れ替え**などの編集距離を伴う変化は「位置合わせ」の前提を崩すため苦手。
#   必要に応じてアライメント戦略（例：差分アルゴリズムでの整列→同一長に拡張）やシーケンス生成系（seq2seq）を検討する。
# - 推論時は各位置の予測トークンIDから文字列を復号し、元文の空白を保持しつつ**スパン単位で置換**して復元する。
#   サブワード（WordPiece）の '##' 接頭辞は接続時に除去する。
#
# 実装上の注意：
# - encode_plus_tagged では、正解文の input_ids をそのまま labels に格納する。
#   学習ループ側で特殊トークンや PAD を損失に含めない場合、labels の該当位置を **-100** に置換しておく（ignore_index）。
# - encode_plus_untagged では、WordPiece の各サブワードを元文字列の部分スパンにマッピングする。
#   繰り返し部分や空白が多い場合、現在の **前方貪欲マッチ**は誤対応のリスクがある（必要なら正規表現や LCS によるロバスト化を検討）。
# - convert_bert_output_to_text では NFKC 正規化を適用している。
#   学習・推論で正規化を**一貫**させないと、スパンの境界と生成表記の齟齬を招く可能性がある。
# =============================================================================


class SC_tokenizer(BertJapaneseTokenizer):

    def encode_plus_tagged(self, wrong_text, correct_text, max_length=128):
        """
        ファインチューニング時に使用。
        誤変換を含む文章と正しい文章を入力とし、
        符号化を行いBERTに入力できる形式にする。

        理論メモ：
        - 入力 x = wrong_text のトークン列に対し、教師 y = correct_text のトークンID列を
          そのまま labels として与えることで、各位置の正解トークンを分類するタスクにする。
        - モデル側は「トークン分類ヘッド（出力次元＝語彙サイズ |V|）」を持ち、
          CrossEntropyLoss(logits_t, label_id_t) を位置 t で計算し、時系列平均を取るのが一般的。
        - PAD/CLS/SEP を損失から除外したい場合、labels の該当位置を -100 にする（ignore_index）。
        """
        # 誤変換した文章をトークン化し、符号化
        encoding = self(
            wrong_text, max_length=max_length, padding="max_length", truncation=True
        )
        # 正しい文章をトークン化し、符号化
        encoding_correct = self(
            correct_text, max_length=max_length, padding="max_length", truncation=True
        )
        # 正しい文章の符号をラベルとする
        # 注意：このままだと [CLS]/[SEP]/[PAD] も学習対象に含まれる。
        # もし PAD を損失から除きたいなら、学習側で labels を -100 に置換する処理を追加する。
        encoding["labels"] = encoding_correct["input_ids"]

        return encoding

    def encode_plus_untagged(self, text, max_length=None, return_tensors=None):
        """
        文章を符号化し、それぞれのトークンの文章中の位置も特定しておく。

        理論メモ：
        - 後段の復号（convert_bert_output_to_text）では、各サブワードが
          原文テキストのどのスパンに対応するかが必要になる。
        - ここでは形態素（MeCab）→サブワード（WordPiece）→原文スパンという対応を
          前方探索で求めている。繰り返しや空白が多い場合は曖昧性が残る点に注意。
        """
        # 文章のトークン化を行い、
        # それぞれのトークンと文章中の文字列を対応づける。
        tokens = []  # トークンを追加していく。
        tokens_original = []  # トークンに対応する文章中の文字列を追加していく。
        words = self.word_tokenizer.tokenize(text)  # MeCabで単語に分割
        for word in words:
            # 単語をサブワードに分割
            tokens_word = self.subword_tokenizer.tokenize(word)
            tokens.extend(tokens_word)
            if tokens_word[0] == "[UNK]":  # 未知語への対応
                tokens_original.append(word)
            else:
                tokens_original.extend(
                    [token.replace("##", "") for token in tokens_word]
                )

        # 各トークンの文章中での位置を調べる。（空白の位置を考慮する）
        # アルゴリズム：前方貪欲一致（最短一致ではなく、順次スライド）
        # 注意：同一サブ文字列が繰り返される場合に誤対応のリスクがある。
        position = 0
        spans = []  # トークンの位置を追加していく。
        for token in tokens_original:
            l = len(token)
            while 1:
                if token != text[position : position + l]:
                    position += 1
                else:
                    spans.append([position, position + l])
                    position += l
                    break

        # 符号化を行いBERTに入力できる形式にする。
        # prepare_for_model が [CLS]/[SEP] の付与と長さ正規化（padding/truncation）を行う。
        input_ids = self.convert_tokens_to_ids(tokens)
        encoding = self.prepare_for_model(
            input_ids,
            max_length=max_length,
            padding="max_length" if max_length else False,
            truncation=True if max_length else False,
        )
        sequence_length = len(encoding["input_ids"])
        # 特殊トークン[CLS]に対するダミーのspanを追加（-1 は「実体なし」の番兵）。
        spans = [[-1, -1]] + spans[: sequence_length - 2]
        # 特殊トークン[SEP]、[PAD]に対するダミーのspanを追加。
        spans = spans + [[-1, -1]] * (sequence_length - len(spans))

        # 必要に応じてtorch.Tensorにする。
        if return_tensors == "pt":
            encoding = {k: torch.tensor([v]) for k, v in encoding.items()}

        return encoding, spans

    def convert_bert_output_to_text(self, text, labels, spans):
        """
        推論時に使用。
        文章と、各トークンのラベルの予測値、文章中での位置を入力とする。
        そこから、BERTによって予測された文章に変換。

        理論メモ：
        - labels は「各位置の語彙ID（予測トークン）」と想定。すなわちモデル出力は
          各位置で |V| 分類を行うトークン分類ヘッドの argmax 等。
        - サブワード '##' は接続時に除去し、文字列として連結。
        - 本実装は NFKC 正規化を通すため、**学習・評価で NFKC を統一**しないと
          スパン境界と生成表記のずれが起き得る（評価の完全一致が過小になる可能性）。
        """
        assert len(spans) == len(labels)

        # labels, spansから特殊トークンに対応する部分を取り除く
        labels = [label for label, span in zip(labels, spans) if span[0] != -1]
        spans = [span for span in spans if span[0] != -1]

        # BERTが予測した文章を作成
        # 原文の空白や未対称な部分は、スパンの「穴」をそのまま複製して保持。
        predicted_text = ""
        position = 0
        for label, span in zip(labels, spans):
            start, end = span
            if position != start:  # 空白の処理（原文の未対応部分をそのままコピー）
                predicted_text += text[position:start]
            predicted_token = self.convert_ids_to_tokens(label)
            predicted_token = predicted_token.replace("##", "")
            predicted_token = unicodedata.normalize("NFKC", predicted_token)
            predicted_text += predicted_token
            position = end

        return predicted_text

In [4]:
# 9-5
tokenizer = SC_tokenizer.from_pretrained(MODEL_NAME)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'BertJapaneseTokenizer'. 
The class this function is called from is 'SC_tokenizer'.


In [5]:
# 9-6
# =============================================================================
# 目的（理論）：
# - 「誤変換を含む文（wrong_text）」をモデル入力とし、「正しい文（correct_text）」の
#   トークンID列を **教師ラベル（labels）** として与える最小例。
# - これにより各位置 t で 「正しいトークンID」を分類する **トークン分類問題** として学習できる。
#   * 推奨損失：CrossEntropyLoss（語彙サイズ |V| クラスの多クラス分類、PAD 等は ignore_index=-100 で除外）
# - 注意：max_length を超えると **切り詰め（truncation）** されるため、wrong と correct の整合が崩れる。
#   学習で両系列が等長にパディング・切り詰めされること（同じ max_length/手順）を必ず保証すること。
# - 出力の encoding には、wrong_text の BERT 入力（input_ids/attention_mask/token_type_ids）に加えて、
#   correct_text の input_ids が **encoding['labels']** として格納される実装（9-4 の SC_tokenizer 参照）。
# =============================================================================

wrong_text = "優勝トロフィーを変換した"  # 誤変換を含む入力（例：返還→変換）
correct_text = "優勝トロフィーを返還した"  # 正しい表記
encoding = tokenizer.encode_plus_tagged(wrong_text, correct_text, max_length=12)
print(encoding)
# 期待される中身（例）：
#  - 'input_ids'       : wrong_text をトークナイズ→ID化→[CLS]/[SEP] 付与→max_length へパディング/切り詰め
#  - 'token_type_ids'  : 単文なので通常は全 0
#  - 'attention_mask'  : 実トークン 1 / PAD 0
#  - 'labels'          : correct_text の input_ids（= 教師）。学習側で PAD/特殊トークンに -100 を適用推奨。
# 理論メモ：
#  - 本手法は「置換型誤り」には強い一方、挿入/削除や語順入れ替え等の編集距離を伴う誤りには弱い。
#    その場合は、アライメント（diff/LCS）で整列して等長化するか、seq2seq 系（エンコーダ–デコーダ）を検討する。

{'input_ids': [2, 759, 18204, 11, 4618, 15, 10, 3, 0, 0, 0, 0], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0], 'labels': [2, 759, 18204, 11, 8274, 15, 10, 3, 0, 0, 0, 0]}


In [6]:
# 9-7
# =============================================================================
# 目的（理論）：
# - 学習/推論時に、原文テキストの各サブワード（WordPiece）が「元テキスト中のどの位置（span）」に
#   対応するかを取得する最小例である。これにより、トークン分類の予測結果（各位置の語彙ID）を
#   文字列へ正確に復号（置換）できる。
#
# 背景：
# - encode_plus_untagged は、形態素分割（MeCab）→サブワード分割（WordPiece）の後、
#   各サブワードから '##' を除いた素片を原文に前方探索でアラインし、[start, end) の半開区間で
#   スパン列（spans）を構築する。
# - prepare_for_model により [CLS]/[SEP] の付与とパディング/切り詰めが行われるため、
#   それら特殊トークンや PAD に対応する spans は [-1, -1] の番兵を入れておく（「実体なし」の意味）。
#
# 出力の読み方：
# - encoding: dict（'input_ids', 'token_type_ids', 'attention_mask' など）。return_tensors='pt' のため
#   各値は形状 [1, T] の torch.Tensor（バッチ次元 1 を含む）。
# - spans: 長さ T のリスト。各要素は [start, end]（原文の0始まりインデックス）。
#   * special/PAD の位置は [-1, -1]。
#   * 先頭は [CLS] のため必ず [-1, -1]、終端側も [SEP] と PAD による [-1, -1] が並ぶ。
#
# 注意点：
# - 前方貪欲一致は、同一部分文字列が繰り返される文や空白が多い文で誤マッチのリスクがある。
#   必要に応じて LCS/編集距離によるアライメントでロバスト化を検討。
# - 学習・推論で NFKC 正規化ポリシーを統一しないと、スパン境界と生成表記の差異が生じ得る。
# =============================================================================

wrong_text = "優勝トロフィーを変換した"
encoding, spans = tokenizer.encode_plus_untagged(wrong_text, return_tensors="pt")
print("# encoding")
print(encoding)
print("# spans")
print(spans)

# encoding
{'input_ids': tensor([[    2,   759, 18204,    11,  4618,    15,    10,     3]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]])}
# spans
[[-1, -1], [0, 2], [2, 7], [7, 8], [8, 10], [10, 11], [11, 12], [-1, -1]]


In [7]:
# 9-8
# =============================================================================
# 目的（理論）：
# - 9-7 で得た spans（原文中のサブワード位置列）と、各位置の **予測トークンID列（predicted_labels）** を
#   用いて、SC_tokenizer.convert_bert_output_to_text により「正しい文章」を復元する最小例。
#
# 処理の流れ（convert_bert_output_to_text の要点）：
#  1) 事前条件：len(spans) == len(labels)（ここで labels=predicted_labels）。
#     - spans は [CLS], 本文サブワード, [SEP]（および PAD があれば PAD）に対応し、特殊/PAD は [-1, -1]。
#     - ラベル列も **同じ長さ**で、対応位置に語彙ID（token_id）を持つこと。
#  2) 特殊/PAD 位置の除去：
#     - span[0] == -1 の要素は「実体なし」として、labels/spans の両方から落とす。
#  3) サブワード復元：
#     - token_id → token へ変換し、WordPiece の接頭辞 '##' を除去して連結。
#     - 連結前に **NFKC 正規化**で全/半角・合成文字の揺れを矯正（学習/推論での正規化ポリシーは統一すること）。
#  4) 空白・非対象領域の保持：
#     - 連結中、原文の position と次のサブワード開始 start にギャップがあれば、原文の該当部分をそのまま複写。
#
# 理論メモ：
# - ここでの labels は「語彙サイズ |V| クラスの **トークン分類** の予測結果（argmax など）」を想定。
# - 位置合わせ（spans）は 9-7 の encode_plus_untagged で算出（MeCab→WordPiece→前方探索）。
#   反復文字列や空白が多い場合は前方貪欲一致が誤対応するリスクがあるため、大規模実運用では LCS/編集距離による
#   アライメントの頑健化を検討。
# - 予測列の長さが spans と一致しないと assert により失敗する。モデル出力 → ラベル化の段で
#   [CLS]/[SEP]/PAD を含んだ長さ合わせを行うこと。
# =============================================================================

predicted_labels = [
    2,
    759,
    18204,
    11,
    8274,
    15,
    10,
    3,
]  # 例：各位置の予測 token_id（語彙ID）
predicted_text = tokenizer.convert_bert_output_to_text(
    wrong_text, predicted_labels, spans
)
print(predicted_text)
# 出力想定：
# - モデルが '変換'→'返還' のような誤変換を修正できていれば、期待する正しい表記に復元される。
# - 語彙外/未知などで難しい場合は近傍のサブワード列に置換される可能性がある。

優勝トロフィーを返還した
