# 1. 프로젝트 설명: Best Model  
제5회 KAIST-POSTECH-UNIST 데이터사이언스 경진대회  
**타이어 불량 예측 및 시험 생산 의사결정 모델링**

이 파일은 경진대회에서 가장 높은 점수를 기록한 모델 파이프라인을 정리한 것입니다.

- 시뮬레이션/공정 데이터를 활용한 **불량(NG) 예측**
- 비용 구조를 고려한 **Threshold 최적화 & Top-K 선택**
- 최종적으로 `submission_best.csv` 제출 파일을 생성하는 것을 목표로 합니다.


# 2. 라이브러리 & 데이터 로드

In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import KFold, GroupKFold
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import roc_auc_score
from pandas.api.types import CategoricalDtype

from catboost import CatBoostClassifier

from google.colab import drive

# Colab 환경에서 Google Drive 마운트
drive.mount('/content/drive')

# 대회에서 제공된 데이터 로드
train = pd.read_csv("/content/drive/MyDrive/train.csv")
test = pd.read_csv("/content/drive/MyDrive/test.csv")
submission_base = pd.read_csv("/content/drive/MyDrive/sample_submission.csv")

# 원본 보존을 위해 작업용 복사본 사용
train_fe = train.copy()
test_fe = test.copy()

# 3. 기본 전처리 & p/x/y 컬럼 정의

In [None]:
# Class 컬럼을 NG(1) / Good(0) 이진 타깃으로 변환
train_fe["NG"] = train_fe["Class"].map({"Good": 0, "NG": 1}).astype(int)

# 학습용 데이터에서는 Class 컬럼 제거
train_fe = train_fe.drop(columns=["Class"])

# 테스트 데이터에서는 ID만 제거 (ID는 제출용에서 따로 사용)
test_fe = test_fe.drop(columns=["ID"])

# p, x, y로 시작하는 시뮬레이션 컬럼들을 인덱스 순서대로 정렬
p_cols = sorted(
    [c for c in train.columns if c.startswith("p") and c[1:].isdigit()],
    key=lambda x: int(x[1:])
)
x_cols = sorted(
    [c for c in train.columns if c.startswith("x") and c[1:].isdigit()],
    key=lambda x: int(x[1:])
)
y_cols = sorted(
    [c for c in train.columns if c.startswith("y") and c[1:].isdigit()],
    key=lambda x: int(x[1:])
)

print("p feature 개수 :", len(p_cols))
print("x feature 개수 :", len(x_cols))
print("y feature 개수 :", len(y_cols))

# 4. Feature Engineering 함수

In [None]:
# x, y 시뮬레이션 결과를 요약 통계로 압축
def add_xy_stats(df):
    # x 요약
    df["x_mean"] = df[x_cols].mean(axis=1)
    df["x_std"]  = df[x_cols].std(axis=1)
    df["x_max"]  = df[x_cols].max(axis=1)
    df["x_min"]  = df[x_cols].min(axis=1)

    # y 요약
    df["y_mean"] = df[y_cols].mean(axis=1)
    df["y_std"]  = df[y_cols].std(axis=1)
    df["y_max"]  = df[y_cols].max(axis=1)
    df["y_min"]  = df[y_cols].min(axis=1)

    # x / y 비율 (스케일 차이를 고려해 클리핑)
    ratio = df["x_mean"] / (df["y_mean"].abs() + 1e-6)
    df["x_y_ratio"] = ratio.clip(-10, 10)
    return df


# p 컬럼들을 일정 구간 단위(chunk)로 묶어 평균/표준편차를 계산
def add_p_chunks(df, size=16):
    # 예: size=16이면 p0~p15, p16~p31 ... 구간별 통계 추가
    for i in range(0, len(p_cols), size):
        cols = p_cols[i:i + size]
        df[f"pchunk_{i//size}_mean"] = df[cols].mean(axis=1)
        df[f"pchunk_{i//size}_std"]  = df[cols].std(axis=1)
    return df


# p 컬럼의 인접한 값들 간 차이를 요약
def add_p_diff(df):
    # 행 기준으로 오른쪽 컬럼과의 차이를 계산
    diff = df[p_cols].diff(axis=1).iloc[:, 1:]
    df["p_diff_mean"] = diff.mean(axis=1)
    df["p_diff_std"]  = diff.std(axis=1)
    df["p_diff_max"]  = diff.abs().max(axis=1)
    return df


# p 컬럼에 PCA를 적용해 차원 축소 (시뮬레이션 곡면을 압축 표현)
def run_p_pca(train_df, test_df, n=15):
    scaler = StandardScaler()
    train_scaled = scaler.fit_transform(train_df[p_cols])
    test_scaled  = scaler.transform(test_df[p_cols])

    pca = PCA(n_components=n, random_state=42)
    train_p = pca.fit_transform(train_scaled)
    test_p  = pca.transform(test_scaled)

    for i in range(n):
        train_df[f"PCA_{i}"] = train_p[:, i]
        test_df[f"PCA_{i}"]  = test_p[:, i]
    return train_df, test_df


