<a href="https://colab.research.google.com/github/jhpython001/deep-learning-from-scratch/blob/master/%EC%8B%9C%EB%8B%88%EC%96%B4%EC%BC%80%EC%96%B4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#01_auto_preprocess.py

In [1]:
# auto_preprocess.py
import re, pandas as pd
from pathlib import Path
import chardet

RAW_CSV = "./sample_data/돌봄인형데이터.csv"
OUT_DIR = Path("./prepared")

# 역할별 후보 컬럼명 사전
COLUMN_CANDIDATES = {
    "id":        ["id","speaker","user_id","화자"],
    "doll_id":   ["doll_id","bot_id","device"],
    "text":      ["text","utterance","sentence","발화"],
    "type":      ["type","label","risk","emotion"],
    "created_at":["created_at","time","timestamp","uttered_at","datetime"]
}

def clean_text(t):
    return re.sub(r"\s+", " ", str(t)).strip() if pd.notna(t) else ""

def auto_map_columns(df: pd.DataFrame):
    """CSV의 실제 컬럼명을 자동으로 매핑해서 표준화"""
    mapping = {}
    for role, candidates in COLUMN_CANDIDATES.items():
        for c in candidates:
            if c in df.columns:
                mapping[role] = c
                break
    return mapping

def read_csv_with_encoding(path, n_bytes=100000, **kwargs):
    # 파일 일부 읽어서 인코딩 추정
    with open(path, "rb") as f:
        raw_data = f.read(n_bytes)
        result = chardet.detect(raw_data)
        enc = result["encoding"]
        conf = result["confidence"]
        print(f"[INFO] Detected encoding: {enc} (confidence: {conf:.2f})")
    # 감지된 인코딩으로 CSV 읽기
    return pd.read_csv(path, encoding=enc, **kwargs)

def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    df = read_csv_with_encoding(RAW_CSV)
    mapping = auto_map_columns(df)
    print("자동 매핑 결과:", mapping)

    # 표준 이름으로 rename
    df = df.rename(columns={v:k for k,v in mapping.items()})

    # 전처리
    if "text" in df:
        df["text"] = df["text"].apply(clean_text)
        df = df[df["text"].str.len() > 0].copy()

    if "type" in df:
        df = df[df["type"].notna()].copy()

    if "created_at" in df:
        df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
        df = df.sort_values(["doll_id","created_at","id"], na_position="last")

    out = OUT_DIR / "processed.csv"
    df.to_csv(out, index=False)
    print(f"저장 완료: {out.resolve()} rows={len(df)}")

if __name__ == "__main__":
    main()


[INFO] Detected encoding: EUC-KR (confidence: 0.99)
자동 매핑 결과: {'id': 'id', 'doll_id': 'doll_id', 'text': 'text', 'type': 'type', 'created_at': 'created_at'}
저장 완료: /content/prepared/processed.csv rows=265


#02_split_by_speaker.py

In [2]:
# 02_split_by_speaker.py
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit
from pathlib import Path

IN_CSV = "./prepared/processed.csv"
OUT_DIR = Path("./prepared")

def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    df = pd.read_csv(IN_CSV)

    # id(발화자) 기준 그룹 분할
    gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    tr_idx, tmp_idx = next(gss.split(df, groups=df["id"]))
    train_df, tmp_df = df.iloc[tr_idx].copy(), df.iloc[tmp_idx].copy()

    gss2 = GroupShuffleSplit(n_splits=1, test_size=0.5, random_state=42)
    va_idx, te_idx = next(gss2.split(tmp_df, groups=tmp_df["id"]))
    valid_df, test_df = tmp_df.iloc[va_idx].copy(), tmp_df.iloc[te_idx].copy()

    train_df.to_csv(OUT_DIR / "train.csv", index=False)
    valid_df.to_csv(OUT_DIR / "valid.csv", index=False)
    test_df.to_csv (OUT_DIR / "test.csv",  index=False)

    print("발화자 수:",
          "train", train_df["id"].nunique(),
          "valid", valid_df["id"].nunique(),
          "test",  test_df["id"].nunique())
    print("행 수:", len(train_df), len(valid_df), len(test_df))

