# 第13章 RAG

## 13.3 RAG 向けに LLM を指示チューニングする

### 13.3.1 AI 王データセットを用いた指示チューニング

#### 環境の準備

In [None]:
!pip install datasets transformers[torch,sentencepiece] trl peft bitsandbytes

In [None]:
from transformers.trainer_utils import set_seed

# 乱数のシードを設定
set_seed(42)

In [None]:
from google.colab import drive

# Googleドライブを"drive"ディレクトリ以下にマウント
drive.mount("drive")

#### データセットの準備

In [None]:
from datasets import load_dataset

# Hugging Face Hubのllm-book/aio-retrieverのリポジトリから
# AI王データセットを読み込む
dataset = load_dataset(
    "llm-book/aio-retriever", trust_remote_code=True
)

# 読み込まれたデータセットの形式と事例数を確認
print(dataset)

In [None]:
from pprint import pprint

# 読み込まれたデータセットの内容を確認
pprint(dataset["validation"][0])

In [None]:
from typing import Any

def filter_example(
    example: dict[str, Any], max_passages: int = 3
) -> bool:
    """上位max_passages件のパッセージに正例が含まれていない事例を除外"""
    if len(example["positive_passage_indices"]) == 0:
        return False
    if example["positive_passage_indices"][0] >= max_passages:
        return False

    return True

dataset = dataset.filter(filter_example)

In [None]:
def process_example(
    example: dict[str, Any], max_passages: int = 3
) -> dict[str, Any]:
    """質問、パッセージ、正解の組からプロンプトを作成し、会話データに変換"""

    # exampleから必要な情報を取得
    question = example["question"]
    answer = example["answers"][0]
    passages = [p["text"] for p in example["passages"]]

    # max_passages件のパッセージを選択
    passages = passages[:max_passages]

    messages: list[dict[str, str]] = []
    # プロンプトとパッセージをユーザのメッセージとして会話データに追加
    prompt_text = "".join(
        [
            "あなたには今からクイズに答えてもらいます。",
            "問題を与えますので、その解答のみを簡潔に出力してください。\n",
            "また解答の参考になりうるテキストを与えます。",
            "解答を含まない場合もあるのでその場合は無視してください。\n\n",
            "---\n",
            "\n\n".join(passages),
            "\n---\n\n",
            f"問題: {question}",
        ]
    )
    messages.append({"role": "user", "content": prompt_text})
    # LLMが出力すべき内容（クイズ問題の答え）を会話データに追加
    messages.append({"role": "assistant", "content": answer})

    # 会話データを事例の"messages"フィールドに追加
    example["messages"] = messages
    return example

dataset = dataset.map(
    process_example, remove_columns=dataset["train"].column_names
)

In [None]:
# 前処理後のデータセットの形式と事例数を確認
print(dataset)

In [None]:
# 前処理後のデータセットの内容を確認
pprint(dataset["validation"][0])

#### トークナイザとモデルの準備

In [None]:
import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
)

# Hugging Face Hubにおけるモデル名を指定
base_model_name = "llm-book/Swallow-7b-hf-oasst1-21k-ja"

# モデルの量子化の設定
quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,  # 4ビット量子化のパラメータを読み込む
    bnb_4bit_quant_type="nf4",  # NF4量子化を使用
    bnb_4bit_compute_dtype=torch.bfloat16,  # 計算時のデータ型としてBF16を使用
)

# モデルの量子化の設定を用いてモデルを読み込む
model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    torch_dtype=torch.bfloat16,
    quantization_config=quantization_config,  # 量子化設定
    use_cache=False,  # 後にgradient checkpointingを有効にするために必要
    device_map="auto",
)

# トークナイザを読み込む
tokenizer = AutoTokenizer.from_pretrained(base_model_name)

#### 指示チューニング前のモデルの評価

In [None]:
from datasets import Dataset
from tqdm.notebook import tqdm
from transformers import PreTrainedModel

