# Clustering Diagnostics: Elbow, Silhouette, Gap Statistic

제출/재현 가능 형태로 정리된 노트북입니다.

- 모든 경로/하이퍼파라미터는 상단 `Config`에서만 수정
- 임베딩 문자열 파싱, (선택) PCA, KMeans 및 지표 계산을 함수로 분리
- 결과는 `cluster_metrics_summary.csv`로 저장


In [None]:
from pathlib import Path

# =========================
# Config (수정은 여기만)
# =========================

# 입력 CSV (임베딩 포함)
CSV_PATH = Path(r"C:\Users\min\Downloads\embedding_Revised.csv")

# 임베딩 컬럼 선택:
# - "img"  : img_emb 컬럼 사용
# - "wrd"  : wrd_emb 컬럼 사용
# - "concat": img_emb + wrd_emb를 이어붙여 사용
EMB_MODE = "concat"

# 임베딩 컬럼명 (CSV 스키마에 맞게 수정)
IMG_COL = "img_emb"
WRD_COL = "wrd_emb"

# (선택) 특정 행 제거/필터링 예시
# 예: normalized_gender_Sports == 1 제거 (컬럼이 없으면 자동 스킵)
DROP_SPORTS_IF_COLUMN_EXISTS = True
SPORTS_COL = "normalized_gender_Sports"

# (선택) PCA
USE_PCA = True
PCA_DIM = 256   # USE_PCA=True일 때만 사용

# KMeans 탐색 범위
K_MIN = 2
K_MAX = 20   # 포함
RANDOM_STATE = 42
N_INIT = "auto"

# Gap Statistic 설정
GAP_B = 10  # reference 샘플 개수 (클수록 안정적, 느려짐)

# 출력
OUT_DIR = Path("./outputs")
OUT_DIR.mkdir(parents=True, exist_ok=True)
SUMMARY_CSV = OUT_DIR / "cluster_metrics_summary.csv"
SAVE_PLOT = OUT_DIR / "cluster_metrics.png"


In [None]:
import numpy as np
import pandas as pd

def _parse_embedding_cell(x):
    """'0.1,0.2,...' 또는 '[0.1, 0.2, ...]' 형태 -> np.ndarray(float)."""
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return None
    s = str(x).strip()
    if not s:
        return None
    # 대괄호 제거
    if s[0] == "[" and s[-1] == "]":
        s = s[1:-1].strip()
    # 공백 제거 후 split
    parts = [p for p in s.replace(" ", "").split(",") if p != ""]
    try:
        return np.array([float(p) for p in parts], dtype=np.float32)
    except ValueError:
        return None

def load_embeddings_from_csv(csv_path, emb_mode, img_col, wrd_col,
                             drop_sports_if_exists=False, sports_col="normalized_gender_Sports"):
    df = pd.read_csv(csv_path, low_memory=False)

    # (선택) sports 제거
    if drop_sports_if_exists and sports_col in df.columns:
        m = pd.to_numeric(df[sports_col], errors="coerce").fillna(0).eq(1)
        df = df.loc[~m].copy()

    # 임베딩 파싱
    if emb_mode not in {"img", "wrd", "concat"}:
        raise ValueError("EMB_MODE must be one of: 'img', 'wrd', 'concat'.")

    if emb_mode in {"img", "concat"} and img_col not in df.columns:
        raise ValueError(f"CSV에 '{img_col}' 컬럼이 없습니다.")
    if emb_mode in {"wrd", "concat"} and wrd_col not in df.columns:
        raise ValueError(f"CSV에 '{wrd_col}' 컬럼이 없습니다.")

    img_list, wrd_list = None, None
    if emb_mode in {"img", "concat"}:
        img_list = df[img_col].map(_parse_embedding_cell).tolist()
    if emb_mode in {"wrd", "concat"}:
        wrd_list = df[wrd_col].map(_parse_embedding_cell).tolist()

    X_rows = []
    keep_idx = []
    for i in range(len(df)):
        if emb_mode == "img":
            v = img_list[i]
        elif emb_mode == "wrd":
            v = wrd_list[i]
        else:
            vi, vw = img_list[i], wrd_list[i]
            if vi is None or vw is None:
                v = None
            else:
                v = np.concatenate([vi, vw], axis=0)

        if v is None or v.size == 0:
            continue
        X_rows.append(v)
        keep_idx.append(i)

    if not X_rows:
        raise RuntimeError("유효한 임베딩이 없습니다. (파싱 실패 또는 전부 결측)")

    # 길이 불일치 방지: 가장 흔한 길이를 기준으로 필터
    lengths = [r.shape[0] for r in X_rows]
    target_len = int(pd.Series(lengths).mode().iloc[0])
    X_filt = []
    keep2 = []
    for r, idx in zip(X_rows, keep_idx):
        if r.shape[0] == target_len:
            X_filt.append(r)
            keep2.append(idx)

    X = np.vstack(X_filt).astype(np.float32)
    df_kept = df.iloc[keep2].reset_index(drop=True)
    return df_kept, X

