# Chapter 16: MoE (Mixture-of-Experts) Fine-tuning

## 1. 학습 목표

* MoE 아키텍처의 작동 원리(Router와 Expert)를 이해한다.
* **Router 붕괴(Router Collapse)** 문제를 이해하고 이를 방지하는 전략을 익힌다.
* Router를 동결(Freeze)하고 Expert만 학습하는 LoRA 설정을 구현한다.
* Mixtral 8x7B 등 대형 MoE 모델을 위한 최적의 하이퍼파라미터를 설정한다.

## 2. MoE (Mixture-of-Experts) 아키텍처란?

### 2.1 개념

MoE는 모든 입력에 대해 모델의 전체 파라미터를 사용하는 Dense 모델과 달리, **조건부 연산(Conditional Computation)**을 수행한다. 입력 토큰에 따라 **일부 전문가(Expert)만 활성화**하여, 파라미터 수는 매우 많지만 실제 연산량(FLOPs)은 적게 유지하는 효율적인 구조다.

```text
구조: [Input] -> [Router (Gating)] -> [Top-K Experts] -> [Weighted Sum] -> [Output]

예시 (Mixtral 8x7B):
- 총 파라미터: 47B (470억 개)
- 활성 파라미터: 13B (토큰당 2개의 Expert만 사용)
-> 47B 모델의 지식을 가지고 있지만, 추론 속도는 13B 모델만큼 빠르다.

```

### 2.2 주요 MoE 모델

* **Mixtral 8x7B**: 오픈소스 MoE의 표준. 8개의 Expert 중 2개를 사용(Top-2).
* **DeepSeek-MoE**: 64개 이상의 잘게 쪼개진(Fine-grained) Expert 사용.
* **Qwen2-MoE**: 고성능 경량 MoE 모델.

## 3. MoE Fine-tuning의 핵심 난관: Router 붕괴

MoE 모델을 일반적인 방법(높은 학습률, 모든 파라미터 학습)으로 튜닝하면 **Router 붕괴(Router Collapse)** 현상이 발생하기 쉽다.
이는 Router가 특정 Expert(예: Expert 1)만 계속 선택하게 되어, 나머지 Expert들이 놀게 되고 모델 성능이 급격히 저하되는 현상이다.

### MoE 튜닝 2대 원칙

1. **Router 동결 (Freeze)**: Router가 이미 학습한 분배 능력을 유지하도록 가중치를 업데이트하지 않는다.
2. **낮은 학습률 (Low LR)**: 일반 모델(2e-4)보다 훨씬 낮은 **1e-5 ~ 5e-6** 수준의 학습률을 사용해야 한다.

## 4. 도메인 데이터셋 준비

각 분야별로 대표적인 오픈소스 데이터셋들이 존재한다. 이번 실습에서는 금융 분야의 `gbharti/finance-alpaca`를 사용한다.

In [4]:
from datasets import load_dataset

# 도메인별 대표 데이터셋 (참고용)
domain_datasets = {
    "금융": "gbharti/finance-alpaca",
    "의료": "medalpaca/medical_meadow_medical_flashcards",
    "코딩": "TokenBender/code_instructions_122k_alpaca_style"
}

# 1. 금융 데이터셋 로드
print("금융 데이터셋 로딩 중...")
dataset = load_dataset("gbharti/finance-alpaca", split="train[:1000]")

print(f"데이터 개수: {len(dataset)}")
print(f"샘플 데이터:\n{dataset[0]}")

