In [1]:
class CFG:
    target = "Fertilizer Name"
    folds = 5
    seed = 42
    folder_path = 'csvfile/'
    except_cols = ['id']
    importance_threshold=1e-4

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import itertools
from scipy.stats import zscore, skew
from scipy.stats import ks_2samp
from sklearn.base import BaseEstimator, ClassifierMixin,TransformerMixin, RegressorMixin,clone
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, KBinsDiscretizer
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import StratifiedKFold, KFold, cross_val_score, train_test_split
from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.linear_model import LogisticRegression
from scipy.stats import norm
from sklearn.tree import DecisionTreeRegressor
import itertools


In [None]:
train = pd.read_csv(CFG.folder_path + 'train.csv')
test = pd.read_csv(CFG.folder_path + 'test.csv')
submission = pd.read_csv(CFG.folder_path + "sample_submission.csv")
# make num cols except CFG.except_cols
num_cols = train.drop(columns=CFG.except_cols).select_dtypes(include=[np.number]).columns.tolist()
cat_cols = train.drop(columns=CFG.target).select_dtypes(exclude=[np.number]).columns.tolist()
target_unique_list = train[CFG.target].unique().tolist()

In [None]:
train.head()

Unnamed: 0,id,Temparature,Humidity,Moisture,Soil Type,Crop Type,Nitrogen,Potassium,Phosphorous,Fertilizer Name
0,0,37,70,36,Clayey,Sugarcane,36,4,5,28-28
1,1,27,69,65,Sandy,Millets,30,6,18,28-28
2,2,29,63,32,Sandy,Millets,24,12,16,17-17-17
3,3,35,62,54,Sandy,Barley,39,12,4,10-26-26
4,4,35,58,43,Red,Paddy,37,2,16,DAP


In [None]:
materials = ["Nitrogen","Potassium","Phosphorous"]

# 1.Data Overview

In [None]:
def compare_distributions(train, test, columns=None, except_cols=None, significance_level=0.05):
    if columns is None:
        # Select numeric columns by default
        columns = train.select_dtypes(include=['int64', 'float64']).columns.tolist()
    
    if except_cols is not None:
        # Exclude specified columns
        columns = [col for col in columns if col not in except_cols]

    results = []

    for col in columns:
        if col not in test.columns:
            continue  # Skip columns not in test set

        # Kolmogorov-Smirnov test
        # We use the Kolmogorov–Smirnov test because we want to verify whether the entire shape of the distribution
        # —not just the mean or variance—is the same across all data.
        stat, p_value = ks_2samp(train[col].dropna(), test[col].dropna())
        is_different = p_value < significance_level
        results.append({
            'column': col,
            'p_value': p_value,
            'statistic': stat,
            'significant_difference': is_different
        })

        # Plot distribution
        plt.figure(figsize=(10, 4))
        sns.kdeplot(train[col], label='train', fill=True)
        sns.kdeplot(test[col], label='test', fill=True)
        plt.title(f"Distribution of {col} | p={p_value:.4f} | {'Different' if is_different else 'Similar'}")
        plt.legend()
        plt.show()

    return pd.DataFrame(results)

# 2.1 Make Pipeline

In [None]:
class DomainFeatureGenerator(BaseEstimator, TransformerMixin):
    def __init__(self, materials=None):
        self.materials = materials or ["Nitrogen", "Phosphorous", "Potassium"]

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        eps = 1e-5

        X['Soil_Crop_Type'] = X["Soil Type"] + "_" + X["Crop Type"]

        for a, b in itertools.permutations(self.materials, 2):
            X[f"{a}_to_{b}"] = X[a] / (X[b] + eps)

        for a in self.materials:
            others = [x for x in self.materials if x != a]
            b, c = others
            X[f"{a}_over_{b}_{c}"] = X[a] / (X[b] + X[c] + eps)

        for a in self.materials:
            others = [x for x in self.materials if x != a]
            b, c = others
            X[f"{a}_over_mean_{b}_{c}"] = X[a] / ((X[b] + X[c]) / 2 + eps)

        X["NPK_total"] = X[self.materials].sum(axis=1)
        X["NPK_std"] = X[self.materials].std(axis=1)
        X["NPK_mean"] = X[self.materials].mean(axis=1)

        return X

