In [None]:
"""
파일명: finetune_llama_correction.py

필요 라이브러리:
pip install transformers accelerate bitsandbytes peft pandas scikit-learn

- transformers : 모델/토크나이저 로드 및 Trainer
- accelerate   : 다중 GPU/분산 학습 지원
- bitsandbytes : 4bit/8bit 양자화
- peft         : LoRA 같은 Parameter-Efficient Fine-Tuning
- pandas, scikit-learn : 데이터 로드 & 전처리
"""

import os
import random
import pandas as pd
from sklearn.model_selection import train_test_split

import torch
from torch.utils.data import Dataset, DataLoader

# bitsandbytes, huggingface
from transformers import TrainingArguments, Trainer
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training

###############################################################################
# 1) Config 클래스
###############################################################################
class Config:
    # 모델 및 학습 관련
    model_name = "Bllossom/llama-3.2-Korean-Bllossom-3B"
    lora_r = 8
    lora_alpha = 16
    lora_dropout = 0.05
    lora_bias = "none"
    
    # 데이터 관련
    train_path = "../database/train/train.csv"
    test_path  = "../database/test/test.csv"
    
    # 토크나이징, 최대 입력 길이
    max_length = 512
    
    # 학습 파라미터
    learning_rate = 1e-4
    num_train_epochs = 1
    per_device_train_batch_size = 1
    per_device_eval_batch_size = 1
    warmup_steps = 50
    weight_decay = 0.01
    
    # 기타
    output_dir = "./finetuned-llama-correction"
    seed = 42
    
###############################################################################
# 2) 노이즈 추가 함수 (데이터 증강)
###############################################################################
def add_noise_to_text(text: str) -> str:
    """
    실제로는 자모/받침/오타 삽입 등 한글에 특화된 노이즈를 넣으면 좋습니다.
    여기서는 간단히 한두 글자씩 랜덤 치환, 삽입, 삭제 정도만 예시로 넣습니다.
    """
    # 노이즈를 추가할 수 있는 연산들
    def random_char_swap(s):
        if len(s) < 2:
            return s
        idx = random.randint(0, len(s) - 2)
        # idx 글자와 idx+1 글자 위치를 바꾼다
        lst = list(s)
        lst[idx], lst[idx+1] = lst[idx+1], lst[idx]
        return "".join(lst)
    
    def random_char_delete(s):
        if not s:
            return s
        idx = random.randint(0, len(s)-1)
        return s[:idx] + s[idx+1:]
    
    def random_char_replace(s):
        if not s:
            return s
        idx = random.randint(0, len(s)-1)
        # 임의의 유니코드 한글 범위 내에서 문자 생성(예: 44032(가) ~ 55203(힣))
        random_kor_char = chr(random.randint(0xAC00, 0xD7A3))
        s = list(s)
        s[idx] = random_kor_char
        return "".join(s)
    
    # 여러 개 노이즈 연산 중 무작위 선택
    ops = [random_char_swap, random_char_delete, random_char_replace]
    # 노이즈를 1~3회 적용 (상황에 따라 조절 가능)
    num_noise_ops = random.randint(1, 3)
    
    noisy_text = text
    for _ in range(num_noise_ops):
        op = random.choice(ops)
        noisy_text = op(noisy_text)
        
    return noisy_text