금융 데이터셋 로딩 중...
데이터 개수: 1000
샘플 데이터:
{'instruction': 'For a car, what scams can be plotted with 0% financing vs rebate?', 'input': '', 'output': "The car deal makes money 3 ways. If you pay in one lump payment. If the payment is greater than what they paid for the car, plus their expenses, they make a profit. They loan you the money. You make payments over months or years, if the total amount you pay is greater than what they paid for the car, plus their expenses, plus their finance expenses they make money. Of course the money takes years to come in, or they sell your loan to another business to get the money faster but in a smaller amount. You trade in a car and they sell it at a profit. Of course that new transaction could be a lump sum or a loan on the used car... They or course make money if you bring the car back for maintenance, or you buy lots of expensive dealer options. Some dealers wave two deals in front of you: get a 0% interest loan. These tend to be shorter 12 months vs 

## 5. 시스템 프롬프트 설계 (페르소나 주입)

도메인 적응의 핵심은 모델에게 **"당신은 전문가입니다"**라고 최면을 거는 것이다. 이를 위해 데이터 포맷팅 단계에서 시스템 프롬프트를 추가한다.

In [5]:
def format_domain_instruction(example, domain="금융"):
    """
    도메인별 시스템 프롬프트를 적용하여 데이터를 포맷팅하는 함수다.
    """
    # 도메인별 시스템 프롬프트 정의
    system_prompts = {
        "금융": "당신은 금융 전문가 AI 어시스턴트이다. 질문에 대해 전문적이고 분석적인 답변을 제공하라.",
        "의료": "당신은 의료 정보 AI 어시스턴트이다. 의학적 사실에 기반하여 답변하되, 반드시 전문가와의 상담을 권유하라.",
        "법률": "당신은 법률 전문가 AI 어시스턴트이다. 관련 법령과 판례에 기반하여 답변하되, 법적 효력이 없음을 명시하라."
    }

    instruction = example['instruction']
    input_text = example.get('input', '')
    output = example['output']

    # 해당 도메인의 시스템 프롬프트 선택
    sys_prompt = system_prompts.get(domain, "당신은 유용한 AI 어시스턴트이다.")

    # 프롬프트 구성
    if input_text:
        text = f"""### System:
{sys_prompt}

### Instruction:
{instruction}

### Input:
{input_text}

### Response:
{output}"""
    else:
        text = f"""### System:
{sys_prompt}

### Instruction:
{instruction}

### Response:
{output}"""

    return {"text": text}

# 금융 도메인으로 포맷팅 적용
formatted_dataset = dataset.map(lambda x: format_domain_instruction(x, "금융"))

print("\n[포맷팅된 데이터 샘플]")
print(formatted_dataset[0]['text'])


[포맷팅된 데이터 샘플]
### System:
당신은 금융 전문가 AI 어시스턴트이다. 질문에 대해 전문적이고 분석적인 답변을 제공하라.

### Instruction:
For a car, what scams can be plotted with 0% financing vs rebate?

### Response:
The car deal makes money 3 ways. If you pay in one lump payment. If the payment is greater than what they paid for the car, plus their expenses, they make a profit. They loan you the money. You make payments over months or years, if the total amount you pay is greater than what they paid for the car, plus their expenses, plus their finance expenses they make money. Of course the money takes years to come in, or they sell your loan to another business to get the money faster but in a smaller amount. You trade in a car and they sell it at a profit. Of course that new transaction could be a lump sum or a loan on the used car... They or course make money if you bring the car back for maintenance, or you buy lots of expensive dealer options. Some dealers wave two deals in front of you: get a 0% interest loan. These t

## 6. MoE 모델 로드 및 구조 분석

실습을 위해 `Qwen/Qwen1.5-MoE-A2.7B`와 같은 작은 MoE 모델이나 `mistralai/Mixtral-8x7B-v0.1`을 로드한다.

In [2]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

# 1. 4-bit 양자화 설정 (MoE는 모델이 크므로 필수)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_double_quant=True,
)

# 2. MoE 모델 로드 (예: Mixtral-8x7B)
model_name = "mistralai/Mixtral-8x7B-v0.1"

print(f"MoE 모델 로딩: {model_name}")
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto",
    trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

# 3. 모델 구조 확인 (Router 모듈 이름 찾기)
print("모델 모듈 이름 확인:")
for name, module in model.named_modules():
    if "gate" in name or "router" in name:
        print(f"Router 발견: {name}")
        break
# Mixtral의 경우 보통 'block_sparse_moe.gate' 등의 이름을 가짐

