# 수원서베이 2024 EDA + 핵심지표 대시보드 (v2: 코드/라벨 자동 매핑)

- `COMMON_VARS`에 **변수코드(SCORE2, MHQ1...)** 또는 **한국어 라벨(수원시정 만족도, 한 주간 삶의 질_평균(100점)...)** 모두 허용
- 코드북을 로드하여 **정확 일치 → 부분 포함(contains)** 순으로 매핑
- 매핑 결과는 `mapping` 시트로 함께 저장


## 0) 로드/설정

In [1]:

from pathlib import Path
import pandas as pd
import numpy as np

BASE = Path.cwd().parent
DATA_XLSX = BASE / "output" / "1. 수원서베이" / "suwon_2024_labeled.xlsx"
DATA_CSV = BASE / "output" / "1. 수원서베이" / "suwon_2024_labeled.csv"
CODEBOOK_XLSX = BASE / "data" / "internal" / "1. 수원서베이" / "(HRC250604) 2024년 수원서베이 용역_공개용 데이터" / "(HRC250604) 2024년 수원서베이 용역_공개용 데이터_코드북.xlsx"

OUTDIR = BASE / "eda"
OUTDIR.mkdir(parents=True, exist_ok=True)
DASHBOARD_PATH = OUTDIR / "dashboard.xlsx"

# 데이터 로드
if DATA_XLSX.exists():
    df = pd.read_excel(DATA_XLSX, sheet_name="data_labeled")
elif DATA_CSV.exists():
    df = pd.read_csv(DATA_CSV)
else:
    raise FileNotFoundError("라벨링 데이터 파일이 없습니다. suwon_2024_labeled.xlsx 또는 CSV를 확인하세요.")

print("Loaded:", df.shape)

# 코드북 로드 (Variable Name, Variable Label)
cb = pd.read_excel(CODEBOOK_XLSX, sheet_name=0, usecols=["Variable Name", "Variable Label"]).drop_duplicates()
cb = cb.rename(columns={"Variable Name":"var", "Variable Label":"label"})
cb = cb.dropna(subset=["var", "label"])
cb["var"] = cb["var"].astype(str)
cb["label"] = cb["label"].astype(str)

# 공통 문항 입력
common_txt = BASE / "common_vars.txt"
if common_txt.exists():
    with open(common_txt, "r", encoding="utf-8") as f:
        COMMON_VARS = [line.strip() for line in f if line.strip()]
else:
    # 예시(라벨 기반)
    COMMON_VARS = ["정책 관심도", "수원시정 만족도", "한 주간 삶의 질_평균(100점)", "영역별 행복 정도_평균(100점)", "환경 만족도_평균(100점)"]

# 가중치 선택
WEIGHT_COL = "ws" if "ws" in df.columns else ("wg" if "wg" in df.columns else None)
print("Weight column:", WEIGHT_COL)
print("COMMON_VARS(raw):", COMMON_VARS)


Loaded: (3057, 481)
Weight column: ws
COMMON_VARS(raw): ['정책 관심도', '수원시정 만족도', '한 주간 삶의 질_평균(100점)', '영역별 행복 정도_평균(100점)', '환경 만족도_평균(100점)']


## 1) 공통 문항 이름 해석/매핑

In [2]:

# ===== 변수명/라벨명 해석기 =====
# 1) 사용자가 준 토큰이 df.columns에 이미 있으면 그대로 사용
# 2) 코드북 label과 정확히 일치하면 해당 var(code)로
# 3) 코드북 label에 부분 포함(contains)되면 가장 긴 일치 라벨을 우선으로 선택
# 4) 그래도 실패하면 unresolved로 처리

df_cols = set(map(str, df.columns))

