# 사용자 선호에 맞는 시 창작 모델

### 환결 설정

In [1]:
!python -m pip install --upgrade pip



In [2]:
!pip install typing_extensions pydantic openai



In [3]:
!pip install datasets transformers peft trl bitsandbytes



In [4]:
import os
import torch
import json
import time
import random
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments,DataCollatorForSeq2Seq, BitsAndBytesConfig, GenerationConfig, AutoModelForSequenceClassification
from datasets import Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import ORPOTrainer, ORPOConfig, AutoModelForCausalLMWithValueHead
from trl.trainer.utils import DPODataCollatorWithPadding
from tqdm import tqdm

In [5]:
os.environ["WANDB_DISABLED"] = "True"           # wandb 비활성화
os.environ["TOKENIZERS_PARALLELISM"] = "False"  # 병렬 토크나이저 경고 방지

device = "cuda" if torch.cuda.is_available() else "cpu" # GPU 설정 변수 

---

## Q-LoRA 피인튜닝

In [6]:
dataset_path = "./korean_poetry_dataset.json"

with open(dataset_path, "r", encoding="utf-8") as f:
    poem_data = json.load(f)

preprocessed_data = [{"topic": item["text"]["topic"], "poem": item["text"]["poem"]} for item in poem_data]

train_dataset = Dataset.from_list(preprocessed_data)

In [7]:
model_name = "Bllossom/llama-3.2-Korean-Bllossom-3B"

tokenizer = AutoTokenizer.from_pretrained(model_name)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [8]:
# 데이터 전처리 함수 (토큰화 + labels 추가)
def preprocess_text(sample):
    input_text = [f"주제: {t}\n시: {p}" for t, p in zip(sample["topic"], sample["poem"])]
    
    model_inputs = tokenizer(
        input_text, 
        padding="max_length",
        truncation=True, 
        max_length=512
    )
    
    model_inputs["labels"] = model_inputs["input_ids"].copy()
    
    pad_token_id = tokenizer.pad_token_id
    
    model_inputs["labels"] = [
        [l if l != pad_token_id else -100 for l in labels]
        for labels in model_inputs["labels"]
    ]
    
    return model_inputs

In [9]:
# 데이터 셋 변환
train_dataset = train_dataset.map(
    preprocess_text, 
    batched=True,
    remove_columns=["topic", "poem"]
)

Map:   0%|          | 0/2600 [00:00<?, ? examples/s]

In [10]:
# 데이터 콜레이터
data_collator = DataCollatorForSeq2Seq(tokenizer, model=None)

In [11]:
bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_use_quarter_precision=True,
    bnb_4bit_quant_type="nf4"
)

In [12]:
# 모델 로드
AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto"
).to(device)

ImportError: Using `bitsandbytes` 8-bit quantization requires the latest version of bitsandbytes: `pip install -U bitsandbytes`

In [None]:
# LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

In [None]:
# 양자화 모델 훈련을 위한 준비
model = prepare_model_for_kbit_training(model)

In [None]:
#LoRA 적용
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

model.train() # 모델 학습 모드 설정

In [None]:
training_args = TrainingArguments(
    output_dir="./q_lora_peom",
    eval_strategy="no",
    save_strategy="epoch",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=16,
    learning_rate=2e-4,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2,
    optim='adamw_bnb_8bits',
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

In [None]:
trainer.train()

---

### 시 생성

In [None]:
# 파인튜닝된 모델로 시 생성
qlora_checkpoint = "./q_lora_peom/checkpoint-243"

model = AutoModelForCausalLM.from_pretrained(qlora_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_name)

generate_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    pad_token_id=tokenizer.eos_token_id,
    batch_size=2
)

In [None]:
topics = ["바람", "비", "노을", "달빛", "안개", "사랑", "이별", "운명", "기다림", "후회", "추억", "시간", "청춘", "변화", "마지막 순간", "군중", "밤거리", "버스", "인생", "빌딩", "사람들", "거짓말", "욕망", "돈", "권력", "비밀", "죽음", "희망", "동물", "자연", "도시", "바다", "산", "하늘", "별", "꽃", "나무", "강", "바위", "흙", "눈", "빗방울", "눈물", "웃음"]