MoE 모델 로딩: mistralai/Mixtral-8x7B-v0.1


Fetching 19 files:   0%|          | 0/19 [00:00<?, ?it/s]

model-00018-of-00019.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00013-of-00019.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00017-of-00019.safetensors:   0%|          | 0.00/4.90G [00:00<?, ?B/s]

model-00016-of-00019.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00019-of-00019.safetensors:   0%|          | 0.00/4.22G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/19 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/967 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/493k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/72.0 [00:00<?, ?B/s]

모델 모듈 이름 확인:
Router 발견: model.layers.0.block_sparse_moe.gate


## 7. Router Freezing을 위한 LoRA 설정

LoRA 설정 시 `modules_to_save`나 타겟 모듈 설정을 통해 Router 학습을 방지하거나 명시적으로 제외해야 한다. 가장 안전한 방법은 Router를 LoRA 타겟에서 제외하고, 원본 파라미터도 동결하는 것이다.

In [3]:
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

# 학습 전처리
model = prepare_model_for_kbit_training(model)

def get_moe_lora_config(model_type="mixtral"):
    """
    MoE 모델용 LoRA 설정. Router는 학습하지 않도록 설정한다.
    """
    # 모델별 타겟 모듈
    if model_type == "mixtral":
        target_modules = ["q_proj", "k_proj", "v_proj", "o_proj"] # Attention만 학습 (안전)
        # 전문가 네트워크(w1, w2, w3)까지 학습하려면 추가 가능하지만 VRAM 소모 큼

    config = LoraConfig(
        r=8,                           # MoE는 Rank를 낮게 잡아도 충분함
        lora_alpha=16,
        target_modules=target_modules,
        modules_to_save=None,          # Router(gate)를 여기에 넣지 않음 -> 동결 유지
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM"
    )
    return config

lora_config = get_moe_lora_config("mixtral")
model = get_peft_model(model, lora_config)

# 안전장치: Router 파라미터가 확실히 동결되었는지 수동 확인 및 조치
for name, param in model.named_parameters():
    if "gate" in name or "router" in name:
        param.requires_grad = False  # 강제 동결

model.print_trainable_parameters()

trainable params: 6,815,744 || all params: 46,709,608,448 || trainable%: 0.0146


## 6. Expert 모니터 + 자동 대응 콜백

학습이 잘 되고 있는지 확인하려면 Loss뿐만 아니라 **Expert가 골고루 사용되고 있는지** 확인해야 한다. 특정 Expert만 사용된다면(Load Balancing 깨짐) 학습을 중단하고 LR을 낮춰야 한다.

TRL SFTTrainer 학습 시 “Expert 분포 모니터링(W&B 로그)”을 그대로 붙혀서 매 every_n_steps마다 진단 프롬프트를 흘려 top1_share / entropy_norm / KL 등을 기록하는 Class를 삽입한다.


TRL SFTTrainer 학습 중 Expert 쏠림(load imbalance)을 감지하면
- LR을 자동으로 낮추고(예: ×0.5)
- 쏠림이 일정 횟수 연속으로 지속되면 학습을 자동 중단(early stop)

In [8]:
import math
from dataclasses import dataclass
from collections import defaultdict
from typing import List, Dict, Any, Optional

import torch
from transformers import TrainerCallback

try:
    import torch.distributed as dist
except Exception:
    dist = None

try:
    import wandb
except Exception:
    wandb = None


def _is_rank0() -> bool:
    if dist is None or not dist.is_available() or not dist.is_initialized():
        return True
    return dist.get_rank() == 0


def _find_router_modules(model, keywords=("moe.gate", ".gate", "router", "gate")):
    hits = []
    for name, module in model.named_modules():
        lname = name.lower()
        if any(k in lname for k in keywords) and isinstance(module, torch.nn.Linear):
            hits.append((name, module))
    return hits


