In [None]:
###########################################
# 0. Huggingface login

# READ token
!huggingface-cli login --token  # your code

In [2]:
###########################################
# 1-1. 구글 드라이브 마운트

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
###########################################
# 1-2. 프로젝트 폴더 만들기

import os
os.makedirs('/content/drive/MyDrive/Sarcasm-LLM', exist_ok=True)

In [None]:

%cd /content/drive/MyDrive/Sarcasm-LLM

In [None]:
##########################################
# 1-3. 모듈 다운

!pip install bitsandbytes
!pip install accelerate
!pip install appdirs
!pip install loralib
!pip install black black[jupyter]
!pip install datasets
!pip install fire
!pip install git+https://github.com/huggingface/peft
!pip install transformers
!pip install sentencepiece sentence_transformers
!pip install scipy numpy scikit-learn pandas

In [None]:
###########################################
# 1-4. 설치된 모듈 리스트 확인하기
!pip check
!pip list


In [7]:
###########################################
# 2-1. 모듈 불러오기

import os
import os.path as osp
import sys
import fire
import json
from typing import List, Union

import torch
from torch.nn import functional as F

import transformers
from transformers import TrainerCallback, TrainingArguments, TrainerState, TrainerControl
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR
from transformers import LlamaForCausalLM, LlamaTokenizer
from transformers import AutoModelForCausalLM, AutoTokenizer

from datasets import load_dataset

from peft import (
    LoraConfig,
    get_peft_model,
    prepare_model_for_kbit_training, #prepare_model_for_int8_training
    set_peft_model_state_dict
)

from peft import PeftModel

In [8]:
#@title 🤗 Base model 선택하기
device = 'auto' #@param {type: "string"}
base_LLM_model = 'Bllossom/llama-3.2-Korean-Bllossom-3B' #@param {type: "string"}

In [None]:
###########################################
# 3-1. 모델 다운로드 (~30분)

model = AutoModelForCausalLM.from_pretrained(
    base_LLM_model,
    load_in_8bit=True, # LoRA
    #load_in_4bit=True, # Quantization Load
    torch_dtype=torch.float16,
    device_map=device)

tokenizer = AutoTokenizer.from_pretrained(base_LLM_model)

In [None]:
###########################################
# 3-2. BOS, EOS, PAD 토큰 확인

# Check special token
bos = tokenizer.bos_token_id # 문장 시작 토큰
eos = tokenizer.eos_token_id # 문장 끝 토큰
pad = tokenizer.pad_token_id # 문장 패딩 토큰 -> 문장 끝나고 매꿔주는 패딩 부분
tokenizer.padding_side = "right" # 패딩 오른쪽

print("BOS token:", bos) # 1
print("EOS token:", eos) # 2
print("PAD token:", pad) # None

In [None]:
if (pad == None) or (pad == eos):
    tokenizer.pad_token_id = 0  # 만약 패딩값이 없거나 eos값과 같다면,
print("length of tokenizer:",len(tokenizer)) # 32000

In [None]:

print(model)
print(type(model)) # 모델의 타입 확인

In [13]:
###########################################
# 5-1. 하이퍼 파라미터

# 데이터셋과 훈련 횟수와 관련된 하이퍼 파라미터
batch_size = 32
num_epochs = 3
micro_batch = 1 #GPU가 2대 이상일 때 활용하면 좋다..?
gradient_accumulation_steps = batch_size // micro_batch #GPU가 2대 이상일 때 활용하면 좋다..?

# 훈련 방법에 대한 하이퍼 파라미터
cutoff_len = 1024 # solar 7B max token
lr_scheduler = 'cosine'
warmup_ratio = 0.06 # warmup_steps = 100
learning_rate = 9.5e-4# 4e-4
optimizer = 'adamw_torch'
weight_decay = 0.01
max_grad_norm = 1.0 # Default값, 모델이 과적합되거나 잘못 된거 같으면 좀 줄여주면 좋음

