# 3교시 대비: 이상탐지 **지도학습 전용** 템플릿 (토이데이터 + 문제 + 모범답안 + 실무 해설)

이 노트북은 **라벨이 있는 이상탐지(이진 분류)** 문제에서  
**지도학습만으로** 빠르게 완성하는 *시험장 복붙 템플릿*입니다.

## 핵심
- `predict_proba`로 **위험도(score)** 생성
- **불균형 대응**(class_weight / scale_pos_weight)
- **임계값 정책**(F1-opt, Top-N, Recall 고정)
- **리포트 출력**(Top-N 샘플)
- (가산) **해석**(Permutation Importance)


## 0) Imports & 재현성

In [1]:
import random, warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    roc_auc_score, average_precision_score,
    precision_recall_curve, f1_score, precision_score, recall_score
)
from sklearn.inspection import permutation_importance

try:
    from xgboost import XGBClassifier
    HAS_XGB = True
except Exception:
    HAS_XGB = False

warnings.filterwarnings("ignore")
SEED=42
random.seed(SEED)
np.random.seed(SEED)


## 1) 토이데이터 생성 (시험장에서는 CSV 로딩으로 대체)

- 범주형: line_id, shift, supplier, material_grade  
- 수치형: temp/pressure/vibration/cycle/error 등  
- 타깃: eol_fail (0 정상, 1 불량) — 불균형(약 4%)


In [2]:
def make_toy_labeled(n=20000, pos_ratio=0.04, seed=42):
    rng = np.random.default_rng(seed)
    line_id = rng.integers(1, 7, size=n).astype(str)
    shift = rng.choice(["day","swing","night"], size=n, p=[0.5,0.3,0.2])
    supplier = rng.choice(["A","B","C","D","E"], size=n, p=[0.25,0.25,0.2,0.2,0.1])
    material_grade = rng.choice(["G1","G2","G3"], size=n, p=[0.55,0.35,0.10])

    temp_mean = rng.normal(70, 5, size=n) + (line_id.astype(int)-3)*0.7
    pressure_std = np.abs(rng.normal(1.1, 0.35, size=n))
    vibration_max = np.abs(rng.normal(3.0, 1.0, size=n)) + (shift=="night")*0.6
    humidity = np.clip(rng.normal(45, 10, size=n), 10, 90)
    cycle_time = rng.normal(120, 15, size=n) + (material_grade=="G3")*7
    error_cnt = rng.poisson(1.3, size=n) + (supplier=="E")*1

    risk = (
        0.9*np.maximum(0, vibration_max-3.5)
        + 1.2*np.maximum(0, error_cnt-2)
        + 0.03*np.maximum(0, cycle_time-130)
        + (supplier=="E")*0.7
        + (material_grade=="G3")*0.8
        + ((supplier=="C") & (shift=="night"))*0.6
        + rng.normal(0, 0.3, size=n)
    )
    thr = np.quantile(risk, 1-pos_ratio)
    y = (risk >= thr).astype(int)

    df = pd.DataFrame({
        "line_id": line_id,
        "shift": shift,
        "supplier": supplier,
        "material_grade": material_grade,
        "temp_mean": temp_mean,
        "pressure_std": pressure_std,
        "vibration_max": vibration_max,
        "humidity": humidity,
        "cycle_time": cycle_time,
        "error_cnt": error_cnt,
        "eol_fail": y
    })

    # missing
    for col in ["temp_mean","pressure_std","humidity"]:
        m = rng.random(n) < 0.02
        df.loc[m, col] = np.nan
    # outliers
    o = rng.random(n) < 0.005
    df.loc[o, "vibration_max"] *= 5
    df.loc[o, "error_cnt"] += 15

    return df

df = make_toy_labeled()
df.head()


Unnamed: 0,line_id,shift,supplier,material_grade,temp_mean,pressure_std,vibration_max,humidity,cycle_time,error_cnt,eol_fail
0,1,swing,B,G2,73.168503,0.876834,4.762998,36.64394,113.076839,2,0
1,5,swing,B,G1,59.48152,1.511594,3.343818,49.10094,148.367302,3,0
2,4,day,E,G2,68.842233,0.781406,2.80823,49.216298,111.936949,2,0
3,3,day,A,G1,67.952355,1.24445,2.125842,38.328802,103.403071,3,0
4,3,day,A,G1,67.677732,0.856918,3.64927,44.314638,127.155881,0,0