# 설계 스펙을 범주형 타입으로 지정 (CatBoost에서 카테고리로 처리)
def add_design_categories(df):
    df["Width"]  = df["Width"].astype("category")
    df["Aspect"] = df["Aspect"].astype("category")
    df["Inch"]   = df["Inch"].astype("category")
    return df


# Target Encoding (카테고리별 평균 타깃값을 인코딩) + 약간의 노이즈
def target_encode(train_df, test_df, col, target="NG", n_splits=5, noise=0.01):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    te_col = f"{col}_TE"
    train_df[te_col] = np.nan

    # KFold로 out-of-fold 방식 target encoding
    for tr_idx, val_idx in kf.split(train_df):
        tr = train_df.iloc[tr_idx]
        val = train_df.iloc[val_idx]
        mapping = tr.groupby(col)[target].mean()
        train_df.loc[val_idx, te_col] = val[col].map(mapping)

    # 전체 train 기준 mapping으로 test 인코딩
    full_map = train_df.groupby(col)[target].mean()
    test_df[te_col] = test_df[col].map(full_map)

    # 과적합 완화를 위해 작은 노이즈 추가
    train_df[te_col] += np.random.normal(0, noise, len(train_df))
    test_df[te_col]  += np.random.normal(0, noise, len(test_df))

    return train_df, test_df


# Mass_Pilot(파일럿 여부)와 PCA 축을 곱한 상호작용 피처
def add_masspilot_interaction(df, n):
    for i in range(n):
        df[f"MP_PCA{i}"] = df["Mass_Pilot"].astype(int) * df[f"PCA_{i}"]
    return df


# p 원본 컬럼을 기반으로 KMeans 클러스터 번호를 부여
def add_cluster(train_raw, test_raw, train_fe, test_fe, n_clusters=5):
    # p 차원 수가 많아서 PCA로 먼저 50차원 축소
    pca = PCA(n_components=50, random_state=42)
    train_p = pca.fit_transform(train_raw)
    test_p  = pca.transform(test_raw)

    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    train_fe["cluster"] = kmeans.fit_predict(train_p)
    test_fe["cluster"]  = kmeans.predict(test_p)

    train_fe["cluster"] = train_fe["cluster"].astype("category")
    test_fe["cluster"]  = test_fe["cluster"].astype("category")
    return train_fe, test_fe


# 학습에 사용할 feature 리스트 구성
def get_feature_list(df, drop=["ID", "Plant", "Class"], target="NG"):
    ignore = set(drop + [target])
    return [c for c in df.columns if c not in ignore]

# 5. Feature Engineering 전체 실행

In [None]:
# 1) p를 구간 단위로 요약
train_fe = add_p_chunks(train_fe)
test_fe  = add_p_chunks(test_fe)

# 2) p의 인접 차이 요약
train_fe = add_p_diff(train_fe)
test_fe  = add_p_diff(test_fe)

# 3) p에 PCA 적용 (15차원으로 축소)
train_fe, test_fe = run_p_pca(train_fe, test_fe, n=15)

# 4) 설계 스펙을 범주형으로 설정
train_fe = add_design_categories(train_fe)
test_fe  = add_design_categories(test_fe)

# 5) Plant 문자열에서 번호만 추출 후 정수형으로 변환
train_fe["Plant"] = train_fe["Plant"].astype(str).str.replace("Plant_", "").astype(int)
test_fe["Plant"]  = test_fe["Plant"].astype(str).str.replace("Plant_", "").astype(int)

# 6) Plant, Mass_Pilot에 대해 Target Encoding
train_fe, test_fe = target_encode(train_fe, test_fe, col="Plant")
train_fe, test_fe = target_encode(train_fe, test_fe, col="Mass_Pilot")

# 7) Mass_Pilot × PCA 상호작용 피처
train_fe = add_masspilot_interaction(train_fe, 15)
test_fe  = add_masspilot_interaction(test_fe, 15)

# 8) x, y 요약 통계 피처 추가
train_fe = add_xy_stats(train_fe)
test_fe  = add_xy_stats(test_fe)

# 9) 원본 p 값 기반 KMeans 클러스터링
train_fe, test_fe = add_cluster(train[p_cols], test[p_cols], train_fe, test_fe)

# 10) object 타입은 category로 변환 (CatBoost에서 카테고리로 처리)
obj_cols = train_fe.columns[train_fe.dtypes == "object"]
obj_cols = [c for c in obj_cols if c not in ["Class"]]  # 혹시 남아있을 Class는 제외
print("object cols -> category로 변환:", obj_cols)