# LoRA config
lora_r = 16
lora_alpha = 16
lora_dropout = 0.1 # 0.05 기호에 따라..
lora_target_modules = ["gate_proj", "down_proj", "up_proj"]

# Tokenizer에서 나오는 input값 설정 옵션
train_on_inputs = False
add_eos_token = False # 데이터 셋에 관한 부분

# Others
resume_from_checkpoint = './sarcasm_LLM/checkpoint-006' # !! 만약 모델을 이어서 훈련하고 싶다면, './custom_LLM/checkpoint-[xxx]'와 같이 파일 경로를 입력해야 합니다!
# 모델 돌다가 다운되는거 방지
output_dir = './sarcasm_LLM'

In [None]:
#@title 🤗 Choose Dataset
dataset = 'YuminKim/KoCoSa' #@param {type: "string"}

#kyujinpy/Open-platypus-Commercial

In [None]:
###########################################
# 5-1. 데이터셋 다운로드

data = load_dataset(dataset)
print(data['train']) # 개수: 19079

In [14]:
import pandas as pd

# train
df_train1 = pd.read_csv('conversation_train.csv')
df_train2 = pd.read_csv('KoCoSa_train.csv', encoding='cp949')
df_train2 = df_train2.reset_index(drop=True)
df_train3 = pd.read_csv('review_train.csv')
df_train4 = pd.read_csv('comment_train.csv')

# test
df_test1 = pd.read_csv('conversation_test.csv')
df_test2 = pd.read_csv('KoCoSa_test.csv', encoding='cp949')
df_test2 = df_test2.reset_index(drop=True)
df_test3 = pd.read_csv('review_test.csv')
df_test4 = pd.read_csv('comment_test.csv')


# train/test 데이터프레임 합치기
df_train_combined = pd.concat([df_train1, df_train2, df_train3, df_train4], ignore_index=True)
df_test_combined = pd.concat([df_test1, df_test2, df_test3, df_test4], ignore_index=True)

data = {}

data['train']= df_train_combined
data['test']= df_test_combined


In [None]:
print(data['train'])
print(data['test'])

In [None]:
template = {
    "prompt_sarcasm_explanation": (
        "아래는 한국어 상황 'context'를 보고, 발화(말, 댓글 등)'response'에 대한 풍자 여부를 판별하는 예제입니다. "
        "풍자를 판별하고 이를 설명하는 답변을 작성해주세요.\n\n"
        "### 상황:\n{context}\n\n"
        "### 발화:\n{response}\n\n"
        "### 답변:\n"
    ),
    "prompt_no_sarcasm_explanation": (
        "아래는 발화에 대한 상황을 추측해보고, 발화(말, 댓글 등)'response'에 대한 풍자 여부를 판별하는 예제입니다. "
        "풍자를 판별하는 답변만 작성해주세요.\n\n"
        "### 발화:\n{response}\n\n"
        "### 답변:\n"
    ),
    "response_split": "### 답변:"
}

In [None]:
from typing import Optional

# 새 sarcasm 프롬프트 템플릿
template = {
    "prompt_sarcasm_explanation": (
        "아래는 한국어 상황 'context'를 보고, 발화(말, 댓글 등)'response'에 대한 풍자 여부를 판별하는 예제입니다. "
        "풍자를 판별하고 이를 설명하는 답변을 작성해주세요.\n\n"
        "### 상황:\n{context}\n\n"
        "### 발화:\n{response}\n\n"
        "### 답변:\n"
    ),
    "prompt_no_sarcasm_explanation": (
        "아래는 발화에 대한 상황을 추측해보고, 발화(말, 댓글 등)'response'에 대한 풍자 여부를 판별하는 예제입니다. "
        "풍자를 판별하는 답변만 작성해주세요.\n\n"
        "### 발화:\n{response}\n\n"
        "### 답변:\n"
    ),
    "response_split": "### 답변:"
}

