colab으로 돌리는 중

KNN Imputation 추가 — 결측치를 그냥 두는 대신 KNN으로 채움 → 비율 피처들이 NaN 안 나옴


20-fold (5→20) — 학습 데이터 95%로 각 모델이 더 강해짐


Optuna 40 trials — 하이퍼파라미터 자동 최적화



CatBoost 단독 — LGB 빼고 CB에 집중

난자_나이 — 난자 출처(본인/기증)에 따라 실제 난자 나이 결정


시기별_신선난자_구간 — 시기코드 × 신선 난자 수 구간 조합


시기별_해동배아_구간 — 시기코드 × 해동 배아 수 구간 조합


시기×배아출처 — 시기코드 × 동결/신선/both


시기×단일이식, 시기×배란자극, 시기×유전진단


불임 남/여 분리 + 여성_불임_비율


시술 이력 비율 (IVF_임신률, IVF_출산률, 총_출산률)


혼합_생성률, 미세주입_배아_생성률 등 추가 비율


신선_배양시간, 동결_배양시간, 이상적_배양

In [1]:
# ============================================================
# 12번: CatBoost 집중 파이프라인
# - KNN Imputation → 피처 엔지니어링 → OOF TE → Optuna → 20-fold CB
# - 목표: CV 0.741+
# ============================================================

import os, gc, warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
from sklearn.impute import KNNImputer
import catboost as cb

try:
    import optuna
    optuna.logging.set_verbosity(optuna.logging.WARNING)
    HAS_OPTUNA = True
except ImportError:
    HAS_OPTUNA = False
    print("[INFO] optuna 미설치 → 기본 파라미터 사용")

# -------------------------
# Config
# -------------------------
DATA_DIR = "../data"  # 본인 경로
OUT_DIR  = "../outputs"
os.makedirs(OUT_DIR, exist_ok=True)

TARGET_COL = "임신 성공 여부"
ID_COL = "ID"

N_FOLDS_TRAIN = 20     # ★ 학습용 fold (많을수록 학습데이터↑)
N_FOLDS_TUNE  = 5      # Optuna 튜닝용 fold (속도)
SEED = 42
OPTUNA_TRIALS = 40     # 튜닝 횟수 (시간 여유 없으면 20으로)

# GPU 자동 감지
import subprocess
def has_gpu():
    try:
        result = subprocess.run(['nvidia-smi'], capture_output=True, text=True, timeout=5)
        return result.returncode == 0
    except:
        return False

TASK_TYPE = "GPU" if has_gpu() else "CPU"
print(f"[INFO] Device: {TASK_TYPE}")


# ===================================================================
# 1) Imputation
# ===================================================================
def impute_missing(train_df, test_df):
    """DI → 0 채움, 나머지 IVF → KNN imputation"""

    di_fill_cols = [
        '미세주입된 난자 수', '미세주입에서 생성된 배아 수', '총 생성 배아 수', '이식된 배아 수',
        '미세주입 배아 이식 수', '저장된 배아 수', '미세주입 후 저장된 배아 수', '해동된 배아 수',
        '해동 난자 수', '수집된 신선 난자 수', '저장된 신선 난자 수', '혼합된 난자 수',
        '파트너 정자와 혼합된 난자 수', '기증자 정자와 혼합된 난자 수',
        '난자 채취 경과일', '난자 해동 경과일', '배아 이식 경과일', '배아 해동 경과일',
        '동결 배아 사용 여부', '신선 배아 사용 여부', '기증 배아 사용 여부',
        '단일 배아 이식 여부', '난자 혼합 경과일',
        '임신 시도 또는 마지막 임신 경과 연수',
    ]

    for df in [train_df, test_df]:
        di_mask = df['시술 유형'] == 'DI'
        for col in di_fill_cols:
            if col in df.columns:
                df.loc[di_mask, col] = df.loc[di_mask, col].fillna(0)

    # KNN imputation (IVF 중 결측이 남은 수치형)
    knn_cols = [
        '총 생성 배아 수', '미세주입된 난자 수', '미세주입에서 생성된 배아 수',
        '이식된 배아 수', '미세주입 배아 이식 수', '저장된 배아 수',
        '미세주입 후 저장된 배아 수', '해동된 배아 수', '해동 난자 수',
        '수집된 신선 난자 수', '저장된 신선 난자 수', '혼합된 난자 수',
        '파트너 정자와 혼합된 난자 수', '기증자 정자와 혼합된 난자 수',
        '동결 배아 사용 여부', '신선 배아 사용 여부', '기증 배아 사용 여부',
        '단일 배아 이식 여부',
        '배아 이식 경과일', '난자 혼합 경과일', '배아 해동 경과일',
    ]
    knn_cols = [c for c in knn_cols if c in train_df.columns]
    has_null = [c for c in knn_cols if train_df[c].isnull().any() or test_df[c].isnull().any()]

    if has_null:
        print(f"  KNN Imputing {len(has_null)} columns...")
        imputer = KNNImputer(n_neighbors=5, weights='distance')
        train_df[has_null] = imputer.fit_transform(train_df[has_null])
        test_df[has_null] = imputer.transform(test_df[has_null])
        print("  Done.")

    return train_df, test_df