def resolve_single(token: str):
    t = str(token).strip()
    # case 1: 이미 컬럼명
    if t in df_cols:
        # 라벨 찾기
        label = cb.loc[cb["var"] == t, "label"]
        return {"input": t, "resolved_var": t, "resolved_label": (label.iloc[0] if not label.empty else None), "method": "as_is"}
    # case 2: label exact
    hit = cb[cb["label"] == t]
    if not hit.empty:
        v = hit.iloc[0]["var"]
        if v in df_cols:
            return {"input": t, "resolved_var": v, "resolved_label": t, "method": "label_exact"}
    # case 3: label contains
    cand = cb[cb["label"].str.contains(t, na=False)]
    if not cand.empty:
        # 후보 중 df에 존재하는 변수만
        cand2 = cand[cand["var"].isin(df_cols)].copy()
        if not cand2.empty:
            # 가장 긴 라벨(=가장 구체적) 우선
            cand2["label_len"] = cand2["label"].str.len()
            row = cand2.sort_values("label_len", ascending=False).iloc[0]
            return {"input": t, "resolved_var": row["var"], "resolved_label": row["label"], "method": "label_contains"}
    # 실패
    return {"input": t, "resolved_var": None, "resolved_label": None, "method": "unresolved"}

resolved = [resolve_single(x) for x in COMMON_VARS]
mapping_df = pd.DataFrame(resolved)
print(mapping_df)


                input resolved_var      resolved_label       method
0              정책 관심도       SCORE1              정책 관심도  label_exact
1            수원시정 만족도       SCORE2            수원시정 만족도  label_exact
2  한 주간 삶의 질_평균(100점)         MHQ1  한 주간 삶의 질_평균(100점)  label_exact
3  영역별 행복 정도_평균(100점)         MHQ2  영역별 행복 정도_평균(100점)  label_exact
4     환경 만족도_평균(100점)         MHQ4     환경 만족도_평균(100점)  label_exact


## 2) 보조 함수

In [3]:

LIKERT_KEYWORDS = [
    "전혀", "그렇지 않다", "보통", "그렇다", "매우", "만족", "불만족", "동의", "비동의",
    "낮다", "높다", "나쁘다", "좋다", "의견", "정도", "점수", "만큼"
]

def looks_like_likert(series, sample_k=30):
    vals = series.dropna().astype(str).unique()[:sample_k]
    hit = 0
    for v in vals:
        if any(k in v for k in LIKERT_KEYWORDS):
            hit += 1
    nunique = series.nunique(dropna=True)
    return (4 <= nunique <= 7) and (hit >= max(1, int(np.ceil(len(vals) * 0.2))))

def infer_var_type(s, cat_threshold=0.05, max_cat_unique=30):
    s_num = pd.to_numeric(s, errors="coerce")
    numeric_ratio = s_num.notna().mean()
    nunique = s.nunique(dropna=True)
    if numeric_ratio > 0.98:
        return "numeric"
    if looks_like_likert(s):
        return "ordinal_likert"
    if nunique <= max_cat_unique or nunique / max(1, len(s)) <= cat_threshold:
        return "categorical"
    return "categorical"

def summarize_numeric(s):
    s_num = pd.to_numeric(s, errors="coerce")
    return {
        "count": int(s_num.count()),
        "mean": float(s_num.mean()) if s_num.count() else np.nan,
        "std": float(s_num.std()) if s_num.count() else np.nan,
        "min": float(s_num.min()) if s_num.count() else np.nan,
        "q25": float(s_num.quantile(0.25)) if s_num.count() else np.nan,
        "median": float(s_num.median()) if s_num.count() else np.nan,
        "q75": float(s_num.quantile(0.75)) if s_num.count() else np.nan,
        "max": float(s_num.max()) if s_num.count() else np.nan,
        "nunique": int(s_num.nunique(dropna=True)),
        "na_rate": float(s.isna().mean())
    }

def weighted_mean(x, w):
    x = pd.to_numeric(x, errors="coerce")
    m = ~x.isna() & ~w.isna()
    if m.sum() == 0:
        return np.nan
    return float((x[m] * w[m]).sum() / w[m].sum())

