<a href="https://colab.research.google.com/github/coraldx5/generativeai_intro_book/blob/master/chap05_LLM_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 第5章：言語モデルを動かしてみよう
- ① 穴埋め問題を解くMLM
- ② 次のトークンを予測するCLM

## MLM（Masked Language Model）を動かてみる
- 穴埋め問題を解いてみましょう

### 穴埋め問題を解く準備

In [None]:
!pip install fugashi==1.3.2 ipadic==1.0.0 transformers==4.42.4  torch==2.3.1+cu121
import transformers, torch
from transformers import BertJapaneseTokenizer, BertForMaskedLM, pipeline

# トークナイザと訓練済みモデルの読み込み
# 'cl-tohoku/bert-base-japanese-whole-word-masking' という事前学習済みの日本語BERTモデルを使用します。
model = BertForMaskedLM.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')

# 事前学習済みモデルに対応するトークナイザーをロードします。
# BertJapaneseTokenizerは文章をトークン（モデルが理解できる単位）に変換し、逆にトークンから文章に変換する役割を持ちます。
tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')

# パイプラインの定義
# 'fill-mask' タスクのパイプラインを作成します。これは文章の中の [MASK] トークンを予測するためのものです。
fill_mask = pipeline('fill-mask',
                     model=model,        # 言語モデルの指定
                     tokenizer=tokenizer, # トークナイザの指定
                     top_k=6              # 表示する候補数の指定
                    )

# 結果を表示する関数の定義
# 文章を入力として、[MASK] トークンの候補とその確率を表示する関数を定義します。
def predictmask(text):
    print('---' * 10)
    print(f'元の文章：「{text}」')
    print(f'[MASK]部の候補：')
    for res in fill_mask(text):
        print(f"{res['score']:.4f}: {res['token_str']}")

### 穴埋め問題を解く

In [None]:
# 例１
predictmask('サングラスをかけた[MASK]が公園を駆け回る。')
# 例２
predictmask('サングラスをかけた[MASK]を食べるのが楽しみだ')
# 例３
predictmask('生卵をかけた[MASK]を食べるのが楽しみだ')

In [None]:
# @title ## 参考UI：穴埋め問題を解くサンプルコード
# @markdown 入力欄に、[MASK]を含む文章を入力してください。
check_sentence = "サングラスをかけた[MASK]が公園を駆け回る。" # @param {type:"string"}
predictmask(check_sentence)

# CLM（Causal Language Model）を動かしてみよう
- 次のトークンを予測してみよう

### 文章生成の準備

In [None]:
from transformers import GPT2LMHeadModel, T5Tokenizer

# モデルとトークナイザーのロード
# 'rinna/japanese-gpt2-medium' という事前学習済みの日本語GPT-2モデルを使用します。モデルのサイズは約1.37GBです。
model_name = 'rinna/japanese-gpt2-medium'  # モデルの名前を指定

# 事前学習済みのGPT-2モデルをロードします。GPT2LMHeadModelは文章生成タスクに使用されるモデルです。
model = GPT2LMHeadModel.from_pretrained(model_name)

# 事前学習済みモデルに対応するトークナイザーをロードします。
# T5Tokenizerは文章をトークン（モデルが理解できる単位）に変換し、逆にトークンから文章に変換する役割を持ちます。
tokenizer = T5Tokenizer.from_pretrained(model_name)

# 結果を表示する関数の定義
def generate_text(input_text, max_length):
    # 入力文章をトークン化
    input_ids = tokenizer.encode(input_text, return_tensors='pt')

    # Attention maskの設定
    attention_mask = torch.ones(input_ids.shape, dtype=torch.long)

    # 文章生成
    output = model.generate(
        input_ids,
        attention_mask=attention_mask,  # Attention maskを指定
        max_length=max_length,  # 生成する最大トークン数
        # no_repeat_ngram_size=2,  # 繰り返しを防ぐn-gramのサイズ
        pad_token_id=tokenizer.pad_token_id,  # パディングのトークンID
        bos_token_id=tokenizer.bos_token_id,  # 文章先頭のトークンID
        eos_token_id=tokenizer.eos_token_id,  # 文章終端のトークンID
    )

    # 生成された文章のデコード
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

    return generated_text

In [None]:
# 関数の使用例
input_text = "これから雨が降りそうなので、"  # 入力文章
max_length = 50  # 生成する最大トークン数

# 生成された文章を表示
generated_text = generate_text(input_text, max_length)
print(generated_text)