eval_file = 'rlhf_evaluation_data.json'

try:
    with open(eval_file, 'r', encoding='utf-8') as f:
        eval_data = json.load(f)
except FileNotFoundError:
    eval_dataset = []

In [None]:
# 시 생성을 위한 변수 설정
num_batches = 5
batch_size = 20
total_samples = num_batches * batch_size
generate_samples = len(eval_dataset)

In [None]:
# 시 생성 함수
def generate_poem_batch():
    batch_data = []
    
    with tqdm(total=batch_size, desc="<시 생성 중>", leave=False) as t:
        for _ in range(batch_size):
            topic = random.choice(topics)
            input_text = f"주제: {topic}\n시: "
            
            start_time = time.time()
            poem = generate_pipeline(
                input_text,
                max_length=100,
                temperature=0.8,
                top_p=0.9,
            )[0]['generated_text']
            end_time = time.time()
            
            gen_time = end_time - start_time
            
            batch_data.append({
                'topic': topic,
                'poem': poem,
                'selected': None
            })
            
            # tqdm
            t.update(1)
            
            global generated_samples
            generated_samples += 1
            complete_rate = (generated_samples / total_samples) * 100
            remaining_time = ((total_samples - generated_samples) * gen_time) / 60
            
            print(f"\n{generated_samples} / {total_samples} 개 완료, {complete_rate:.2f}")
            print(f"남은 시간: {remaining_time:.2f}분")
            print("-" * 50)
            
    return batch_data
            
            
            

In [None]:
# 시 저장 및 json 저장
for _ in tqdm(range(num_batches), desc="<전체 진행 상황>", position=0):
    eval_dataset.extend(generate_poem_batch())
    
    with open(eval_file, 'w', encoding='utf-8') as f:
        json.dump(eval_dataset, f, ensure_ascii=False, indent=4)

---

### Reward model
- 앞서 생성한 시에 대해서 selected=true로 수정해 피드백 수정

In [None]:
# 생성한 시+선호도 파일 로드
with open(eval_file, "r", encoding="utf-8") as f:
    evaluation_data = json.load(f)
    
    
reward_data = [
    {'text_a': f"주제: {item["topic"]}", 'text_b': item["poem"]}
    for item in evaluation_data if item["selected"]
]

In [None]:
# 데이터 전처리 함수 (베치 데이터 처리)
def preprocess_reward_data(sample):

    model_inputs = tokenizer(
        sample["text_a"],
        text_pair=sample["text_b"],
        padding="max_length",
        truncation=True, 
        max_length=512
    )
    
    model_inputs["labels"] = model_inputs["input_ids"].copy()
    
    pad_token_id = tokenizer.pad_token_id
    
    model_inputs["labels"] = [
        [l if l != pad_token_id else -100 for l in labels]
        for labels in model_inputs["labels"]
    ]
    
    return model_inputs

In [None]:
# 전처리
reward_dataset = reward_dataset.map(
    preprocess_reward_data,
    batched=True,
    remove_columns=["text_a", "text_b"]
)

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_quant_type="nf4",
    device_map="auto"
)

In [None]:
# model 생성
reward_model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config
)

reward_model = prepare_model_for_kbit_training(reward_model)

In [None]:
#LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

In [None]:
# LoRA 적용
reward_model = get_peft_model(reward_model, lora_config)

In [None]:
#trainer 설정 
training_args = TrainingArguments(
    output_dir="./reward_model",
    save_strategy="epoch",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=16,
    learning_rate=2e-4,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2,
    remove_unused_columns=False,
    fp16=True
)

