# Hands-on: TRL + Unsloth QLoRA training

このノートブックは `scripts/train_lora.py` を**ブロック単位で理解しながら実行**できるように分解したものです。
重い処理があるので、各セルの説明を読んでから実行してください。

In [1]:
import sys
print(sys.executable)

/home/junta_takahashi/matsuo_llm2025_mainCompe/.maLM25main/bin/python


In [1]:
import torch
print("cuda available:", torch.cuda.is_available())
print("cuda device count:", torch.cuda.device_count())

cuda available: True
cuda device count: 1


In [19]:
# 0. 依存の読み込みimport datetime as dt
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import torch
import yaml
from datasets import load_dataset
from transformers import EarlyStoppingCallback, TrainingArguments, TrainerCallback
from trl import SFTTrainer
from unsloth import FastLanguageModel

MASK_TAGS = ["Output:", "OUTPUT:", "Final:", "Answer:", "Result:", "Response:"]

In [20]:
# 0.5 プロジェクトルートを解決（notebooks配下から実行する前提）
# 目的: notebooks から実行しても正しいパスを参照できるようにする
from pathlib import Path

cwd = Path.cwd()
# configs/ が存在する階層を上に探す
PROJECT_ROOT = next(p for p in [cwd] + list(cwd.parents) if (p / "configs" / "train_lora.yaml").exists())
print("PROJECT_ROOT:", PROJECT_ROOT)


PROJECT_ROOT: /home/junta_takahashi/matsuo_llm2025_mainCompe


In [21]:
# 1. 設定読み込み
import datetime as dt
# 目的: configs/train_lora.yaml から設定を読み、以降のセルで使います。
# 設定の内容:
# - data: データパス・分割比率・乱数seed
# - training: 学習ハイパーパラメータ（batch/epoch/max_length 等）
# - lora: LoRAのrank/alpha/対象層
# - model: ベースモデル名
# ここで run_name / 出力ディレクトリ が決まります。
cfg = yaml.safe_load((PROJECT_ROOT / "configs" / "train_lora.yaml").read_text(encoding="utf-8"))
data_cfg = cfg["data"]
train_cfg = cfg["training"]
lora_cfg = cfg["lora"]
model_cfg = cfg["model"]

data_path = data_cfg["data_path"]
data_path = str((PROJECT_ROOT / data_path).resolve())
eval_ratio = float(data_cfg["eval_ratio"])
seed = int(data_cfg["seed"])
base_model = model_cfg["base_model"]
max_length = int(train_cfg["max_length"])

output_dir = str(PROJECT_ROOT / train_cfg["output_dir"] / dt.datetime.now().strftime("%Y%m%d-%H%M%S"))
run_name = Path(output_dir).name

print("base_model:", base_model)
print("data_path:", data_path)
print("output_dir:", output_dir)

base_model: Qwen/Qwen3-4B-Instruct-2507
data_path: /home/junta_takahashi/matsuo_llm2025_mainCompe/data/processed/merged_train.jsonl
output_dir: /home/junta_takahashi/matsuo_llm2025_mainCompe/outputs/models/20260205-011456


In [22]:
# 2. データ読み込み + 分割
# 目的: merged_train.jsonl を読み込み、train/eval に分割します。
# 注意: この時点では messages を保持しておき、後で生成ログに使います。
# 出力: ds_raw (train/test) が作成されます。
# ここでは元データ (messages付き) を保持します。
ds_raw = load_dataset("json", data_files=data_path, split="train")
ds_raw = ds_raw.train_test_split(test_size=eval_ratio, seed=seed)
print("train size:", len(ds_raw["train"]))
print("eval size:", len(ds_raw["test"]))

Generating train split: 0 examples [00:00, ? examples/s]

DatasetGenerationError: An error occurred while generating the dataset

In [None]:
# 3. モデル・トークナイザ初期化（Unsloth）
# 目的: Unslothで4bit量子化のモデルを読み込みます。
# ここでGPUに載るため、VRAM消費が大きいセルです。
# max_length が長いとVRAM使用量が増える点に注意。
# VRAMに厳しい場合は max_length を下げてください。
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name=base_model,
    max_seq_length=max_length,
    dtype=None,
    load_in_4bit=True,
)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