def evaluate(
    model: PreTrainedModel, dataset: Dataset
) -> tuple[list[str], list[str], float]:
    """データセットの各問題に対するモデルの出力を評価し、正解率を算出"""
    pred_answers = []
    gold_answers = []
    num_correct = 0

    for example in tqdm(dataset):
        # プロンプトにチャットテンプレートを適用
        model_inputs = tokenizer.apply_chat_template(
            example["messages"][:-1],
            add_generation_prompt=True,
            return_tensors="pt",
        ).to("cuda")

        # プロンプトの長さ（トークン数）を取得しておく
        input_length = model_inputs.shape[1]

        # モデルにプロンプトを入力し、出力を得る
        generated_ids = model.generate(
            model_inputs,
            max_new_tokens=32,
            do_sample=False,
            temperature=None,
            top_p=None,
        )

        # モデルの出力から答えの部分を文字列として取り出す
        pred_answer = tokenizer.batch_decode(
            generated_ids[:, input_length:], skip_special_tokens=True
        )[0]

        # 正解の文字列を取り出す
        gold_answer = example["messages"][-1]["content"]

        # モデルの答えと正解が一致していれば正答とカウント
        if pred_answer == gold_answer:
            num_correct += 1

        # モデルの答えと正解をそれぞれリストに追加
        pred_answers.append(pred_answer)
        gold_answers.append(gold_answer)

    # 正解率を計算
    accuracy = num_correct / len(pred_answers)

    return pred_answers, gold_answers, accuracy

In [None]:
# 指示チューニング前のモデルを使って評価
pred_answers, gold_answers, accuracy = evaluate(
    model, dataset["validation"]
)

# 無料版ColabのT4 GPUなどでは評価に時間を要するため、最初の100事例のみで評価
# pred_answers, gold_answers, accuracy = evaluate(
#     model, dataset["validation"].take(100)
# )

print(f"正解率: {accuracy:.1%}")

In [None]:
# モデルが予測した答えを表示
for pred_answer, gold_answer in zip(
    pred_answers[:20], gold_answers[:20]
):
    print(f"正解: {gold_answer} / 予測: {pred_answer}")

#### 指示チューニングの準備

In [None]:
# 訓練セットのすべての事例にチャットテンプレートを適用
tokenized_train_dataset = [
    tokenizer.apply_chat_template(example["messages"])
    for example in dataset["train"]
]

In [None]:
from trl import DataCollatorForCompletionOnlyLM

# collate関数を初期化
bos = tokenizer.bos_token
collator = DataCollatorForCompletionOnlyLM(
    # ユーザとアシスタントそれぞれの発話開始文字列
    instruction_template=bos + "ユーザ：",
    response_template=bos + "アシスタント：",
    tokenizer=tokenizer,  # トークナイザ
)

In [None]:
from peft import LoraConfig, TaskType, get_peft_model