# ===================================================================
# 2) Feature Engineering
# ===================================================================
def preprocess(df):
    d = df.copy()

    # --- 기존 검증 피처 ---
    def major_procedure(x):
        if pd.isna(x): return "Unknown"
        if "IUI" in x: return "IUI"
        if "DI" in x:  return "Other"
        if "ICSI" in x: return "ICSI"
        if "IVF" in x: return "IVF"
        return "Other"
    d["시술_대분류"] = d["특정 시술 유형"].apply(major_procedure)

    d["BLASTOCYST_포함"] = d["특정 시술 유형"].astype(str).str.contains("BLASTOCYST", na=False).astype(int)
    d["AH_포함"] = d["특정 시술 유형"].astype(str).str.contains("AH", na=False).astype(int)

    embryo_stage_cols = [
        "단일 배아 이식 여부", "착상 전 유전 진단 사용 여부", "배아 생성 주요 이유",
        "총 생성 배아 수", "미세주입된 난자 수", "미세주입에서 생성된 배아 수",
        "이식된 배아 수", "미세주입 배아 이식 수", "저장된 배아 수",
        "미세주입 후 저장된 배아 수", "해동된 배아 수", "해동 난자 수",
        "수집된 신선 난자 수", "저장된 신선 난자 수", "혼합된 난자 수",
        "파트너 정자와 혼합된 난자 수", "기증자 정자와 혼합된 난자 수",
        "동결 배아 사용 여부", "신선 배아 사용 여부", "기증 배아 사용 여부", "대리모 여부",
    ]
    d["배아_이식_여부"] = 1 - d[embryo_stage_cols].isna().all(axis=1).astype(int)

    def embryo_stage(row):
        if row["배아_이식_여부"] == 0: return "배아단계_미도달"
        elif pd.isna(row["총 생성 배아 수"]) or row["총 생성 배아 수"] == 0: return "배아생성_실패"
        elif pd.isna(row["이식된 배아 수"]) or row["이식된 배아 수"] == 0: return "이식_미실시"
        else: return "이식_완료"
    d["배아_진행_단계"] = d.apply(embryo_stage, axis=1)

    def collapse_trials(x):
        if x == "0회": return "0회"
        elif x in ["1회","2회"]: return "1–2회"
        else: return "3회 이상"
    d["총시술_bin3"] = d["총 시술 횟수"].apply(collapse_trials)

    def age_group_simple(age):
        if age == "알 수 없음": return "Unknown"
        elif age == "만18-34세": return "34세 이하"
        elif age in ["만35-37세","만38-39세"]: return "35-39세"
        else: return "40세 이상"
    d["나이_3구간"] = d["시술 당시 나이"].apply(age_group_simple)

    def embryo_count_bin(count):
        if pd.isna(count) or count == 0: return "0개"
        elif count <= 2: return "1-2개"
        else: return "3개 이상"
    d["이식배아_구간"] = d["이식된 배아 수"].apply(embryo_count_bin)

    d["Day5_이식_여부"] = (d["배아 이식 경과일"] == 5.0).astype(int)

    infertility_cols = [
        "남성 주 불임 원인", "남성 부 불임 원인", "여성 주 불임 원인", "여성 부 불임 원인",
        "부부 주 불임 원인", "부부 부 불임 원인", "불명확 불임 원인",
        "불임 원인 - 난관 질환", "불임 원인 - 남성 요인", "불임 원인 - 배란 장애",
        "불임 원인 - 여성 요인", "불임 원인 - 자궁경부 문제", "불임 원인 - 자궁내막증",
        "불임 원인 - 정자 농도", "불임 원인 - 정자 면역학적 요인", "불임 원인 - 정자 운동성",
        "불임 원인 - 정자 형태"
    ]
    d["불임_원인_개수"] = d[infertility_cols].sum(axis=1)

    d["배아_해동_실시_여부"] = d["배아 해동 경과일"].notna().astype(int)

    # 비율 (importance 검증됨)
    d["배아_이식_비율"] = d["이식된 배아 수"] / (d["총 생성 배아 수"] + 1)
    d["배아_저장_비율"] = d["저장된 배아 수"] / (d["총 생성 배아 수"] + 1)
    d["배아_생성_효율"] = d["총 생성 배아 수"] / (d["수집된 신선 난자 수"] + 1)
    d["미세주입_생성_효율"] = d["미세주입에서 생성된 배아 수"] / (d["미세주입된 난자 수"] + 1)
    d["난자_활용률"] = d["혼합된 난자 수"] / (d["수집된 신선 난자 수"] + 1)

    # 교호작용 (importance 상위)
    d["나이×Day5"] = d["시술 당시 나이"].astype(str) + "_" + d["Day5_이식_여부"].astype(str)
    d["시술횟수×나이"] = d["총시술_bin3"].astype(str) + "_" + d["나이_3구간"].astype(str)
    d["나이×배아진행"] = d["시술 당시 나이"].astype(str) + "_" + d["배아_진행_단계"]
    d["시기코드×나이"] = d["시술 시기 코드"] + "_" + d["나이_3구간"]
    d["나이×단일이식"] = d["시술 당시 나이"].astype(str) + "_" + d["단일 배아 이식 여부"].fillna(-1).astype(int).astype(str)

    # 배양기간
    d["배양기간"] = d["배아 이식 경과일"] - d["난자 혼합 경과일"]

    # 시술 횟수 ordinal
    ord_map = {"0회":0, "1회":1, "2회":2, "3회":3, "4회":4, "5회":5, "6회 이상":6}
    for col_name, col_src in [("총시술_ord","총 시술 횟수"), ("IVF시술_ord","IVF 시술 횟수"),
                               ("클리닉시술_ord","클리닉 내 총 시술 횟수"), ("DI시술_ord","DI 시술 횟수"),
                               ("총임신_ord","총 임신 횟수"), ("IVF임신_ord","IVF 임신 횟수"),
                               ("총출산_ord","총 출산 횟수"), ("IVF출산_ord","IVF 출산 횟수"),
                               ("DI임신_ord","DI 임신 횟수"), ("DI출산_ord","DI 출산 횟수")]:
        d[col_name] = d[col_src].map(ord_map)

    # --- ★ 신규 피처 ---

    # [1] 난자 나이 (난자 출처에 따라 실제 나이 결정)
    def get_egg_age(row):
        if row["난자 출처"] == "본인 제공":
            return row["시술 당시 나이"]
        elif row["난자 출처"] == "기증 제공":
            donor = row["난자 기증자 나이"]
            return donor if donor != "알 수 없음" else "만18-34세"
        return row["시술 당시 나이"]
    d["난자_나이"] = d.apply(get_egg_age, axis=1)

    # [2] 배아 출처 (동결/신선/both)
    def get_embryo_source(row):
        fr = 0 if pd.isna(row.get("동결 배아 사용 여부")) else int(row["동결 배아 사용 여부"])
        fs = 0 if pd.isna(row.get("신선 배아 사용 여부")) else int(row["신선 배아 사용 여부"])
        if fr and fs: return "both"
        if fr: return "frozen"
        if fs: return "fresh"
        return "none"
    d["배아_출처"] = d.apply(get_embryo_source, axis=1)

    # [3] 시기 × 배아출처
    d["시기×배아출처"] = d["시술 시기 코드"] + "_" + d["배아_출처"]

    # [4] 배아 생성 이유 대분류
    def embryo_reason_simple(x):
        if pd.isna(x): return "미도달"
        if "현재 시술용" in x: return "시술용"
        return "저장기증"
    d["배아생성이유"] = d["배아 생성 주요 이유"].apply(embryo_reason_simple)

    # [5] 배란 자극 × 유도 조합
    d["배란 유도 유형"] = d["배란 유도 유형"].replace({
        '생식선 자극 호르몬': '기록되지 않은 시행', '세트로타이드 (억제제)': '기록되지 않은 시행'
    })
    d["배란_자극_유도"] = d["배란 자극 여부"].astype(str) + "_" + d["배란 유도 유형"].astype(str)

    # [6] 나이 알 수 없음 (성공률 0%)
    d["나이_알수없음"] = (d["시술 당시 나이"] == "알 수 없음").astype(int)

    # [7] 불임 남/여 분리
    male_inf = ["불임 원인 - 남성 요인", "불임 원인 - 정자 농도",
                "불임 원인 - 정자 면역학적 요인", "불임 원인 - 정자 운동성", "불임 원인 - 정자 형태"]
    female_inf = ["불임 원인 - 난관 질환", "불임 원인 - 배란 장애",
                  "불임 원인 - 자궁경부 문제", "불임 원인 - 자궁내막증"]
    d["남성_불임_수"] = d[male_inf].sum(axis=1)
    d["여성_불임_수"] = d[female_inf].sum(axis=1)
    d["여성_불임_비율"] = np.where(
        (d["남성_불임_수"] + d["여성_불임_수"]) == 0, -1,
        d["여성_불임_수"] / (d["남성_불임_수"] + d["여성_불임_수"])
    )

    # [8] 시술 이력 비율
    d["IVF_임신률"] = np.where(d["IVF시술_ord"] == 0, -1, d["IVF임신_ord"] / d["IVF시술_ord"])
    d["IVF_출산률"] = np.where(d["IVF임신_ord"] == 0, -1, d["IVF출산_ord"] / d["IVF임신_ord"])
    d["총_출산률"] = np.where(d["총임신_ord"] == 0, -1, d["총출산_ord"] / d["총임신_ord"])
    d["클리닉_비율"] = np.where(d["총시술_ord"] == 0, -1, d["클리닉시술_ord"] / d["총시술_ord"])

    # [9] 추가 비율
    d["미세주입_이식_비율"] = np.where(d["이식된 배아 수"] == 0, -1, d["미세주입 배아 이식 수"] / d["이식된 배아 수"])
    d["혼합_생성률"] = np.where((d["혼합된 난자 수"] + d["해동 난자 수"]) == 0, -1,
                              d["총 생성 배아 수"] / (d["혼합된 난자 수"] + d["해동 난자 수"]))
    d["미세주입_배아_생성률"] = np.where(d["미세주입된 난자 수"] == 0, -1,
                                     d["미세주입에서 생성된 배아 수"] / d["미세주입된 난자 수"])
    d["총_사용_배아"] = d["해동된 배아 수"] + d["총 생성 배아 수"]
    d["IVF_정자_미혼합"] = ((d["파트너 정자와 혼합된 난자 수"] == 0) &
                          (d["기증자 정자와 혼합된 난자 수"] == 0) &
                          (d["시술 유형"] == "IVF")).astype(int)

    # [10] 시기별 세분화
    d["시기×단일이식"] = d["시술 시기 코드"] + "_" + d["단일 배아 이식 여부"].fillna(0).astype(int).astype(str)
    d["시기×배란자극"] = d["시술 시기 코드"] + "_" + d["배란 자극 여부"].astype(str)
    d["시기×유전진단"] = d["시술 시기 코드"] + "_" + d["착상 전 유전 진단 사용 여부"].fillna(0).astype(int).astype(str)

    # [11] 배양 시간 (신선/동결)
    d["신선_배양시간"] = (d["배아 이식 경과일"] - d["난자 혼합 경과일"]).fillna(0)
    d["동결_배양시간"] = (d["배아 이식 경과일"] - d["배아 해동 경과일"]).fillna(0)
    d["이상적_배양"] = d["배아 이식 경과일"].isin([3.0, 5.0]).astype(int)

    # [12] 신선 배아 + 시기별 난자 수 구간 (기술 발전 반영)
    def fresh_egg_tier(row):
        if pd.isna(row["신선 배아 사용 여부"]) or row["신선 배아 사용 여부"] == 0:
            return "not_fresh"
        eggs = row["수집된 신선 난자 수"] if not pd.isna(row["수집된 신선 난자 수"]) else 0
        code = row["시술 시기 코드"]
        if eggs > 10: return f"{code}_high"
        elif eggs > 0: return f"{code}_low"
        return f"{code}_zero"
    d["시기별_신선난자_구간"] = d.apply(fresh_egg_tier, axis=1)

    # [13] 동결 배아 + 시기별 해동 수 구간
    def frozen_thaw_tier(row):
        if pd.isna(row["동결 배아 사용 여부"]) or row["동결 배아 사용 여부"] == 0:
            return "not_frozen"
        thawed = row["해동된 배아 수"] if not pd.isna(row["해동된 배아 수"]) else 0
        code = row["시술 시기 코드"]
        if thawed > 3: return f"{code}_many"
        elif thawed > 0: return f"{code}_few"
        return f"{code}_zero"
    d["시기별_해동배아_구간"] = d.apply(frozen_thaw_tier, axis=1)

    # [14] PGD/PGS 통합
    d["유전검사_합"] = d["착상 전 유전 진단 사용 여부"].fillna(0) + d["착상 전 유전 검사 사용 여부"].fillna(0)
    d["PGD_실시"] = d["PGD 시술 여부"].notna().astype(int)
    d["PGS_실시"] = d["PGS 시술 여부"].notna().astype(int)

    # 대리모 -1 처리
    d["대리모 여부"] = d["대리모 여부"].fillna(-1)

    return d


