# 다음 상태 예측
### 사용자 세션의 현재/이전 상태와 탭형 컨텍스트를 입력으로 받아 LSTM으로 다음 상태를 확률적으로 예측하는 실습입니다.

# 0) 패키지 설치
- 실습에 필요한 패키지를 설치합니다.

In [None]:
!pip install -U pyarrow==15.0.2 fastparquet pandas torch optuna scikit-learn

# 1) 경로·시드·하이퍼파라미터 설정
- 데이터/모델 저장 경로를 만들고, 재현성 시드와 LSTM 학습 하이퍼파라미터, 디바이스를 지정합니다.

In [None]:
from pathlib import Path
import json, pandas as pd, numpy as np
import torch

# ---- 디렉토리/파일 경로 ----
DATA_DIR = Path("/home/jovyan/datasets/next_state_pre")  # 전처리 및 산출물 저장 디렉토리
DATA_DIR.mkdir(parents=True, exist_ok=True)
MODEL_DIR = Path("/home/jovyan/models/next_state_model") # 모델 및 하이퍼파라미터 저장 디렉토리
MODEL_DIR.mkdir(parents=True, exist_ok=True)

RAW_PARQUET = Path("/home/jovyan/datasets/processed_user_behavior.sorted.parquet")  # 원천 파케
MAP_JSON    = DATA_DIR / "state_mapping.json"            # state 인덱스 매핑 저장
SPLIT_JSON  = DATA_DIR / "train_val_sessions.json"       # train/val 세션 분할 저장
PAIR_NPY    = DATA_DIR / "observed_prev_pairs.npy"       # 관측된 (prev2, prev1) 쌍 저장
BEST_PATH   = MODEL_DIR / "best_model.pt"                # 베스트 체크포인트(학습 상태 dict) 저장

# ---- 시드/분할 ----
SEED = 17
VAL_FRAC = 0.2  # 검증 세트 비율
np.random.seed(SEED)

# ---- 하이퍼파라미터 ----
HYPERPARAMS = {
    "emb_dim": 192,        # 상태 임베딩 차원
    "pair_dim": 16,        # (prev2, prev1) 선형투영 차원
    "tab_dim": 16,         # 표형(tab) 피처 투영 차원: cart/page/idle/search 4D -> 16D
    "hid": 384,            # LSTM hidden size
    "num_layers": 2,       # LSTM layer 수
    "dropout": 0.2,        # LSTM 출력 드롭아웃
    "lr": 3e-4,            # learning rate
    "weight_decay": 5e-4,  # AdamW weight decay
    "batch_size": 64,
    "epochs": 50,
    "patience": 7,         # Early stopping patience (연속 bad epoch 수)
    "use_amp": True,       # CUDA 있을 때 자동 혼합정밀도 사용
    "grad_clip_norm": 1.0, # gradient clipping L2 norm
}

# ---- 디바이스 ----
device = "cuda" if torch.cuda.is_available() else "cpu"
print("DATA_DIR:", DATA_DIR, "| device:", device)
print("[DONE]")

# 2) 데이터 로드·컬럼 체크
- processed_user_behavior.sorted.parquet을 읽고 session/state/tab 피처와 라벨 컬럼이 모두 존재하는지 검증합니다.

In [None]:
df = pd.read_parquet(RAW_PARQUET)

need_cols = [
    "session_id", "current_state", "next_state",            # 상태 전이 핵심
    "cart_item_count","page_depth","last_action_elapsed","search_count"  # 컨텍스트(tab) 피처
]
miss = [c for c in need_cols if c not in df.columns]
assert not miss, f"필수 컬럼 없음: {miss}"

print(df[need_cols].head(3))
print("[INFO] rows:", len(df))
print("[DONE]")

# 3) 상태→인덱스 매핑 생성/저장
- PAD=0, UNK=1 예약 후 상태에 2부터 ID를 부여해 JSON으로 저장합니다.

In [None]:
PAD_ID, UNK_ID = 0, 1

# 현재/다음 상태의 전체 유니크 값으로 vocab 구성
states_all = pd.Index(df["current_state"].astype(str).unique()).union(
    df["next_state"].astype(str).unique()
)
states_sorted = pd.Index(sorted(states_all))

state2idx = {s: i+2 for i, s in enumerate(states_sorted)}  # 2부터 시작
idx2state = {i: s for s, i in state2idx.items()}
num_states = len(state2idx) + 2  # PAD, UNK 포함한 클래스 수

