# 1. 설치 (Colab 첫 셀)

In [None]:
!pip -q install -U transformers datasets accelerate peft evaluate scikit-learn

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m512.3/512.3 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m557.0/557.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.9/8.9 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.7/47.7 MB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os, random, numpy as np
from dataclasses import dataclass
from typing import Dict, Any, List, Optional

import torch
from torch import nn

from datasets import load_dataset, DatasetDict
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    set_seed,
)
from peft import LoraConfig, get_peft_model, TaskType
from sklearn.metrics import f1_score

#############################################################################
# 일단 테스트 용이라 GPU 설정은 안했는데 학습할 떄는 GPU 쓰는 것을 권장한다 #
#############################################################################
print("torch:", torch.__version__)
print("cuda:", torch.cuda.is_available())



torch: 2.9.0+cpu
cuda: False


# 2 . 설정값

In [None]:
MODEL_NAME = "klue/roberta-base"   # 가볍고 성능 좋은 편
MAX_LENGTH = 128                  # 128부터 시작 (길면 256)
SEED = 42

# CSV 경로: Colab에 업로드한 뒤 경로 지정
CSV_PATH = "/content/kyobo_labeled.csv"

TEXT_COL = "review"
BOOK_ID_COL = "book_id"  # 없으면 None으로 바꾸기: BOOK_ID_COL = None

LABEL_COLS = [
    "label_action",     # 실무 적용성
    "label_thinking",   # 사고 확장성
    "label_depth",      # 지식 밀도
    "label_clarity",    # 가독성/설명력
    "label_relatable",  # 공감성
]

set_seed(SEED)


# 3. 데이터 로드

In [None]:
from google.colab import files
import pandas as pd
import io

print("파일을 선택해주세요:")
uploaded = files.upload()

# 업로드된 파일 이름을 확인합니다.
for fn in uploaded.keys():
  print('사용자가 업로드한 파일 "{name}"의 크기는 {length} 바이트입니다'.format(
      name=fn, length=len(uploaded[fn])))

# 업로드된 파일(여기서는 kyobo_labeled.csv라고 가정)을 Pandas DataFrame으로 읽어옵니다.
# 파일 이름이 다를 경우 uploaded.keys()의 실제 이름으로 변경해야 합니다.
CSV_PATH = "/content/kyobo_labeled.csv"
file_name = next(iter(uploaded)) # 업로드된 첫 번째 파일 이름을 가져옵니다.

# 업로드된 내용을 지정된 CSV_PATH로 저장 (선택 사항, 바로 DataFrame으로 읽을 수 있음)
with open(CSV_PATH, 'wb') as f:
    f.write(uploaded[file_name])

print(f"\n파일이 {CSV_PATH} 경로에 성공적으로 저장되었습니다.")

# 이제 CSV_PATH를 사용하여 데이터를 로드할 수 있습니다.
df = pd.read_csv(CSV_PATH)
print("\n데이터프레임 미리보기:")
print(df.head())

파일을 선택해주세요:


Saving book_review_labeled_original.csv to book_review_labeled_original.csv
사용자가 업로드한 파일 "book_review_labeled_original.csv"의 크기는 14065 바이트입니다

파일이 /content/kyobo_labeled.csv 경로에 성공적으로 저장되었습니다.

데이터프레임 미리보기:
         book_id                                             review  \
0  S000214208202                          이 책이 나오면 겨울이 온다라고 생각이 듭니다   
1  S000214208202  매년 나오는데  24년트렌드는 구매하고 읽지도 못하고 한해가 거의 지나가네요 \n2...   
2  S000214208202           10분만 투자하면 전체 내용을 파악할 수 있는, 가벼이 훑어보면 되는 책   
3  S000214208202                                  매년 운세 보듯이 보게 되는 책   
4  S000214208202  소비자 트렌드 관점에서는 새로운 이슈 출현을 강조 하고자 했으나 점점 트렌드를 억측...   

   label_action  label_thinking  label_depth  label_clarity  label_relatable  