# ===================================================================
# 3) OOF Target Encoding
# ===================================================================
def oof_target_encode(train_df, test_df, col, target_col, n_folds=5, seed=42, smoothing=20):
    global_mean = train_df[target_col].mean()
    train_enc = pd.Series(np.nan, index=train_df.index, dtype=float)
    skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)

    for tr_idx, va_idx in skf.split(train_df, train_df[target_col]):
        tr = train_df.iloc[tr_idx]
        stats = tr.groupby(col)[target_col].agg(["mean", "count"])
        stats["enc"] = (stats["mean"] * stats["count"] + global_mean * smoothing) / (stats["count"] + smoothing)
        mapping = stats["enc"].to_dict()
        train_enc.iloc[va_idx] = train_df.iloc[va_idx][col].map(mapping).fillna(global_mean)

    stats = train_df.groupby(col)[target_col].agg(["mean", "count"])
    stats["enc"] = (stats["mean"] * stats["count"] + global_mean * smoothing) / (stats["count"] + smoothing)
    test_enc = test_df[col].map(stats["enc"].to_dict()).fillna(global_mean)

    return train_enc.values, test_enc.values


# ===================================================================
# 4) Optuna CatBoost Tuning
# ===================================================================
def tune_catboost_optuna(X_train, y, cat_indices, n_trials=40):
    if not HAS_OPTUNA:
        print("[SKIP] Optuna 미설치")
        return None

    def objective(trial):
        params = {
            'iterations': 2000,
            'learning_rate': trial.suggest_float('learning_rate', 0.02, 0.1, log=True),
            'depth': trial.suggest_int('depth', 5, 9),
            'l2_leaf_reg': trial.suggest_float('l2_leaf_reg', 1.0, 10.0),
            'random_strength': trial.suggest_float('random_strength', 0.1, 2.0),
            'bagging_temperature': trial.suggest_float('bagging_temperature', 0.0, 1.0),
            'border_count': trial.suggest_int('border_count', 64, 255),
            'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 1, 50),
            'random_seed': SEED,
            'eval_metric': 'AUC',
            'verbose': 0,
            'early_stopping_rounds': 100,
            'use_best_model': True,
            'task_type': TASK_TYPE,
        }

        skf = StratifiedKFold(n_splits=N_FOLDS_TUNE, shuffle=True, random_state=SEED)
        aucs = []

        for tr_idx, va_idx in skf.split(X_train, y):
            pool_tr = cb.Pool(X_train.iloc[tr_idx], y[tr_idx], cat_features=cat_indices)
            pool_va = cb.Pool(X_train.iloc[va_idx], y[va_idx], cat_features=cat_indices)

            model = cb.CatBoostClassifier(**params)
            model.fit(pool_tr, eval_set=pool_va)

            pred = model.predict_proba(X_train.iloc[va_idx])[:, 1]
            aucs.append(roc_auc_score(y[va_idx], pred))

        return np.mean(aucs)

    print(f"\n[Optuna] CatBoost 튜닝 ({n_trials} trials, {N_FOLDS_TUNE}-fold)...")
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials)

    print(f"[Optuna] Best CV AUC: {study.best_value:.6f}")
    print(f"[Optuna] Best params: {study.best_params}")
    return study.best_params