@torch.no_grad()
def monitor_expert_usage(
    model,
    tokenizer,
    input_texts: List[str],
    *,
    top_k: int = 2,
    max_length: int = 256,
    keywords=("moe.gate", ".gate", "router", "gate"),
) -> Dict[str, Dict[str, Any]]:
    model.eval()
    device = next(model.parameters()).device

    router_modules = _find_router_modules(model, keywords=keywords)
    if not router_modules:
        raise RuntimeError("라우터/게이트 모듈을 찾지 못했다. keywords를 조정하라.")

    stats = {
        "counts": defaultdict(lambda: None),
        "entropy_sum": defaultdict(float),
        "calls": defaultdict(int),
    }

    hooks = []

    def make_hook(name):
        def hook_fn(module, inputs, output):
            logits = output
            if not torch.is_tensor(logits) or logits.ndim < 2:
                return

            n = logits.shape[-1]
            flat = logits.reshape(-1, n).float()
            probs = torch.softmax(flat, dim=-1)

            ent = -(probs * probs.clamp_min(1e-12).log()).sum(dim=-1)
            stats["entropy_sum"][name] += ent.mean().item()
            stats["calls"][name] += 1

            k = min(top_k, n)
            top_idx = probs.topk(k=k, dim=-1).indices  # [tokens, k]

            counts = stats["counts"][name]
            if counts is None or counts.numel() != n:
                counts = torch.zeros(n, dtype=torch.long)
                stats["counts"][name] = counts

            for i in range(k):
                idx = top_idx[:, i].detach().cpu()
                counts.index_add_(0, idx, torch.ones_like(idx, dtype=torch.long))
        return hook_fn

    for name, mod in router_modules:
        hooks.append(mod.register_forward_hook(make_hook(name)))

    batch = tokenizer(
        input_texts,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=max_length,
    ).to(device)

    _ = model(**batch)

    for h in hooks:
        h.remove()

    results = {}
    for name, _ in router_modules:
        counts = stats["counts"][name]
        if counts is None or counts.sum().item() == 0:
            continue

        total = counts.sum().item()
        p = (counts.float() / total)
        n = p.numel()

        top1_share = p.max().item()
        entropy = -(p * p.clamp_min(1e-12).log()).sum().item()
        entropy_norm = entropy / math.log(n)

        uniform = torch.full_like(p, 1.0 / n)
        kl_to_uniform = (p * (p.clamp_min(1e-12).log() - uniform.log())).sum().item()

        avg_token_entropy = stats["entropy_sum"][name] / max(stats["calls"][name], 1)

        results[name] = {
            "num_experts": int(n),
            "top1_share": float(top1_share),
            "entropy_norm": float(entropy_norm),
            "kl_to_uniform": float(kl_to_uniform),
            "avg_token_entropy": float(avg_token_entropy),
            "counts": counts.tolist(),
        }

    return results


@dataclass
class ExpertAutoControlConfig:
    sample_texts: List[str]
    every_n_steps: int = 50

    # 모니터링 파라미터
    top_k: int = 2
    max_length: int = 256
    keywords: tuple = ("moe.gate", ".gate", "router", "gate")
    log_prefix: str = "moe_expert"

    # 쏠림 판정 기준(원하면 조절)
    top1_share_threshold: float = 0.70
    entropy_norm_threshold: float = 0.40

    # 자동 LR 감소
    enable_lr_drop: bool = True
    lr_drop_factor: float = 0.5      # lr *= 0.5
    min_lr: float = 1e-7
    cooldown_steps: int = 200        # 한번 LR 내린 후 N step 동안은 다시 안 내림

    # 자동 중단(연속 N회 쏠림이면 stop)
    enable_early_stop: bool = True
    patience_alerts: int = 3         # 연속 3회 alert면 stop