print("pad_token:", tokenizer.pad_token)

In [None]:
# 4. メッセージをテキストに変換する関数
# 目的: messages -> ChatML風テキストに変換する関数を定義します。
# - messages_to_text: 学習用（全文）
# - build_prompt_from_messages: 生成用（assistant直前まで）
# - extract_assistant_ref: 参照回答を取り出す
# 学習用 (全文) と生成用 (assistant直前まで) を分けています。
def messages_to_text(messages: List[dict]) -> str:
    parts = []
    for m in messages:
        if not isinstance(m, dict):
            continue
        role = m.get("role", "user")
        content = m.get("content", "")
        parts.append(f"<|{role}|>\n{content}")
    return "\n".join(parts)

def build_prompt_from_messages(messages: List[dict]) -> str:
    parts = []
    for m in messages:
        if not isinstance(m, dict):
            continue
        role = m.get("role", "user")
        content = m.get("content", "")
        if role == "assistant":
            break
        parts.append(f"<|{role}|>\n{content}")
    parts.append("<|assistant|>\n")
    return "\n".join(parts)

def extract_assistant_ref(messages: List[dict]) -> str:
    for m in messages:
        if isinstance(m, dict) and m.get("role") == "assistant":
            return str(m.get("content", ""))
    return ""

In [None]:
# 5. テキスト化 + カラム整形
# 目的: 学習用の text カラムだけを作り、Trainerに渡せる形にします。
# これ以降の学習は text カラムのみを使います。
# ここから先は text を使って学習します。
def format_fn(example: Dict[str, object]) -> Dict[str, str]:
    messages = example.get("messages", [])
    return {"text": messages_to_text(messages)}

ds = ds_raw.map(format_fn, remove_columns=ds_raw["train"].column_names)
print(ds)

In [None]:
# 6. Outputタグ出現率
# 目的: Output系タグがどの程度含まれているか確認します。
# ここで値が低すぎると loss マスクが効きにくい可能性があります。
def has_tag(text: str) -> bool:
    return any(tag in text for tag in MASK_TAGS)

tag_hits = sum(1 for t in ds["train"]["text"] if has_tag(t))
print(f"Tag hit ratio (train): {tag_hits}/{len(ds['train'])}")

In [None]:
# 7. トークン長分布の保存
# 目的: token長分布を保存し、max_lengthの妥当性を確認します。
# 併せて run_meta.yaml に学習条件を記録します。
# run_meta.yaml / length_stats.yaml が outputs に保存されます。
def percentile(values: List[int], p: float) -> int:
    if not values:
        return 0
    idx = int(round((p / 100.0) * (len(values) - 1)))
    return values[idx]

def compute_length_stats(texts: List[str], tokenizer, max_length: int) -> Dict[str, int]:
    lengths: List[int] = []
    for t in texts:
        ids = tokenizer(t, add_special_tokens=False, truncation=True, max_length=max_length)["input_ids"]
        lengths.append(len(ids))
    lengths.sort()
    return {
        "samples": len(lengths),
        "p50": percentile(lengths, 50),
        "p75": percentile(lengths, 75),
        "p90": percentile(lengths, 90),
        "p95": percentile(lengths, 95),
        "p97": percentile(lengths, 97),
        "p98": percentile(lengths, 98),
        "p99": percentile(lengths, 99),
        "p99_5": percentile(lengths, 99.5),
        "max": lengths[-1] if lengths else 0,
    }