###############################################################################
# 3) Dataset 정의
###############################################################################
class CorrectionDataset(Dataset):
    def __init__(self, df, tokenizer, max_length: int=512, is_train: bool=True):
        self.df = df
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.is_train = is_train

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        # 학습 시:   input  = 노이즈가 추가된 텍스트
        #           label  = 원본 output
        if self.is_train:
            # train.csv: (row['input'], row['output'])  
            # => 실제론 row['input']은 무시하고, row['output']에 노이즈를 입히고자 함
            original_text = str(row['output'])
            noisy_text = add_noise_to_text(original_text)
            
            # 프롬프트 형식(예시): "아래 문장을 올바른 한국어로 교정하세요:\n{noisy_text}\n답변:"
            prompt_text = f"아래 문장을 올바른 한국어로 교정하세요:\n{noisy_text}\n답변:"
            target_text = original_text  # 모델이 최종적으로 생성해야 하는 문장

            # 모델 입력을 만들기 위해 prompt + target 형식으로 붙임
            # causal LM에서는 label을 그대로 shift시켜 예측하기 때문에,
            # prompt+target 전체를 입력으로 두고, target 부분만 loss 계산하도록 구성
            # 보통 special token이나 구분자를 추가하는 방식을 사용하기도 함
            full_text = prompt_text + " " + target_text
            
            tokenized = self.tokenizer(
                full_text,
                truncation=True,
                max_length=self.max_length,
                return_tensors="pt",
                padding="max_length"
            )
            input_ids = tokenized["input_ids"].squeeze(0)
            attention_mask = tokenized["attention_mask"].squeeze(0)
            
            # 어디부터 target(라벨)로 삼을지 구분해야 함
            # 예시로 prompt 부분은 -100으로 ignore하고, target 부분만 정답으로 사용
            prompt_len = len(self.tokenizer(prompt_text, add_special_tokens=False)["input_ids"])
            
            labels = input_ids.clone()
            # prompt 위치까지는 -100으로 마스킹하여 loss 미계산
            labels[:prompt_len] = -100

            return {
                "input_ids": input_ids,
                "attention_mask": attention_mask,
                "labels": labels
            }
        else:
            # 실제 추론 시: row['input'] 만 가지고 inference
            # 다만 Trainer eval 단계 등에서 label이 필요할 수 있으므로
            # 여기서는 임시로 (no label) 처리
            input_text = str(row['input'])
            prompt_text = f"아래 문장을 올바른 한국어로 교정하세요:\n{input_text}\n답변:"

            tokenized = self.tokenizer(
                prompt_text,
                truncation=True,
                max_length=self.max_length,
                return_tensors="pt",
                padding="max_length"
            )
            input_ids = tokenized["input_ids"].squeeze(0)
            attention_mask = tokenized["attention_mask"].squeeze(0)
            
            return {
                "input_ids": input_ids,
                "attention_mask": attention_mask
                # labels가 필요하다면, row에 'output' 컬럼이 있을 경우 추가
            }

###############################################################################
# 4) 데이터 준비 함수
###############################################################################
def load_datasets(config: Config, tokenizer):
    # train.csv 로드 => ID, input, output
    df_train = pd.read_csv(config.train_path)
    # test.csv  => ID, input (output 없음)
    df_test  = pd.read_csv(config.test_path)
    
    # 예시로 train/valid split
    df_train, df_valid = train_test_split(df_train, test_size=0.1, random_state=config.seed)
    
    train_dataset = CorrectionDataset(df_train, tokenizer, config.max_length, is_train=True)
    valid_dataset = CorrectionDataset(df_valid, tokenizer, config.max_length, is_train=True)  # 검증 시에도 label 필요
    test_dataset  = CorrectionDataset(df_test, tokenizer, config.max_length, is_train=False)
    
    return train_dataset, valid_dataset, test_dataset

###############################################################################
# 5) 모델 준비 (4bit 로 로드 + LoRA)
###############################################################################
def get_model(config: Config):
    # (1) 토크나이저 로드
    tokenizer = AutoTokenizer.from_pretrained(
        config.model_name,
        trust_remote_code=True  # Bllossom LLaMA 커스텀 코드 필요 시
    )
    tokenizer.pad_token_id = 0

    # (2) 4bit 양자화 모델 로드
    model = AutoModelForCausalLM.from_pretrained(
        config.model_name,
        trust_remote_code=True,
        load_in_4bit=True,
        device_map="auto"
    )

    # (3) kbit(4bit) 훈련 준비
    # 이 단계가 누락되면 미세 튜닝 시 grad_fn이 사라져
    # "does not require grad" 오류가 발생할 수 있음
    model = prepare_model_for_kbit_training(model)

    # (4) LoRA 설정
    lora_config = LoraConfig(
        r=config.lora_r,
        lora_alpha=config.lora_alpha,
        target_modules=["q_proj", "v_proj"],  # LLaMA의 Q, V 부분에 적용
        lora_dropout=config.lora_dropout,
        bias=config.lora_bias,
        task_type=TaskType.CAUSAL_LM
    )

    # (5) LoRA 적용
    model = get_peft_model(model, lora_config)

    return tokenizer, model

