## ENV

In [None]:
from google.colab import drive
from google.colab import userdata
drive.mount('/content/drive')

## Installation

In [None]:
!pip install -q transformers datasets peft accelerate bitsandbytes

# Dataset Prep

In [None]:
# Data loading
import os
import json
import pandas as pd

burnfit_dir = os.path.join("/content/drive/MyDrive", "Burnfit")
input_dir = os.path.join(burnfit_dir, "input")
output_dir = os.path.join(burnfit_dir, "output")

inputs = os.listdir(input_dir)
outputs = os.listdir(output_dir)

input_user_profile_csv = os.path.join(input_dir, "input_user.csv")
input_user_1rm_csv = os.path.join(input_dir, "input_user_rm.csv")
output_increase_rate_csv = os.path.join(output_dir, "output_increase_rate.csv")
output_weekly_increase_rate_plan_csv = os.path.join(output_dir, "output_weekly_increase_rate_plan.csv")

input_user_profile_df = pd.read_csv(input_user_profile_csv)
input_user_1rm_df = pd.read_csv(input_user_1rm_csv)
output_increase_rate_df = pd.read_csv(output_increase_rate_csv)
output_weekly_increase_rate_plan_df = pd.read_csv(output_weekly_increase_rate_plan_csv)

print(input_user_profile_df.columns)
print(input_user_1rm_df.columns)
print(output_increase_rate_df.columns)
print(output_weekly_increase_rate_plan_df.columns)

In [None]:
# 질문
def question_formmatter(input_row):
  input_text = [
        "## 질문",
        "-5/3/1 프로그램",
        f"- **운동경험**: {input_row['운동경험']}",
        f"- **성별**: {input_row['성별']}",
        f"- **운동목표**: {input_row['운동목표']}",
        "- **1RM**:",
        f"  - 벤치프레스: {input_row['벤치프레스']}",
        f"  - 스쿼트: {input_row['스쿼트']}",
        f"  - 데드리프트: {input_row['데드리프트']}",
        f"  - 오버헤드프레스: {input_row['오버헤드프레스']}"
    ]
  return "\n".join(input_text)

# 답변
def answer_formmatter(output_row):
  json_output = {
        "program": "5/3/1 프로그램",
        "init_weight_rate": str(output_row["init_weight_rate"]),
        "increase_rate_week": str(output_row["increase_rate_week"]),
        "increase_rate_set": str(output_row["increase_rate_set"]),
        "deloading_rate": str(output_row["deloading_rate"]),
        "weekly_weight_increase_plan": output_row["weekly_increase_rate_plan"]
  }
  answer = f"## 답변\n{json.dumps(json_output, ensure_ascii=False, indent=2)}\n<END>"
  # return f"## 답변\n{json.dumps(json_output, ensure_ascii=False, indent=2)}"
  return answer


## 질문 + 답변
def question_answer_formmatter(input_user_profile_df, input_user_1rm_df, output_increase_rate_df, output_weekly_increase_rate_plan_df):
    rows = []
    combined_df = input_user_profile_df.merge(input_user_1rm_df, on="id") \
            .merge(output_increase_rate_df, on="id") \
            .merge(output_weekly_increase_rate_plan_df, on="id")

    for _, row in combined_df.iterrows():
        input_row = row[["운동경험", "성별", "운동목표", "벤치프레스", "스쿼트", "데드리프트", "오버헤드프레스"]]
        output_row = row[["init_weight_rate", "increase_rate_week", "increase_rate_set", "deloading_rate", "weekly_increase_rate_plan"]]

        question = question_formmatter(input_row)
        answer = answer_formmatter(output_row)
        full_text = f"{question}\n\n{answer}"
        rows.append({"id": row["id"], "text": full_text})
        if _ < 5:
          print(full_text)
    final_df = pd.DataFrame(rows)
    return final_df

In [None]:
text_df = question_answer_formmatter(input_user_profile_df, input_user_1rm_df, output_increase_rate_df, output_weekly_increase_rate_plan_df)
print("Rows:", text_df.shape[0])
print("Columns:", text_df.shape[1])

# Fine-tuning Process

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

model_name = "EleutherAI/polyglot-ko-1.3b"

tokenizer = AutoTokenizer.from_pretrained(model_name)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,
    torch_dtype="auto",
    device_map="auto"
)

In [None]:
from peft import LoraConfig, get_peft_model, TaskType

peft_config = LoraConfig(
    r=8,
    lora_alpha=16,
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM
)

model = get_peft_model(model, peft_config)

In [None]:
import json

print("Rows:", text_df.shape[0])
print("Columns:", text_df.shape[1])

text_df[["text"]].to_json("training_data.jsonl", orient="records", lines=True, force_ascii=False)