def save_yaml(path: Path, payload: Dict[str, object]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(yaml.safe_dump(payload, allow_unicode=True), encoding="utf-8")

run_meta = {
    "base_model": base_model,
    "data_path": data_path,
    "eval_ratio": eval_ratio,
    "seed": seed,
    "max_length": max_length,
    "batch_size": int(train_cfg["batch_size"]),
    "grad_accum": int(train_cfg["grad_accum"]),
    "learning_rate": float(train_cfg["learning_rate"]),
    "epochs": float(train_cfg["epochs"]),
    "precision": str(train_cfg["precision"]),
    "packing": bool(train_cfg["packing"]),
    "lora_r": int(lora_cfg["r"]),
    "lora_alpha": int(lora_cfg["alpha"]),
    "lora_target_modules": list(lora_cfg["target_modules"]),
    "quantization": {
        "load_in_4bit": True,
        "bnb_4bit_quant_type": "nf4",
        "bnb_4bit_use_double_quant": True,
        "bnb_4bit_compute_dtype": "bf16",
    },
    "mask_tags": MASK_TAGS,
    "tag_hit_train": tag_hits,
    "train_size": len(ds["train"]),
    "eval_size": len(ds["test"]),
}

length_stats = compute_length_stats(ds["train"]["text"], tokenizer, max_length=max_length)
save_yaml(Path(output_dir) / "run_meta.yaml", run_meta)
save_yaml(Path(output_dir) / "length_stats.yaml", length_stats)

print("Saved run_meta.yaml and length_stats.yaml")

In [None]:
# 8. LoRA 設定 + gradient checkpointing（Unsloth）
# 目的: LoRAを有効化し、Unslothのgradient checkpointingでVRAM節約します。
# ここで LoRA のランク(r)やalphaが反映されます。
model = FastLanguageModel.get_peft_model(
    model,
    r=int(lora_cfg["r"]),
    lora_alpha=int(lora_cfg["alpha"]),
    target_modules=list(lora_cfg["target_modules"]),
    lora_dropout=0.05,
    bias="none",
    use_gradient_checkpointing="unsloth",
    random_state=seed,
    use_rslora=False,
    loftq_config=None,
)
print("LoRA ready")

In [None]:
# 9. labels を Outputタグ以降に限定する collator
# 目的: Outputタグ以降のみ loss を計算するためのcollatorを作ります。
# - タグが無い場合は全文をloss対象にします。
def find_first_tag_pos(text: str, tags: List[str]) -> Optional[Tuple[int, str]]:
    found = []
    for tag in tags:
        idx = text.find(tag)
        if idx >= 0:
            found.append((idx, tag))
    if not found:
        return None
    found.sort(key=lambda x: x[0])
    return found[0]

@dataclass
class CollatorConfig:
    tokenizer: object
    max_length: int
    tags: List[str]

class OutputTagCollator:
    def __init__(self, cfg: CollatorConfig) -> None:
        self.cfg = cfg

    def __call__(self, features: List[Dict[str, object]]) -> Dict[str, torch.Tensor]:
        texts = [f["text"] for f in features]
        tok = self.cfg.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=self.cfg.max_length,
            return_tensors="pt",
        )
        input_ids = tok["input_ids"]
        labels = input_ids.clone()
        for i, text in enumerate(texts):
            hit = find_first_tag_pos(text, self.cfg.tags)
            if hit is None:
                continue
            tag_pos, tag = hit
            prefix = text[: tag_pos + len(tag)]
            prefix_ids = self.cfg.tokenizer(
                prefix, truncation=True, max_length=self.cfg.max_length, add_special_tokens=False
            )["input_ids"]
            cut = len(prefix_ids)
            labels[i, :cut] = -100
        tok["labels"] = labels
        return tok

collator = OutputTagCollator(
    CollatorConfig(tokenizer=tokenizer, max_length=max_length, tags=MASK_TAGS)
)
print("collator ready")

