In [None]:
import sys
# ====================================================
# Library
# ====================================================
import os
import gc
import warnings
warnings.filterwarnings('ignore')
import random
import scipy as sp
import numpy as np
import math
import pandas as pd
from glob import glob
from pathlib import Path
import joblib
import pickle
import itertools
from tqdm.auto import tqdm
pd.set_option('display.max_columns', None)

import plotly.colors as pc
import plotly.express as px
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import japanize_matplotlib
import seaborn as sns
import random
from sklearn.preprocessing import MinMaxScaler

import rich
from rich import pretty, print
from rich.console import Console
from sklearn.model_selection import KFold, StratifiedKFold, train_test_split, GroupKFold, TimeSeriesSplit

from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import LabelEncoder
from scipy.stats import rankdata
from scipy.optimize import minimize
import lightgbm as lgb
import xgboost as xgb
from xgboost import XGBRegressor, XGBClassifier
from catboost import Pool, CatBoostRegressor, CatBoostClassifier

In [None]:
# ====================================================
# Configurations (TODO: プロジェクトごとに変更)
# ====================================================
class CFG:
    VER = 1
    COMPETITION = ''  # TODO: コンペ名
    DATA_PATH = Path('../input')
    OOF_DATA_PATH = Path(f'../oof/{VER}')
    MODEL_DATA_PATH = Path(f'../models/{VER}')
    SUB_DATA_PATH = Path(f'../submission/{VER}')
    METHOD_LIST = ['lightgbm', 'xgboost']  # 使用するモデル
    SEED = 57
    SEED_LIST = [1, 57, 143, 2026, 9999]
    n_folds = 5
    target_col = ''  # TODO: ターゲットカラム名
    id_col = ''  # TODO: IDカラム名
    metric = ''  # TODO: 評価指標
    num_boost_round = 100000
    early_stopping_round = 100
    verbose = 250

    feature_drop_cols = []  # TODO: 特徴量から除外するカラム

    categorical_cols = []  # TODO: カテゴリカル変数

    # LightGBM (二値分類)
    classification_lgb_params = {
        'objective': 'binary',
        'metric': 'auc',
        'learning_rate': 0.01,
        'num_leaves': 63,
        'colsample_bytree': 0.4,
        'reg_alpha': 0.4,
        'seed': SEED,
    }

    # XGBoost (二値分類)
    classification_xgb_params = {
        'device': 'cuda',
        'objective': 'binary:logistic',
        'eval_metric': 'auc',
        'learning_rate': 0.01,
        'max_depth': 8,
        'enable_categorical': True,
        'random_state': SEED,
    }

    # CatBoost (二値分類)
    classification_cat_params = {
        'eval_metric': 'AUC',
        'learning_rate': 0.01,
        'iterations': num_boost_round,
        'depth': 8,
        'random_seed': SEED,
        'task_type': 'GPU'
    }

    model_weight_dict = {'lightgbm': 0.50, 'xgboost': 0.50}  # 初期重み

In [None]:
# ====================================================
# Setup
# ====================================================
os.makedirs(CFG.MODEL_DATA_PATH, exist_ok=True)
os.makedirs(CFG.OOF_DATA_PATH, exist_ok=True)
os.makedirs(CFG.SUB_DATA_PATH, exist_ok=True)

def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

seed_everything(CFG.SEED)

# Load Data (TODO: プロジェクトごとに実装)

In [None]:
# TODO: データ読み込みを実装
# train_df = pd.read_csv(CFG.DATA_PATH / 'train.csv')
# test_df = pd.read_csv(CFG.DATA_PATH / 'test.csv')
# sample_submission_df = pd.read_csv(CFG.DATA_PATH / 'sample_submission.csv')

# Preprocess (TODO: プロジェクトごとに実装)

In [None]:
# TODO: 前処理を実装
# def Preprocessing(train_df, test_df):
#     feature_cols = [col for col in train_df.columns if col not in [CFG.target_col, CFG.id_col] + CFG.feature_drop_cols]
#     return train_df, test_df, feature_cols
#
# train_df, test_df, FEATURES = Preprocessing(train_df, test_df)

# Modeling (汎用関数)