In [None]:
from datasets import load_dataset

dataset = load_dataset("json", data_files="training_data.jsonl")["train"]

def tokenize(example):
    return tokenizer(
        example["text"],
        truncation=True,
        padding="max_length",
        max_length=512
    )

tokenized_dataset = dataset.map(tokenize, batched=True)

In [None]:
# Path
fine_tunned_dir = os.path.join(burnfit_dir, "fine_tunned")
path_to_save = os.path.join(fine_tunned_dir, "lora-output-v2")

In [None]:
from transformers import TrainingArguments, Trainer, DataCollatorForLanguageModeling

## END token
tokenizer.add_special_tokens({"additional_special_tokens": ["<END>"]})
model.resize_token_embeddings(len(tokenizer))

training_args = TrainingArguments(
    output_dir=path_to_save,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,
    num_train_epochs=3,
    learning_rate=2e-4,
    bf16=True,
    logging_steps=20,
    save_strategy="epoch",
    save_total_limit=2,
    report_to="none",
    optim="paged_adamw_8bit"
)

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

trainer = Trainer(
    model=model,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=tokenized_dataset,
    data_collator=data_collator
)

In [None]:
# Training!!
trainer.train()

# Testing

## Before fine-tuning

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch

model_name = "EleutherAI/polyglot-ko-1.3b"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    load_in_8bit=True,
)

generator = pipeline("text-generation", model=model, tokenizer=tokenizer)

In [None]:
prompt = """## 질문
- 5/3/1프로그램
- **운동경험**: 1
- **성별**: 남성
- **운동목표**: 6
- **1RM**:
  - 벤치프레스: 30kg
  - 스쿼트: 30kg
  - 데드리프트: 30kg
  - 오버헤드프레스: 30kg

## 답변
"""

output = generator(
    prompt,
    max_new_tokens=200,
    temperature=0.7,
    do_sample=True,
    top_p=0.9,
    repetition_penalty=1.1,
)

print(output[0]["generated_text"])

## after fine-tuning

#### V1

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from peft import PeftModel

fine_tunned_dir = os.path.join(burnfit_dir, "fine_tunned")
path_to_v1 = os.path.join(fine_tunned_dir, "lora-output-v1")
checkpoint = os.path.join(path_to_v1, "checkpoint-186")
print(checkpoint)
base_model = AutoModelForCausalLM.from_pretrained("EleutherAI/polyglot-ko-1.3b", device_map="auto")
lora_model = PeftModel.from_pretrained(base_model, checkpoint)
tokenizer = AutoTokenizer.from_pretrained(checkpoint)

generator = pipeline("text-generation", model=lora_model, tokenizer=tokenizer)

In [None]:
prompt = """## 질문
- 5/3/1프로그램
- **운동경험**: 3
- **성별**: 남성
- **운동목표**: 1
- **1RM**:
  - 벤치프레스: 60kg
  - 스쿼트: 70kg
  - 데드리프트: 70kg
  - 오버헤드프레스: 70kg

## 답변
"""

output = generator(
    prompt,
    max_new_tokens=260,
    temperature=0.7,
    do_sample=True,
    top_p=0.9,
    repetition_penalty=1.1,
)

print(output[0]["generated_text"])

#### V2: Adding Special Token

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import os
import torch

path_to_v2 = os.path.join(fine_tunned_dir, "lora-output-v2")
checkpoint = os.path.join(path_to_v2, "checkpoint-186")
print(checkpoint)

tokenizer = AutoTokenizer.from_pretrained(checkpoint)
tokenizer.add_special_tokens({"additional_special_tokens": ["<END>"]})
tokenizer.eos_token = "<END>"
eos_token_id = tokenizer.convert_tokens_to_ids("<END>")

base_model = AutoModelForCausalLM.from_pretrained("EleutherAI/polyglot-ko-1.3b", device_map="auto")
base_model.resize_token_embeddings(len(tokenizer))

model = PeftModel.from_pretrained(base_model, checkpoint)

In [None]:
prompt = """## 질문
- 5/3/1프로그램
- **운동경험**: 3
- **성별**: 남성
- **운동목표**: 2
- **1RM**:
  - 벤치프레스: 60kg
  - 스쿼트: 70kg
  - 데드리프트: 70kg
  - 오버헤드프레스: 70kg

## 답변
"""

inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

inputs.pop("token_type_ids", None)

outputs = model.generate(
        **inputs,
        max_new_tokens=300,
        eos_token_id=eos_token_id,
        do_sample=True,
        top_p=0.9,
        temperature=0.7,
        repetition_penalty=1.1,
    )

# generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
# print(generated_text)
decoded = tokenizer.decode(outputs[0])

print(decoded.split("<END>")[0] + "<END>")