0             0               1            0              0                0  
1             0               0            0              0                0  
2             0               0            0              0                0  
3             0               0            0      

In [None]:
ds = load_dataset("csv", data_files={"data": CSV_PATH}, sep=',')["data"]
print(ds)

# 필수 컬럼 체크
need_cols = [TEXT_COL] + LABEL_COLS + ([BOOK_ID_COL] if BOOK_ID_COL else [])
for c in need_cols:
    if c not in ds.column_names:
        raise ValueError(f"CSV에 '{c}' 컬럼이 없습니다. 현재 컬럼: {ds.column_names}")

# 텍스트/라벨 정리 (라벨을 float32로 맞춤)
def to_float_labels(example):
    labels = [float(example[c]) for c in LABEL_COLS]
    example["labels"] = labels
    return example

ds = ds.map(to_float_labels)

Generating data split: 0 examples [00:00, ? examples/s]

Dataset({
    features: ['book_id', 'review', 'label_action', 'label_thinking', 'label_depth', 'label_clarity', 'label_relatable'],
    num_rows: 100
})


Map:   0%|          | 0/100 [00:00<?, ? examples/s]

# 4. Train-Test Dataset Split (book_id 있으면 책 단위로 split 추천)

In [None]:
def book_level_split(dataset, book_id_col, train_ratio=0.8, val_ratio=0.1):
    # 고유 book_id 목록
    book_ids = list(set(dataset[book_id_col]))
    random.Random(SEED).shuffle(book_ids)

    n_unique_books = len(book_ids)

    # train, validation, test 3개의 분할을 위해 최소 3개의 고유한 책이 필요
    if n_unique_books < 3:
        print(f"경고: {n_unique_books}개의 고유한 책 ID만 발견되었습니다. 리뷰 단위의 랜덤 분할로 대체합니다.")
        # 리뷰 단위의 랜덤 분할 수행
        dsd_review_level = dataset.train_test_split(test_size=0.2, seed=SEED) # 80/20 split
        tmp = dsd_review_level["test"].train_test_split(test_size=0.5, seed=SEED) # 20%를 10/10으로 val/test split
        return DatasetDict(train=dsd_review_level["train"], validation=tmp["train"], test=tmp["test"])
    else:
        # 충분한 고유한 책 ID가 있는 경우 책 단위 분할 진행
        n = n_unique_books
        n_train = int(n * train_ratio)
        n_val = int(n * val_ratio)

        # 0이 되지 않도록 최소 1개는 할당 (비율이 너무 작아 0이 될 경우)
        if n_train == 0 and n >= 1: n_train = 1
        if n_val == 0 and n - n_train >= 1: n_val = 1
        # 남은 책이 없어서 test_books가 비는 것을 방지하기 위해 (극단적인 경우)
        if n - n_train - n_val <= 0:
            if n_val > 0: n_val -= 1
            elif n_train > 0: n_train -= 1

        train_books = set(book_ids[:n_train])
        val_books   = set(book_ids[n_train:n_train+n_val])
        test_books  = set(book_ids[n_train+n_val:])

        def filt(split_set):
            if not split_set:
                # 분할 세트가 비어있는 경우 빈 데이터셋 반환
                return dataset.filter(lambda x: False)
            return dataset.filter(lambda x: x[book_id_col] in split_set)

        return DatasetDict(
            train=filt(train_books),
            validation=filt(val_books),
            test=filt(test_books),
        )

if BOOK_ID_COL:
    dsd = book_level_split(ds, BOOK_ID_COL)
else:
    # 랜덤 split (기존 코드)
    dsd = ds.train_test_split(test_size=0.2, seed=SEED)
    tmp = dsd["test"].train_test_split(test_size=0.5, seed=SEED)
    dsd = DatasetDict(train=dsd["train"], validation=tmp["train"], test=tmp["test"])

print(dsd)
print("train/val/test:", len(dsd["train"]), len(dsd["validation"]), len(dsd["test"]))