if __name__ == "__main__":
    main()


발화자 수: train 212 valid 26 test 27
행 수: 212 26 27


#03_make_windows.py

In [3]:
# 03_make_windows.py
import pandas as pd, numpy as np
from pathlib import Path

SPLITS = ["train","valid","test"]
IN_DIR = Path("./prepared")
OUT_DIR = Path("./prepared")

WINDOW_N = 8
STRIDE = 4
TYPE_LEVELS = ["positive","danger","critical","emergency"]
SEVERITY = {t:i for i,t in enumerate(TYPE_LEVELS)}

def make_windows(df, window_n=8, stride=4, join_for_context=False):
    rows = []
    if "created_at" in df.columns:
        df["created_at"] = pd.to_datetime(df["created_at"], errors="coerce")
        df = df.sort_values(["doll_id","created_at"]).reset_index(drop=True)
    else:
        df = df.sort_values(["doll_id"]).reset_index(drop=True)

    for doll, g in df.groupby("doll_id", sort=False):
        texts = g["text"].tolist()
        types = g["type"].tolist() if "type" in g.columns else [None]*len(g)
        times = g["created_at"].tolist() if "created_at" in g.columns else [None]*len(g)

        for start in range(0, len(texts) - window_n + 1, stride):
            end = start + window_n
            chunk_texts = texts[start:end]
            chunk_types = types[start:end]
            chunk_times = times[start:end]

            item = {
                "doll_id": doll,
                "win_start_idx": start,
                "win_end_idx": end-1,
                "start_time": chunk_times[0],
                "end_time":   chunk_times[-1],
            }
            if join_for_context:
                item["context_text"] = " [UTT] ".join(map(str, chunk_texts))

            if chunk_types[0] is not None:
                sev = [SEVERITY.get(t, 0) for t in chunk_types]
                worst = chunk_types[int(np.argmax(sev))]
                item["type"] = worst

            rows.append(item)
    return pd.DataFrame(rows)

def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    for split in SPLITS:
        df = pd.read_csv(IN_DIR / f"{split}.csv")

        # 집계형 윈도우
        w_agg = make_windows(df, WINDOW_N, STRIDE, join_for_context=False)
        w_agg.to_csv(OUT_DIR / f"windows_n{WINDOW_N}_s{STRIDE}_{split}.csv", index=False)

        # 문맥형 윈도우
        w_ctx = make_windows(df, WINDOW_N, STRIDE, join_for_context=True)
        w_ctx.to_csv(OUT_DIR / f"context_windows_n{WINDOW_N}_s{STRIDE}_{split}.csv", index=False)

        print(split, "윈도우 생성 완료:",
              len(w_agg), "(집계형),", len(w_ctx), "(문맥형)")

if __name__ == "__main__":
    main()

train 윈도우 생성 완료: 12 (집계형), 12 (문맥형)
valid 윈도우 생성 완료: 0 (집계형), 0 (문맥형)
test 윈도우 생성 완료: 0 (집계형), 0 (문맥형)



#train_risk.py

In [22]:
# train_risk.py (robust to HF/accelerate version diffs)
import os, re
import numpy as np
import pandas as pd
import torch, torch.nn as nn

from datasets import Dataset
from sklearn.metrics import (accuracy_score, f1_score, classification_report,
                             precision_recall_fscore_support)

from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification,
    TrainingArguments, Trainer
)

# ===== 경로/하이퍼파라미터 =====
TRAIN_CSV = "prepared/train.csv"
VALID_CSV = "prepared/valid.csv"
TEST_CSV  = "prepared/test.csv"
BASE_MODEL = "beomi/KcELECTRA-base"
OUT_DIR = "./ckpt"
FINAL_DIR = "./ckpt_final"
MAX_LEN = 256
EPOCHS  = 3
BATCH_TRAIN = 8
BATCH_EVAL  = 16
TYPES = ["positive","danger","critical","emergency"]
POS = TYPES.index("positive")
# =================================

