In [3]:
# -*- coding: utf-8 -*-
# pip install pandas statsmodels scipy

import os, warnings
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.stats import chi2
warnings.filterwarnings("ignore")

# ========== 설정 ==========
PATH = r"./5_tt_10년평균_통합지수.csv"  # 필요시 연평균 파일로 교체
TARGET = "통합지수"

# 후보 범주/수치 변수(파일에 없으면 자동 제외됨)
CANDIDATE_CATS = [
    "사고형태","법규위반","사고요일","일광상태",
    "사고유형","도로형태","가해자차종","일당운전자경력",
    "피해운전자 연령대","가해운전자 연령대","가해자성별","피해운전자 성별","기상상태"
]
CANDIDATE_NUMS = ["사고시각","가해자나이","피해자나이"]  # 존재하면 사용

# 포함할 상호작용(우선순위 높은 2~3개 추천)
SELECTED_INTERACTIONS = [
    ("법규위반","가해자차종"),
    ("사고형태","도로형태"),
    ("사고유형","사고형태"),
    # 필요시 ("사고요일","일광상태") 도 추가 가능
]

# 안전/성능 파라미터
MIN_LEVEL_N = 100          # 희소 레벨은 '기타'로 묶기
TOPK_LEVELS_INTER = 10     # 상호작용 변수 각자 상위 K레벨만 사용(교차항 폭발 방지)
MAX_TRAIN_ROWS    = 200_000  # **최대 학습 행 수 cap** (메모리 부족시 더 낮추기)
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# ========== 유틸 ==========
def read_csv_any(path):
    for enc in ["utf-8","cp949","utf-8-sig"]:
        try: return pd.read_csv(path, encoding=enc)
        except Exception: pass
    return pd.read_csv(path)

def uniq(seq): return list(dict.fromkeys(list(seq)))

def collapse_rare_levels(df, col, min_n=100, other="기타"):
    vc = df[col].value_counts(dropna=True)
    rare = vc[vc < min_n].index
    if len(rare) > 0:
        df[col] = df[col].where(~df[col].isin(rare), other)
    return df

def keep_topk_levels(df, col, k=TOPK_LEVELS_INTER, other="기타"):
    vc = df[col].value_counts(dropna=True)
    keep = set(vc.index[:k])
    df[col] = df[col].where(df[col].isin(keep), other)
    return df