# 매핑 저장 (추론/리포트 단계에서 재사용)
with open(MAP_JSON, "w", encoding="utf-8") as f:
    json.dump({"PAD_ID": PAD_ID, "UNK_ID": UNK_ID, "state2idx": state2idx}, f, ensure_ascii=False, indent=2)

print("[SAVE] state mapping ->", MAP_JSON)
print("[INFO] num_states:", num_states)
print("[DONE]")

# 4) prev1/prev2 생성·관측쌍 저장
- 세션별 shift로 이전 상태 인덱스를 만들고, (prev2, prev1) 유니크 쌍을 NPY로 보관합니다.

In [None]:
with open(MAP_JSON, "r", encoding="utf-8") as f:
    mp = json.load(f)
state2idx = mp["state2idx"]; PAD_ID = mp["PAD_ID"]; UNK_ID = mp["UNK_ID"]

def map_state(s):
    return state2idx.get(str(s), UNK_ID)

# 상태 → 인덱스
df["current_state_idx"] = df["current_state"].astype(str).map(map_state).astype("int32")
df["next_state_idx"]    = df["next_state"].astype(str).map(map_state).astype("int32")

# 세션별로 이전 상태 shift
g = df.groupby("session_id")["current_state_idx"]
df["prev1_idx"] = g.shift(1).fillna(PAD_ID).astype("int32")
df["prev2_idx"] = g.shift(2).fillna(PAD_ID).astype("int32")

# 관측된 (prev2, prev1) 유니크 쌍 저장 (분석/디버깅용)
pairs = df[["prev2_idx","prev1_idx"]].drop_duplicates().to_numpy(dtype=np.int32)
np.save(PAIR_NPY, pairs)
print("[SAVE] observed prev pairs ->", PAIR_NPY, pairs.shape)
print("[DONE]")

# 5) 세션 단위 train/val 분리
- 세션 ID를 셔플해 검증 비율만큼 떼고, 교차 오염 없이 분할 내역을 JSON으로 저장합니다.

In [None]:
sess = df["session_id"].drop_duplicates().to_numpy()
np.random.shuffle(sess)
n_val = int(len(sess)*VAL_FRAC)
val_sessions = set(sess[:n_val])
train_sessions = set(sess[n_val:])

with open(SPLIT_JSON, "w") as f:
    json.dump({
        "seed": SEED, "val_frac": VAL_FRAC,
        "train_sessions": list(train_sessions),
        "val_sessions": list(val_sessions)
    }, f, indent=2)

print("[SAVE] split ->", SPLIT_JSON, "| #train:", len(train_sessions), " #val:", len(val_sessions))
print("[DONE]")

# 6) 시퀀스 데이터 구성
- 각 세션의 마지막 스텝을 제외해 입력(state/pair/tab)과 라벨(next_state)을 T−1 길이로 만듭니다.

In [None]:
def build_session_sequences(frame):
    seqs_x, seqs_prevpair, seqs_tab, seqs_y = [], [], [], []
    for sid, sub in frame.sort_values(["session_id"]).groupby("session_id"):
        x = sub["current_state_idx"].to_numpy(dtype=np.int32)
        y = sub["next_state_idx"].to_numpy(dtype=np.int32)
        p1 = sub["prev1_idx"].to_numpy(dtype=np.int32)
        p2 = sub["prev2_idx"].to_numpy(dtype=np.int32)
        tabs = sub[["cart_item_count","page_depth","last_action_elapsed","search_count"]].to_numpy(dtype=np.float32)
        if len(x) < 2:
            continue
        # 마지막 스텝은 다음 상태가 없으므로 제외
        seqs_x.append(x[:-1])                                  # [T-1]
        seqs_y.append(y[:-1])                                  # [T-1]
        seqs_prevpair.append(np.stack([p2[:-1], p1[:-1]], 1))  # [T-1, 2]
        seqs_tab.append(tabs[:-1])                             # [T-1, 4]
    return seqs_x, seqs_prevpair, seqs_tab, seqs_y

train_x, train_p, train_t, train_y = build_session_sequences(df[df["session_id"].isin(train_sessions)])
val_x,   val_p,   val_t,   val_y   = build_session_sequences(df[df["session_id"].isin(val_sessions)])

print("[INFO] #train seq:", len(train_x), " | #val seq:", len(val_x))
print("[DONE]")