def clean_text(t):
    return re.sub(r"\s+", " ", str(t)).strip() if pd.notna(t) else ""

def load_split(path):
    df = pd.read_csv(path)
    assert "text" in df.columns and "type" in df.columns, f"{path}에 text/type 컬럼이 필요합니다."
    df["text"] = df["text"].apply(clean_text)
    df = df[(df["text"].str.len()>0) & df["type"].notna()].reset_index(drop=True)
    return df

print("Loading splits ...")
train_df, valid_df, test_df = load_split(TRAIN_CSV), load_split(VALID_CSV), load_split(TEST_CSV)

# 라벨 매핑
id2type = {i:t for i,t in enumerate(TYPES)}
type2id = {t:i for i,t in enumerate(TYPES)}
for df in (train_df, valid_df, test_df):
    df["label_id"] = df["type"].map(type2id)
    df.dropna(subset=["label_id"], inplace=True)
    df["label_id"] = df["label_id"].astype(int)

def to_hfds(pdf):
    return Dataset.from_pandas(
        pdf[["text","label_id"]].rename(columns={"label_id":"labels"}),
        preserve_index=False
    )

ds_train, ds_valid, ds_test = to_hfds(train_df), to_hfds(valid_df), to_hfds(test_df)

tok = AutoTokenizer.from_pretrained(BASE_MODEL)

def tokenize(batch):
    return tok(batch["text"], truncation=True, max_length=MAX_LEN)

print("Tokenizing ...")
ds_train = ds_train.map(tokenize, batched=True)
ds_valid = ds_valid.map(tokenize, batched=True)
ds_test  = ds_test.map(tokenize,  batched=True)

keep = ["input_ids","attention_mask","labels"]
ds_train = ds_train.remove_columns([c for c in ds_train.column_names if c not in keep])
ds_valid = ds_valid.remove_columns([c for c in ds_valid.column_names if c not in keep])
ds_test  = ds_test.remove_columns([c for c in ds_test.column_names  if c not in keep])