class Prompter(object):
    """
    context, response 를 입력으로 받고,
    optional label 과 optional explanation 을 target 출력으로 묶어
    prompt 를 생성합니다.
    """
    def __init__(self, verbose: bool = False):
        self.template = template
        self.verbose = verbose

    def generate_prompt(
        self,
        context: str,
        response: str,
        label: Optional[str] = None,
        explanation: Optional[str] = None
    ) -> str:
        # explanation 이 있으면 분류+설명 템플릿 사용
        if explanation is not None:
            prompt = self.template["prompt_sarcasm_explanation"].format(
                context=context,
                response=response
            )
            if label:
                prompt += label
            prompt += f"\n{explanation}"
        # explanation 이 없으면 분류만 템플릿 사용
        else:
            prompt = self.template["prompt_no_sarcasm_explanation"].format(
                response=response
            )
            if label:
                prompt += label

        if self.verbose:
            print("Generated prompt:\n", prompt)
        return prompt

    def get_response(self, output: str) -> str:
        """
        모델 출력에서 '### 답변:' 뒤에 나오는 실제 답변만 추출합니다.
        """
        parts = output.split(self.template["response_split"])
        return parts[-1].strip() if len(parts) > 1 else output.strip()

# 사용 예시
prompter = Prompter(verbose=True)

# 1) label만 있을 때
p1 = prompter.generate_prompt(
    context="A: ... 영화 보셨나요?",
    response="B: 어, 인셉션은 꿈 이야기가 아니구요.",
    label="Non-Sarcasm"
)

# 2) label + explanation 있을 때
p2 = prompter.generate_prompt(
    context="A: ... 연애 경험 어때요?",
    response="B: 저한테도 소개팅은 첨이었어요.",
    label="Sarcasm",
    explanation="B는 소개팅 경험이 없다는 당연한 사실을 과장해 풍자로 표현했습니다."
)

print(p1)
print("—" * 40)
print(p2)

In [None]:
###########################################
# 5-4. Token generation 함수

def tokenize(prompt, add_eos_token=True):
    result = tokenizer(
        prompt,
        truncation=True,
        max_length=cutoff_len,
        padding=False,
        return_tensors=None,
    )

    if (
        result["input_ids"][-1] != tokenizer.eos_token_id
        and len(result["input_ids"]) < cutoff_len
        and add_eos_token
    ):

        result["input_ids"].append(tokenizer.eos_token_id)
        result["attention_mask"].append(1)

    result["labels"] = result["input_ids"].copy()

    return result

def generate_and_tokenize_prompt(data_point):
    full_prompt = prompter.generate_prompt(
        data_point["instruction"],
        data_point["input"],
        data_point["output"])

    tokenized_full_prompt = tokenize(full_prompt)
    if not train_on_inputs:

        user_prompt = prompter.generate_prompt(
            data_point["instruction"], data_point["input"])

        tokenized_user_prompt = tokenize(
            user_prompt, add_eos_token=add_eos_token)

        user_prompt_len = len(tokenized_user_prompt["input_ids"])

        if add_eos_token:
            user_prompt_len -= 1

        tokenized_full_prompt["labels"] = [
            -100
        ] * user_prompt_len + tokenized_full_prompt["labels"][
            user_prompt_len:
        ]
    return tokenized_full_prompt

In [17]:
def tokenize(prompt: str, add_eos_token: bool = True):
    # (기존 토크나이저 그대로)
    result = tokenizer(
        prompt,
        truncation=True,
        max_length=cutoff_len,
        padding=False,
        return_tensors=None,
    )
    if (
        add_eos_token
        and result["input_ids"][-1] != tokenizer.eos_token_id
        and len(result["input_ids"]) < cutoff_len
    ):
        result["input_ids"].append(tokenizer.eos_token_id)
        result["attention_mask"].append(1)
    # labels 복사
    result["labels"] = result["input_ids"].copy()
    return result