In [None]:
# ====================================================
# Training Functions
# ====================================================
def lightgbm_training(x_train, y_train, x_valid, y_valid, params=None):
    if params is None:
        params = CFG.classification_lgb_params
    lgb_train = lgb.Dataset(x_train, y_train)
    lgb_valid = lgb.Dataset(x_valid, y_valid)

    model = lgb.train(
        params=params,
        train_set=lgb_train,
        num_boost_round=CFG.num_boost_round,
        valid_sets=[lgb_train, lgb_valid],
        callbacks=[
            lgb.early_stopping(stopping_rounds=CFG.early_stopping_round, verbose=CFG.verbose),
            lgb.log_evaluation(CFG.verbose),
        ]
    )
    valid_pred = model.predict(x_valid)
    return model, valid_pred


def xgboost_training(x_train, y_train, x_valid, y_valid, params=None):
    if params is None:
        params = CFG.classification_xgb_params
    xgb_train = xgb.DMatrix(data=x_train, label=y_train)
    xgb_valid = xgb.DMatrix(data=x_valid, label=y_valid)
    model = xgb.train(
        params,
        dtrain=xgb_train,
        num_boost_round=CFG.num_boost_round,
        evals=[(xgb_train, 'train'), (xgb_valid, 'eval')],
        early_stopping_rounds=CFG.early_stopping_round,
        verbose_eval=CFG.verbose
    )
    valid_pred = model.predict(xgb.DMatrix(x_valid))
    return model, valid_pred


def catboost_training(x_train, y_train, x_valid, y_valid, params=None):
    if params is None:
        params = CFG.classification_cat_params
    cat_train = Pool(data=x_train, label=y_train)
    cat_valid = Pool(data=x_valid, label=y_valid)
    model = CatBoostClassifier(**params)
    model.fit(
        cat_train,
        eval_set=[cat_valid],
        early_stopping_rounds=CFG.early_stopping_round,
        verbose=CFG.verbose,
        use_best_model=True
    )
    valid_pred = model.predict_proba(x_valid)[:, 1]
    return model, valid_pred

In [None]:
# ====================================================
# Cross Validation Training
# ====================================================
def apply_fold_rank_transform(oof_df, pred_col):
    """各foldの予測値をfold内でrank変換"""
    rank_predictions = np.zeros(len(oof_df))
    for fold in oof_df['fold'].unique():
        mask = oof_df['fold'] == fold
        fold_pred = oof_df.loc[mask, pred_col].values
        rank_predictions[mask] = rankdata(fold_pred) / len(fold_pred)
    return rank_predictions


def gradient_boosting_model_cv_training(method, train_df, features, target_col=None, id_col=None):
    """勾配ブースティングモデルのCV学習"""
    if target_col is None:
        target_col = CFG.target_col
    if id_col is None:
        id_col = CFG.id_col

    seed_oof_predictions = np.zeros(len(train_df))

    for seed in CFG.SEED_LIST:
        print(f'Using seed {seed}')

        oof_predictions = np.zeros(len(train_df))
        oof_fold = np.zeros(len(train_df))

        skf = StratifiedKFold(n_splits=CFG.n_folds, shuffle=True, random_state=seed)
        for fold, (train_index, valid_index) in enumerate(skf.split(X=train_df, y=train_df[target_col])):
            print('-' * 50)
            print(f'{method} training fold {fold + 1} in seed {seed}')

            x_train = train_df[features].iloc[train_index]
            y_train = train_df[target_col].iloc[train_index]
            x_valid = train_df[features].iloc[valid_index]
            y_valid = train_df[target_col].iloc[valid_index]

            if method == 'lightgbm':
                model, valid_pred = lightgbm_training(x_train, y_train, x_valid, y_valid)
            elif method == 'xgboost':
                model, valid_pred = xgboost_training(x_train, y_train, x_valid, y_valid)
            elif method == 'catboost':
                model, valid_pred = catboost_training(x_train, y_train, x_valid, y_valid)
            else:
                raise ValueError(f'Unknown method: {method}')

            # Save model
            pickle.dump(model, open(CFG.MODEL_DATA_PATH / f'{method}_fold{fold + 1}_seed{seed}_ver{CFG.VER}.pkl', 'wb'))
            oof_predictions[valid_index] = valid_pred
            oof_fold[valid_index] = fold + 1

            del x_train, x_valid, y_train, y_valid, model, valid_pred
            gc.collect()

        # OOF保存
        oof_df = pd.DataFrame({
            id_col: train_df[id_col],
            target_col: train_df[target_col],
            f'{method}_prediction': oof_predictions,
            'fold': oof_fold
        })
        oof_df.to_csv(CFG.OOF_DATA_PATH / f'oof_{method}_seed{seed}_ver{CFG.VER}.csv', index=False)

        # スコア計算
        rank_predictions = apply_fold_rank_transform(oof_df, f'{method}_prediction')
        score = roc_auc_score(train_df[target_col], rank_predictions)
        print(f'{method} seed {seed} OOF CV auc (rank transformed): {score:.6f}')

        seed_oof_predictions += oof_predictions

    # seed平均
    seed_oof_predictions /= len(CFG.SEED_LIST)
    oof_avg_df = pd.DataFrame({
        id_col: train_df[id_col],
        target_col: train_df[target_col],
        f'{method}_prediction': seed_oof_predictions
    })
    oof_avg_df.to_csv(CFG.OOF_DATA_PATH / f'oof_{method}_seed_avg_ver{CFG.VER}.csv', index=False)

    score_raw = roc_auc_score(train_df[target_col], seed_oof_predictions)
    print(f'{method} seed averaged OOF CV auc (raw): {score_raw:.6f}')