for col in obj_cols:
    train_fe[col] = train_fe[col].astype("category")
    if col in test_fe.columns:
        test_fe[col] = test_fe[col].astype("category")

# 11) PCA로 대체했으므로 p 원본 컬럼은 드롭
train_fe = train_fe.drop(columns=p_cols, errors="ignore")
test_fe  = test_fe.drop(columns=p_cols, errors="ignore")

# 12) 최종 학습에 사용할 feature 리스트
final_features = get_feature_list(train_fe, drop=["Plant"], target="NG")
print("최종 feature 수:", len(final_features))

# 6. Profit 함수 & GroupKFold Cross Validation

In [None]:
# 대회에서 정의된 수익 구조
GAIN_GOOD = 100    # 좋은 타이어를 선택했을 때 +100
GAIN_NG   = -2000  # 불량 타이어를 선택했을 때 -2000

def profit_threshold(y, pred, th):
    """
    예측 확률(pred) <= threshold 인 샘플을 선택한다고 가정했을 때의
    총 수익을 계산한다.
    y: 0(좋음), 1(NG)
    pred: NG가 될 확률(값이 낮을수록 좋은 타이어)
    """
    sel = pred <= th
    if sel.sum() == 0:
        return 0
    bad = y[sel]
    return (bad == 0).sum() * GAIN_GOOD + (bad == 1).sum() * GAIN_NG


SEED = 42
N_SPLITS = 5

# CatBoost에서 사용할 카테고리 feature index 추출
cat_idx = [
    i for i, c in enumerate(final_features)
    if isinstance(train_fe[c].dtype, CategoricalDtype)
]
print("categorical features:", [final_features[i] for i in cat_idx])

# 공장(Plant)을 기준으로 GroupKFold 수행
gkf = GroupKFold(n_splits=N_SPLITS)

X_all = train_fe[final_features]
y_all = train_fe["NG"].values
groups = train_fe["Plant"].values

oof_pred = np.zeros(len(train_fe))
fold_auc = []

# Fold 0의 validation 결과를 이용해 threshold를 최적화
best_thr_fold0 = None
best_profit_fold0 = -1e18

# 각 fold별 test 예측도 저장 (필요 시 앙상블 가능)
test_pred_folds = np.zeros((len(test_fe), N_SPLITS))

print("\n===== GroupKFold CV 시작 =====")
for fold, (tr_idx, val_idx) in enumerate(gkf.split(X_all, y_all, groups)):
    print(f"\n--- Fold {fold} ---")
    X_tr, X_val = X_all.iloc[tr_idx], X_all.iloc[val_idx]
    y_tr, y_val = y_all[tr_idx], y_all[val_idx]

    model = CatBoostClassifier(
        iterations=1500,
        learning_rate=0.03,
        depth=8,
        loss_function="Logloss",
        eval_metric="AUC",
        random_state=SEED,
        verbose=False,
    )

    model.fit(X_tr, y_tr, cat_features=cat_idx)

    # validation에서 NG일 확률 예측
    val_pred = model.predict_proba(X_val)[:, 1]
    oof_pred[val_idx] = val_pred

    auc = roc_auc_score(y_val, val_pred)
    fold_auc.append(auc)
    print(f"Fold {fold} AUC: {auc:.4f}")

    # test 예측 저장
    test_pred_folds[:, fold] = model.predict_proba(test_fe[final_features])[:, 1]

    # Fold0에서만 threshold 후보들을 탐색해 best threshold 선택
    if fold == 0:
        ths = np.linspace(val_pred.min(), val_pred.max(), 501)
        for th in ths:
            p = profit_threshold(y_val, val_pred, th)
            if p > best_profit_fold0:
                best_profit_fold0 = p
                best_thr_fold0 = th
        print(f"Fold0 기준 best threshold: {best_thr_fold0:.6f}")
        print(f"Fold0 기준 best profit   : {best_profit_fold0:.0f}")

# 전체 OOF AUC 확인
oof_auc = roc_auc_score(y_all, oof_pred)
print("\nFold별 AUC:", [f"{x:.4f}" for x in fold_auc])
print("OOF AUC   :", oof_auc)
print("최종 사용 threshold (Fold0 기반):", best_thr_fold0)

# 7. 전체 Train으로 최종 모델 학습

In [None]:
# CV에서 사용한 설정과 동일하게 전체 train으로 최종 모델 학습
final_model = CatBoostClassifier(
    iterations=1500,
    learning_rate=0.03,
    depth=8,
    loss_function="Logloss",
    eval_metric="AUC",
    random_state=SEED,
    verbose=False,
)

final_model.fit(X_all, y_all, cat_features=cat_idx)

# 최종 모델의 test 예측 (NG 확률)
test_pred_final = final_model.predict_proba(test_fe[final_features])[:, 1]
print("test_pred_final 범위:", test_pred_final.min(), " ~ ", test_pred_final.max())