def to_hour(s):
    x = pd.to_numeric(s, errors="coerce")
    if x.dropna().size and x.dropna().max() > 100:
        return (x // 100).clip(0, 23)
    return x

def choose_cluster_col(df):
    # 군집SE 후보 자동 선택(부적합하면 HC3)
    for c in ["사고장소","지점ID","IDX","시도"]:
        if c in df.columns:
            g = df[c].fillna("__NA__")
            if g.nunique() >= 5 and g.nunique() <= 0.95*len(g):
                return c
    return None

def cov_from(df, cluster_col):
    if cluster_col and (cluster_col in df.columns):
        g = df[cluster_col].fillna("__NA__")
        if g.nunique() >= 5 and g.nunique() <= 0.95*len(g) and (g.value_counts().max() >= 2):
            return "cluster", {"groups": g.to_numpy()}
    return "HC3", {}

def drop_near_alias(df, cat_cols, thresh=0.98):
    # 서로 거의 1:1로 겹치는 축 자동 제거(near-aliasing)
    keep, dropped = [], []
    for c in cat_cols:
        if c not in df.columns: continue
        if df[c].nunique() <= 1:
            dropped.append(c); 
            continue
        to_drop = False
        for k in keep:
            if k not in df.columns: continue
            tab = pd.crosstab(df[k], df[c])
            if tab.values.sum() == 0: continue
            col_acc = (tab.div(tab.sum(0), 0).max(0)).mean()
            row_acc = (tab.div(tab.sum(1), 0).max(1)).mean()
            if max(col_acc, row_acc) >= thresh:
                to_drop = True; break
        if to_drop: dropped.append(c)
        else: keep.append(c)
    return keep, dropped

# ---------- 결측/전처리 ----------
def impute_all(df, cat_cols, num_cols):
    df = df.copy()
    # 범주 결측 → '미상'
    for c in cat_cols:
        if c in df.columns:
            df[c] = df[c].astype("object").fillna("미상")
    # 수치 결측/이상치 대치
    for c in num_cols:
        if c not in df.columns: 
            continue
        s = df[c]
        if c == "사고시각": s = to_hour(s)
        s = pd.to_numeric(s, errors="coerce")
        med = s.median()
        if pd.isna(med): med = 0.0
        df[c] = s.fillna(med)
    return df

# ---------- 층화 샘플링(cap) ----------
def stratified_cap(df, inter_pairs, cat_base, max_rows=MAX_TRAIN_ROWS, seed=RANDOM_SEED):
    if len(df) <= max_rows:
        return df.copy()

    # 층 변수 = (선택 상호작용에 등장하는 모든 범주축) 있으면 그걸로,
    # 없으면 cat_base 앞의 2개로
    strata_cols = uniq([c for pair in inter_pairs for c in pair if c in cat_base])
    if len(strata_cols) == 0:
        strata_cols = cat_base[:2] if len(cat_base) >= 2 else cat_base

    # 상호작용 관측 보호를 위해 TopK 레벨 제한 후 층화
    tmp = df.copy()
    for c in strata_cols:
        if c in tmp.columns:
            tmp = keep_topk_levels(tmp, c, TOPK_LEVELS_INTER)
    g = tmp.groupby(strata_cols, dropna=False, sort=False)

    cap_per = max(50, int(max_rows / max(1, g.ngroups)))
    parts = []
    rng = np.random.default_rng(seed)
    for _, sub in g:
        n = len(sub)
        if n <= cap_per:
            parts.append(sub)
        else:
            parts.append(sub.sample(cap_per, random_state=seed))
    out = pd.concat(parts, axis=0)
    # cap 초과시 랜덤 추가 샘플 조정
    if len(out) > max_rows:
        out = out.sample(max_rows, random_state=seed)
    return out

# ---------- 설계행렬(주효과 + 선택 상호작용) ----------
def build_X(df, cat_cols, num_cols, inter_pairs):
    df = df.copy()

    # 상호작용 변수는 Top-K 레벨로 제한 (교차항 폭발 방지)
    for a, b in inter_pairs:
        if a in df.columns: df = keep_topk_levels(df, a, TOPK_LEVELS_INTER)
        if b in df.columns: df = keep_topk_levels(df, b, TOPK_LEVELS_INTER)

    # 더미 생성
    cats_for_dummies = [c for c in cat_cols if c in df.columns]
    Xc = pd.get_dummies(df[cats_for_dummies], drop_first=True)
    Xc = Xc.loc[:, ~Xc.columns.duplicated()]

    # 선택한 페어만 상호작용 더미 추가
    for a, b in inter_pairs:
        A = [c for c in Xc.columns if c.startswith(a + "_")]
        B = [c for c in Xc.columns if c.startswith(b + "_")]
        for ca in A:
            ca_s = Xc[ca]
            for cb in B:
                Xc[f"{ca}:{cb}"] = ca_s * Xc[cb]

    # 수치와 결합
    Xn = df[num_cols].copy() if num_cols else pd.DataFrame(index=df.index)
    X = pd.concat([Xn.astype(float), Xc.astype(float)], axis=1)

    # 제로분산 열 제거
    if len(X.columns):
        nonzero = X.std(axis=0).replace(0, np.nan).notna()
        X = X.loc[:, nonzero]

    # inf → NaN 처리 (뒤 단계에서 일괄 제거)
    X = X.replace([np.inf, -np.inf], np.nan)

    # 상수항
    X = sm.add_constant(X, has_constant="add")
    return X

def align_and_clean(X, y_raw, cov_kwds=None):
    y = pd.to_numeric(pd.Series(y_raw), errors="coerce").values
    na_mask = X.isna().any(axis=1).values
    mask = np.isfinite(y) & (~na_mask)

    Xc = X.loc[mask]
    yc = y[mask]

    cov_kwds_out = cov_kwds
    if cov_kwds is not None and "groups" in cov_kwds:
        cov_kwds_out = dict(cov_kwds)
        cov_kwds_out["groups"] = np.asarray(cov_kwds["groups"])[mask]

    dropped = int((~mask).sum())
    if dropped > 0:
        print(f"[clean] dropped {dropped} rows due to NaN/inf in X or y")

    if len(Xc) == 0:
        raise ValueError("No rows left after cleaning NaN/inf. Check inputs.")
    return Xc, yc, cov_kwds_out

def fit_glm_gamma(X, y, cov_type, cov_kwds):
    # 1차: 일반 적합 → 실패 시 초소형 릿지
    try:
        return sm.GLM(y, X, family=sm.families.Gamma(link=sm.families.links.log()))\
                .fit(cov_type=cov_type, cov_kwds=cov_kwds)
    except Exception:
        return sm.GLM(y, X, family=sm.families.Gamma(link=sm.families.links.log()))\
                .fit_regularized(alpha=1e-6, L1_wt=0.0)

def export_coef_table(res, path):
    out = pd.DataFrame({
        "term": res.params.index,
        "coef": res.params.values,
        "std_err": getattr(res, "bse", pd.Series(index=res.params.index, dtype=float)).reindex(res.params.index).values,
        "p_value": getattr(res, "pvalues", pd.Series(index=res.params.index, dtype=float)).reindex(res.params.index).values
    })
    out["mult_effect_%"] = (np.exp(out["coef"]) - 1.0) * 100.0
    out.to_csv(path, index=False, encoding="utf-8-sig")
    return out

# ========== 0) 데이터 로드 & 전처리 ==========
print("[WD]", os.getcwd())
df = read_csv_any(PATH)
assert TARGET in df.columns, f"'{TARGET}' 컬럼이 있어야 합니다."

# Y>0만 사용
df = df[df[TARGET].notna() & (pd.to_numeric(df[TARGET], errors="coerce") > 0)].copy()
df[TARGET] = pd.to_numeric(df[TARGET], errors="coerce")

# 누수 방지: 이름에 이런 단어가 들어간 컬럼은 피처에서 제외
LEAKS = ["통합지수","다발도","심각도","사고건수","건수","사망사고건수","중상사고건수","경상사고건수"]
is_safe = lambda c: not any(w in str(c) for w in LEAKS)

cat_base = [c for c in CANDIDATE_CATS if c in df.columns and is_safe(c)]
num_base = [c for c in CANDIDATE_NUMS if c in df.columns and is_safe(c)]

# 희소 레벨 정리
for c in cat_base:
    df = collapse_rare_levels(df, c, min_n=MIN_LEVEL_N)

# near-aliasing 제거
cat_base, dropped_alias = drop_near_alias(df, cat_base, thresh=0.98)
if dropped_alias:
    print("[auto-drop aliasing]", dropped_alias)

# 상호작용 리스트 중 존재하는 것만
SELECTED_INTERACTIONS = [(a,b) for (a,b) in SELECTED_INTERACTIONS if (a in cat_base and b in cat_base)]

# 결측 대치
df = impute_all(df, cat_base, num_base)

# ========== (NEW) 학습 표본 층화 샘플링 ==========
df_train = stratified_cap(df, SELECTED_INTERACTIONS, cat_base, max_rows=MAX_TRAIN_ROWS, seed=RANDOM_SEED)
print(f"[sample] train rows: {len(df_train):,} / {len(df):,}")

# ========== 1) 베이스(주효과) 모형 ==========
cluster_col = choose_cluster_col(df_train)
cov_type0, cov_kw0 = cov_from(df_train, cluster_col)

X_base = build_X(df_train, cat_base, num_base, inter_pairs=[])  # 상호작용 없음
y_raw = df_train[TARGET].values
X_base, y_base, cov_kw0 = align_and_clean(X_base, y_raw, cov_kw0)
fit_base = fit_glm_gamma(X_base, y_base, cov_type0, cov_kw0)
export_coef_table(fit_base, "MODEL_BASE_results.csv")
print(f"[BASE] AIC={fit_base.aic:.1f}, n={int(fit_base.nobs)} -> MODEL_BASE_results.csv")

# ========== 2) 풀(주효과 + 선택 상호작용) 모형 ==========
cov_typeF, cov_kwF = cov_from(df_train, cluster_col)
X_full = build_X(df_train, cat_base, num_base, inter_pairs=SELECTED_INTERACTIONS)
X_full, y_full, cov_kwF = align_and_clean(X_full, y_raw, cov_kwF)
fit_full = fit_glm_gamma(X_full, y_full, cov_typeF, cov_kwF)
export_coef_table(fit_full, "MODEL_FULL_results.csv")
pd.DataFrame({"interaction_pairs":[f"{a} × {b}" for a,b in SELECTED_INTERACTIONS]}).to_csv(
    "INTER_LIST_used.csv", index=False, encoding="utf-8-sig"
)
print(f"[FULL] AIC={fit_full.aic:.1f}, n={int(fit_full.nobs)} -> MODEL_FULL_results.csv, INTER_LIST_used.csv")

# ========== 3) BASE vs FULL 개선도(LR 테스트 & ΔAIC/BIC) ==========
# 동일 표본/상수항 기준으로 비교
ll0, ll1 = fit_base.llf, fit_full.llf
df0, df1 = fit_base.df_model, fit_full.df_model
LR = 2*(ll1 - ll0)
ddf = max(int(df1 - df0), 1)
p = chi2.sf(LR, ddf)
delta_AIC = fit_full.aic - fit_base.aic
delta_BIC = fit_full.bic - fit_base.bic

pd.DataFrame([{
    "LR_stat": LR, "df_diff": ddf, "p_value": p,
    "AIC_base": fit_base.aic, "AIC_full": fit_full.aic, "ΔAIC": delta_AIC,
    "BIC_base": fit_base.bic, "BIC_full": fit_full.bic, "ΔBIC": delta_BIC,
    "n_train": int(fit_full.nobs)
}]).to_csv("LR_TEST_BASE_vs_FULL.csv", index=False, encoding="utf-8-sig")

print("[COMPARE] LR_TEST_BASE_vs_FULL.csv saved")
print(f"  LR={LR:.2f}, df={ddf}, p={p:.3e}, ΔAIC={delta_AIC:.2f}, ΔBIC={delta_BIC:.2f}")

# ========== 4) 대표 상호작용(첫 번째) 피벗 예측 ==========
if SELECTED_INTERACTIONS:
    a, b = SELECTED_INTERACTIONS[0]
    # 대표값(연속=중앙값, 범주=최빈값)으로 다른 공변량 고정 (학습표본 기준)
    rep = {}
    for c in num_base:
        if c in df_train.columns:
            rep[c] = pd.to_numeric(df_train[c], errors="coerce").median()
            if pd.isna(rep[c]): rep[c] = 0.0
    for c in cat_base:
        if c not in [a,b] and c in df_train.columns:
            mode_series = df_train[c].mode(dropna=True)
            rep[c] = mode_series.iloc[0] if len(mode_series) else "미상"

    # 그리드 만들기: a,b의 모든(TopK 후) 레벨 조합
    df_ab = df_train[[a,b]].copy()
    df_ab = keep_topk_levels(df_ab, a, TOPK_LEVELS_INTER)
    df_ab = keep_topk_levels(df_ab, b, TOPK_LEVELS_INTER)
    A_levels = df_ab[a].dropna().unique().tolist()
    B_levels = df_ab[b].dropna().unique().tolist()

    grid = [{a: av, b: bv, **rep} for av in A_levels for bv in B_levels]
    new = pd.DataFrame(grid, columns=uniq([a,b] + list(rep.keys())))

    # 예측용 설계행렬(훈련 X_full과 동일 규칙 + 열정렬)
    X_new = build_X(new, cat_base, num_base, inter_pairs=SELECTED_INTERACTIONS)
    X_new = X_new.reindex(columns=fit_full.model.exog_names, fill_value=0.0)

    pred = fit_full.get_prediction(exog=X_new).summary_frame()
    new["pred_mean"] = pred["mean"]
    new["pred_low"]  = pred["mean_ci_lower"]
    new["pred_high"] = pred["mean_ci_upper"]

    # 피벗: 예측 평균
    pivot = new.pivot(index=a, columns=b, values="pred_mean")
    pivot.to_csv(f"PIVOT_pred_mean_{a}x{b}.csv", encoding="utf-8-sig")

    # 배수효과(%): pivot의 [0,0] 기준
    base_val = pivot.iloc[0,0]
    mult = (pivot / base_val - 1.0) * 100.0
    mult.to_csv(f"PIVOT_mult_%_{a}x{b}.csv", encoding="utf-8-sig")
    print(f"[PIVOT] saved -> PIVOT_pred_mean_{a}x{b}.csv, PIVOT_mult_%_{a}x{b}.csv")
else:
    print("[PIVOT] 상호작용이 지정되지 않아 피벗 생략")


[WD] c:\Users\USER\Desktop\Project_Data
[auto-drop aliasing] ['사고유형', '도로형태', '기상상태']
[sample] train rows: 86,162 / 522,926
[BASE] AIC=335722.5, n=86162 -> MODEL_BASE_results.csv
[FULL] AIC=335631.9, n=86162 -> MODEL_FULL_results.csv, INTER_LIST_used.csv
[COMPARE] LR_TEST_BASE_vs_FULL.csv saved
  LR=200.55, df=55, p=1.974e-18, ΔAIC=-90.55, ΔBIC=432.80
[PIVOT] saved -> PIVOT_pred_mean_법규위반x가해자차종.csv, PIVOT_mult_%_법규위반x가해자차종.csv


In [4]:
# -*- coding: utf-8 -*-
# pip install pandas statsmodels scipy

import os, warnings
import numpy as np
import pandas as pd
import statsmodels.api as sm
from scipy.stats import chi2
warnings.filterwarnings("ignore")

# ========== 설정 ==========
PATH = r"./5_tt_10년평균_통합지수.csv"  # 필요시 연평균 파일로 교체
TARGET = "통합지수"

# 후보 범주/수치 변수(파일에 없으면 자동 제외됨)
CANDIDATE_CATS = [
    "사고형태","법규위반","사고요일","일광상태",
    "사고유형","도로형태","가해자차종","일당운전자경력",
    "피해운전자 연령대","가해운전자 연령대","가해자성별","피해운전자 성별","기상상태"
]
CANDIDATE_NUMS = ["사고시각","가해자나이","피해자나이"]  # 존재하면 사용

# 포함할 상호작용(우선순위 높은 2~3개 추천) — 바꿔도 안전하게 동작
SELECTED_INTERACTIONS = [
    ("법규위반","가해자차종"),
    ("사고형태","도로형태"),
    ("사고유형","사고형태"),
    # 필요시 ("사고요일","일광상태") 도 추가 가능
]

# 안전/성능 파라미터
MIN_LEVEL_N         = 100      # 희소 레벨은 '기타'로 묶기
TOPK_LEVELS_INTER   = 10       # 상호작용 변수 각자 상위 K레벨만 사용(교차항 폭발 방지)
MAX_TRAIN_ROWS      = 200_000  # 학습 행 수 cap (메모리 빠듯하면 ↓)
MAX_INTER_DUMMIES   = 400      # 상호작용 더미 칼럼 수 안전 캡 (drop_first 기준 (la-1)*(lb-1))
RANDOM_SEED         = 42
np.random.seed(RANDOM_SEED)

# ========== 유틸 ==========
def read_csv_any(path):
    for enc in ["utf-8","cp949","utf-8-sig"]:
        try: return pd.read_csv(path, encoding=enc)
        except Exception: pass
    return pd.read_csv(path)

def uniq(seq): return list(dict.fromkeys(list(seq)))

def collapse_rare_levels(df, col, min_n=100, other="기타"):
    vc = df[col].value_counts(dropna=True)
    rare = vc[vc < min_n].index
    if len(rare) > 0:
        df[col] = df[col].where(~df[col].isin(rare), other)
    return df

def keep_topk_levels(df, col, k=TOPK_LEVELS_INTER, other="기타"):
    vc = df[col].value_counts(dropna=True)
    keep = set(vc.index[:k])
    df[col] = df[col].where(df[col].isin(keep), other)
    return df

def to_hour(s):
    x = pd.to_numeric(s, errors="coerce")
    if x.dropna().size and x.dropna().max() > 100:
        return (x // 100).clip(0, 23)
    return x

def choose_cluster_col(df):
    # 군집SE 후보 자동 선택(부적합하면 HC3)
    for c in ["사고장소","지점ID","IDX","시도"]:
        if c in df.columns:
            g = df[c].fillna("__NA__")
            if g.nunique() >= 5 and g.nunique() <= 0.95*len(g):
                return c
    return None

def cov_from(df, cluster_col):
    if cluster_col and (cluster_col in df.columns):
        g = df[cluster_col].fillna("__NA__")
        if g.nunique() >= 5 and g.nunique() <= 0.95*len(g) and (g.value_counts().max() >= 2):
            return "cluster", {"groups": g.to_numpy()}
    return "HC3", {}

def drop_near_alias(df, cat_cols, thresh=0.98):
    # 서로 거의 1:1로 겹치는 축 자동 제거(near-aliasing)
    keep, dropped = [], []
    for c in cat_cols:
        if c not in df.columns: continue
        if df[c].nunique() <= 1:
            dropped.append(c); 
            continue
        to_drop = False
        for k in keep:
            if k not in df.columns: continue
            tab = pd.crosstab(df[k], df[c])
            if tab.values.sum() == 0: continue
            col_acc = (tab.div(tab.sum(0), 0).max(0)).mean()
            row_acc = (tab.div(tab.sum(1), 0).max(1)).mean()
            if max(col_acc, row_acc) >= thresh:
                to_drop = True; break
        if to_drop: dropped.append(c)
        else: keep.append(c)
    return keep, dropped

# ---------- (보강1) 상호작용 유효성 점검 ----------
def validate_and_filter_interactions(df, cat_base, pairs, min_levels=2):
    ok = []
    for a, b in pairs:
        if a not in cat_base or b not in cat_base:
            print(f"[skip] '{a}×{b}': not in cat_base")
            continue
        la, lb = df[a].nunique(), df[b].nunique()
        if la < min_levels or lb < min_levels:
            print(f"[skip] '{a}×{b}': insufficient levels ({la},{lb})")
            continue
        ok.append((a, b))
    return ok

# ---------- (보강3) 상호작용 더미 칼럼 수 캡 ----------
def cap_interaction_levels_for_pair(df, a, b, max_inter_dummies=MAX_INTER_DUMMIES):
    if a not in df.columns or b not in df.columns:
        return df
    la, lb = df[a].nunique(), df[b].nunique()
    tgt_a, tgt_b = la, lb
    # drop_first 기준 더미 수 ≈ (la-1)*(lb-1)
    while (tgt_a-1)*(tgt_b-1) > max_inter_dummies and (tgt_a > 2 or tgt_b > 2):
        if tgt_a >= tgt_b and tgt_a > 2:
            tgt_a -= 1
        elif tgt_b > 2:
            tgt_b -= 1
        else:
            break
    if tgt_a < la: df = keep_topk_levels(df, a, tgt_a)
    if tgt_b < lb: df = keep_topk_levels(df, b, tgt_b)
    return df

# ---------- 결측/전처리 ----------
def impute_all(df, cat_cols, num_cols):
    df = df.copy()
    # 범주 결측 → '미상'
    for c in cat_cols:
        if c in df.columns:
            df[c] = df[c].astype("object").fillna("미상")
    # 수치 결측/이상치 대치
    for c in num_cols:
        if c not in df.columns: 
            continue
        s = df[c]
        if c == "사고시각": s = to_hour(s)
        s = pd.to_numeric(s, errors="coerce")
        med = s.median()
        if pd.isna(med): med = 0.0
        df[c] = s.fillna(med)
    return df

# ---------- 층화 샘플링(cap) ----------
def stratified_cap(df, inter_pairs, cat_base, max_rows=MAX_TRAIN_ROWS, seed=RANDOM_SEED):
    if len(df) <= max_rows:
        return df.copy()
    strata_cols = uniq([c for pair in inter_pairs for c in pair if c in cat_base])
    if len(strata_cols) == 0:
        strata_cols = cat_base[:2] if len(cat_base) >= 2 else cat_base
    tmp = df.copy()
    for c in strata_cols:
        if c in tmp.columns:
            tmp = keep_topk_levels(tmp, c, TOPK_LEVELS_INTER)
    g = tmp.groupby(strata_cols, dropna=False, sort=False)
    cap_per = max(50, int(max_rows / max(1, g.ngroups)))
    parts = []
    for _, sub in g:
        n = len(sub)
        parts.append(sub.sample(cap_per, random_state=seed) if n > cap_per else sub)
    out = pd.concat(parts, axis=0)
    if len(out) > max_rows:
        out = out.sample(max_rows, random_state=seed)
    return out

# ---------- 설계행렬(주효과 + 선택 상호작용) ----------
def build_X(df, cat_cols, num_cols, inter_pairs):
    df = df.copy()

    # 상호작용 변수: Top-K 제한 후, 칼럼 수 캡 적용
    for a, b in inter_pairs:
        if a in df.columns: df = keep_topk_levels(df, a, TOPK_LEVELS_INTER)
        if b in df.columns: df = keep_topk_levels(df, b, TOPK_LEVELS_INTER)
        df = cap_interaction_levels_for_pair(df, a, b, max_inter_dummies=MAX_INTER_DUMMIES)

    # 더미 생성
    cats_for_dummies = [c for c in cat_cols if c in df.columns]
    Xc = pd.get_dummies(df[cats_for_dummies], drop_first=True)
    Xc = Xc.loc[:, ~Xc.columns.duplicated()]

    # 선택한 페어만 상호작용 더미 추가
    for a, b in inter_pairs:
        A = [c for c in Xc.columns if c.startswith(a + "_")]
        B = [c for c in Xc.columns if c.startswith(b + "_")]
        for ca in A:
            ca_s = Xc[ca]
            for cb in B:
                Xc[f"{ca}:{cb}"] = ca_s * Xc[cb]

    # 수치와 결합
    Xn = df[num_cols].copy() if num_cols else pd.DataFrame(index=df.index)
    X = pd.concat([Xn.astype(float), Xc.astype(float)], axis=1)

    # 제로분산 열 제거
    if len(X.columns):
        nonzero = X.std(axis=0).replace(0, np.nan).notna()
        X = X.loc[:, nonzero]

    # inf → NaN 처리 (뒤 단계에서 일괄 제거)
    X = X.replace([np.inf, -np.inf], np.nan)

    # 상수항
    X = sm.add_constant(X, has_constant="add")
    return X

def align_and_clean(X, y_raw, cov_kwds=None):
    y = pd.to_numeric(pd.Series(y_raw), errors="coerce").values
    na_mask = X.isna().any(axis=1).values
    mask = np.isfinite(y) & (~na_mask)
    Xc = X.loc[mask]
    yc = y[mask]
    cov_kwds_out = cov_kwds
    if cov_kwds is not None and "groups" in cov_kwds:
        cov_kwds_out = dict(cov_kwds)
        cov_kwds_out["groups"] = np.asarray(cov_kwds["groups"])[mask]
    dropped = int((~mask).sum())
    if dropped > 0:
        print(f"[clean] dropped {dropped} rows due to NaN/inf in X or y")
    if len(Xc) == 0:
        raise ValueError("No rows left after cleaning NaN/inf. Check inputs.")
    return Xc, yc, cov_kwds_out

def fit_glm_gamma(X, y, cov_type, cov_kwds):
    # 1차: 일반 적합 → 실패 시 초소형 릿지
    try:
        return sm.GLM(y, X, family=sm.families.Gamma(link=sm.families.links.log()))\
                .fit(cov_type=cov_type, cov_kwds=cov_kwds)
    except Exception:
        return sm.GLM(y, X, family=sm.families.Gamma(link=sm.families.links.log()))\
                .fit_regularized(alpha=1e-6, L1_wt=0.0)

def export_coef_table(res, path):
    out = pd.DataFrame({
        "term": res.params.index,
        "coef": res.params.values,
        "std_err": getattr(res, "bse", pd.Series(index=res.params.index, dtype=float)).reindex(res.params.index).values,
        "p_value": getattr(res, "pvalues", pd.Series(index=res.params.index, dtype=float)).reindex(res.params.index).values
    })
    out["mult_effect_%"] = (np.exp(out["coef"]) - 1.0) * 100.0
    out.to_csv(path, index=False, encoding="utf-8-sig")
    return out

# ---------- (보강2) 안전 예측기 ----------
def safe_predict_mean(res, X_new):
    if hasattr(res, "get_prediction"):
        try:
            sf = res.get_prediction(exog=X_new).summary_frame()
            mean = sf["mean"]
            low  = sf["mean_ci_lower"] if "mean_ci_lower" in sf else None
            high = sf["mean_ci_upper"] if "mean_ci_upper" in sf else None
            return mean, low, high
        except Exception:
            pass
    # fallback: 수동 계산 (CI 생략)
    lin = np.dot(X_new, res.params)
    mu = res.model.family.fitted(lin)   # log 링크 → exp 변환
    return pd.Series(mu, index=X_new.index), None, None

# ========== 0) 데이터 로드 & 전처리 ==========
print("[WD]", os.getcwd())
df = read_csv_any(PATH)
assert TARGET in df.columns, f"'{TARGET}' 컬럼이 있어야 합니다."

# Y>0만 사용
df = df[df[TARGET].notna() & (pd.to_numeric(df[TARGET], errors="coerce") > 0)].copy()
df[TARGET] = pd.to_numeric(df[TARGET], errors="coerce")

# 누수 방지: 이름에 이런 단어가 들어간 컬럼은 피처에서 제외
LEAKS = ["통합지수","다발도","심각도","사고건수","건수","사망사고건수","중상사고건수","경상사고건수"]
is_safe = lambda c: not any(w in str(c) for w in LEAKS)

cat_base = [c for c in CANDIDATE_CATS if c in df.columns and is_safe(c)]
num_base = [c for c in CANDIDATE_NUMS if c in df.columns and is_safe(c)]

# 희소 레벨 정리
for c in cat_base:
    df = collapse_rare_levels(df, c, min_n=MIN_LEVEL_N)

# near-aliasing 제거
cat_base, dropped_alias = drop_near_alias(df, cat_base, thresh=0.98)
if dropped_alias:
    print("[auto-drop aliasing]", dropped_alias)

# 상호작용 리스트 중 존재하는 것만 → 유효성 검사까지
SELECTED_INTERACTIONS = [(a,b) for (a,b) in SELECTED_INTERACTIONS if (a in cat_base and b in cat_base)]
SELECTED_INTERACTIONS = validate_and_filter_interactions(df, cat_base, SELECTED_INTERACTIONS)

# 결측 대치
df = impute_all(df, cat_base, num_base)

# ========== 학습 표본 층화 샘플링 ==========
df_train = stratified_cap(df, SELECTED_INTERACTIONS, cat_base, max_rows=MAX_TRAIN_ROWS, seed=RANDOM_SEED)
print(f"[sample] train rows: {len(df_train):,} / {len(df):,}")

# ========== 1) 베이스(주효과) 모형 ==========
cluster_col = choose_cluster_col(df_train)
cov_type0, cov_kw0 = cov_from(df_train, cluster_col)

X_base = build_X(df_train, cat_base, num_base, inter_pairs=[])  # 상호작용 없음
y_raw = df_train[TARGET].values
X_base, y_base, cov_kw0 = align_and_clean(X_base, y_raw, cov_kw0)
fit_base = fit_glm_gamma(X_base, y_base, cov_type0, cov_kw0)
export_coef_table(fit_base, "MODEL_BASE_results.csv")
print(f"[BASE] AIC={fit_base.aic:.1f}, n={int(fit_base.nobs)} -> MODEL_BASE_results.csv")

# ========== 2) 풀(주효과 + 선택 상호작용) 모형 ==========
cov_typeF, cov_kwF = cov_from(df_train, cluster_col)
X_full = build_X(df_train, cat_base, num_base, inter_pairs=SELECTED_INTERACTIONS)
X_full, y_full, cov_kwF = align_and_clean(X_full, y_raw, cov_kwF)
fit_full = fit_glm_gamma(X_full, y_full, cov_typeF, cov_kwF)
export_coef_table(fit_full, "MODEL_FULL_results.csv")
pd.DataFrame({"interaction_pairs":[f"{a} × {b}" for a,b in SELECTED_INTERACTIONS]}).to_csv(
    "INTER_LIST_used.csv", index=False, encoding="utf-8-sig"
)
print(f"[FULL] AIC={fit_full.aic:.1f}, n={int(fit_full.nobs)} -> MODEL_FULL_results.csv, INTER_LIST_used.csv")

# ========== 3) BASE vs FULL 개선도(LR 테스트 & ΔAIC/BIC) ==========
ll0, ll1 = fit_base.llf, fit_full.llf
df0, df1 = fit_base.df_model, fit_full.df_model
LR = 2*(ll1 - ll0)
ddf = max(int(df1 - df0), 1)
p = chi2.sf(LR, ddf)
delta_AIC = fit_full.aic - fit_base.aic
delta_BIC = fit_full.bic - fit_base.bic

pd.DataFrame([{
    "LR_stat": LR, "df_diff": ddf, "p_value": p,
    "AIC_base": fit_base.aic, "AIC_full": fit_full.aic, "ΔAIC": delta_AIC,
    "BIC_base": fit_base.bic, "BIC_full": fit_full.bic, "ΔBIC": delta_BIC,
    "n_train": int(fit_full.nobs)
}]).to_csv("LR_TEST_BASE_vs_FULL.csv", index=False, encoding="utf-8-sig")

print("[COMPARE] LR_TEST_BASE_vs_FULL.csv saved")
print(f"  LR={LR:.2f}, df={ddf}, p={p:.3e}, ΔAIC={delta_AIC:.2f}, ΔBIC={delta_BIC:.2f}")

# ========== 4) 대표 상호작용(첫 번째) 피벗 예측 ==========
if SELECTED_INTERACTIONS:
    a, b = SELECTED_INTERACTIONS[0]
    # 대표값(연속=중앙값, 범주=최빈값)으로 다른 공변량 고정 (학습표본 기준)
    rep = {}
    for c in num_base:
        if c in df_train.columns:
            rep[c] = pd.to_numeric(df_train[c], errors="coerce").median()
            if pd.isna(rep[c]): rep[c] = 0.0
    for c in cat_base:
        if c not in [a,b] and c in df_train.columns:
            mode_series = df_train[c].mode(dropna=True)
            rep[c] = mode_series.iloc[0] if len(mode_series) else "미상"

    # 그리드 만들기: a,b의 모든(TopK/캡 후) 레벨 조합
    df_ab = df_train[[a,b]].copy()
    df_ab = keep_topk_levels(df_ab, a, TOPK_LEVELS_INTER)
    df_ab = keep_topk_levels(df_ab, b, TOPK_LEVELS_INTER)
    df_ab = cap_interaction_levels_for_pair(df_ab, a, b, max_inter_dummies=MAX_INTER_DUMMIES)
    A_levels = df_ab[a].dropna().unique().tolist()
    B_levels = df_ab[b].dropna().unique().tolist()

    grid = [{a: av, b: bv, **rep} for av in A_levels for bv in B_levels]
    new = pd.DataFrame(grid, columns=uniq([a,b] + list(rep.keys())))

    # 예측용 설계행렬(훈련 X_full과 동일 규칙 + 열정렬)
    X_new = build_X(new, cat_base, num_base, inter_pairs=SELECTED_INTERACTIONS)
    X_new = X_new.reindex(columns=fit_full.model.exog_names, fill_value=0.0)

    mean, low, high = safe_predict_mean(fit_full, X_new)
    new["pred_mean"] = mean
    if low is not None:  new["pred_low"]  = low
    if high is not None: new["pred_high"] = high

    # 피벗: 예측 평균
    pivot = new.pivot(index=a, columns=b, values="pred_mean")
    pivot.to_csv(f"PIVOT_pred_mean_{a}x{b}.csv", encoding="utf-8-sig")

    # 배수효과(%): pivot의 [0,0] 기준
    base_val = pivot.iloc[0,0]
    mult = (pivot / base_val - 1.0) * 100.0
    mult.to_csv(f"PIVOT_mult_%_{a}x{b}.csv", encoding="utf-8-sig")
    print(f"[PIVOT] saved -> PIVOT_pred_mean_{a}x{b}.csv, PIVOT_mult_%_{a}x{b}.csv")
else:
    print("[PIVOT] 상호작용이 지정되지 않아 피벗 생략")


[WD] c:\Users\USER\Desktop\Project_Data
[auto-drop aliasing] ['사고유형', '도로형태', '기상상태']
[sample] train rows: 86,162 / 522,926
[BASE] AIC=335722.5, n=86162 -> MODEL_BASE_results.csv
[FULL] AIC=335631.9, n=86162 -> MODEL_FULL_results.csv, INTER_LIST_used.csv
[COMPARE] LR_TEST_BASE_vs_FULL.csv saved
  LR=200.55, df=55, p=1.974e-18, ΔAIC=-90.55, ΔBIC=432.80
[PIVOT] saved -> PIVOT_pred_mean_법규위반x가해자차종.csv, PIVOT_mult_%_법규위반x가해자차종.csv