In [None]:
# @title ## 参考UI：後続文章を生成するサンプルコード
# @markdown 文章を入力して下さい
input_text = "これから雨が降りそうなので、" # @param {type:"string"}
# @markdown 生成する最大トークン数
max_length = 50 # @param {type:"integer"}
generate_text(input_text, max_length)

## 各トークンの予測確率を可視化してみよう

### 貪欲探索の挙動を確認する

In [None]:
# モデルを評価モードに設定
model.eval()

# 入力文章をトークン化
input_ids = tokenizer.encode(input_text, return_tensors='pt')

# Attention maskの設定
attention_mask = torch.ones(input_ids.shape, dtype=torch.long)

# 生成されたトークンとその確率を順次表示
max_length = 3  # 生成する最大トークン数
for _ in range(max_length):
    # トークンの予測確率を取得
    with torch.no_grad():
        outputs = model(input_ids)
        predictions = outputs.logits

    # 次のトークンの予測確率を計算
    next_token_probs = torch.softmax(predictions[:, -1, :], dim=-1)

    # 上位3つのトークンを取得
    top_k = 3
    top_k_probs, top_k_indices = torch.topk(next_token_probs, top_k)

    # 上位3つのトークンとその確率を表示
    print(f"\n({_+1}番目) 上位3つのトークンと、確率：")
    for i in range(top_k):
        predicted_token_id = top_k_indices[0, i].item()
        predicted_token = tokenizer.decode([predicted_token_id])
        predicted_prob = top_k_probs[0, i].item()
        print(f"Token: {predicted_token}({predicted_token_id}), Probability: {predicted_prob:.4f}")

    # 最も確率の高いトークンを入力トークンに追加
    input_ids = torch.cat((input_ids, top_k_indices[:, 0].unsqueeze(-1)), dim=1)

    # 予測が終了トークンに到達した場合は終了
    if top_k_indices[0, 0].item() == tokenizer.eos_token_id:
        break

# 生成された全文章を表示
generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True)
print("---"*10)
print("生成された文章:")
print(generated_text)

### さまざまな探索手法を体験する

In [None]:
# モデルとトークナイザーのロード
model_name = 'rinna/japanese-gpt2-medium'
model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = T5Tokenizer.from_pretrained(model_name)

# 文章生成の設定
input_text = "これから雨が降りそうなので、"  # 入力文章
max_length = 40  # 生成する最大トークン数

# トークン化
input_ids = tokenizer.encode(input_text, return_tensors='pt')

# 文章生成のパラメータを辞書で設定
# max_length: 生成する最大トークン数
# pad_token_id: パディングのトークンID
# bos_token_id: 文章先頭のトークンID
# eos_token_id: 文章終端のトークンID
prm = {
    "max_length": max_length,
    "pad_token_id": tokenizer.pad_token_id,
    "bos_token_id": tokenizer.bos_token_id,
    "eos_token_id": tokenizer.eos_token_id,
}

# Greedy探索
# Greedy探索は、各ステップで最も確率の高いトークンを選びます
greedy_output = model.generate(input_ids, **prm)
print("Greedy:", tokenizer.decode(greedy_output[0], skip_special_tokens=True))

# Beam探索
# Beam探索は、複数の候補（ビーム）を同時に探索し、最も良い結果を選びます
# num_beams: ビームの数
# early_stopping: 生成の早期終了を行うかどうか
beam_output = model.generate(input_ids, num_beams=3, early_stopping=True, **prm)
print("Beam:", tokenizer.decode(beam_output[0], skip_special_tokens=True))

# Top-kサンプリング
# Top-kサンプリングは、上位k個のトークンからランダムに選択します
# do_sample: サンプリングを行うかどうか
# top_k: 選択する上位トークンの数
top_k_output = model.generate(input_ids, do_sample=True, top_k=50, **prm)
print("Top-k Sampling:", tokenizer.decode(top_k_output[0], skip_special_tokens=True))

# Top-pサンプリング
# Top-pサンプリングは、確率の高いトークンの集合からランダムに選択します
# do_sample: サンプリングを行うかどうか
# top_p: 累積確率がpを超えるまでのトークンを選択する閾値
top_p_output = model.generate(input_ids, do_sample=True, top_p=0.95, **prm)
print("Top-p Sampling:", tokenizer.decode(top_p_output[0], skip_special_tokens=True))

In [None]:
# @title ## 参考UI：様々な探索法で文章を生成するサンプルコード
# @markdown 文章を入力して下さい
# 文章生成の設定
input_text = "これから雨が降りそうなので、" # @param {type:"string"}
# @markdown 生成する最大トークン数
max_length = 50 # @param {type:"integer"}