###############################################################################
# 6) 학습 함수
###############################################################################
def train_model(config: Config):
    tokenizer, model = get_model(config)
    train_dataset, valid_dataset, _ = load_datasets(config, tokenizer)
    
    training_args = TrainingArguments(
        output_dir=config.output_dir,
        num_train_epochs=config.num_train_epochs,
        learning_rate=config.learning_rate,
        per_device_train_batch_size=config.per_device_train_batch_size,
        per_device_eval_batch_size=config.per_device_eval_batch_size,
        warmup_steps=config.warmup_steps,
        weight_decay=config.weight_decay,
        evaluation_strategy="steps",  # 혹은 "epoch"
        save_steps=50,                # 모델 저장 빈도
        eval_steps=50,                # 평가 빈도
        logging_steps=10,
        save_total_limit=2,
        fp16=True,                    # FP16 사용
        # or bf16=True (가능한 환경이면) 
        gradient_checkpointing=True,  # 메모리 절약
        remove_unused_columns=False   # Dataset에서 입력 컬럼 그대로 사용
    )

    def data_collator(features):
        # huggingface Trainer용 default DataCollator:
        # 이미 모든 텐서가 padding되어 있다고 가정
        return {
            "input_ids": torch.stack([f["input_ids"] for f in features]),
            "attention_mask": torch.stack([f["attention_mask"] for f in features]),
            "labels": torch.stack([f["labels"] for f in features])
        }

    # Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=valid_dataset,
        data_collator=data_collator
    )
    
    trainer.train()
    
    # 학습 완료 후 모델 저장
    trainer.save_model(config.output_dir)
    tokenizer.save_pretrained(config.output_dir)

###############################################################################
# 7) 추론(후처리) 함수
###############################################################################
def correct_sentence(tokenizer, model, text: str, max_new_tokens=128):
    """
    주어진 text를 모델에 넣어서 교정 결과를 생성하는 함수.
    text는 noisy한 문장(이미 LSTM 등에서 나온 예측값)이라고 가정.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    
    prompt = f"아래 문장을 올바른 한국어로 교정하세요:\n{text}\n답변:"
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    
    with torch.no_grad():
        generation = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            top_p=0.9,
            temperature=0.8
        )
    # 모델이 만든 전체 문장을 디코딩
    output = tokenizer.decode(generation[0], skip_special_tokens=True)
    
    # prompt를 제외한 부분만 추출(간단히 문자열 치환)
    # "아래 문장을 ...\n답변:" 까지 잘라내기
    if "답변:" in output:
        output = output.split("답변:")[-1].strip()
    return output

def inference_on_test(config: Config):
    # 1) 모델 로드
    tokenizer = LlamaTokenizer.from_pretrained(config.output_dir)
    tokenizer.pad_token_id = 0
    model = LlamaForCausalLM.from_pretrained(
        config.output_dir,
        load_in_4bit=True,
        device_map="auto"
    )
    
    # LoRA 가중치도 불러오기 위해 peft 방법 사용
    # 만약 Trainer.save_model()으로 LoRA가 같이 저장되었다면 자동 로드 가능
    # (필요 시 peft 로드 별도)
    
    model.eval()  # 추론 모드
    
    # 2) test 데이터 로드
    df_test = pd.read_csv(config.test_path)
    
    # 3) 기존 LSTM 모델의 결과물을 얻는 함수 (데모용)
    # 실제론 사용자가 만든 BiMultiLSTMModel로 예측
    def bimulti_lstm_predict(noisy_input: str) -> str:
        # 여기서는 데모용으로 "noisy_input" 그대로 반환하거나
        # 임의로 글자를 조금 바꿔 반환하는 방식
        # 실제로는 사용자 LSTM 모델의 output이 들어와야 함
        return noisy_input
    
    # 4) 각 문장에 대해 (1) LSTM 예측 -> (2) LLaMA 교정 -> (정답) 은 여기선 X
    for i, row in df_test.iterrows():
        input_text = str(row["input"])
        
        # (1) LSTM 예측
        lstm_output = bimulti_lstm_predict(input_text)
        
        # (2) LLaMA 교정
        pred_str = correct_sentence(tokenizer, model, lstm_output)
        
        # (3) 정답(lab_str)은 없는 상태이므로 None 처리
        lab_str = None  # 실제로 정답이 있으면 대입
        
        print(f"[예측] {pred_str} (정답: {lab_str})")

###############################################################################
# main 실행부
###############################################################################

config = Config()
random.seed(config.seed)
torch.manual_seed(config.seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(config.seed)

# 1) 모델 학습
print("===== 1) 모델 학습 시작 =====")
train_model(config)

# # 2) 학습 완료 후, test set에 대한 후처리 추론
# print("===== 2) 테스트 데이터 추론(후처리) =====")
# inference_on_test(config)