## 2) (시험형) 문제 요구사항
1) 불균형 확인
2) 전처리(범주/수치)
3) 지도 모델 학습+평가(PR-AUC/F1)
4) 임계값 정책 비교(F1-opt/Top5/Recall 고정)
5) 리포트 출력
6) (가산) 해석

## 3) EDA: 클래스 비율/결측 확인

In [3]:
target_col="eol_fail"
cat_cols=["line_id","shift","supplier","material_grade"]
num_cols=[c for c in df.columns if c not in cat_cols+[target_col]]

print("Shape:", df.shape)
print(df[target_col].value_counts())
print(df[target_col].value_counts(normalize=True))
print("\nMissing ratio top:")
print(df.isna().mean().sort_values(ascending=False).head(10))


Shape: (20000, 11)
eol_fail
0    19200
1      800
Name: count, dtype: int64
eol_fail
0    0.96
1    0.04
Name: proportion, dtype: float64

Missing ratio top:
pressure_std      0.01970
temp_mean         0.01925
humidity          0.01880
supplier          0.00000
shift             0.00000
line_id           0.00000
material_grade    0.00000
vibration_max     0.00000
cycle_time        0.00000
error_cnt         0.00000
dtype: float64


## 4) Split (stratify 필수) + 전처리

In [4]:
X=df[cat_cols+num_cols].copy()
y=df[target_col].astype(int).copy()

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=SEED, stratify=y
)

USE_ROBUST=True
scaler = RobustScaler() if USE_ROBUST else StandardScaler()

cat_pipe = Pipeline([
    ("imp", SimpleImputer(strategy="most_frequent")),
    ("ohe", OneHotEncoder(handle_unknown="ignore"))
])
num_pipe = Pipeline([
    ("imp", SimpleImputer(strategy="median")),
    ("sc", scaler)
])

preprocess = ColumnTransformer([
    ("cat", cat_pipe, cat_cols),
    ("num", num_pipe, num_cols)
])

X_train_enc = preprocess.fit_transform(X_train)
X_val_enc = preprocess.transform(X_val)

print("Encoded shapes:", X_train_enc.shape, X_val_enc.shape)


Encoded shapes: (16000, 23) (4000, 23)


## 5) 지도 모델 학습(불균형 대응) + PR-AUC/ROC-AUC

In [5]:
pos = int((y_train==1).sum())
neg = int((y_train==0).sum())
scale_pos_weight = neg / max(pos, 1)

models = {
    "LR": LogisticRegression(max_iter=2000, class_weight="balanced", solver="saga", n_jobs=-1),
    "RF": RandomForestClassifier(n_estimators=400, random_state=SEED, n_jobs=-1, class_weight="balanced_subsample")
}
if HAS_XGB:
    models["XGB"] = XGBClassifier(
        n_estimators=500, max_depth=4, learning_rate=0.05,
        subsample=0.9, colsample_bytree=0.8, reg_lambda=1.0,
        random_state=SEED, n_jobs=-1, eval_metric="logloss",
        scale_pos_weight=scale_pos_weight
    )

rows=[]
probs={}
for name, m in models.items():
    m.fit(X_train_enc, y_train)
    p = m.predict_proba(X_val_enc)[:,1]
    probs[name]=p
    rows.append([name, average_precision_score(y_val,p), roc_auc_score(y_val,p)])
pd.DataFrame(rows, columns=["model","PR_AUC","ROC_AUC"]).sort_values("PR_AUC", ascending=False)


Unnamed: 0,model,PR_AUC,ROC_AUC
2,XGB,0.936348,0.99681
1,RF,0.877709,0.995194
0,LR,0.650408,0.984019


In [6]:
best_name = pd.DataFrame(rows, columns=["model","PR_AUC","ROC_AUC"]).sort_values("PR_AUC", ascending=False).iloc[0]["model"]
best_model = models[best_name]
y_score = probs[best_name]
print("Chosen:", best_name, "PR-AUC:", average_precision_score(y_val,y_score))


Chosen: XGB PR-AUC: 0.9363482752521389