# 7) X,y 저장
- 테이블 형태의 피처/라벨을 Parquet·CSV로 저장해 디버깅/재현에 활용합니다.

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np

TAB_FEATS = [
    "current_state_idx","prev1_idx","prev2_idx",  # 상태 기반 피처
    "cart_item_count","page_depth","last_action_elapsed","search_count"  # tab 피처
]
LABEL = "next_state_idx"

# dtype 캐스팅 (파일 사이즈/일관성)
int_cols   = ["current_state_idx","prev1_idx","prev2_idx","cart_item_count","page_depth","search_count"]
float_cols = ["last_action_elapsed"]

def _cast_types(df_):
    for c in int_cols:
        df_[c] = df_[c].astype("int32")
    for c in float_cols:
        df_[c] = df_[c].astype("float32")
    return df_

X_train = df[df["session_id"].isin(train_sessions)][TAB_FEATS].reset_index(drop=True).pipe(_cast_types)
y_train = df[df["session_id"].isin(train_sessions)][LABEL].reset_index(drop=True).astype("int32")

X_val   = df[df["session_id"].isin(val_sessions)][TAB_FEATS].reset_index(drop=True).pipe(_cast_types)
y_val   = df[df["session_id"].isin(val_sessions)][LABEL].reset_index(drop=True).astype("int32")

# Parquet/CSV 저장
(X_train).to_parquet(DATA_DIR/"X_train.parquet", index=False)
(y_train.to_frame(name="y")).to_parquet(DATA_DIR/"y_train.parquet", index=False)
(X_val).to_parquet(DATA_DIR/"X_val.parquet", index=False)
(y_val.to_frame(name="y")).to_parquet(DATA_DIR/"y_val.parquet", index=False)

X_train.to_csv(DATA_DIR/"X_train.csv", index=False)
y_train.to_frame(name="y").to_csv(DATA_DIR/"y_train.csv", index=False)
X_val.to_csv(DATA_DIR/"X_val.csv", index=False)
y_val.to_frame(name="y").to_csv(DATA_DIR/"y_val.csv", index=False)

print("[SAVE] flattened X,y ->", DATA_DIR)
print("  X_train:", X_train.shape, " | y_train:", y_train.shape)
print("  X_val  :", X_val.shape,   " | y_val  :", y_val.shape)
print("[DONE]")

# 8) Dataset/Collate/DataLoader 정의
- 가변 길이를 배치 최대 길이에 패딩하는 collate로 x/pair/tab/y/mask 텐서를 만듭니다.

In [None]:
import torch
from torch.utils.data import DataLoader

class NextStateSeqDataset(torch.utils.data.Dataset):
    def __init__(self, xs, pairs, tabs, ys):
        self.xs = xs; self.pairs = pairs; self.tabs = tabs; self.ys = ys
    def __len__(self): return len(self.xs)
    def __getitem__(self, i): return self.xs[i], self.pairs[i], self.tabs[i], self.ys[i]

def pad_collate(batch, pad_id=PAD_ID):
    xs, ps, ts, ys = zip(*batch)
    L = max(len(x) for x in xs)   # 배치 내 최대 길이
    B = len(xs)                   # 배치 크기

    # 패딩 버퍼 생성
    x_pad = np.full((B, L), pad_id, dtype=np.int64)
    y_pad = np.full((B, L), pad_id, dtype=np.int64)
    mask  = np.zeros((B, L), dtype=np.bool_)      # 유효 토큰 마스크
    pair_pad = np.full((B, L, 2), pad_id, dtype=np.int64)
    tab_pad  = np.zeros((B, L, 4), dtype=np.float32)

    # 각 샘플을 왼쪽 정렬로 채우기
    for i,(x,p,t,y) in enumerate(zip(xs,ps,ts,ys)):
        l = len(x)
        x_pad[i,:l] = x; y_pad[i,:l] = y; mask[i,:l] = True
        pair_pad[i,:l,:] = p; tab_pad[i,:l,:] = t

    return (torch.from_numpy(x_pad),
            torch.from_numpy(pair_pad),
            torch.from_numpy(tab_pad),
            torch.from_numpy(y_pad),
            torch.from_numpy(mask))

# DataLoader
train_ds = NextStateSeqDataset(train_x, train_p, train_t, train_y)
val_ds   = NextStateSeqDataset(val_x,   val_p,   val_t,   val_y)