device = torch.device("mps" if torch.backends.mps.is_available()
                      else "cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

def build_model():
    return AutoModelForSequenceClassification.from_pretrained(
        BASE_MODEL,
        num_labels=len(TYPES),
        id2label=id2type,
        label2id=type2id
    ).to(device)

model = build_model()

# ===== 클래스 불균형 가중치 =====
counts = train_df["label_id"].value_counts().reindex(range(len(TYPES)), fill_value=0).sort_index().values
w = (1.0 / (counts + 1e-6))
w = w / w.sum() * len(TYPES)
class_weights = torch.tensor(w, dtype=torch.float32).to(device)
print("Class weights:", class_weights.cpu().numpy().round(3).tolist())

# ===== 메트릭 =====
def compute_metrics(p):
    logits = p.predictions
    preds = np.argmax(logits, axis=1)
    y = p.label_ids
    acc = accuracy_score(y, preds)
    p_cls, r_cls, f1_cls, _ = precision_recall_fscore_support(
        y, preds, labels=list(range(len(TYPES))), average=None, zero_division=0
    )
    macro_f1 = f1_cls.mean()
    return {
        "accuracy": acc,
        "macro_f1": macro_f1,
        "f1_pos": f1_cls[POS],
        "recall_pos": r_cls[POS],
        "precision_pos": p_cls[POS],
    }

# ===== TrainingArguments 생성 (신버전 우선, 실패 시 최소 인자) =====
def make_training_args():
    try:
        return TrainingArguments(
            output_dir=OUT_DIR,
            per_device_train_batch_size=BATCH_TRAIN,
            per_device_eval_batch_size=BATCH_EVAL,
            learning_rate=2e-5,
            num_train_epochs=EPOCHS,
            evaluation_strategy="steps",
            save_strategy="steps",
            eval_steps=500,
            save_steps=500,
            load_best_model_at_end=True,
            metric_for_best_model="macro_f1",
            greater_is_better=True,
            gradient_accumulation_steps=2,
            logging_steps=50,
            report_to="none",
            fp16=False,
            save_total_limit=3,
        )
    except TypeError:
        print("[WARN] Using minimal TrainingArguments due to old transformers.")
        return TrainingArguments(
            output_dir=OUT_DIR,
            per_device_train_batch_size=BATCH_TRAIN,
            per_device_eval_batch_size=BATCH_EVAL,
            learning_rate=2e-5,
            num_train_epochs=EPOCHS,
            logging_steps=50,
            report_to="none",
        )

args = make_training_args()

# EarlyStopping은 조건 충족 시에만 사용
def supports_early_stopping(_args):
    try:
        has_eval = hasattr(_args, "evaluation_strategy") and str(getattr(_args, "evaluation_strategy", "no")).lower() != "no"
        has_metric = hasattr(_args, "metric_for_best_model") and getattr(_args, "metric_for_best_model", None) is not None
        loads_best = hasattr(_args, "load_best_model_at_end") and bool(getattr(_args, "load_best_model_at_end", False))
        return has_eval and has_metric and loads_best
    except Exception:
        return False

callbacks = []
if supports_early_stopping(args):
    try:
        from transformers import EarlyStoppingCallback
        callbacks = [EarlyStoppingCallback(early_stopping_patience=2)]
    except Exception:
        callbacks = []

# DataCollator: 자동 패딩
data_collator = None
try:
    from transformers import DataCollatorWithPadding
    data_collator = DataCollatorWithPadding(tokenizer=tok)
except Exception:
    pass

# ===== 가중치 CE Trainer (num_items_in_batch 방어) =====
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # num_items_in_batch 등 추가 kwarg가 와도 무시
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = nn.CrossEntropyLoss(weight=class_weights)(logits, labels)
        return (loss, outputs) if return_outputs else loss

def build_trainer(_model, _args, use_callbacks=True):
    return WeightedTrainer(
        model=_model,
        args=_args,
        train_dataset=ds_train,
        eval_dataset=ds_valid,
        compute_metrics=compute_metrics,
        tokenizer=tok,
        data_collator=data_collator,
        callbacks=(callbacks if (use_callbacks and callbacks) else [])
    )

trainer = build_trainer(model, args, use_callbacks=True)

# ===== 학습 =====
print("Training ...")
try:
    trainer.train()
except Exception as e:
    # (중요) Accelerate 상태가 깨질 수 있으므로, 완전히 새 객체로 재시작
    print("[WARN] trainer.train() failed:", repr(e))
    print("[INFO] Retrying with fresh model/trainer and minimal TrainingArguments...")
    # 1) 새 모델
    model = build_model()
    # 2) 최소 인자
    minimal_args = TrainingArguments(
        output_dir=OUT_DIR,
        per_device_train_batch_size=BATCH_TRAIN,
        per_device_eval_batch_size=BATCH_EVAL,
        learning_rate=2e-5,
        num_train_epochs=EPOCHS,
        logging_steps=50,
        report_to="none",
    )
    # 3) 새 트레이너 (콜백 없음)
    trainer = build_trainer(model, minimal_args, use_callbacks=False)
    # 4) 재시도
    trainer.train()

# 항상 최종 상태 저장
os.makedirs(FINAL_DIR, exist_ok=True)
trainer.save_model(FINAL_DIR)
try:
    tok.save_pretrained(FINAL_DIR)
except Exception:
    pass

# ===== 평가 =====
def softmax(x):
    x = np.array(x)
    e = np.exp(x - np.max(x, axis=1, keepdims=True))
    return e / e.sum(axis=1, keepdims=True)

print("VALID:", trainer.evaluate())

print("\nTesting ...")
pred_test = trainer.predict(ds_test)
y_true = pred_test.label_ids
y_hat_argmax = np.argmax(pred_test.predictions, axis=1)

print("TEST metrics (argmax):", {
    "accuracy": accuracy_score(y_true, y_hat_argmax),
    "macro_f1": f1_score(y_true, y_hat_argmax, average="macro", zero_division=0)
})
print(classification_report(
    y_true, y_hat_argmax,
    labels=list(range(len(TYPES))),
    target_names=TYPES,
    digits=3,
    zero_division=0
))

# ===== (선택) positive 임계값 튜닝 =====
def tune_positive_threshold(logits, labels, pos_index=POS, grid=None):
    if grid is None:
        grid = np.linspace(0.1, 0.9, 81)
    probs = softmax(logits)
    pos_prob = probs[:, pos_index]
    best_t, best_f1 = 0.5, -1.0
    for t in grid:
        preds = np.argmax(logits, axis=1)
        preds = np.where(pos_prob >= t, pos_index, preds)
        f1 = f1_score(labels, preds, labels=[pos_index], average="macro", zero_division=0)
        if f1 > best_f1:
            best_f1, best_t = f1, t
    return best_t, best_f1

print("\nTuning threshold on VALID ...")
pred_valid = trainer.predict(ds_valid)
t_best, f1_best = tune_positive_threshold(pred_valid.predictions, pred_valid.label_ids, pos_index=POS)
print(f"Best threshold for 'positive': {t_best:.3f} (valid f1_pos={f1_best:.3f})")

probs_test = softmax(pred_test.predictions)
pos_prob_test = probs_test[:, POS]
y_hat_thresh = np.where(pos_prob_test >= t_best, POS, y_hat_argmax)

print("TEST metrics (thresholded positive):", {
    "accuracy": accuracy_score(y_true, y_hat_thresh),
    "macro_f1": f1_score(y_true, y_hat_thresh, average="macro", zero_division=0),
    "f1_pos": f1_score(y_true, y_hat_thresh, labels=[POS], average="macro", zero_division=0)
})
print(classification_report(
    y_true, y_hat_thresh,
    labels=list(range(len(TYPES))),
    target_names=TYPES,
    digits=3,
    zero_division=0
))

print(f"\n[INFO] Final model saved at: {FINAL_DIR}")
print("[TIP] In inference scripts, set CKPT = './ckpt_final'")

Loading splits ...
Tokenizing ...


Map:   0%|          | 0/212 [00:00<?, ? examples/s]

Map:   0%|          | 0/26 [00:00<?, ? examples/s]

Map:   0%|          | 0/27 [00:00<?, ? examples/s]

Device: cuda


Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at beomi/KcELECTRA-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Class weights: [0.0, 0.0, 0.0, 4.0]
[WARN] Using minimal TrainingArguments due to old transformers.
Training ...


  return WeightedTrainer(


Step,Training Loss
50,1.0932


VALID: {'eval_loss': 0.7994072437286377, 'eval_accuracy': 0.7692307692307693, 'eval_macro_f1': 0.3839285714285714, 'eval_f1_pos': 0.75, 'eval_recall_pos': 0.6428571428571429, 'eval_precision_pos': 0.9, 'eval_runtime': 0.0539, 'eval_samples_per_second': 482.411, 'eval_steps_per_second': 37.109, 'epoch': 3.0}

Testing ...
TEST metrics (argmax): {'accuracy': 0.8148148148148148, 'macro_f1': 0.7145877378435518}
              precision    recall  f1-score   support

    positive      0.750     0.429     0.545         7
      danger      0.826     0.950     0.884        20
    critical      0.000     0.000     0.000         0
   emergency      0.000     0.000     0.000         0

    accuracy                          0.815        27
   macro avg      0.394     0.345     0.357        27
weighted avg      0.806     0.815     0.796        27


Tuning threshold on VALID ...


Best threshold for 'positive': 0.340 (valid f1_pos=0.867)
TEST metrics (thresholded positive): {'accuracy': 0.6666666666666666, 'macro_f1': 0.6493506493506493, 'f1_pos': 0.5714285714285714}
              precision    recall  f1-score   support

    positive      0.429     0.857     0.571         7
      danger      0.923     0.600     0.727        20
    critical      0.000     0.000     0.000         0
   emergency      0.000     0.000     0.000         0

    accuracy                          0.667        27
   macro avg      0.338     0.364     0.325        27
weighted avg      0.795     0.667     0.687        27


[INFO] Final model saved at: ./ckpt_final
[TIP] In inference scripts, set CKPT = './ckpt_final'


#05_infer_utterance.py

In [24]:
# infer_utterance.py (robust)
import pandas as pd, numpy as np, torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification

IN_CSV = "prepared/test.csv"        # 단일 발화 데이터
CKPT   = "./ckpt_final"             # ← 학습 스크립트에서 저장한 최종 모델 폴더
BASE_MODEL = "beomi/KcELECTRA-base"
MAX_LEN = 256
TYPES = ["positive","danger","critical","emergency"]
ACTION_MAP = {
  "positive": "정기 안부/취미 대화 유지",
  "danger":   "주 1회 전화/방문 권유, 수면·식사 체크",
  "critical": "담당자 모니터링 강화, 보호자 통지(24~48h)",
  "emergency":"즉시 관제센터 연결/119 연계, 위치 확인"
}

device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")

# 토크나이저는 CKPT에서 먼저 시도, 없으면 BASE_MODEL로 폴백
try:
    tok = AutoTokenizer.from_pretrained(CKPT)
except Exception:
    tok = AutoTokenizer.from_pretrained(BASE_MODEL)

# 모델은 CKPT에서 로드
model = AutoModelForSequenceClassification.from_pretrained(CKPT).to(device).eval()

# id2label이 문자열 키일 수 있어 안전하게 정렬
if hasattr(model.config, "id2label") and model.config.id2label:
    # 키를 int로 변환 후 index 기준 정렬
    id2type = {int(k): v for k, v in model.config.id2label.items()}
else:
    id2type = {i: t for i, t in enumerate(TYPES)}

def softmax_np(x):
    x = x - x.max()
    e = np.exp(x)
    return e / e.sum()

def predict_one(text: str):
    inputs = tok(str(text), return_tensors="pt", truncation=True, max_length=MAX_LEN).to(device)
    with torch.no_grad():
        logits = model(**inputs).logits[0].detach().cpu().numpy()
    probs = softmax_np(logits)
    pred = id2type[int(probs.argmax())]
    return pred, probs

if __name__ == "__main__":
    df = pd.read_csv(IN_CSV)
    assert "text" in df.columns, "CSV에 text 컬럼이 필요합니다. (01_auto_preprocess.py로 표준화하세요)"

    preds, P = [], []
    for t in df["text"].tolist():
        yhat, p = predict_one(t)
        preds.append(yhat); P.append(p)
    P = np.vstack(P)

    out = df.copy()
    out["pred_type"] = preds

    # 모델 설정 순서대로 확률 컬럼 추가
    ordered_types = [id2type[i] for i in range(len(id2type))]
    for i, tp in enumerate(ordered_types):
        out[f"prob_{tp}"] = np.round(P[:, i], 4)

    out["action"] = out["pred_type"].map(ACTION_MAP)

    # 원래 type(정답)이 있으면 비교에 좋으니 그대로 둠
    out.to_csv("pred_utterance.csv", index=False)
    print("saved: pred_utterance.csv  rows=", len(out))
    print("labels order used:", ordered_types)

saved: pred_utterance.csv  rows= 27
labels order used: ['positive', 'danger', 'critical', 'emergency']
