In [None]:
import torch
from datasets import load_dataset
from transformers import BitsAndBytesConfig
from transformers import AutoProcessor, Gemma3ForConditionalGeneration
from trl import SFTTrainer, SFTConfig
from peft import LoraConfig, get_peft_model
import os

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
)

model_name = "google/gemma-3-4b-it"

model = Gemma3ForConditionalGeneration.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    attn_implementation="flash_attention_2",
    quantization_config=bnb_config,
    device_map="auto",
)

model.config.use_cache = False  # 훈련 시 캐싱 비활성화

processor = AutoProcessor.from_pretrained(model_name, use_fast=True)
tokenizer = processor.tokenizer
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

In [None]:
# from unsloth import FastModel

# model, tokenizer = FastModel.from_pretrained(
#     model_name="unsloth/gemma-3-4b-it",
#     max_seq_length=2048,
#     load_in_4bit=True,
#     load_in_8bit=False,
#     full_finetuning=False,
#     dtype=torch.bfloat16,
#     device_map="auto",
#     # token = "hf_...",
# )

# tokenizer.pad_token = tokenizer.eos_token
# tokenizer.padding_side = "right"

In [None]:
# LoRA 구성을 위한 설정 (인과적 언어 모델링용)
lora_config = LoraConfig(
    r=8,
    lora_alpha=16,
    lora_dropout=0.05,
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],  # 타겟 모듈 지정
    bias="none",
    task_type="CAUSAL_LM",  # 작업 유형 설정
    # use_rslora=True,
    # use_dora=True,
)

model = get_peft_model(model, lora_config)

In [None]:
# model = FastModel.get_peft_model(
#     model,
#     finetune_vision_layers=False,
#     finetune_language_layers=True,
#     finetune_attention_modules=True,
#     finetune_mlp_modules=True,
#     r=8,
#     lora_alpha=16,
#     lora_dropout=0,
#     bias="none",
#     random_state=3407,
#     use_gradient_checkpointing=True,
# )

In [None]:
dataset = load_dataset("Yeongi/Bespoke-Stratos-3.65k", split="train")

In [None]:
# # ===== 답변 추출 함수 =====
# def extract_boxed(s):
#     start = s.find("boxed{")
#     if start == -1:
#         return None
#     start_index = start + len("boxed{")
#     count = 1  # 처음의 '{'에 대해 1로 시작
#     i = start_index
#     while i < len(s):
#         if s[i] == "{":
#             count += 1
#         elif s[i] == "}":
#             count -= 1
#             if count == 0:
#                 return s[start_index:i]
#         i += 1
#     return None


# def extract_aime_answer(response):
#     # 박스 형식 및 숫자 직접 매칭
#     # 모든 \boxed{} 패턴을 찾아 마지막 항목 선택
#     boxed_matches = extract_boxed(response)
#     if boxed_matches:
#         raw_answer = boxed_matches
#     else:
#         patterns = [
#             r"<\|begin_of_solution\|>.*?```python(.*?)```.*?<\|end_of_solution\|>",
#             r"ANSWER\s*:\s*(\d+)",  # 간단한 숫자 형식
#             r"final answer is:\s*(\d+)",  # 대체 표현
#             r"<\|begin_of_solution\|>(.*?)<\|end_of_solution\|>",
#         ]
#         for pattern in patterns:
#             match = re.search(pattern, response, re.DOTALL)
#             if match:
#                 raw_answer = match.group(1).strip()
#                 break
#         else:
#             return None
#     return raw_answer

In [None]:
# processed_dataset = dataset.map(
#     lambda x: {
#         "prompt": [
#             {"role": "system", "content": x["system"][0]},
#             {"role": "user", "content": x["question"]},
#         ],
#         "answer": extract_aime_answer(x["response"]),
#     },
#     num_proc=6,
# )

In [None]:
# # 토큰 수를 저장할 리스트
# token_lengths = []

# # 데이터셋의 모든 텍스트에 대해 토큰 수 계산
# for idx, text in enumerate(filtered_dataset):
#     num_tokens = len(tokenizer.tokenizer(text['system'])['input_ids']) + len(tokenizer.tokenizer(text['question'])['input_ids']) + len(tokenizer.tokenizer(text['response'])['input_ids'])
#     token_lengths.append((num_tokens, idx))

# # 최대 및 최소 토큰 수를 가진 데이터의 인덱스 찾기
# max_token_info = max(token_lengths, key=lambda x: x[0])  # 토큰 수가 최대인 데이터
# min_token_info = min(token_lengths, key=lambda x: x[0])  # 토큰 수가 최소인 데이터

# # 최대 및 최소 토큰 수를 가진 데이터 출력
# max_tokens, max_idx = max_token_info
# min_tokens, min_idx = min_token_info

# print(f"최대 토큰 수: {max_tokens}")
# print(f"최대 토큰 수를 가진 데이터: {filtered_dataset[max_idx]}, idx: {max_idx}")
# print(f"최소 토큰 수: {min_tokens}")
# print(f"최소 토큰 수를 가진 데이터: {filtered_dataset[min_idx]}, idx: {min_idx}")

In [None]:
def preprocess_function(dataset):
    # 입력 텍스트 준비 (시스템 프롬프트 + 질문)
    inputs = [
        f"{system}\n\n{question}"
        for system, question in zip(dataset["system"], dataset["question"])
    ]

    # 출력 텍스트 준비 (응답)
    outputs = [f"{response}{tokenizer.eos_token}" for response in dataset["response"]]

    # 입력 토큰화
    model_inputs = tokenizer(
        inputs, max_length=2048, truncation=True, padding="max_length"
    )

    # 라벨(출력) 토큰화
    labels = tokenizer(
        outputs,
        max_length=2048,
        truncation=True,
        padding="max_length",
        add_special_tokens=True,
    )

    # 라벨 ID를 모델 입력에 추가
    model_inputs["labels"] = labels["input_ids"]

    return model_inputs

In [None]:
tokenized_dataset = dataset.map(
    preprocess_function, batched=True, remove_columns=dataset.column_names
)

In [None]:
model.train()  # 반드시 추가!
model.enable_input_require_grads()  # 그래디언트 계산 강제 활성화

In [None]:
# 참고: https://huggingface.co/blog/open-r1/update-3
output_dir = os.path.join("outputs", model_name)

trainer = SFTTrainer(
    model=model,
    processing_class=processor.tokenizer,  # 멀티모달
    # tokenizer=tokenizer, # Only 언어모델
    train_dataset=tokenized_dataset,
    eval_dataset=None,
    args=SFTConfig(
        bf16=True,
        per_device_train_batch_size=8,
        gradient_accumulation_steps=4,
        gradient_checkpointing=True,
        max_grad_norm=0.2,
        warmup_ratio=0.1,
        weight_decay=0.01,
        num_train_epochs=1,
        # max_steps = 50, # test only
        learning_rate=2e-5,
        logging_steps=1,
        optim="adamw_torch_fused",
        lr_scheduler_type="cosine",
        save_steps=10,
        save_total_limit=10,
        seed=3407,
        report_to="wandb",
        run_name="gemma3-12b-lora-sft",
        label_names=["labels"],
        # packing=True,
        output_dir=output_dir,
    ),
)

In [None]:
trainer.train(resume_from_checkpoint=None)