def generate_and_tokenize_prompt(example, add_eos_token: bool = True):
    """
    example: {
      'context': str,
      'response': str,
      'label': 'Sarcasm' or 'Non-Sarcasm',
      'explanation': Optional[str]
    }
    """
    # 1) full prompt 생성
    full_prompt = prompter.generate_prompt(
        context=example["context"],
        response=example["response"],
        label=example.get("label"),
        explanation=example.get("explanation")
    )
    tokenized_full = tokenize(full_prompt, add_eos_token=add_eos_token)

    # 2) train_on_inputs=False 인 경우, 입력(prompt) 부분만 마스킹
    if not train_on_inputs:
        # prompt-only (label/explanation 없이) 길이를 알아냄
        prompt_only = prompter.generate_prompt(
            context=example["context"],
            response=example["response"]
        )
        tokenized_prompt = tokenize(prompt_only, add_eos_token=add_eos_token)
        prompt_len = len(tokenized_prompt["input_ids"])
        # EOS 토큰을 별도 마스킹했다면 길이에서 -1
        if add_eos_token:
            prompt_len -= 1
        # labels 의 앞 prompt_len 부분을 -100 으로
        tokenized_full["labels"] = ([-100] * prompt_len
                                    + tokenized_full["labels"][prompt_len:])

    return tokenized_full

In [None]:
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split

# 🧩 1. train/test DataFrame → HuggingFace Dataset 변환
train_df = data["train"].reset_index(drop=True)
test_df = data["test"].reset_index(drop=True)

# 🧩 2. train-validation 분할
df_train_combined, df_val = train_test_split(train_df, test_size=0.2, random_state=42)

# 🧩 3. HuggingFace Dataset 변환
train_dataset = Dataset.from_pandas(df_train_combined.reset_index(drop=True))
val_dataset = Dataset.from_pandas(df_val.reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df)

# 🧩 4. DatasetDict 구성 (선택 사항)
data_hf = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset,
    'test': test_dataset
})

# 🧩 5. map + tokenization
train_data = data_hf['train'].shuffle(seed=42).map(
    generate_and_tokenize_prompt,
    remove_columns=data_hf["train"].column_names,
    batched=False
)

val_data = data_hf['validation'].map(
    generate_and_tokenize_prompt,
    remove_columns=data_hf["validation"].column_names,
    batched=False
)

test_data = data_hf['test'].map(
    generate_and_tokenize_prompt,
    remove_columns=data_hf["test"].column_names,
    batched=False
)

# ✅ 최종 확인
print("✅ 학습셋:", train_data)
print("✅ 검증셋:", val_data)
print("✅ 테스트셋:", test_data)


In [19]:
###########################################
# 6-1. LoRA config 정의

config = LoraConfig(
    r=lora_r,
    lora_alpha=lora_alpha,
    target_modules=lora_target_modules,
    lora_dropout=lora_dropout,
    bias="none", # 모델 가중치 말고 쓰래쉬 홀드 같은거?
    task_type="CAUSAL_LM")

In [20]:

###########################################
# 6-2. Model with LoRA

model = prepare_model_for_kbit_training(model) # prepare_model_for_int8_training
model = get_peft_model(model, config) # Applying LoRA (모델에 LoRA config 적용)

In [None]:

###########################################
# 7-1. 만약 이전에 돌렸던 모델을 가져온다면, 아래의 코드 실행

if resume_from_checkpoint:
    checkpoint_name = os.path.join(
        resume_from_checkpoint, "pytorch_model.bin"
    )  # All checkpoint

    if not os.path.exists(checkpoint_name):
        checkpoint_name = os.path.join(
            resume_from_checkpoint, "adapter_model.bin"
        )  # only LoRA model
        resume_from_checkpoint = (
            True
        ) # kyujin: I will use this checkpoint

    if os.path.exists(checkpoint_name):
        print(f"Restarting from {checkpoint_name}")
        adapters_weights = torch.load(checkpoint_name)
        set_peft_model_state_dict(model, adapters_weights)

    else:
        print(f"Checkpoint {checkpoint_name} not found")