# ===================================================================
# 5) Main
# ===================================================================
def main():
    print("=" * 80)
    print(f"12번: CatBoost 집중 (fold={N_FOLDS_TRAIN}, optuna={OPTUNA_TRIALS}trials)")
    print("=" * 80)

    train_raw = pd.read_csv(os.path.join(DATA_DIR, "train.csv"))
    test_raw  = pd.read_csv(os.path.join(DATA_DIR, "test.csv"))
    sub       = pd.read_csv(os.path.join(DATA_DIR, "sample_submission.csv"))

    # Step 1: Imputation
    print("\n[Step 1] Imputation...")
    train_raw, test_raw = impute_missing(train_raw, test_raw)

    # Step 2: Feature Engineering
    print("\n[Step 2] Feature Engineering...")
    train_df = preprocess(train_raw)
    test_df  = preprocess(test_raw)
    y = train_df[TARGET_COL].astype(int).values

    # Step 3: OOF Target Encoding
    drop_cols = [ID_COL, TARGET_COL]

    te_cols = [
        "시술 시기 코드", "특정 시술 유형", "시술 당시 나이", "배아_진행_단계",
        "나이×Day5", "시술횟수×나이", "나이×배아진행", "시기코드×나이",
        "배아생성이유", "난자_나이", "시기×배아출처", "배란_자극_유도",
        "시기×단일이식", "시기×배란자극", "나이×단일이식",
        "시기별_신선난자_구간", "시기별_해동배아_구간", "시기×유전진단",
    ]

    print("\n[Step 3] OOF Target Encoding...")
    for col in te_cols:
        if col not in train_df.columns:
            continue
        tr_enc, te_enc = oof_target_encode(train_df, test_df, col, TARGET_COL,
                                            n_folds=N_FOLDS_TRAIN, seed=SEED, smoothing=20)
        train_df[f"TE_{col}"] = tr_enc
        test_df[f"TE_{col}"] = te_enc

    # Feature 준비
    feature_cols = [c for c in train_df.columns if c not in drop_cols]
    cat_cols = [c for c in feature_cols if str(train_df[c].dtype) in ["object", "category"]]
    num_cols = [c for c in feature_cols if c not in cat_cols]

    print(f"\nFeatures: {len(feature_cols)} (cat={len(cat_cols)}, num={len(num_cols)})")

    # CatBoost 데이터 준비
    X_train = train_df[feature_cols].copy()
    X_test  = test_df[feature_cols].copy()
    for c in cat_cols:
        X_train[c] = X_train[c].astype(str).fillna("NA")
        X_test[c]  = X_test[c].astype(str).fillna("NA")
    X_train[num_cols] = X_train[num_cols].fillna(-999)
    X_test[num_cols]  = X_test[num_cols].fillna(-999)
    cat_indices = [feature_cols.index(c) for c in cat_cols]

    # Step 4: Optuna
    print("\n[Step 4] Hyperparameter Tuning...")
    best_params = tune_catboost_optuna(X_train, y, cat_indices, n_trials=OPTUNA_TRIALS)

    # Step 5: Final Training
    print("\n" + "=" * 60)
    print(f"[Step 5] CatBoost Final Training ({N_FOLDS_TRAIN}-fold)")
    print("=" * 60)

    cb_params = {
        'iterations': 3000,
        'random_seed': SEED,
        'eval_metric': 'AUC',
        'loss_function': 'Logloss',
        'verbose': 500,
        'early_stopping_rounds': 200,
        'use_best_model': True,
        'task_type': TASK_TYPE,
    }
    if best_params:
        cb_params.update(best_params)
    else:
        cb_params.update({
            'learning_rate': 0.05,
            'depth': 7,
            'l2_leaf_reg': 5.0,
            'random_strength': 1.0,
            'bagging_temperature': 0.1,
            'border_count': 170,
        })

    skf = StratifiedKFold(n_splits=N_FOLDS_TRAIN, shuffle=True, random_state=SEED)
    oof_pred = np.zeros(len(train_df))
    test_pred = np.zeros(len(test_df))
    fold_aucs = []

    for fold, (tr_idx, va_idx) in enumerate(skf.split(X_train, y), 1):
        pool_tr = cb.Pool(X_train.iloc[tr_idx], y[tr_idx], cat_features=cat_indices)
        pool_va = cb.Pool(X_train.iloc[va_idx], y[va_idx], cat_features=cat_indices)

        model = cb.CatBoostClassifier(**cb_params)
        model.fit(pool_tr, eval_set=pool_va)

        oof_pred[va_idx] = model.predict_proba(X_train.iloc[va_idx])[:, 1]
        test_pred += model.predict_proba(X_test)[:, 1] / N_FOLDS_TRAIN

        fauc = roc_auc_score(y[va_idx], oof_pred[va_idx])
        fold_aucs.append(fauc)
        print(f"  Fold {fold}/{N_FOLDS_TRAIN}: AUC = {fauc:.6f} (iter={model.best_iteration_})")

    oof_auc = roc_auc_score(y, oof_pred)
    print(f"\n{'='*60}")
    print(f">>> CatBoost OOF AUC = {oof_auc:.6f}")
    print(f">>> Fold AUCs: mean={np.mean(fold_aucs):.6f}, std={np.std(fold_aucs):.6f}")
    print(f"{'='*60}")

    # Feature importance
    fi = pd.DataFrame({
        "feature": feature_cols,
        "importance": model.get_feature_importance()
    }).sort_values("importance", ascending=False)
    print("\n=== Top 30 Features ===")
    print(fi.head(30).to_string())

    # Save
    sub2 = sub.copy()
    sub2["probability"] = test_pred
    sub_path = os.path.join(OUT_DIR, f"12_cb_CV{oof_auc:.6f}.csv")
    sub2.to_csv(sub_path, index=False)
    print(f"\nSaved: {sub_path}")

    np.save(os.path.join(OUT_DIR, "12_oof_cb.npy"), oof_pred)
    np.save(os.path.join(OUT_DIR, "12_test_cb.npy"), test_pred)

    with open(os.path.join(OUT_DIR, "12_summary.txt"), "w", encoding="utf-8") as f:
        f.write(f"12번 CatBoost 집중 파이프라인\n{'='*60}\n")
        f.write(f"N_FOLDS: {N_FOLDS_TRAIN}\n")
        f.write(f"OOF AUC: {oof_auc:.6f}\n")
        f.write(f"Fold mean: {np.mean(fold_aucs):.6f} ± {np.std(fold_aucs):.6f}\n")
        f.write(f"Optuna params: {best_params}\n")
        f.write(f"\nFeatures: {len(feature_cols)} (cat={len(cat_cols)}, num={len(num_cols)})\n")
        f.write(f"\nTop 30:\n{fi.head(30).to_string()}\n")

    print("\nDONE!")


if __name__ == "__main__":
    main()

[33m[W 2026-02-11 21:28:29,541][0m Trial 35 failed with parameters: {'learning_rate': 0.02447823522360051, 'depth': 6, 'l2_leaf_reg': 1.8415454101990258, 'random_strength': 1.1144237271737059, 'bagging_temperature': 0.6131417781689361, 'border_count': 183, 'min_data_in_leaf': 28} because of the following error: KeyboardInterrupt('').[0m
Traceback (most recent call last):
  File "/opt/anaconda3/envs/fertility_ai/lib/python3.11/site-packages/optuna/study/_optimize.py", line 206, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/zb/7vcy0_ts23q_3pjlgfdd_cl80000gn/T/ipykernel_66233/3055623727.py", line 365, in objective
    model.fit(pool_tr, eval_set=pool_va)
  File "/opt/anaconda3/envs/fertility_ai/lib/python3.11/site-packages/catboost/core.py", line 5245, in fit
    self._fit(X, y, cat_features, text_features, embedding_features, None, graph, sample_weight, None, None, None, None, baseline, use_best_model,
  File "/opt/anaconda3/env

KeyboardInterrupt: 