# トークン化
# 入力文章をモデルが理解できるトークンに変換します
input_ids = tokenizer.encode(input_text, return_tensors='pt')

# 文章生成のパラメータを辞書で設定
# max_length: 生成する最大トークン数
# pad_token_id: パディングのトークンID
# bos_token_id: 文章先頭のトークンID
# eos_token_id: 文章終端のトークンID
prm = {
    "max_length": max_length,
    "pad_token_id": tokenizer.pad_token_id,
    "bos_token_id": tokenizer.bos_token_id,
    "eos_token_id": tokenizer.eos_token_id,
}

# Greedy探索
# Greedy探索は、各ステップで最も確率の高いトークンを選びます
greedy_output = model.generate(input_ids, **prm)
print("Greedy:", tokenizer.decode(greedy_output[0], skip_special_tokens=True))

# Beam探索
# Beam探索は、複数の候補（ビーム）を同時に探索し、最も良い結果を選びます
# num_beams: ビームの数
# early_stopping: 生成の早期終了を行うかどうか
beam_output = model.generate(input_ids, num_beams=3, early_stopping=True, **prm)
print("Beam:", tokenizer.decode(beam_output[0], skip_special_tokens=True))

# Top-kサンプリング
# Top-kサンプリングは、上位k個のトークンからランダムに選択します
# do_sample: サンプリングを行うかどうか
# top_k: 選択する上位トークンの数
top_k_output = model.generate(input_ids, do_sample=True, top_k=50, **prm)
print("Top-k Sampling:", tokenizer.decode(top_k_output[0], skip_special_tokens=True))

# Top-pサンプリング
# Top-pサンプリングは、確率の高いトークンの集合からランダムに選択します
# do_sample: サンプリングを行うかどうか
# top_p: 累積確率がpを超えるまでのトークンを選択する閾値
top_p_output = model.generate(input_ids, do_sample=True, top_p=0.95, **prm)
print("Top-p Sampling:", tokenizer.decode(top_p_output[0], skip_special_tokens=True))

## 参考：言語モデルをファインチューニングして分類問題を解く

In [None]:
# @title ### チューニングに必要な学習データを用意します
# 必要なライブラリをインポート
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch.optim as optim

# データセットの作成
# Datasetクラスを継承したクラスを作成します。
# ここでは、トークナイズされたデータとそのラベルを受け取って、
# データセットを管理する役割を持つクラスです。
class CommentDataset(Dataset):
    def __init__(self, encodings, labels):
        # トークナイズされた文章データ（エンコーディング）とラベルを保存します
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        # データセットのサイズ（サンプルの数）を返します
        return len(self.labels)

    def __getitem__(self, idx):
        # 指定されたインデックスのデータを取得します
        # エンコーディングとラベルをまとめて返します
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item


# GitHubからCSVファイルをダウンロードします
# wgetはコマンドラインツールで、指定されたURLからファイルをダウンロードします
!wget https://raw.githubusercontent.com/coraldx5/generativeai_intro_book/master/movie_review_jpn.csv

# データの読み込み
# pandasを使ってCSVファイルをデータフレームとして読み込みます
df = pd.read_csv("movie_review_jpn.csv")

# データの確認
# 最初の5行を表示して、データの内容を確認します
print(df.head())

# ラベルのエンコーディング
# 'result'列にはレビューの評価 ('good' または 'bad') が入っているので、
# それを数値に変換します。'good'を1、'bad'を0にマップします。
df['label'] = df['result'].map({'good': 1, 'bad': 0})

# 訓練データとテストデータに分割
# データを訓練用とテスト用に分けます。訓練用80%、テスト用20%です。
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# トークナイザーのロード
# BERTモデル用の日本語トークナイザーをロードします。これにより、文章がBERTモデルで
# 扱える形式にトークナイズ（単語やサブワードに分割）されます。
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese')

# 文章のトークナイズとエンコード
# トークナイザーを使って文章データをBERTモデルに入力できる形式に変換します。
def encode_data(data):
    return tokenizer.batch_encode_plus(
        data['comment'].tolist(),  # コメントをリストとして取得
        add_special_tokens=True,    # 特殊トークン（[CLS], [SEP]など）を追加
        max_length=128,             # 最大長を128に設定（これ以上は切り捨て）
        padding='max_length',       # 最大長に満たない部分をパディング
        truncation=True,            # 長すぎるコメントは切り捨て
        return_attention_mask=True, # アテンションマスクを返す（BERTの入力に必要）
        return_tensors='pt'         # 結果をPyTorchのテンソルとして返す
    )