경고: 1개의 고유한 책 ID만 발견되었습니다. 리뷰 단위의 랜덤 분할로 대체합니다.
DatasetDict({
    train: Dataset({
        features: ['book_id', 'review', 'label_action', 'label_thinking', 'label_depth', 'label_clarity', 'label_relatable', 'labels'],
        num_rows: 80
    })
    validation: Dataset({
        features: ['book_id', 'review', 'label_action', 'label_thinking', 'label_depth', 'label_clarity', 'label_relatable', 'labels'],
        num_rows: 10
    })
    test: Dataset({
        features: ['book_id', 'review', 'label_action', 'label_thinking', 'label_depth', 'label_clarity', 'label_relatable', 'labels'],
        num_rows: 10
    })
})
train/val/test: 80 10 10


In [None]:
dsd['test'][0]

{'book_id': 'S000214208202',
 'review': '매년 많은분들이 연례행사처럼 읽어보는 책^^',
 'label_action': 0,
 'label_thinking': 0,
 'label_depth': 0,
 'label_clarity': 0,
 'label_relatable': 0,
 'labels': [0.0, 0.0, 0.0, 0.0, 0.0]}

# 5. Tokenization

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_fn(batch):
    return tokenizer(
        batch[TEXT_COL],
        truncation=True,
        max_length=MAX_LENGTH,
    )

dsd_tok = dsd.map(tokenize_fn, batched=True)

# Trainer가 labels만 보면 되게 정리
keep_cols = ["input_ids", "attention_mask", "labels"]
if "token_type_ids" in dsd_tok["train"].column_names:
    keep_cols.append("token_type_ids")

dsd_tok = dsd_tok.remove_columns([c for c in dsd_tok["train"].column_names if c not in keep_cols])
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/375 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/173 [00:00<?, ?B/s]

Map:   0%|          | 0/80 [00:00<?, ? examples/s]

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

In [None]:
# HuggingFace가 문장을 tokenization한 결과
dsd_tok['test'][0]

{'labels': [0.0, 0.0, 0.0, 0.0, 0.0],
 'input_ids': [0,
  5088,
  1039,
  2073,
  2377,
  7285,
  21844,
  16049,
  7925,
  1508,
  15882,
  2259,
  1644,
  65,
  65,
  2],
 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}

# 6. 모델 + LoRA 적용

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(LABEL_COLS),
)

# 멀티라벨 설정 (transformers 내장 loss도 BCE로 잡히지만,
# 아래에서 Trainer에서 커스텀 loss로 확실히 고정할 거야)
model.config.problem_type = "multi_label_classification"

# LoRA 설정 (Roberta/BERT 계열에서 흔히 q/v 혹은 query/value를 타깃)
# klue/roberta-base는 roberta 구조라 아래 타깃이 잘 먹는 편
lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,
    r=8,
    lora_alpha=16,
    lora_dropout=0.05,
    bias="none",
    target_modules=["query", "value"],  # 핵심
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

config.json:   0%|          | 0.00/546 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/443M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at klue/roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


trainable params: 889,349 || all params: 111,511,306 || trainable%: 0.7975


# 7. Metric 설정 & Loss function 설정

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs = sigmoid(logits)

    # 기본 threshold=0.5
    preds = (probs >= 0.5).astype(int)
    labels_int = labels.astype(int)

    micro = f1_score(labels_int, preds, average="micro", zero_division=0)
    macro = f1_score(labels_int, preds, average="macro", zero_division=0)

    # 라벨별 F1도 보고 싶으면 아래 추가
    per_label = f1_score(labels_int, preds, average=None, zero_division=0)
    metrics = {"f1_micro": micro, "f1_macro": macro}
    for i, name in enumerate(LABEL_COLS):
        metrics[f"f1_{name}"] = per_label[i]
    return metrics

class MultiLabelTrainer(Trainer):
    def __init__(self, pos_weight: Optional[torch.Tensor] = None, **kwargs):
        super().__init__(**kwargs)
        self.pos_weight = pos_weight

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        # labels: (batch, 5), logits: (batch, 5)
        loss_fct = nn.BCEWithLogitsLoss(pos_weight=self.pos_weight)
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss

# 8. positive_weight 계산 (for imbalanced dataset)

왜 쓰는 지는 좀 더 확인이 필요해보임

In [None]:
def calc_pos_weight(train_ds, label_cols):
    labels = np.array(train_ds["labels"])  # shape (N,5)
    P = labels.sum(axis=0)
    N = labels.shape[0]
    # 0 division 방지
    P = np.where(P == 0, 1.0, P)
    pos_weight = (N - P) / P
    return torch.tensor(pos_weight, dtype=torch.float32)

pos_weight = calc_pos_weight(dsd_tok["train"], LABEL_COLS)
pos_weight = pos_weight.to("cuda") if torch.cuda.is_available() else pos_weight
print("pos_weight:", pos_weight)

pos_weight: tensor([ 7.0000,  9.0000, 79.0000, 12.3333, 19.0000])


# 9 .TrainingArguments (Colab 안정 세팅)

In [None]:
# from transformers import TrainingArguments

output_dir = "./kyobo_multilabel_lora"

args = TrainingArguments(
    output_dir=output_dir,
    learning_rate=2e-4,          # LoRA는 보통 조금 높게도 OK # 중요
    per_device_train_batch_size=16, # 중요
    per_device_eval_batch_size=32,
    num_train_epochs=3, # 중요
    weight_decay=0.01, # 약간 중요
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    greater_is_better=True,
    logging_steps=50,
    fp16=torch.cuda.is_available(),
    report_to="none",
    save_total_limit=2,
)

# 10. Train