class ExpertAutoControlCallback(TrainerCallback):
    def __init__(self, tokenizer, cfg: ExpertAutoControlConfig):
        self.tokenizer = tokenizer
        self.cfg = cfg
        self._consecutive_alerts = 0
        self._last_lr_drop_step = -10**9

    def _maybe_drop_lr(self, optimizer, step: int) -> Optional[float]:
        if optimizer is None:
            return None
        if (step - self._last_lr_drop_step) < self.cfg.cooldown_steps:
            return None

        # 모든 param_group에 동일하게 적용
        new_lrs = []
        for g in optimizer.param_groups:
            old = float(g.get("lr", 0.0))
            new = max(old * self.cfg.lr_drop_factor, self.cfg.min_lr)
            g["lr"] = new
            new_lrs.append(new)

        self._last_lr_drop_step = step
        return float(min(new_lrs)) if new_lrs else None

    def on_step_end(self, args, state, control, **kwargs):
        step = int(state.global_step)
        if step == 0 or (step % self.cfg.every_n_steps) != 0:
            return control
        if not _is_rank0():
            return control

        model = kwargs.get("model", None)
        optimizer = kwargs.get("optimizer", None)
        if model is None:
            return control

        metrics = monitor_expert_usage(
            model=model,
            tokenizer=self.tokenizer,
            input_texts=self.cfg.sample_texts,
            top_k=self.cfg.top_k,
            max_length=self.cfg.max_length,
            keywords=self.cfg.keywords,
        )

        # 모듈별 값을 평균으로 집계
        vals_top1, vals_ent, vals_kl, vals_tokent = [], [], [], []
        logs = {}

        for name, m in metrics.items():
            prefix = f"{self.cfg.log_prefix}/{name}"
            logs[f"{prefix}/top1_share"] = m["top1_share"]
            logs[f"{prefix}/entropy_norm"] = m["entropy_norm"]
            logs[f"{prefix}/kl_to_uniform"] = m["kl_to_uniform"]
            logs[f"{prefix}/avg_token_entropy"] = m["avg_token_entropy"]

            if wandb is not None and wandb.run is not None:
                logs[f"{prefix}/counts_hist"] = wandb.Histogram(m["counts"])

            vals_top1.append(m["top1_share"])
            vals_ent.append(m["entropy_norm"])
            vals_kl.append(m["kl_to_uniform"])
            vals_tokent.append(m["avg_token_entropy"])

        if not vals_top1:
            return control

        avg_top1 = float(sum(vals_top1) / len(vals_top1))
        avg_ent = float(sum(vals_ent) / len(vals_ent))
        avg_kl = float(sum(vals_kl) / len(vals_kl))
        avg_tokent = float(sum(vals_tokent) / len(vals_tokent))

        logs[f"{self.cfg.log_prefix}/avg_top1_share"] = avg_top1
        logs[f"{self.cfg.log_prefix}/avg_entropy_norm"] = avg_ent
        logs[f"{self.cfg.log_prefix}/avg_kl_to_uniform"] = avg_kl
        logs[f"{self.cfg.log_prefix}/avg_token_entropy"] = avg_tokent

        # 쏠림 판정
        alert = (avg_top1 >= self.cfg.top1_share_threshold) and (avg_ent <= self.cfg.entropy_norm_threshold)
        logs[f"{self.cfg.log_prefix}/alert_load_imbalance"] = int(alert)

        if alert:
            self._consecutive_alerts += 1
        else:
            self._consecutive_alerts = 0

        logs[f"{self.cfg.log_prefix}/consecutive_alerts"] = int(self._consecutive_alerts)

        # 자동 LR 감소
        if alert and self.cfg.enable_lr_drop:
            new_lr = self._maybe_drop_lr(optimizer, step)
            if new_lr is not None:
                logs[f"{self.cfg.log_prefix}/action_lr_dropped"] = 1
                logs[f"{self.cfg.log_prefix}/new_lr_min"] = float(new_lr)
            else:
                logs[f"{self.cfg.log_prefix}/action_lr_dropped"] = 0
        else:
            logs[f"{self.cfg.log_prefix}/action_lr_dropped"] = 0

        # 자동 중단
        if alert and self.cfg.enable_early_stop and self._consecutive_alerts >= self.cfg.patience_alerts:
            logs[f"{self.cfg.log_prefix}/action_early_stop"] = 1
            control.should_training_stop = True
        else:
            logs[f"{self.cfg.log_prefix}/action_early_stop"] = 0

        # 로깅
        if wandb is not None and wandb.run is not None:
            wandb.log(logs, step=step)
        else:
            short = {k: v for k, v in logs.items() if isinstance(v, (int, float))}
            print(f"[ExpertAutoControl step={step}] {short}")

        return control