In [None]:
class ConditionalThresholdGenerator(BaseEstimator, TransformerMixin):
    def __init__(self, numeric_cols, q_low=0.25, q_high=0.75):
        self.numeric_cols = numeric_cols
        self.q_low = q_low
        self.q_high = q_high

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        df = X.copy()
        new_features_list = []  # 각 그룹에 대해 생성된 DataFrame들을 저장할 리스트

        crop_types = df["Crop Type"].dropna().unique()
        soil_types = df["Soil Type"].dropna().unique()
        crop_soil_combinations = df[["Crop Type", "Soil Type"]].dropna().drop_duplicates().values

        for crop in crop_types:
            filt = df["Crop Type"] == crop
            prefix = f"is_{crop.lower().replace(' ', '_')}"
            features = self._create_condition_features(df, filt, prefix)
            new_features_list.append(features)

        for soil in soil_types:
            filt = df["Soil Type"] == soil
            prefix = f"is_{soil.lower().replace(' ', '_')}"
            features = self._create_condition_features(df, filt, prefix)
            new_features_list.append(features)

        for crop, soil in crop_soil_combinations:
            filt = (df["Crop Type"] == crop) & (df["Soil Type"] == soil)
            prefix = f"is_{crop.lower().replace(' ', '_')}_{soil.lower().replace(' ', '_')}"
            features = self._create_condition_features(df, filt, prefix)
            new_features_list.append(features)

        # 전체 new features를 concat하여 한 번에 추가
        all_new_features = pd.concat(new_features_list, axis=1)
        df = pd.concat([df, all_new_features], axis=1)

        return df

    def _create_condition_features(self, df, group_filter, prefix):
        subset = df[group_filter]
        new_features = {}

        for col in self.numeric_cols:
            if subset[col].isnull().all():
                continue

            q1 = subset[col].quantile(self.q_low)
            q3 = subset[col].quantile(self.q_high)

            low_col = f"{prefix}_low_{col.lower()}"
            high_col = f"{prefix}_high_{col.lower()}"

            new_features[low_col] = ((group_filter) & (df[col] < q1)).astype(int)
            new_features[high_col] = ((group_filter) & (df[col] > q3)).astype(int)

        return pd.DataFrame(new_features, index=df.index)