In [None]:
###########################################
# 7-2. Trainer class 정의

trainer = transformers.Trainer(
        model=model,
        train_dataset=train_data,
        eval_dataset=val_data,
        args=transformers.TrainingArguments( # 훈련에 이용될 하이퍼파라미터
            per_device_train_batch_size = micro_batch,
            gradient_accumulation_steps = gradient_accumulation_steps,
            warmup_ratio=warmup_ratio, # 0.06
            num_train_epochs=num_epochs, # 1
            learning_rate=learning_rate,
            fp16=True,
            logging_steps=1,
            optim="adamw_torch",
            eval_strategy="epoch",
            save_strategy="steps",# step마다 모델 저장하겠다
            max_grad_norm = max_grad_norm,
            save_steps = 50, # you can change! 30 -> 50
            lr_scheduler_type=lr_scheduler,
            output_dir=output_dir,
            save_total_limit=2,
            load_best_model_at_end=False,
            ddp_find_unused_parameters=False,
            group_by_length = False
        ),
        data_collator=transformers.DataCollatorForSeq2Seq(
            tokenizer, pad_to_multiple_of=8, return_tensors="pt", padding=True
        ),
    )

model.config.use_cache = False
model.print_trainable_parameters() # 훈련하는 파라미터의 % 체크

if torch.__version__ >= "2" and sys.platform != "win32":
    model = torch.compile(model)

In [None]:
###########################################
# 7-3. Training (fine-tuning)

## 훈련시간이 많이 소요됩니다! (~12h)
## 만약 중간에 colab이 끊긴다면, checkpoint를 이용해서 다시 훈련하세요!
### 만약 개인적인 GPU 자원이 있다면, 개인 서버나 컴퓨터에서 훈련시키는 것을 추천드립니다!
resume_from_checkpoint = None
torch.cuda.empty_cache()
trainer.train(resume_from_checkpoint=resume_from_checkpoint)

In [None]:
###########################################
# 7-4. 모델 저장

model.save_pretrained(output_dir)
model_path = os.path.join(output_dir, "pytorch_model.bin")
torch.save({}, model_path)
tokenizer.save_pretrained(output_dir)

In [None]:
###########################################
# 8-1. 훈련된 LoRA layer와 base LLM 병합(merge)

torch.cuda.empty_cache()

base_model = AutoModelForCausalLM.from_pretrained(
    base_LLM_model,
    return_dict = True,
    torch_dtype=torch.float16,
    device_map=device)

model = PeftModel.from_pretrained(base_model, output_dir, device)
model = model.merge_and_unload() # Merge!

In [None]:
###########################################
# 8-2. 여러분의 custom LLM 모델 저장!

final_save_folder = './Sarcasm_LLM_FineTuning'

model.save_pretrained(final_save_folder)
tokenizer.save_pretrained(final_save_folder)

In [None]:
###########################################
# 9-1. 허깅페이스 로그인

# WRITE token
!huggingface-cli login --token  # your code

In [None]:
###########################################
# 9-2. 모델 업로드 (~10분)
## 먼저, 허깅페이스에 모델 repo를 만든 후 코드를 실행해야 합니다!

model = AutoModelForCausalLM.from_pretrained(final_save_folder)
model.push_to_hub('myoo-oong/Korean-Sarcasm-LLM-llama-3.2-Korean-Bllossom-3B-CustomData-3', token=True)

In [None]:
###########################################
# 9-3. Tokenizer 업로드

tokenizer = AutoTokenizer.from_pretrained(final_save_folder)
tokenizer.push_to_hub('myoo-oong/Korean-Sarcasm-LLM-llama-3.2-Korean-Bllossom-3B-CustomData-3', token=True)

In [None]:
from google.colab import drive
drive.mount('/content/drive')