In [None]:
trainer = MultiLabelTrainer(
    model=model,
    args=args,
    train_dataset=dsd_tok["train"],
    eval_dataset=dsd_tok["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    pos_weight=pos_weight,  # 불균형 없으면 None으로 바꿔도 됨
)

trainer.train()

  super().__init__(**kwargs)


Epoch,Training Loss,Validation Loss,F1 Micro,F1 Macro,F1 Label Action,F1 Label Thinking,F1 Label Depth,F1 Label Clarity,F1 Label Relatable
1,No log,0.788555,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,No log,0.782609,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,No log,0.779729,0.0,0.0,0.0,0.0,0.0,0.0,0.0




TrainOutput(global_step=15, training_loss=1.3417550404866536, metrics={'train_runtime': 250.4158, 'train_samples_per_second': 0.958, 'train_steps_per_second': 0.06, 'total_flos': 12386958808896.0, 'train_loss': 1.3417550404866536, 'epoch': 3.0})

# 11. 평가 (Test) : F1_Score = Precision & Recall 의 조화평균

In [None]:
test_metrics = trainer.evaluate(dsd_tok["test"])
print(test_metrics)



{'eval_loss': 0.7182961702346802, 'eval_f1_micro': 0.0, 'eval_f1_macro': 0.0, 'eval_f1_label_action': 0.0, 'eval_f1_label_thinking': 0.0, 'eval_f1_label_depth': 0.0, 'eval_f1_label_clarity': 0.0, 'eval_f1_label_relatable': 0.0, 'eval_runtime': 1.717, 'eval_samples_per_second': 5.824, 'eval_steps_per_second': 0.582, 'epoch': 3.0}


In [None]:
pd.Series(test_metrics)

Unnamed: 0,0
eval_loss,0.718296
eval_f1_micro,0.0
eval_f1_macro,0.0
eval_f1_label_action,0.0
eval_f1_label_thinking,0.0
eval_f1_label_depth,0.0
eval_f1_label_clarity,0.0
eval_f1_label_relatable,0.0
eval_runtime,1.717
eval_samples_per_second,5.824


# 12. 예측

In [None]:
label_names = LABEL_COLS

def predict_labels(texts, threshold=0.5):
    model.eval()
    with torch.no_grad():
        tok = tokenizer(texts, return_tensors="pt", truncation=True, max_length=MAX_LENGTH, padding=True)
        tok = {k: v.to(model.device) for k, v in tok.items()}
        logits = model(**tok).logits.detach().cpu().numpy()
        probs = sigmoid(logits)
        preds = (probs >= threshold).astype(int)

    results = []
    for t, p, pr in zip(texts, preds, probs):
        tags = [label_names[i] for i in range(len(label_names)) if p[i] == 1]
        results.append({"text": t, "tags": tags, "probs": pr.tolist()})
    return results

# samples = [
#     "실무에서 바로 적용할 수 있는 체크리스트가 많아서 좋았습니다.",
#     "내용은 깊은데 어렵고 읽기 힘들었어요.",
#     "관점을 바꿔주는 통찰이 많아 생각이 확장됐습니다."
# ]
# ✅ 엑셀/CSV 전체 리뷰(102행)를 리스트로 만들기
# df 대신 원본 데이터셋을 가진 ds를 사용
all_texts = list(ds[TEXT_COL]) # 명시적으로 list()로 변환

# ✅ 전체 예측
predict_results = predict_labels(all_texts, threshold=0.5)

In [None]:
predict_results = predict_labels(all_texts)

In [None]:
# 1. 딕셔너리 리스트를 바로 DataFrame으로 변환
df = pd.DataFrame(predict_results)

# 2. 'probs' 리스트를 분리하여 별도 컬럼으로 추가 (선택 사항)
# 각 probs 값에 대한 컬럼명을 지정합니다.
prob_columns = LABEL_COLS

# 'probs' 리스트를 DataFrame으로 변환하여 기존 df에 병합ㅠ
df_probs = pd.DataFrame(df['probs'].tolist(), columns=prob_columns)
df = pd.concat([df.drop('probs', axis=1), df_probs], axis=1)

# DataFrame을 CSV 파일로 저장
df.to_csv("predict_results.csv", index=False)

In [None]:
df

Unnamed: 0,text,tags,label_action,label_thinking,label_depth,label_clarity,label_relatable
0,이 책이 나오면 겨울이 온다라고 생각이 듭니다,[],0.439743,0.443523,0.341336,0.471272,0.410864
1,매년 나오는데 24년트렌드는 구매하고 읽지도 못하고 한해가 거의 지나가네요 \n2...,[],0.438919,0.442775,0.333920,0.481019,0.405651
2,"10분만 투자하면 전체 내용을 파악할 수 있는, 가벼이 훑어보면 되는 책",[],0.444989,0.442580,0.347962,0.471911,0.415484
3,매년 운세 보듯이 보게 되는 책,[],0.440026,0.443172,0.343632,0.471608,0.412157
4,소비자 트렌드 관점에서는 새로운 이슈 출현을 강조 하고자 했으나 점점 트렌드를 억측...,[],0.448597,0.443017,0.344637,0.481657,0.408509
...,...,...,...,...,...,...,...
95,2025년의 전망은 다소 암울하지만 어려움 속에서도 새로운 트렌드는 계속 만들어지고...,[],0.439010,0.439689,0.338429,0.477474,0.412785
96,❤️❤️❤️❤️❤️,[],0.435073,0.436957,0.337177,0.476131,0.415045
97,빠른배송 잘 읽겠습니다,[],0.439255,0.442851,0.339198,0.470007,0.412679
98,내용이 좋아요 최고최고에용,[],0.440396,0.437904,0.338480,0.472583,0.411880


In [None]:
from google.colab import files
import os

# 이미 "predict_results.csv" 파일이 생성되어 있으므로 다운로드합니다.
# 파일 이름을 지정하여 다운로드할 수 있습니다.
file_to_download = "predict_results.csv"

# 파일이 존재하는지 확인 후 다운로드
if os.path.exists(file_to_download):
    files.download(file_to_download)
    print(f"'{file_to_download}' 파일 다운로드 완료!")
else:
    print(f"오류: '{file_to_download}' 파일을 찾을 수 없습니다. 이전 단계에서 파일이 제대로 생성되었는지 확인해주세요.")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

'predict_results.csv' 파일 다운로드 완료!