def Learning(train_df, features):
    """全モデルの学習を実行"""
    for method in CFG.METHOD_LIST:
        gradient_boosting_model_cv_training(method, train_df, features)

In [None]:
# ====================================================
# Weight Optimization
# ====================================================
def get_rank_transformed_oof(method):
    """各seedのOOFを読み込み、fold内rank変換を適用して平均"""
    all_rank_preds = []
    for seed in CFG.SEED_LIST:
        oof_df = pd.read_csv(CFG.OOF_DATA_PATH / f'oof_{method}_seed{seed}_ver{CFG.VER}.csv')
        rank_pred = apply_fold_rank_transform(oof_df, f'{method}_prediction')
        all_rank_preds.append(rank_pred)
    return np.mean(all_rank_preds, axis=0)


def optimize_weights(train_df, target_col=None):
    """OOF予測を使って各モデルの重みを最適化する"""
    if target_col is None:
        target_col = CFG.target_col

    oof_preds = {}
    for method in CFG.METHOD_LIST:
        oof_preds[method] = get_rank_transformed_oof(method)

    y_true = train_df[target_col].values

    def objective(weights):
        final_pred = np.zeros(len(y_true))
        for i, method in enumerate(CFG.METHOD_LIST):
            final_pred += weights[i] * oof_preds[method]
        return -roc_auc_score(y_true, final_pred)

    initial_weights = [1.0 / len(CFG.METHOD_LIST)] * len(CFG.METHOD_LIST)
    constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
    bounds = [(0, 1)] * len(CFG.METHOD_LIST)

    result = minimize(
        objective,
        initial_weights,
        method='SLSQP',
        bounds=bounds,
        constraints=constraints
    )

    optimal_weights = result.x
    optimal_score = -result.fun

    print('=' * 50)
    print('Weight Optimization Results:')
    print('=' * 50)
    for method, weight in zip(CFG.METHOD_LIST, optimal_weights):
        print(f'{method}: {weight:.4f}')
    print(f'Optimized OOF AUC: {optimal_score:.6f}')
    print('=' * 50)

    return {method: weight for method, weight in zip(CFG.METHOD_LIST, optimal_weights)}, optimal_score


def scoring_cv(train_df, weights=None, target_col=None):
    """OOFスコアを計算する"""
    if weights is None:
        weights = CFG.model_weight_dict
    if target_col is None:
        target_col = CFG.target_col

    preds = np.zeros(len(train_df))
    for method in CFG.METHOD_LIST:
        rank_pred = get_rank_transformed_oof(method)
        preds += rank_pred * weights[method]

    score = roc_auc_score(train_df[target_col], preds)
    print(f'OOF CV auc (rank transformed): {score:.6f}')
    return score

In [None]:
# TODO: 学習実行
# Learning(train_df, FEATURES)

In [None]:
# TODO: 重み最適化
# optimized_weights, optimized_score = optimize_weights(train_df)
# scoring_cv(train_df, CFG.model_weight_dict)
# scoring_cv(train_df, optimized_weights)