train_loader = DataLoader(train_ds, batch_size=HYPERPARAMS["batch_size"], shuffle=True,  collate_fn=pad_collate)
val_loader   = DataLoader(val_ds,   batch_size=HYPERPARAMS["batch_size"], shuffle=False, collate_fn=pad_collate)

# 샘플 배치 형태 확인
xb, pb, tb, yb, mb = next(iter(train_loader))
print("[INFO] batch shapes:", xb.shape, pb.shape, tb.shape, yb.shape, mb.shape)
print("[DONE]")

# 9) LSTM 분류기 정의
- 상태 임베딩+pair 투영+tab 투영을 concat해 LSTM→Dropout→Linear로 다음 상태 로짓을 출력합니다.

In [None]:
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, emb_dim=192, pair_dim=16, tab_dim=16,
                 hid=384, num_layers=2, dropout=0.2):
        super().__init__()
        self.num_classes = vocab_size

        # 상태 임베딩 (PAD는 0으로 고정)
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=PAD_ID)

        # (prev2, prev1) → pair_dim
        self.pair_proj = nn.Linear(2, pair_dim)

        # tab 피처(4D) → tab_dim
        self.tab_proj  = nn.Linear(4, tab_dim)

        # LSTM 입력 차원 = emb_dim + pair_dim + tab_dim
        self.lstm = nn.LSTM(input_size=emb_dim + pair_dim + tab_dim,
                            hidden_size=hid, num_layers=num_layers,
                            dropout=dropout if num_layers > 1 else 0.0,
                            batch_first=True)

        self.drop = nn.Dropout(dropout)
        self.head = nn.Linear(hid, vocab_size)

    def forward(self, x_state, pair, x_tab, mask):
        xe = self.emb(x_state)             # [B,T,E]
        pe = self.pair_proj(pair.float())  # [B,T,P]
        te = self.tab_proj(x_tab.float())  # [B,T,Td]
        inp = torch.cat([xe, pe, te], dim=-1)
        out, _ = self.lstm(inp)            # [B,T,H]
        out = self.drop(out)
        return self.head(out)              # [B,T,C]

print("[DONE]")

# 10) 클래스 가중치 산출
- 라벨 분포의 inverse-frequency 기반 가중치를 계산해 MAX_WEIGHT로 클리핑 후 리포트/저장합니다.

In [None]:
from collections import Counter
import pandas as pd
import numpy as np
import torch

MAX_WEIGHT = 1.5  # 상한 클리핑 (희소 클래스 가중치를 너무 크게 만들지 않기 위해)

# 라벨 분포
label_counts = Counter(df["next_state_idx"].tolist())
total = sum(label_counts.values())

# 가중치 계산 (raw -> clip)
weights = np.ones(num_states, dtype=np.float32)
raw_weights = np.ones(num_states, dtype=np.float32)

for k, v in label_counts.items():
    raw = total / (len(label_counts) * v)  # inverse freq 기반
    raw_weights[k] = raw
    weights[k] = raw

weights = np.clip(weights, None, MAX_WEIGHT)
class_weights = torch.tensor(weights, device=device)  # 손실 함수에 전달

# per-class 테이블 (PAD/UNK 제외, 희소 클래스부터 보기 쉽도록 support↑ 정렬)
rows = []
for idx, support in sorted(label_counts.items(), key=lambda x: x[1]):
    if idx in (PAD_ID, UNK_ID):
        continue
    label = idx2state.get(int(idx), f"UNKNOWN_{idx}")
    rows.append({
        "class_id": int(idx),
        "label": label,
        "support": int(support),
        "raw_weight": float(raw_weights[idx]),
        "clipped_weight": float(weights[idx]),
    })

df_class_weight = pd.DataFrame(rows).sort_values(["support","label"]).reset_index(drop=True)

print(f"[INFO] class_weights ready (clipped @ {MAX_WEIGHT})  | classes={len(df_class_weight)}")
print(df_class_weight.to_string(index=False))

# CSV 저장 (추후 비교/대시보드용)
out_csv = DATA_DIR / "class_weights_report.csv"
df_class_weight.to_csv(out_csv, index=False, encoding="utf-8")
print("[SAVE] per-class weights ->", out_csv)
print("[DONE]")

# 11) 학습·평가 루틴 구현
- AMP/GradClip/가중치/마스크를 반영해 train_one_epoch와 eval_epoch로 top1·top3·macroF1을 계산합니다.