In [None]:
class InteractionFeatureGenerator(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy(deep=True) 
        cols = [col for col in X.columns if col in num_cols]
        new_features = {}

        # Generate interaction features between numerical columns
        for col1 in cols:
            for col2 in cols:
                if col1 != col2:
                    new_features[f"{col1}_x_{col2}"] = X[col1] * X[col2]
                    new_features[f"{col1}_plus_{col2}"] = X[col1] + X[col2]
                    new_features[f"{col1}_minus_{col2}"] = X[col1] - X[col2]
                    new_features[f"{col2}_minus_{col1}"] = X[col2] - X[col1]
                    new_features[f"{col1}_div_{col2}"] = X[col1] / (X[col2] + 1e-5)
                    new_features[f"{col2}_div_{col1}"] = X[col2] / (X[col1] + 1e-5)

        # Row-wise statistical summaries
        new_features["row_mean"] = X[cols].mean(axis=1)
        new_features["row_std"] = X[cols].std(axis=1)
        new_features["row_max"] = X[cols].max(axis=1)
        new_features["row_min"] = X[cols].min(axis=1)
        new_features["row_median"] = X[cols].median(axis=1)

        # Additional row-wise distribution statistics
        new_features["row_skew"] = X[cols].skew(axis=1)
        new_features["row_kurtosis"] = X[cols].kurtosis(axis=1)
        new_features["row_range"] = X[cols].max(axis=1) - X[cols].min(axis=1)

        # Outlier flag using Z-score for each numerical column
        for col in cols:
            z = (X[col] - X[col].mean()) / (X[col].std() + 1e-5)
            new_features[f"{col}_outlier_flag"] = (np.abs(z) > 3).astype(int)

        new_df = pd.DataFrame(new_features, index=X.index)
        X = pd.concat([X, new_df], axis=1)

        return X.copy(deep=True)

In [None]:
def build_domain_feature_pipeline():
    return Pipeline([
        ('domain', DomainFeatureGenerator()),
    ])
def build_conditional_feature_pipeline():
    return Pipeline([
        ('conditional', ConditionalThresholdGenerator(
            numeric_cols=num_cols
        )),
    ])
def build_interaction_feature_pipeline():
    return Pipeline([
        ('interaction', InteractionFeatureGenerator()),
    ])

# 3.Model Training

In [None]:
class HybridFullStackingClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, pipelines, model_defs, meta_model,
                 n_splits=CFG.folds, random_state=CFG.seed, verbose=True,
                 stratify_feature=None, n_bins=10):
        self.pipelines = pipelines
        self.model_defs = model_defs
        self.meta_model = meta_model
        self.n_splits = n_splits
        self.random_state = random_state
        self.verbose = verbose
        self.stratify_feature = stratify_feature
        self.n_bins = n_bins
        self.fitted_models_ = []  # 각 (파이프라인, 기본 모델 정의) 조합별 학습된 모델들 (폴드별 모델 리스트)
        self.label_encoder_ = LabelEncoder()
        self.classes_ = None
        self.feature_names_ = None # 최종적으로 사용될 전체 피처 이름 리스트

    def _get_dataframe_with_features(self, data_transformed, index, pipeline_step=None, original_columns=None):
        """
        변환된 데이터를 Pandas DataFrame으로 만들고, 가능한 경우 피처 이름을 추출합니다.
        """
        if isinstance(data_transformed, pd.DataFrame):
            return data_transformed
        
        feature_names = None
        if pipeline_step is not None and hasattr(pipeline_step, 'get_feature_names_out'):
            try:
                if original_columns is not None: # 일부 트랜스포머는 원본 컬럼명이 필요
                    feature_names = pipeline_step.get_feature_names_out(original_columns)
                else:
                    feature_names = pipeline_step.get_feature_names_out()
            except Exception: # get_feature_names_out 호출 실패 시
                feature_names = None
        
        if feature_names is not None:
            return pd.DataFrame(data_transformed, index=index, columns=feature_names)
        else:
            # 피처 이름을 얻지 못한 경우, 기본 정수형 컬럼 사용
            # 이 경우 data_transformed가 2D 배열이어야 함
            if data_transformed.ndim == 1: # 1D 배열이면 2D로 변환
                 data_transformed = data_transformed.reshape(-1, 1)
            return pd.DataFrame(data_transformed, index=index)


    def fit(self, X, y):
        y_encoded = self.label_encoder_.fit_transform(y)
        self.classes_ = self.label_encoder_.classes_
        n_samples = X.shape[0]
        
        # 각 (파이프라인, 모델 정의) 조합마다 len(self.classes_) 만큼의 메타 피처가 생성됨
        total_base_models = len(self.pipelines) * len(self.model_defs)
        meta_features_shape_cols = total_base_models * (len(self.classes_) if len(self.classes_) > 2 else 1) # 이진 분류 시 확률값 1개
        meta_features = np.zeros((n_samples, meta_features_shape_cols))

        if self.stratify_feature is not None and self.stratify_feature in X.columns:
            binner = KBinsDiscretizer(n_bins=self.n_bins, encode='ordinal', strategy='quantile')
            stratify_col_binned = binner.fit_transform(X[[self.stratify_feature]]).astype(int).flatten()
            kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
            stratify_values = stratify_col_binned
        else:
            kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) # 기본적으로 y로 계층화
            stratify_values = y_encoded

        base_model_counter = 0 # 메타 피처 컬럼 인덱싱용
        all_pipeline_feature_sets = set() # 모든 파이프라인에서 생성된 피처 이름을 수집

        for pi, pipeline_def in enumerate(self.pipelines):
            for mi, (model_cls, model_params) in enumerate(self.model_defs):
                if self.verbose:
                    print(f"\nTraining Pipeline {pi+1} + Model {model_cls.__name__}")

                # 현재 (파이프라인, 모델정의) 조합에 대한 OOF 예측값 저장용
                # 이진 분류이고 클래스가 2개면, 보통 양성 클래스에 대한 확률만 사용 (컬럼 1개)
                # 다중 클래스면 각 클래스에 대한 확률 사용 (컬럼 n_classes개)
                oof_preds_single_model = np.zeros((n_samples, len(self.classes_) if len(self.classes_) > 2 else 1))
                
                fold_specific_models = [] # 현재 (파이프라인, 모델정의)의 각 폴드별 (파이프, 모델) 저장

                for fold, (train_idx, val_idx) in enumerate(kf.split(X, stratify_values)):
                    if self.verbose:
                        print(f"  Fold {fold+1}/{self.n_splits}...")

                    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
                    y_train_fold, y_val_fold = y_encoded[train_idx], y_encoded[val_idx]

                    # 파이프라인 복제 및 학습
                    current_pipeline = clone(pipeline_def)
                    current_pipeline.fit(X_train, y_train_fold)
                    
                    # 원본 X_train, X_val의 컬럼 이름을 기억 (get_feature_names_out에 필요할 수 있음)
                    original_train_cols = X_train.columns.tolist()
                    original_val_cols = X_val.columns.tolist()

                    # 데이터 변환 및 DataFrame으로 변환 (피처 이름 추출 시도)
                    X_train_transformed_raw = current_pipeline.transform(X_train)
                    X_val_transformed_raw = current_pipeline.transform(X_val)
                    
                    X_train_trans = self._get_dataframe_with_features(X_train_transformed_raw, X_train.index, current_pipeline, original_train_cols)
                    X_val_trans = self._get_dataframe_with_features(X_val_transformed_raw, X_val.index, current_pipeline, original_val_cols)

                    # 현재 파이프라인/폴드에서 나온 피처 이름들을 수집
                    current_fold_train_features = X_train_trans.columns.tolist()
                    all_pipeline_feature_sets.update(current_fold_train_features)
                    all_pipeline_feature_sets.update(X_val_trans.columns.tolist()) # X_val_trans도 다른 피처를 가질 수 있으므로 추가

                    # X_val_trans의 컬럼을 X_train_trans에 맞춤 (없는 컬럼은 0으로 채움)
                    X_val_trans = X_val_trans.reindex(columns=current_fold_train_features, fill_value=0)
                    
                    # 범주형 컬럼 처리
                    cat_cols = X_train_trans.select_dtypes(include=["object", "category"]).columns.tolist()
                    
                    temp_X_train_trans = X_train_trans.copy() # SettingWithCopyWarning 방지 및 원본 유지
                    temp_X_val_trans = X_val_trans.copy()

                    for col in cat_cols:
                        # object 타입 컬럼을 category 타입으로 변환
                        train_categories = temp_X_train_trans[col].astype('category').cat.categories
                        temp_X_train_trans[col] = pd.Categorical(temp_X_train_trans[col], categories=train_categories)
                        temp_X_val_trans[col] = pd.Categorical(temp_X_val_trans[col], categories=train_categories, # 학습 데이터의 범주를 따름
                                                                # validate='ignore' # 새로운 범주가 나타나면 NaN으로 처리하고 나중에 fill_value 등으로 채울 수 있음
                                                               ) 
                        # 만약 NaN이 생겼다면, 모델이 처리하거나 미리 채워야 함 (예: fill_value=0 과 같이)
                        # 여기서는 reindex에서 fill_value=0으로 처리하므로, 범주형 변환 후 NaN은 드물지만 주의
                        if temp_X_val_trans[col].isnull().any():
                             # 예를 들어, 가장 빈번한 값이나 특정 값으로 채우기
                             # 여기서는 간단히 문자열 "missing"으로 채우고, 해당 "missing"도 범주에 추가될 수 있도록 처리 필요
                             # 또는 모델이 NaN을 직접 처리하도록 둠 (LGBM, XGBoost, CatBoost는 가능)
                             pass


                    model_name = model_cls.__name__.lower()
                    model_params_copy = model_params.copy()

                    if "catboost" in model_name:
                        model_params_copy["cat_features"] = cat_cols
                    elif "xgb" in model_name:
                        model_params_copy["enable_categorical"] = True # 데이터가 pd.Categorical 타입이어야 함
                    # LightGBM은 fit 시에 categorical_feature 파라미터로 전달

                    model = model_cls(**model_params_copy)

                    if "lgbm" in model_name:
                        model.fit(temp_X_train_trans, y_train_fold,
                                  eval_set=[(temp_X_val_trans, y_val_fold)],
                                  categorical_feature=cat_cols,
                                  # callbacks=[lightgbm.early_stopping(10)] # 예시
                                 )
                    elif "catboost" in model_name and "eval_set" in model_params_copy: # eval_set 파라미터를 init에서 받는 경우
                        model.fit(temp_X_train_trans, y_train_fold, eval_set=[(temp_X_val_trans, y_val_fold)])
                    else: # 일반적인 fit
                        model.fit(temp_X_train_trans, y_train_fold)
                    
                    # OOF 예측 (확률값)
                    proba_preds = model.predict_proba(temp_X_val_trans)
                    
                    if len(self.classes_) == 2: # 이진 분류
                        oof_preds_single_model[val_idx] = proba_preds[:, 1].reshape(-1, 1) # 양성 클래스 확률
                    else: # 다중 클래스
                        oof_preds_single_model[val_idx] = proba_preds
                    
                    fold_specific_models.append((current_pipeline, model, current_fold_train_features))

                # 현재 (파이프라인, 모델정의) 조합의 OOF 예측값을 전체 메타 피처에 할당
                num_proba_cols = oof_preds_single_model.shape[1]
                meta_features[:, base_model_counter : base_model_counter + num_proba_cols] = oof_preds_single_model
                base_model_counter += num_proba_cols
                
                self.fitted_models_.append(fold_specific_models)

        # 모든 파이프라인에서 나온 피처 이름들을 정렬하여 최종 피처 이름으로 저장
        # 이 self.feature_names_는 predict_proba 시 입력 X를 파이프라인 통과 후 reindex할 때 사용됨
        # 하지만 각 기본 모델은 학습 시 사용했던 current_fold_train_features를 기준으로 예측해야 함.
        # 따라서 predict_proba에서 모델별로 저장된 피처 이름을 사용하도록 수정 필요.
        self.feature_names_ = sorted(list(all_pipeline_feature_sets)) # 이것은 전역적인 참조용.

        self.meta_model.fit(meta_features, y_encoded)
        return self

    def predict_proba(self, X):
        n_samples = X.shape[0]
        total_base_models = len(self.fitted_models_) # self.pipelines * self.model_defs와 동일
        
        # 메타 피처의 컬럼 수 계산 (fit과 동일한 로직)
        meta_features_shape_cols = 0
        for fold_models_for_one_config in self.fitted_models_:
            # 첫번째 폴드의 모델 예측 결과 shape으로 판단 (이진/다중)
            # (pipeline, model, model_features)
            _, first_model, _ = fold_models_for_one_config[0]
            # 임시 예측으로 컬럼 수 확인 (더 정확하게 하려면 classes_ 정보 사용)
            if len(self.classes_) == 2:
                meta_features_shape_cols += 1
            else:
                meta_features_shape_cols += len(self.classes_)
        
        meta_X_test = np.zeros((n_samples, meta_features_shape_cols))
        
        current_meta_col_idx = 0

        for i, fold_models_for_one_config in enumerate(self.fitted_models_):
            # fold_models_for_one_config는 특정 (파이프라인, 모델정의) 조합에 대한 [(pipe, model, model_features), ...] 리스트
            
            # 현재 (파이프라인, 모델정의) 조합의 테스트셋 예측값 (폴드 평균 전)
            # 각 폴드 모델은 자체적인 피처셋으로 예측해야 함
            # shape: (n_folds, n_samples, n_classes_for_model)
            current_config_preds_across_folds_list = []

            for pipeline, model, model_features in fold_models_for_one_config:
                # 데이터 변환 (fit에서와 유사한 로직)
                original_X_cols = X.columns.tolist()
                X_transformed_raw = pipeline.transform(X) # 새로운 X에 대해 파이프라인 변환
                X_trans = self._get_dataframe_with_features(X_transformed_raw, X.index, pipeline, original_X_cols)
                
                # 학습 시 사용된 피처(model_features)에 맞춰 컬럼 정렬
                X_trans_reindexed = X_trans.reindex(columns=model_features, fill_value=0)
                
                # 범주형 처리 (fit에서와 유사하게, 학습 시 범주를 따르도록)
                cat_cols = X_trans_reindexed.select_dtypes(include=["object", "category"]).columns.tolist()
                temp_X_trans_reindexed = X_trans_reindexed.copy()
                for col in cat_cols:
                    # 학습 시점에 생성된 범주 정보를 가져와야 하나, 여기서는 간단히 category로 변환
                    # 더 정확하려면 학습 시 사용한 categories를 저장하고 사용해야 함.
                    # 지금은 모델이 알아서 처리하거나, object 타입을 category로 변환하는 정도만 수행
                    if temp_X_trans_reindexed[col].dtype == 'object':
                         temp_X_trans_reindexed[col] = temp_X_trans_reindexed[col].astype('category')
                    # 만약 학습 시의 카테고리 정보를 저장해두었다면 그것을 사용:
                    # train_categories = self.saved_categories_for_model_pipe_fold[...][col]
                    # temp_X_trans_reindexed[col] = pd.Categorical(temp_X_trans_reindexed[col], categories=train_categories)


                proba = model.predict_proba(temp_X_trans_reindexed)
                current_config_preds_across_folds_list.append(proba)
            
            # 현재 (파이프라인, 모델정의) 조합에 대한 폴드 평균 예측값
            # np.array로 변환 후 평균 계산
            current_config_preds_across_folds_arr = np.array(current_config_preds_across_folds_list)
            averaged_preds = np.mean(current_config_preds_across_folds_arr, axis=0)

            num_proba_cols_model = averaged_preds.shape[1] if averaged_preds.ndim > 1 else 1
            
            if num_proba_cols_model == 1 or len(self.classes_) == 2 : # 이진 분류의 양성 클래스 확률
                 meta_X_test[:, current_meta_col_idx : current_meta_col_idx + 1] = averaged_preds.reshape(-1,1)
                 current_meta_col_idx += 1
            else: # 다중 클래스
                 meta_X_test[:, current_meta_col_idx : current_meta_col_idx + num_proba_cols_model] = averaged_preds
                 current_meta_col_idx += num_proba_cols_model

        final_proba = self.meta_model.predict_proba(meta_X_test)
        return final_proba

    def predict(self, X):
        final_proba = self.predict_proba(X)
        # 이진 분류의 경우 argmax가 0 또는 1을 반환
        # 다중 클래스의 경우 가장 확률 높은 클래스 인덱스 반환
        if len(self.classes_) == 2 and final_proba.shape[1] == 1: # 메타모델이 이진분류 확률 하나만 출력한 경우
            # 보통 predict_proba는 각 클래스 확률을 다 주지만, 일부 meta_model은 다를 수 있음
            # sklearn 표준은 predict_proba가 (n_samples, n_classes)를 반환
            # 만약 (n_samples, 1)이면, 0.5 기준으로 판단하거나, meta_model의 predict를 사용
            # 여기서는 meta_model.predict_proba가 표준을 따른다고 가정
             predictions_encoded = (final_proba[:, 1] >= 0.5).astype(int) if final_proba.shape[1] == 2 else (final_proba >= 0.5).astype(int).flatten()

        else: # 다중 클래스 또는 이진 분류 (n_samples, 2) 출력
            predictions_encoded = np.argmax(final_proba, axis=1)
        
        return self.label_encoder_.inverse_transform(predictions_encoded)

    # MAP@3 점수 계산은 사용자의 특정 요구사항에 따라 구현된 것으로 보이며,
    # 이진 분류에서는 TOP3의 의미가 모호할 수 있으므로, 다중 클래스 문제에 더 적합합니다.
    # 코드는 그대로 유지하되, 적용 문맥을 고려해야 합니다.
    def score_map3(self, X, y_true):
        proba = self.predict_proba(X) # (n_samples, n_classes)
        
        # 클래스가 3개 미만인 경우 MAP@3의 의미가 달라지거나 오류 발생 가능
        # 예: 클래스가 2개인데 top3를 뽑으면 모든 클래스가 선택됨
        top_k = min(3, len(self.classes_)) 
        
        # 각 샘플에 대해 확률이 높은 순으로 클래스 인덱스 정렬
        # np.argsort는 오름차순 정렬이므로, 음수로 만들어 내림차순 효과
        top_k_indices = np.argsort(-proba, axis=1)[:, :top_k]
        
        y_true_encoded = self.label_encoder_.transform(y_true)
        
        score = 0.0
        for i in range(len(y_true_encoded)):
            for j in range(top_k): # top_k 까지만 확인
                if top_k_indices[i, j] == y_true_encoded[i]:
                    score += 1.0 / (j + 1)
                    break # 해당 샘플에 대한 점수 계산 중단 (첫 매칭만 인정)
        return score / len(y_true_encoded)