df, X = load_embeddings_from_csv(
    CSV_PATH,
    EMB_MODE,
    IMG_COL,
    WRD_COL,
    drop_sports_if_exists=DROP_SPORTS_IF_COLUMN_EXISTS,
    sports_col=SPORTS_COL,
)

print("rows kept:", len(df))
print("X shape:", X.shape)


In [None]:
from sklearn.decomposition import PCA

def l2_normalize_rows(a, eps=1e-12):
    n = np.linalg.norm(a, axis=1, keepdims=True)
    return a / np.maximum(n, eps)

# 임베딩은 보통 정규화된 상태가 유리
Xn = l2_normalize_rows(X)

if USE_PCA:
    pca = PCA(n_components=PCA_DIM, random_state=RANDOM_STATE)
    Xp = pca.fit_transform(Xn)
    print("PCA explained variance ratio sum:", float(pca.explained_variance_ratio_.sum()))
else:
    Xp = Xn

print("X used for clustering:", Xp.shape)


In [None]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

def gap_statistic(X, ks, B=10, random_state=42):
    """Tibshirani et al. Gap Statistic (KMeans, uniform reference)."""
    rng = np.random.default_rng(random_state)
    mins = X.min(axis=0)
    maxs = X.max(axis=0)

    gaps = []
    for k in ks:
        km = KMeans(n_clusters=k, random_state=random_state, n_init=N_INIT)
        km.fit(X)
        wk = km.inertia_  # within-cluster dispersion (sum of squared distances)

        wkb = []
        for b in range(B):
            Xref = rng.uniform(mins, maxs, size=X.shape).astype(X.dtype)
            km_ref = KMeans(n_clusters=k, random_state=random_state + b + 1, n_init=N_INIT)
            km_ref.fit(Xref)
            wkb.append(km_ref.inertia_)

        gap = np.mean(np.log(wkb)) - np.log(wk)
        gaps.append(float(gap))
    return np.array(gaps, dtype=np.float64)

ks = list(range(K_MIN, K_MAX + 1))
inertias = []
sil_scores = []

for k in ks:
    km = KMeans(n_clusters=k, random_state=RANDOM_STATE, n_init=N_INIT)
    labels = km.fit_predict(Xp)
    inertias.append(float(km.inertia_))

    # silhouette는 표본수가 충분하고 cluster가 1개 이상이어야 계산 가능
    if len(set(labels)) > 1 and Xp.shape[0] >= (k + 1):
        sil_scores.append(float(silhouette_score(Xp, labels)))
    else:
        sil_scores.append(float("nan"))

gap_scores = gap_statistic(Xp, ks, B=GAP_B, random_state=RANDOM_STATE)

print("done. ks:", ks[:3], "...", ks[-3:])


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

out = pd.DataFrame({
    "k": ks,
    "inertia": inertias,
    "silhouette": sil_scores,
    "gap": gap_scores
})
out.to_csv(SUMMARY_CSV, index=False)
print("✅ saved:", SUMMARY_CSV)

# Plot (단일 figure, 3개 subplot)
fig, axes = plt.subplots(1, 3, figsize=(14, 4))

axes[0].plot(out["k"], out["inertia"], marker="o")
axes[0].set_title("Elbow (Inertia)")
axes[0].set_xlabel("k")
axes[0].set_ylabel("inertia")

axes[1].plot(out["k"], out["silhouette"], marker="o")
axes[1].set_title("Silhouette")
axes[1].set_xlabel("k")
axes[1].set_ylabel("silhouette")

axes[2].plot(out["k"], out["gap"], marker="o")
axes[2].set_title("Gap Statistic")
axes[2].set_xlabel("k")
axes[2].set_ylabel("gap")

fig.suptitle(f"KMeans Diagnostics | mode={EMB_MODE} | PCA={PCA_DIM if USE_PCA else 'off'}", fontsize=12)
fig.tight_layout()

if SAVE_PLOT:
    fig.savefig(SAVE_PLOT, dpi=150, bbox_inches="tight")
    print("✅ saved plot:", SAVE_PLOT)

plt.show()