In [None]:
from sklearn.metrics import f1_score

def train_one_epoch(model, loader, optimizer, device, num_classes, class_weights=None,
                    use_amp=True, grad_clip_norm=1.0, scaler=None):
    model.train()
    loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_ID, weight=class_weights)
    correct = 0; total = 0; last_loss = 0.0

    for x, pair, tab, y, mask in loader:
        x, pair, tab, y, mask = x.to(device), pair.to(device), tab.to(device), y.to(device), mask.to(device)
        optimizer.zero_grad(set_to_none=True)

        if use_amp and scaler is not None:
            with torch.amp.autocast(device_type="cuda"):
                logits = model(x, pair, tab, mask)                      # [B,T,C]
                loss = loss_fn(logits.view(-1, num_classes), y.view(-1))
            scaler.scale(loss).backward()
            if grad_clip_norm:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_norm)
            scaler.step(optimizer); scaler.update()
        else:
            logits = model(x, pair, tab, mask)
            loss = loss_fn(logits.view(-1, num_classes), y.view(-1))
            loss.backward()
            if grad_clip_norm:
                torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip_norm)
            optimizer.step()

        last_loss = float(loss.item())
        with torch.no_grad():
            pred  = logits.argmax(-1)                # [B,T]
            valid = (mask & (y != PAD_ID))           # 유효 타임스텝
            correct += (pred[valid] == y[valid]).sum().item()
            total   += valid.sum().item()

    return {"loss": last_loss, "acc": correct / max(total, 1)}

@torch.no_grad()
def eval_epoch(model, loader, device, num_classes, class_weights=None):
    model.eval()
    loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_ID, weight=class_weights)
    total_valid=0; top1=0; top3=0; sum_loss=0; n_batches=0
    all_preds, all_trues = [], []

    for x, pair, tab, y, mask in loader:
        x, pair, tab, y, mask = x.to(device), pair.to(device), tab.to(device), y.to(device), mask.to(device)
        logits = model(x, pair, tab, mask)
        loss   = loss_fn(logits.view(-1, num_classes), y.view(-1))
        sum_loss += float(loss.item()); n_batches += 1

        prob = logits.softmax(-1)                    # [B,T,C]
        pred = prob.argmax(-1)                       # [B,T]

        valid = (mask & (y != PAD_ID))
        top1 += (pred[valid] == y[valid]).sum().item()
        total_valid += valid.sum().item()

        _, topk_idx = prob.topk(3, dim=-1)          # [B,T,3]
        top3 += (topk_idx[valid].eq(y[valid].unsqueeze(-1))).any(dim=-1).sum().item()

        all_preds.extend(pred[valid].cpu().tolist())
        all_trues.extend(y[valid].cpu().tolist())

    macro_f1 = f1_score(all_trues, all_preds, average="macro") if all_trues else 0.0
    return {"loss": sum_loss/max(n_batches,1), "top1": top1/max(total_valid,1),
            "top3": top3/max(total_valid,1), "macro_f1": macro_f1}

print("[DONE]")

# 12) Optuna 하이퍼파라미터 튜닝
- 검증 top1을 최대화하는 하이퍼파라미터를 탐색합니다.

In [None]:
import optuna
from torch.optim import AdamW
from torch.amp import GradScaler

optuna.logging.set_verbosity(optuna.logging.WARNING)

def objective(trial):
    # ---- 탐색 범위(필요 시 조정) ----
    emb_dim    = trial.suggest_int("emb_dim", 128, 256, step=32)
    pair_dim   = trial.suggest_int("pair_dim", 8, 24, step=8)
    tab_dim    = trial.suggest_int("tab_dim", 8, 32, step=8)
    hid        = trial.suggest_int("hid", 256, 384, step=64)
    num_layers = trial.suggest_int("num_layers", 1, 2)
    dropout    = trial.suggest_float("dropout", 0.2, 0.4)
    lr         = trial.suggest_float("lr", 5e-4, 2e-3, log=True)
    wd         = trial.suggest_float("weight_decay", 1e-5, 5e-4, log=True)

    # ---- 모델/옵티마/AMP ----
    m = LSTMClassifier(num_states, emb_dim, pair_dim, tab_dim, hid, num_layers, dropout).to(device)
    opt = AdamW(m.parameters(), lr=lr, weight_decay=wd)
    scaler_tmp = GradScaler("cuda", enabled=(device=="cuda" and HYPERPARAMS["use_amp"]))

    # ---- 1 epoch quick fit ----
    train_one_epoch(m, train_loader, opt, device, num_states, class_weights=class_weights,
                    use_amp=(device=="cuda" and HYPERPARAMS["use_amp"]),
                    grad_clip_norm=HYPERPARAMS["grad_clip_norm"], scaler=scaler_tmp)

    # ---- 검증 ----
    ev = eval_epoch(m, val_loader, device, num_states, class_weights=class_weights)
    print(f"[Trial {trial.number:02d}] top1={ev['top1']*100:6.2f}% | top3={ev['top3']*100:6.2f}% | F1={ev['macro_f1']:.3f}")

    # 로그용 부가 지표 기록
    trial.set_user_attr("top3", ev["top3"])
    trial.set_user_attr("macro_f1", ev["macro_f1"])
    return ev["top1"]