In [None]:
train_X = train.drop(columns=['id', CFG.target])
train_y = train[CFG.target]
test_X = test.drop(columns=['id'])

In [None]:
pipeline1 = build_domain_feature_pipeline()
pipeline2 = build_conditional_feature_pipeline()
pipeline3 = build_interaction_feature_pipeline()
pipelines = [pipeline1, pipeline2, pipeline3]

In [None]:
model_defs = [
    (CatBoostClassifier, {
        'iterations': 4000,
        'learning_rate': 0.02,
        'depth': 12,
        'loss_function': 'MultiClass',
        'l2_leaf_reg': 3,
        'random_seed': CFG.seed,
        'eval_metric': 'MultiClass',
        'early_stopping_rounds': 200,
        'verbose': 0,
        'task_type': 'GPU',
        'bootstrap_type': 'Poisson',
        'grow_policy': 'Depthwise',
        'subsample': 0.8
    }),
    (XGBClassifier, {
        'max_depth': 12,
        'colsample_bytree': 0.5,
        'subsample': 0.9,
        'n_estimators': 4000,
        'learning_rate': 0.02,
        'gamma': 0.01,
        'reg_alpha': 2.0,
        'reg_lambda': 1.5,
        'max_delta_step': 2,
        'eval_metric': 'mlogloss',
        'random_state': CFG.seed,
        'tree_method': 'hist',
        'device': 'cuda',
    }),
    (LGBMClassifier, {
        'n_estimators': 4000,
        'learning_rate': 0.03,
        'max_depth': 12,
        'colsample_bytree': 0.7,
        'subsample': 0.9,
        'random_state': CFG.seed,
        'objective': 'multiclass',
        'num_class': len(target_unique_list),
        'verbose': -1,
        'num_leaves': 127,
        'reg_lambda': 0.1,
        'reg_alpha': 0.1
    })
]