def weighted_std(x, w):
    x = pd.to_numeric(x, errors="coerce")
    m = ~x.isna() & ~w.isna()
    if m.sum() < 2:
        return np.nan
    mu = (x[m] * w[m]).sum() / w[m].sum()
    var = ((w[m] * (x[m] - mu) ** 2).sum()) / (w[m].sum())
    return float(np.sqrt(var))

def numeric_stats(s, w=None):
    base = summarize_numeric(s)
    if w is not None:
        wm = weighted_mean(s, w)
        ws = weighted_std(s, w)
        base.update({"w_mean": wm, "w_std": ws})
    return base

def categorical_freq(s, w=None):
    s2 = s.fillna("(결측)").astype(str)
    if w is None:
        vc = s2.value_counts(dropna=False).rename("count").to_frame()
        vc["ratio"] = vc["count"] / len(s2)
        vc = vc.reset_index().rename(columns={"index":"value"})
        return vc[["value", "ratio", "count"]]
    else:
        tmp = pd.DataFrame({"v": s2, "w": w})
        grp = tmp.groupby("v", dropna=False, as_index=False)["w"].sum()
        total = grp["w"].sum()
        grp = grp.rename(columns={"w": "weight"}).sort_values("weight", ascending=False)
        grp["ratio"] = grp["weight"] / total if total > 0 else np.nan
        grp = grp.rename(columns={"v": "value"})
        grp["count"] = np.nan
        return grp[["value", "ratio", "count", "weight"]]


## 3) 대시보드 생성

In [4]:

# 실제 대시보드 빌드
resolved_ok = mapping_df[mapping_df["resolved_var"].notna()].copy()
resolved_ok = resolved_ok[resolved_ok["resolved_var"].isin(df.columns)]
unresolved = mapping_df[mapping_df["resolved_var"].isna()].copy()

w = None
if WEIGHT_COL and WEIGHT_COL in df.columns:
    w = pd.to_numeric(df[WEIGHT_COL], errors="coerce")

summary_rows = []

with pd.ExcelWriter(DASHBOARD_PATH, engine="xlsxwriter") as writer:
    # mapping 시트
    mapping_df.to_excel(writer, sheet_name="mapping", index=False)
    # unresolved 시트
    if not unresolved.empty:
        unresolved.to_excel(writer, sheet_name="unresolved", index=False)

    # 변수별 시트 + summary
    for _, row in resolved_ok.iterrows():
        var = row["resolved_var"]
        label = row["resolved_label"]
        s = df[var]
        vtype = infer_var_type(s)
        sheet = (label if isinstance(label, str) and label.strip() else var)[:31]

        if vtype == "numeric":
            stats = numeric_stats(s, w)
            stats.update({"variable": var, "label": label, "type":"numeric", "weight": WEIGHT_COL})
            pd.DataFrame([stats]).to_excel(writer, sheet_name=sheet, index=False)
            summary_rows.append(stats)
        else:
            freq = categorical_freq(s, w)
            freq.to_excel(writer, sheet_name=sheet, index=False)
            # summary top1
            top_val = None; top_ratio = np.nan
            if not freq.empty:
                top_val = freq.iloc[0]["value"]
                top_ratio = float(freq.iloc[0]["ratio"]) if pd.notna(freq.iloc[0]["ratio"]) else np.nan
            summary_rows.append({
                "variable": var,
                "label": label,
                "type": "categorical",
                "top1_value": top_val,
                "top1_ratio": top_ratio,
                "nunique": int(s.nunique(dropna=True)),
                "weight": WEIGHT_COL
            })

    if summary_rows:
        pd.DataFrame(summary_rows).to_excel(writer, sheet_name="summary", index=False)

print("Dashboard saved to:", DASHBOARD_PATH)
print("Resolved OK:", len(resolved_ok), " / Input:", len(mapping_df))
if len(unresolved) > 0:
    print("[WARN] Unresolved inputs:", list(unresolved["input"]))


Dashboard saved to: d:\workspace\dacon_sri\eda\dashboard.xlsx
Resolved OK: 5  / Input: 5