# ---- 튜닝 실행 ----
study = optuna.create_study(direction="maximize", study_name="fix_final_next_state_v1")
study.optimize(objective, n_trials=10, show_progress_bar=True)

# ---- 결과 요약 ----
best_trial = study.best_trial
print("\n[BEST RESULT]")
print(f" top1    : {best_trial.value*100:.2f}%")
print(f" top3    : {best_trial.user_attrs.get('top3')*100:.2f}%")
print(f" macroF1 : {best_trial.user_attrs.get('macro_f1'):.3f}")
print(" params  :", best_trial.params)

# 13) 최종 학습·체크포인트 저장
- 최적 하이퍼 파라미터를 적용해 ReduceLROnPlateau+EarlyStopping으로 훈련하고 best 모델을 pt로 저장합니다.

In [None]:
from torch.optim import AdamW
from torch.amp import GradScaler

best_params = study.best_trial.params
HYPERPARAMS.update({
    "emb_dim":      best_params["emb_dim"],
    "pair_dim":     best_params["pair_dim"],
    "tab_dim":      best_params["tab_dim"],
    "hid":          best_params["hid"],
    "num_layers":   best_params["num_layers"],
    "dropout":      best_params["dropout"],
    "lr":           best_params["lr"],
    "weight_decay": best_params["weight_decay"],
})
print("[INFO] HYPERPARAMS updated with Optuna best:", HYPERPARAMS)

# 하이퍼 저장
BEST_JSON = (Path(BEST_PATH).parent / "best_hparams.json")
with open(BEST_JSON, "w") as f:
    json.dump({"optuna_best": best_params, "hparams": HYPERPARAMS}, f, indent=2)
print(f"[SAVE] best_hparams -> {BEST_JSON}")

# 학습 설정
USE_AMP   = (device == "cuda") and bool(HYPERPARAMS.get("use_amp", True))
EPOCHS    = int(HYPERPARAMS.get("epochs", 50))
PATIENCE  = int(HYPERPARAMS.get("patience", 7))
GRAD_CLIP = float(HYPERPARAMS.get("grad_clip_norm", 1.0))

model = LSTMClassifier(
    vocab_size=num_states,
    emb_dim=HYPERPARAMS["emb_dim"],
    pair_dim=HYPERPARAMS["pair_dim"],
    tab_dim=HYPERPARAMS["tab_dim"],
    hid=HYPERPARAMS["hid"],
    num_layers=HYPERPARAMS["num_layers"],
    dropout=HYPERPARAMS["dropout"],
).to(device)