# 訓練データとテストデータをエンコードします
train_encodings = encode_data(train_df)
test_encodings = encode_data(test_df)

# ラベルをPyTorchのテンソルに変換します
train_labels = torch.tensor(train_df['label'].values)
test_labels = torch.tensor(test_df['label'].values)

# データセットを作成します
train_dataset = CommentDataset(train_encodings, train_labels)
test_dataset = CommentDataset(test_encodings, test_labels)

# データローダーを作成します
# データローダーは、モデルにデータをバッチごとに供給するためのものです。
train_loader = DataLoader(train_dataset, batch_size=5, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=5, shuffle=False)


In [None]:
# @title ### ファインチューニングを実行します
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# モデルのロード
# 事前学習されたBERTモデルをロードし、分類タスク用に設定します
# num_labels=2は、ポジティブ/ネガティブの2クラス分類であることを示しています
model = BertForSequenceClassification.from_pretrained('cl-tohoku/bert-base-japanese', num_labels=2)

# オプティマイザーの設定
# AdamWは、重み減衰（Weight Decay）を考慮したAdamオプティマイザーです
optimizer = AdamW(model.parameters(), lr=5e-5)

# GPU（CUDA）またはCPUのいずれかを使用するデバイスを設定します
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# モデルを指定したデバイスに転送します
model.to(device)

# 学習ループを開始します
for epoch in range(1):  # ← エポック数（学習量に相当します）
    model.train()  # モデルを訓練モードに設定します
    for batch in train_loader:
        optimizer.zero_grad()  # 勾配をゼロにリセットします

        # バッチから入力データとラベルを取り出し、デバイスに転送します
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        # モデルにデータを入力し、出力と損失を計算します
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss

        # 損失に基づいて勾配を計算し、モデルのパラメータを更新します
        loss.backward()
        optimizer.step()

    # エポック終了後に損失を表示します
    print(f"Epoch {epoch + 1} completed. Loss: {loss.item()}")

    # モデルの評価モードを設定します
    model.eval()

    # 評価用の変数を初期化します
    eval_loss = 0
    eval_steps = 0
    correct_predictions = 0
    total_predictions = 0

    # 評価フェーズのループを開始します
    with torch.no_grad():  # 評価時には勾配を計算しないので、no_grad()を使用します
        for batch in test_loader:
            # バッチから入力データとラベルを取り出し、デバイスに転送します
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # モデルにデータを入力し、出力と損失を計算します
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            eval_loss += loss.item()
            eval_steps += 1

            # 出力のロジットから予測を計算します
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=-1)  # 最も高いスコアを持つラベルを選びます
            correct_predictions += (predictions == labels).sum().item()  # 正解した予測の数をカウント
            total_predictions += labels.size(0)  # 総ラベル数をカウント

    # 評価用データにおける損失と精度を計算して表示します
    avg_eval_loss = eval_loss / eval_steps
    accuracy = correct_predictions / total_predictions
    print(f"Validation Loss: {avg_eval_loss}")
    print(f"Validation Accuracy: {accuracy}")


In [None]:
# @title ## 参考UI：様々な探索法で文章を生成するサンプルコード
# @markdown 文章を入力して下さい
# 文章生成の設定
input_text = "場面転換が唐突で、ストーリーの流れが断片的に感じられました。" # @param {type:"string"}

def predict(text):
    # 文章をモデルが理解できる形式にエンコードします
    encoding = tokenizer.encode_plus(
        text,                        # 入力文章
        add_special_tokens=True,     # 特殊トークン（[CLS], [SEP]など）を追加
        max_length=128,              # 最大長を128に設定（これ以上は切り捨て）
        padding='max_length',        # 最大長に満たない部分をパディング
        truncation=True,             # 長すぎる文章は切り捨て
        return_attention_mask=True,  # アテンションマスクを返す（BERTの入力に必要）
        return_tensors='pt'          # 結果をPyTorchのテンソルとして返す
    )

    # エンコードされた入力データをデバイス（GPUまたはCPU）に転送します
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    # モデルを評価モードにし、勾配計算を無効にして予測を行います
    with torch.no_grad():  # 予測時には勾配を計算しないので、no_grad()を使用します
        outputs = model(input_ids, attention_mask=attention_mask)

    # 出力ロジット（モデルの予測結果）から、最も高いスコアを持つクラスを取得します
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=1).item()  # 最も高いスコアのインデックスを取得

    # クラスに応じて結果を返します（1なら 'good'、それ以外なら 'bad'）
    return 'good' if predicted_class == 1 else 'bad'

# 予測の例
prediction = predict(input_text)
print(f"予測結果: {prediction}")