# LoRAの設定
peft_config = LoraConfig(
    r=128,  # 差分行列のランク
    lora_alpha=128,  # LoRA層の出力のスケールを調整するハイパーパラメータ
    lora_dropout=0.05,  # LoRA層に適用するドロップアウト
    task_type=TaskType.CAUSAL_LM,  # LLMが解くタスクのタイプを指定
    # LoRAで学習するモジュール
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

model.enable_input_require_grads()  # 学習を行うために必要
model = get_peft_model(model, peft_config)  # モデルにLoRAを適用
model.print_trainable_parameters()  # 学習可能なパラメータ数を表示

#### 指示チューニングの実行

In [None]:
from transformers import Trainer, TrainingArguments

# 訓練のハイパーパラメータを設定
training_args = TrainingArguments(
    output_dir="./drive/MyDrive/llm_book/RAG_IT_results",  # 結果の保存フォルダ
    bf16=True,  # BF16を使用した学習の有効化
    max_steps=100,  # 訓練ステップ数
    per_device_train_batch_size=2,  # 訓練時のバッチサイズ
    gradient_accumulation_steps=8,  # 勾配累積のステップ数（5.5.2節）
    gradient_checkpointing=True,  # 勾配チェックポインティングの有効化（5.5.3節）
    optim="paged_adamw_8bit",  # 最適化器
    learning_rate=1e-4,  # 学習率
    lr_scheduler_type="cosine",  # 学習率スケジューラの種類
    max_grad_norm=0.3,  # 勾配クリッピングにおけるノルムの最大値（9.4.3節）
    warmup_ratio=0.1,  # 学習率のウォームアップの長さ（5.2.8節）
    logging_steps=10,  # ロギングの頻度
    save_steps=50,  # モデルの保存頻度
    report_to="none",  # 外部ツールへのログを無効化
)

# 無料版のT4 GPUなど、低メモリ環境での学習パラメータ
# バッチサイズと勾配累積のステップ数を小さく設定
# training_args = TrainingArguments(
#     output_dir="./drive/MyDrive/llm_book/RAG_IT_results",  # 結果の保存フォルダ
#     bf16=True,  # BF16を使用した学習の有効化
#     max_steps=100,  # 訓練ステップ数
#     per_device_train_batch_size=1,  # 訓練時のバッチサイズ
#     gradient_accumulation_steps=4,  # 勾配累積のステップ数（5.5.2節）
#     gradient_checkpointing=True,  # 勾配チェックポインティングの有効化（5.5.3節）
#     optim="paged_adamw_8bit",  # 最適化器
#     learning_rate=1e-4,  # 学習率
#     lr_scheduler_type="cosine",  # 学習率スケジューラの種類
#     max_grad_norm=0.3,  # 勾配クリッピングにおけるノルムの最大値（9.4.3節）
#     warmup_ratio=0.1,  # 学習率のウォームアップの長さ（5.2.8節）
#     logging_steps=10,  # ロギングの頻度
#     save_steps=50,  # モデルの保存頻度
#     report_to="none",  # 外部ツールへのログを無効化
# )

# Trainerを初期化
trainer = Trainer(
    model,
    train_dataset=tokenized_train_dataset,  # トークンID化されたデータセット
    data_collator=collator,  # ラベルの加工及びミニバッチ構築処理を行うモジュール
    args=training_args,  # 訓練の設定
    tokenizer=tokenizer,  # パラメータ保存時にトークナイザも一緒に保存するために指定
)

# モデルの訓練を実行
trainer.train()

#### 指示チューニング後のモデルの評価

In [None]:
# 指示チューニング後のモデルを使って評価
pred_answers, gold_answers, accuracy = evaluate(
    model, dataset["validation"]
)

# 無料版ColabのT4 GPUなどでは評価に時間を要するため、最初の100事例のみで評価
# pred_answers, gold_answers, accuracy = evaluate(
#     model, dataset["validation"].take(100)
# )

print(f"正解率: {accuracy:.1%}")

In [None]:
# モデルが予測した解答を表示
for pred_answer, gold_answer in zip(
    pred_answers[:20], gold_answers[:20]
):
    print(f"正解: {gold_answer} / 予測: {pred_answer}")

#### モデルの保存

In [None]:
from huggingface_hub import notebook_login

# Hugging Face Hubにログイン
notebook_login()

In [None]:
# 無料版Colab（T4 GPU）の場合はRAMの制限で量子化前のモデルを読み込めない場合があります
# その場合は以下のコードで、学習後のLoRAパラメータのみをアップロードすることが可能です
# model.push_to_hub("singletongue/Swallow-7b-hf-oasst1-21k-ja-aio-retriever")

In [None]:
from peft import PeftModel

# 学習したLoRAのパラメータを量子化していない学習前のモデルに足し合わせる
base_model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    torch_dtype=torch.bfloat16,
)
checkpoint_path = "./drive/MyDrive/llm_book/RAG_IT_results/checkpoint-100"
tuned_model = PeftModel.from_pretrained(base_model, checkpoint_path)

# LoRAのパラメータのみをアップロードする場合は次の行をコメントアウト
tuned_model = tuned_model.merge_and_unload()

# Hugging Face Hubのリポジトリ名を指定
# "YOUR-ACCOUNT"は自らのユーザ名に置き換えてください
repo_name = "YOUR-ACCOUNT/Swallow-7b-hf-oasst1-21k-ja-aio-retriever"

# トークナイザをアップロード
tokenizer.push_to_hub(repo_name)
# モデルをアップロード
tuned_model.push_to_hub(repo_name)