## 6) 임계값 정책 3종 (F1-opt / Top 5% / Recall 고정)

In [7]:
prec, rec, ths = precision_recall_curve(y_val, y_score)
f1s = 2*prec*rec/(prec+rec+1e-12)

# A) F1-opt
idx = np.argmax(f1s)
th_f1 = ths[idx] if idx < len(ths) else 0.5

# B) Top 5%
th_top5 = np.percentile(y_score, 95)

# C) Recall 고정
TARGET_RECALL=0.90
valid = np.where(rec[:-1] >= TARGET_RECALL)[0]
if len(valid)>0:
    idx_c = valid[np.argmax(prec[valid])]
    th_rec = ths[idx_c]
else:
    th_rec = th_f1

def eval_at(th):
    yp = (y_score >= th).astype(int)
    return {
        "threshold": float(th),
        "precision": precision_score(y_val, yp, zero_division=0),
        "recall": recall_score(y_val, yp, zero_division=0),
        "f1": f1_score(y_val, yp, zero_division=0),
        "alarm_rate": float(yp.mean())
    }

print("F1-opt:", eval_at(th_f1))
print("Top5 :", eval_at(th_top5))
print("Recall>=%.2f:"%TARGET_RECALL, eval_at(th_rec))


F1-opt: {'threshold': 0.7156171798706055, 'precision': 0.7914438502673797, 'recall': 0.925, 'f1': 0.8530259365994236, 'alarm_rate': 0.04675}
Top5 : {'threshold': 0.5712087750434875, 'precision': 0.755, 'recall': 0.94375, 'f1': 0.8388888888888889, 'alarm_rate': 0.05}
Recall>=0.90: {'threshold': 0.7614364624023438, 'precision': 0.7955801104972375, 'recall': 0.9, 'f1': 0.844574780058651, 'alarm_rate': 0.04525}


## 7) 리포트(Top 5% 기준 상위 30개)

In [8]:
th_report = th_top5
val_df = X_val.copy()
val_df["score"] = y_score
val_df["eol_fail"] = y_val.values
val_df["is_anomaly"] = (val_df["score"] >= th_report).astype(int)

report_cols = ["line_id","shift","supplier","material_grade","score","eol_fail"]
report = (val_df[val_df["is_anomaly"]==1][report_cols]
          .sort_values("score", ascending=False)
          .head(30))
report


Unnamed: 0,line_id,shift,supplier,material_grade,score,eol_fail
12875,2,night,E,G3,0.999989,1
19565,4,night,E,G1,0.999978,1
18288,4,swing,E,G1,0.999967,1
6181,6,swing,E,G2,0.999962,1
9028,4,day,E,G2,0.999959,1
19178,6,night,E,G1,0.999958,1
6485,6,night,E,G3,0.999956,1
9220,1,day,E,G3,0.999937,1
9514,4,day,A,G3,0.999933,1
19381,3,night,E,G1,0.999933,1


## 8) (가산) 해석: Permutation Importance (Top 30)
희소 feature name이 길면 상위만 보면 충분

In [9]:
perm = permutation_importance(
    best_model, X_val_enc, y_val, n_repeats=3, random_state=SEED,
    scoring="average_precision"
)
imp = perm.importances_mean
try:
    fn = preprocess.get_feature_names_out()
except Exception:
    fn = np.array([f"f{i}" for i in range(len(imp))])

imp_df = pd.DataFrame({"feature": fn, "importance": imp}).sort_values("importance", ascending=False)
imp_df.head(30)


Unnamed: 0,feature,importance
22,num__error_cnt,0.839379
19,num__vibration_max,0.170803
16,cat__material_grade_G3,0.061994
13,cat__supplier_E,0.045073
21,num__cycle_time,0.032246
20,num__humidity,0.007585
12,cat__supplier_D,0.003389
5,cat__line_id_6,0.001049
18,num__pressure_std,0.000899
4,cat__line_id_5,0.000875


## 9) 시험장 복붙 체크리스트(지도 전용)
- `CSV_PATH`, `target_col`, `cat_cols`만 바꾸기
- 불균형 심하면: LR `class_weight`, XGB `scale_pos_weight` 유지
- 지표는 PR-AUC + F1(정책 기준) 권장
- 정책은 최소 2개(F1-opt + TopN) 비교