## 7. MoE 최적화 학습 설정

MoE 학습 자동모니터링과 자동 대응 콜백을 적용했다.

In [9]:
from trl import SFTConfig, SFTTrainer

training_args = SFTConfig(
    output_dir="./MIXTRAL-MoE-Finetuned",
    learning_rate=1e-5,
    num_train_epochs=1,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=16,
    optim="paged_adamw_8bit",
    bf16=True,
    logging_steps=10,
    dataset_text_field="text",
)

# W&B 초기화
import wandb
wandb.init(project="mixtral-moe", name="finance-sft")

diag_texts = [
    "금융 상품 설명을 3줄로 요약해줘.",
    "다음 문장에서 위험 요인을 찾아줘: '이 상품은 원금 손실이 발생할 수 있다.'",
    "금리 인상이 채권 가격에 미치는 영향을 간단히 설명해줘.",
    "아래 문장을 영어로 번역해줘: '해당 투자에는 손실 가능성이 있다.'",
]

auto_cfg = ExpertAutoControlConfig(
    sample_texts=diag_texts,
    every_n_steps=50,

    # 쏠림 기준(필요시 조절)
    top1_share_threshold=0.70,
    entropy_norm_threshold=0.40,

    # 자동 LR drop + 자동 stop
    enable_lr_drop=True,
    lr_drop_factor=0.5,
    cooldown_steps=200,
    min_lr=1e-7,

    enable_early_stop=True,
    patience_alerts=3,
)

expert_auto_cb = ExpertAutoControlCallback(tokenizer=tokenizer, cfg=auto_cfg)

trainer = SFTTrainer(
    model=model,
    train_dataset=formatted_dataset,
    args=training_args,
    processing_class=tokenizer,
    callbacks=[expert_auto_cb],   # callback 적용
)

print("금융 도메인 적응 학습 시작...")
trainer.train()
print("학습 완료!")

trainer.save_model("./GMIXTRAL-MoE-Finetuned-Final")

[34m[1mwandb[0m: [32m[41mERROR[0m The nbformat package was not found. It is required to save notebook history.


0,1
train/entropy,█▁▄
train/epoch,▁▅█
train/global_step,▁▅█
train/grad_norm,▁█▃
train/learning_rate,█▅▁
train/loss,█▅▁
train/mean_token_accuracy,▁▇█
train/num_tokens,▁▄█

0,1
train/entropy,2.01939
train/epoch,0.48
train/global_step,30.0
train/grad_norm,0.41797
train/learning_rate,1e-05
train/loss,2.1022
train/mean_token_accuracy,0.55681
train/num_tokens,154449.0


금융 도메인 적응 학습 시작...


Step,Training Loss
10,2.0898
20,2.0532
30,2.0313
40,2.0351
50,1.9855
60,2.0049


학습 완료!


## 8. 요약

이 챕터에서는 **MoE Fine-tuning**의 핵심 전략을 다루었다.

1. **구조적 이점**: MoE는 '조건부 연산'을 통해 거대 모델의 지식과 작은 모델의 속도를 동시에 제공한다.
2. **Router 보호**: 학습 중 Router가 망가지는 것을 막기 위해 **Router Freezing**과 **1e-5 이하의 낮은 학습률**이 필수적이다.
3. **메모리 관리**: 모델 크기가 크므로 4-bit QLoRA와 Paged Optimizer가 거의 강제된다.

---

다음 챕터는 **Chapter 17: Post-training 기법**으로, OSFT 기법을 다룰 예정이다. 오픈소스 Training Hub 라이브러리를 활용하여 OSFT와 SFT를 구현할 예정이다.