In [None]:
# ====================================================
# Feature Importance
# ====================================================
def plot_feature_importance(features, seed=None, fold=1, figsize=(10, 20)):
    """LightGBMの特徴量重要度を可視化"""
    if seed is None:
        seed = CFG.SEED
    model = pickle.load(open(CFG.MODEL_DATA_PATH / f'lightgbm_fold{fold}_seed{seed}_ver{CFG.VER}.pkl', 'rb'))
    importance_df = pd.DataFrame(
        model.feature_importance(importance_type='gain'),
        index=features,
        columns=['importance']
    )
    importance_df = importance_df.sort_values('importance', ascending=False)

    plt.figure(figsize=figsize)
    sns.barplot(data=importance_df, x='importance', y=importance_df.index)
    plt.title('Feature Importance (Gain)')
    plt.tight_layout()
    plt.show()

    return importance_df

# TODO: 可視化実行
# importance_df = plot_feature_importance(FEATURES)

# Inference (汎用関数)

In [None]:
# ====================================================
# Inference Functions
# ====================================================
def lightgbm_inference(x_test):
    test_pred = np.zeros(len(x_test))
    for seed in CFG.SEED_LIST:
        for fold in range(CFG.n_folds):
            model = pickle.load(open(CFG.MODEL_DATA_PATH / f'lightgbm_fold{fold + 1}_seed{seed}_ver{CFG.VER}.pkl', 'rb'))
            pred = model.predict(x_test)
            rank_pred = rankdata(pred) / len(pred)
            test_pred += rank_pred
    return test_pred / (CFG.n_folds * len(CFG.SEED_LIST))


def xgboost_inference(x_test):
    test_pred = np.zeros(len(x_test))
    for seed in CFG.SEED_LIST:
        for fold in range(CFG.n_folds):
            model = pickle.load(open(CFG.MODEL_DATA_PATH / f'xgboost_fold{fold + 1}_seed{seed}_ver{CFG.VER}.pkl', 'rb'))
            pred = model.predict(xgb.DMatrix(x_test), enable_categorical=True)
            rank_pred = rankdata(pred) / len(pred)
            test_pred += rank_pred
    return test_pred / (CFG.n_folds * len(CFG.SEED_LIST))


def catboost_inference(x_test):
    test_pred = np.zeros(len(x_test))
    for seed in CFG.SEED_LIST:
        for fold in range(CFG.n_folds):
            model = pickle.load(open(CFG.MODEL_DATA_PATH / f'catboost_fold{fold + 1}_seed{seed}_ver{CFG.VER}.pkl', 'rb'))
            pred = model.predict_proba(x_test)[:, 1]
            rank_pred = rankdata(pred) / len(pred)
            test_pred += rank_pred
    return test_pred / (CFG.n_folds * len(CFG.SEED_LIST))


def gradient_boosting_model_inference(method, test_df, features):
    x_test = test_df[features]
    if method == 'lightgbm':
        return lightgbm_inference(x_test)
    elif method == 'xgboost':
        return xgboost_inference(x_test)
    elif method == 'catboost':
        return catboost_inference(x_test)
    else:
        raise ValueError(f'Unknown method: {method}')


def Predicting(test_df, features, weights=None):
    """テストデータに対して予測を行う"""
    if weights is None:
        weights = CFG.model_weight_dict

    output_df = test_df.copy()
    output_df['pred'] = 0
    for method in CFG.METHOD_LIST:
        output_df[f'{method}_pred'] = gradient_boosting_model_inference(method, test_df, features)
        output_df['pred'] += weights[method] * output_df[f'{method}_pred']
    return output_df

In [None]:
# TODO: 推論実行
# test_pred = Predicting(test_df, FEATURES, optimized_weights)

# Submission (TODO: プロジェクトごとに実装)

In [None]:
# TODO: サブミッション作成
# submission_df = sample_submission_df.copy()
# submission_df = submission_df.merge(
#     test_pred[[CFG.id_col, 'pred']],
#     on=CFG.id_col,
#     how='left'
# )
# submission_df = submission_df[[CFG.id_col, 'pred']]
# submission_df.to_csv(CFG.SUB_DATA_PATH / f'submission_ver{CFG.VER}.csv', index=False)
# print(f'Submission saved to {CFG.SUB_DATA_PATH / f"submission_ver{CFG.VER}.csv"}')