reward_trainer = Trainer(
    model=model,
    args=reward_training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

In [None]:
reward_trainer.train()

---

### RLHF (ORPO)

In [None]:
# 모델 및 토크나이저 로드
model = AutoModelForCausalLM.from_pretrained(qlora_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_name)

model.train()
model.cuda()

# 파라미터 학습 가능하게끔 설정
for param in model.parameters():
    param.requires_grad = True
    
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [None]:
#PyTorch CUDA 메모리 관리 최적화 (OOM 방지)
!export PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True

In [None]:
# ORPO 데이터셋 준비
with open(eval_file, "r", encoding="utf-8") as f:
    evaluation_data = json.load(f)
    
orpo_data = []

for item in evaluation_data:
    if item["selected"]:
        prompt_text = f"주제: {item['topic']}\n이 주제에 맞는 시를 작성해 주세요."
        chosen_text = item["poem"]
        rejected_text = ""
        
        tokenized_prompt = tokenizer(prompt_text, truncation=True, padding="max_length", max_length=64, return_tensors="pt")
        tokenized_chosen = tokenizer(chosen_text, truncation=True, padding="max_length", max_length=64, return_tensors="pt")
        tokenized_rejected = tokenizer(rejected_text, truncation=True, padding="max_length", max_length=64, return_tensors="pt")
        
        orpo_data.append({
            "prompt": prompt_text,
            "chosen": chosen_text,
            "rejected": rejected_text,
            "prompt_input_ids": tokenized_prompt["input_ids"].squeeze(0).cuda(),
            "prompt_attention_mask": tokenized_prompt["attention_mask"].squeeze(0).cuda(),
            "chosen_input_ids": tokenized_chosen["input_ids"].squeeze(0).cuda(),
            "chosen_attention_mask": tokenized_chosen["attention_mask"].squeeze(0).cuda(),
            "rejected_input_ids": tokenized_rejected["input_ids"].squeeze(0).cuda(),
            "rejected_attention_mask": tokenized_rejected["attention_mask"].squeeze(0).cuda()
        })
        
        orpo_dataset = Dataset.from_list(orpo_data)
        

In [None]:
# ORPO 설정
from numpy import gradient


orpo_config = ORPOConfig(
    output_dir="./orpo_output",
    per_device_train_batch_size=8,
    gradient_accumulation_steps=16,
    learning_rate=2e6,
    num_train_epochs=5,
    logging_steps=50,
    save_total_limit=2,
    remove_unused_columns=False,
    fp16=True,
    gradient_checkpointing=True,
    max_grad_norm=1.0,
    warmup_steps=100,
    save_steps=500
)

In [None]:
# 데이터 콜레이터 설정
data_colloator = DPODataCollatorWithPadding(
    pad_token_id=tokenizer.pad_token_id,
    label_pad_token_id = -100,
    is_encoder_decoder=False
)

In [None]:
# ORPOTrainer 초기화
orpo_trainer = ORPOTrainer(
    model=model,
    args=orpo_config,
    data_colloator=data_colloator,
    train_dataset=orpo_dataset,
    process_function=tokenizer
)

In [None]:
# torch.cuda.empty_cache()
orpo_trainer.train()
# torch.cuda.empty_cache()

---

### 최종 시 생성

In [None]:
orpr_checkpoint = './orpo_output/checkpoint-xxx'

model = AutoModelForCausalLM.from_pretrained(orpr_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_name)

generate_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    pad_token_id=tokenizer.eos_token_id
)

In [None]:
def generate_peom_final(num_samples=5):
    topics = ["바람", "비", "노을", "달빛", "안개", "사랑", "이별", "운명", "기다림", "후회", "추억", "시간", "청춘", "변화", "마지막 순간", "군중", "밤거리", "버스", "인생", "빌딩", "사람들", "거짓말", "욕망", "돈", "권력", "비밀", "죽음", "희망", "동물", "자연", "도시", "바다", "산", "하늘", "별", "꽃", "나무", "강", "바위", "흙", "눈", "빗방울", "눈물", "웃음"]
    
    results = []
    
    for _ in range(num_samples):
        topic = random.choice(topics)
        input_text = f"주제: {topic}\n 시"
        poem = generate_poem(
            input_text,
            max_tokens=1000,
            temperature=0.8,
            top_p=0.9,
        )[0]['generated_text']
        
        results.append({
            "topic": topic,
            "poem": poem
        })
    
    return result

In [None]:
generated_poem = generate_peom_final(num_samples=10)
generated_poem