In [None]:
ensemble = HybridFullStackingClassifier(
    pipelines=pipelines,
    model_defs=model_defs,
    n_splits = CFG.folds, 
    random_state = CFG.seed,
    meta_model=LogisticRegression(),
    n_bins=10,
    verbose=True
)

ensemble.fit(train_X, train_y)


Training Pipeline 1 + Model CatBoostClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 1 + Model XGBClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 1 + Model LGBMClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 2 + Model CatBoostClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 2 + Model XGBClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 2 + Model LGBMClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 3 + Model CatBoostClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 3 + Model XGBClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 3/5...
  Fold 4/5...
  Fold 5/5...

Training Pipeline 3 + Model LGBMClassifier
  Fold 1/5...
  Fold 2/5...
  Fold 

In [None]:
test_probs = ensemble.predict_proba(test_X)

In [None]:
def predict_top_k_labels_string(ensemble_model, X_test_data, k=3):
    final_proba = ensemble_model.predict_proba(X_test_data) # (n_samples, n_classes)
    
    num_classes = final_proba.shape[1]
    actual_k = min(k, num_classes)

    top_k_indices = np.argsort(-final_proba, axis=1)[:, :actual_k]
    
    flat_top_k_indices = top_k_indices.ravel()
    flat_top_k_labels = ensemble_model.label_encoder_.inverse_transform(flat_top_k_indices)
    top_k_labels_array = flat_top_k_labels.reshape(top_k_indices.shape)
    
    top_k_strings = [" ".join(labels_for_sample) for labels_for_sample in top_k_labels_array]
    return np.array(top_k_strings)

test_predictions_top3_strings = predict_top_k_labels_string(ensemble, test_X, k=3)

try:
    test_ids = test_X.index
except AttributeError:
    test_ids = np.arange(len(test_predictions_top3_strings))

In [None]:
submission_df_top3 = pd.DataFrame({
    'id': test['id'],
    'Fertilizer Name': test_predictions_top3_strings
})

submission_df_top3.to_csv('submission_top3_strings.csv', index=False)