In [None]:
# 10. エポック終了時のサンプル生成ログ
# 目的: 各エポック終了時に固定サンプルを生成し、ログを保存します。
# - 人間が見て改善点を判断できる出力を残します。
# 人間が見て改善点を判断できるログになります。
class EvalSampleCallback(TrainerCallback):
    def __init__(
        self,
        dataset,
        tokenizer,
        output_dir: str,
        sample_size: int,
        max_new_tokens: int,
        temperature: float,
        top_p: float,
        seed: int,
    ) -> None:
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.output_dir = Path(output_dir)
        self.sample_size = sample_size
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.top_p = top_p
        rng = torch.Generator().manual_seed(seed)
        n = len(dataset)
        if n == 0:
            self.indices = []
        else:
            perm = torch.randperm(n, generator=rng).tolist()
            self.indices = perm[: min(sample_size, n)]

    def on_epoch_end(self, args, state, control, **kwargs):
        model = kwargs["model"]
        model_was_training = model.training
        model.eval()

        results = []
        device = next(model.parameters()).device
        for idx in self.indices:
            ex = self.dataset[idx]
            messages = ex.get("messages", [])
            prompt = build_prompt_from_messages(messages)
            ref = extract_assistant_ref(messages)

            inputs = self.tokenizer(
                prompt, return_tensors="pt", add_special_tokens=False
            ).to(device)
            with torch.no_grad():
                out = model.generate(
                    **inputs,
                    max_new_tokens=self.max_new_tokens,
                    do_sample=True,
                    temperature=self.temperature,
                    top_p=self.top_p,
                )
            text = self.tokenizer.decode(out[0], skip_special_tokens=False)
            results.append(
                {
                    "epoch": float(state.epoch),
                    "index": idx,
                    "prompt": prompt,
                    "prediction": text,
                    "reference": ref,
                }
            )

        out_path = self.output_dir / f"eval_samples_epoch{int(state.epoch)}.jsonl"
        out_path.parent.mkdir(parents=True, exist_ok=True)
        with out_path.open("w", encoding="utf-8") as f:
            for r in results:
                f.write(json.dumps(r, ensure_ascii=False) + "\n")

        if model_was_training:
            model.train()

print("EvalSampleCallback ready")

In [None]:
# 11. TrainingArguments
# 目的: 学習ループの詳細設定を行います。
# - eval_steps / save_steps / precision など
# - report_to="wandb" でW&Bにログ送信
# WandB の run_name もここで渡します。
args = TrainingArguments(
    output_dir=output_dir,
    num_train_epochs=float(train_cfg["epochs"]),
    per_device_train_batch_size=int(train_cfg["batch_size"]),
    per_device_eval_batch_size=int(train_cfg["batch_size"]),
    gradient_accumulation_steps=int(train_cfg["grad_accum"]),
    learning_rate=float(train_cfg["learning_rate"]),
    save_steps=int(train_cfg["save_steps"]),
    eval_steps=int(train_cfg["eval_steps"]),
    evaluation_strategy="steps",
    logging_steps=50,
    bf16=str(train_cfg["precision"]).lower() == "bf16",
    fp16=str(train_cfg["precision"]).lower() == "fp16",
    report_to="wandb",
    run_name=run_name,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
)
print("TrainingArguments ready")

In [None]:
# 12. Trainer 作成
# 目的: 本番学習用のTrainerを作成します。
trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=ds["train"],
    eval_dataset=ds["test"],
    formatting_func=lambda x: x["text"],
    data_collator=collator,
    args=args,
    packing=bool(train_cfg["packing"]),
    max_seq_length=max_length,
    dataset_text_field="text",
)

trainer.add_callback(
    EarlyStoppingCallback(
        early_stopping_patience=int(train_cfg["early_stopping_patience"]),
        early_stopping_threshold=float(train_cfg["early_stopping_threshold"]),
    )
)
trainer.add_callback(
    EvalSampleCallback(
        dataset=ds_raw["test"],
        tokenizer=tokenizer,
        output_dir=output_dir,
        sample_size=int(train_cfg["eval_sample_size"]),
        max_new_tokens=int(train_cfg["eval_max_new_tokens"]),
        temperature=float(train_cfg["eval_temperature"]),
        top_p=float(train_cfg["eval_top_p"]),
        seed=seed,
    )
)
print("Trainer ready")

In [None]:
# 13. 学習実行
# 目的: 本番学習を実行します（時間がかかります）。
trainer.train()

In [None]:
# 14. 保存
# 目的: LoRAアダプタとtokenizerを保存します。
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
print("Saved model to:", output_dir)