optimizer = AdamW(model.parameters(), lr=HYPERPARAMS["lr"], weight_decay=HYPERPARAMS["weight_decay"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=2)
scaler    = GradScaler("cuda", enabled=USE_AMP)

best, bad = None, 0
for epoch in range(EPOCHS):
    tr = train_one_epoch(model, train_loader, optimizer, device,
                         num_classes=num_states, class_weights=class_weights,
                         use_amp=USE_AMP, grad_clip_norm=GRAD_CLIP, scaler=scaler)
    ev = eval_epoch(model, val_loader, device, num_states, class_weights=class_weights)
    scheduler.step(ev["top1"])

    print(f"[E{epoch:02d}] train loss={tr['loss']:.4f} acc={tr['acc']:.4f} | "
          f"val loss={ev['loss']:.4f} top1={ev['top1']:.4f} top3={ev['top3']:.4f} macroF1={ev['macro_f1']:.4f}")

    if (best is None) or (ev["top1"] > best["top1"]):
        best, bad = {"epoch": epoch, **ev}, 0
        torch.save({
            "epoch": epoch,
            "model_state": model.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "metrics": ev,
            "num_classes": num_states,
            "pad_id": PAD_ID,
            "state2idx": state2idx,
            "idx2state": idx2state,
            "hparams": HYPERPARAMS,
        }, str(BEST_PATH))
        print(f"[SAVE] best -> {BEST_PATH}  (epoch={epoch}, top1={ev['top1']:.4f})")
    else:
        bad += 1
        if bad >= PATIENCE:
            print("Early stopping at epoch", epoch)
            break

print("\n[BEST FINAL]", best)

# 14) per-class 지표 생성
- 체크포인트로 검증셋 예측을 수집해 class별 support/top1/top3/precision/recall/F1을 집계합니다.

In [None]:
import torch
from sklearn.metrics import precision_recall_fscore_support
from collections import defaultdict

# 체크포인트 로드
ckpt = torch.load(str(BEST_PATH), map_location=device)
model = LSTMClassifier(
    vocab_size=ckpt.get("num_classes", num_states),
    emb_dim=HYPERPARAMS["emb_dim"],
    pair_dim=HYPERPARAMS["pair_dim"],
    tab_dim=HYPERPARAMS["tab_dim"],
    hid=HYPERPARAMS["hid"],
    num_layers=HYPERPARAMS["num_layers"],
    dropout=HYPERPARAMS["dropout"],
).to(device)
model.load_state_dict(ckpt["model_state"])
model.eval()

# val 예측 수집
all_true, all_pred, all_top3 = [], [], []
with torch.no_grad():
    for x, pair, tab, y, mask in val_loader:
        x, pair, tab, y, mask = x.to(device), pair.to(device), tab.to(device), y.to(device), mask.to(device)
        logits = model(x, pair, tab, mask)
        prob   = logits.softmax(-1)
        pred   = prob.argmax(-1)
        _, topk_idx = prob.topk(3, dim=-1)

        valid = (mask & (y != PAD_ID))
        all_true.extend(y[valid].detach().cpu().tolist())
        all_pred.extend(pred[valid].detach().cpu().tolist())
        all_top3.extend(topk_idx[valid].detach().cpu().tolist())  # 각 항목 길이=3

# per-class 지표
labels = sorted(set(all_true))
prec, rec, f1, support = precision_recall_fscore_support(all_true, all_pred, labels=labels, zero_division=0)

# top1/top3 per class
correct_per_cls = defaultdict(int); total_per_cls = defaultdict(int)
for t, p in zip(all_true, all_pred):
    total_per_cls[t] += 1
    if t == p:
        correct_per_cls[t] += 1
top1_acc_per_cls = {c: (correct_per_cls[c] / total_per_cls[c]) if total_per_cls[c] > 0 else 0.0 for c in labels}

hit3_per_cls = defaultdict(int)
for t, top3 in zip(all_true, all_top3):
    if int(t) in list(map(int, top3)):
        hit3_per_cls[t] += 1
top3_hit_per_cls = {c: (hit3_per_cls[c] / total_per_cls[c]) if total_per_cls[c] > 0 else 0.0 for c in labels}

# DataFrame 생성
rows = []
for i, c in enumerate(labels):
    state_name = idx2state.get(int(c), f"UNKNOWN_{c}")
    rows.append({
        "class_id": int(c),
        "label": state_name,
        "support": int(support[i]),
        "top1_acc": float(top1_acc_per_cls.get(c, 0.0)),
        "top3_hit_rate": float(top3_hit_per_cls.get(c, 0.0)),
        "precision": float(prec[i]),
        "recall": float(rec[i]),
        "f1": float(f1[i]),
    })

df_cls = pd.DataFrame(rows).sort_values(["support"], ascending=False).reset_index(drop=True)

# 요약
print(df_cls.head(10))
print("\n[SUMMARY]")
print("macroF1 (val):", df_cls["f1"].mean().round(4))
print("weighted F1 (val):", (df_cls["f1"] * df_cls["support"] / df_cls["support"].sum()).sum().round(4))
print("[DONE]")

# 15) per-class 표 스타일링 표시
- df_cls에 bar/gradient/format을 적용해 support·정확도·F1을 시각적으로 강조합니다.

In [None]:
import pandas as pd
from IPython.display import display

pd.set_option("display.float_format", "{:.3f}".format)

styled = (
    df_cls.style
    .bar(subset=["support"], color="#d0e0f0")                       # support를 bar 느낌으로
    .background_gradient(subset=["top1_acc","top3_hit_rate","f1"],  # 핵심 지표 색상 강조
                         cmap="Blues")
    .format({
        "top1_acc": "{:.2%}",
        "top3_hit_rate": "{:.2%}",
        "precision": "{:.3f}",
        "recall": "{:.3f}",
        "f1": "{:.3f}",
    })
)

display(styled.hide(axis="index"))

# 16) 혼동행렬 시각화
- 실제×예측 카운트를 표시하고 밝기 기반 텍스트 색 전환·대각선(정답) 강조 테두리를 적용합니다.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from sklearn.metrics import confusion_matrix

if "all_true" not in globals() or "all_pred" not in globals():
    raise RuntimeError("all_true/all_pred가 없습니다. 먼저 Cell 13(클래스별 리포트 생성)을 실행하세요.")

cls_ids   = labels
cls_names = [idx2state.get(int(c), f"UNK_{c}") for c in cls_ids]

cm = confusion_matrix(all_true, all_pred, labels=cls_ids)

fig, ax = plt.subplots(figsize=(max(8, len(cls_ids)*0.6), max(6, len(cls_ids)*0.6)))
im = ax.imshow(cm, interpolation="nearest")  # 기본 colormap (viridis 등)
ax.figure.colorbar(im, ax=ax)
ax.set_title("Confusion Matrix (Counts)")
ax.set_xlabel("Predicted")
ax.set_ylabel("True")
ax.set_xticks(np.arange(len(cls_ids))); ax.set_yticks(np.arange(len(cls_ids)))
ax.set_xticklabels(cls_names, rotation=45, ha="right")
ax.set_yticklabels(cls_names)

# === 값의 밝기에 따른 텍스트 색상 자동 전환 ===
# imshow의 norm을 사용 (값→0~1 정규화). 밝기가 0.5 넘으면 배경이 밝다고 보고 검은색 텍스트 사용.
norm = getattr(im, "norm", None)
if norm is None:
    from matplotlib import colors
    norm = colors.Normalize(vmin=float(np.min(cm)), vmax=float(np.max(cm)))

for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        val = cm[i, j]
        bright = norm(val) > 0.5  # True면 밝은 셀(검은 글자), False면 어두운 셀(흰 글자)
        txt_color = "black" if bright else "white"
        ax.text(j, i, f"{val:d}", ha="center", va="center", fontsize=8, color=txt_color)

# === 대각선(정답) 셀만 테두리 강조 ===
for k in range(len(cls_ids)):
    rect = Rectangle((k - 0.5, k - 0.5), 1, 1,
                     fill=False, edgecolor="red", linewidth=2, zorder=3)
    ax.add_patch(rect)

plt.tight_layout()
plt.show()


# 17) Top-1 정확도 막대그래프
- support 내림차순으로 정렬된 클래스별 top1 정확도를 bar 차트로 나타냅니다.

In [None]:
import matplotlib.pyplot as plt

if "df_cls" not in globals():
    raise RuntimeError("df_cls가 없습니다. 먼저 Cell 13을 실행하세요.")

order = df_cls.sort_values("support", ascending=False)

plt.figure(figsize=(max(8, len(order)*0.5), 5))
plt.bar(order["label"], order["top1_acc"])
plt.title("Per-Class Top-1 Accuracy")
plt.ylabel("Accuracy")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
# plt.savefig(DATA_DIR / "per_class_top1.png", dpi=150, bbox_inches="tight")
plt.show()

# 18) 오류율(1−Top1) 막대그래프
- 클래스별 상대적 오분류 정도를 error rate로 시각화합니다.

In [None]:
import matplotlib.pyplot as plt

order = df_cls.sort_values("support", ascending=False)
error_rate = 1.0 - order["top1_acc"]

plt.figure(figsize=(max(8, len(order)*0.5), 5))
plt.bar(order["label"], error_rate)
plt.title("Per-Class Error Rate (1 − Top-1)")
plt.ylabel("Error Rate")
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
# plt.savefig(DATA_DIR / "per_class_error.png", dpi=150, bbox_inches="tight")
plt.show()
