In [None]:
# Standard imports
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from enum import Enum

import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import TargetEncoder
from sklearn.metrics import roc_auc_score

## Configuration & Base Classes

In [None]:
# ============== DatasetConfig ==============
@dataclass
class DatasetConfig:
    training_col_names: List[str]
    target_col_name: str
    numerical_col_names: List[str]
    categorical_col_names: List[str]
    name: Optional[str] = None


# ============== Base Preprocessor ==============
class BasePreprocessor(ABC):
    name = "base-preprocessor"

    def __init__(self, config: DatasetConfig):
        self._fitted = False
        self.config = config

    @abstractmethod
    def fit(self, data: pd.DataFrame) -> "BasePreprocessor":
        pass

    @abstractmethod
    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        pass

    def fit_transform(self, data: pd.DataFrame) -> pd.DataFrame:
        return self.fit(data).transform(data)

    @property
    def fitted(self) -> bool:
        return self._fitted


# ============== Base Model ==============
T = Union[np.ndarray, pd.DataFrame]

class BaseKtoolsModel(ABC):
    def __init__(self) -> None:
        self._fitted = False
        self.model = None

    @abstractmethod
    def fit(
        self,
        X: T,
        y: T,
        validation_set: Optional[Tuple[T, T]] = None,
        weights: Optional[T] = None,
        val_weights: Optional[T] = None,
    ) -> "BaseKtoolsModel":
        pass

    @abstractmethod
    def predict(self, X: T) -> np.ndarray:
        pass

    @property
    def fitted(self) -> bool:
        return self._fitted

## Preprocessing Classes

In [None]:
# ============== Categorical Frequency Encoder ==============
class CategoricalFrequencyEncoder(BasePreprocessor):
    freq_suffix = "_frequency_encoding"

    def __init__(self, config: DatasetConfig, encode_missing_value: int = 0):
        super().__init__(config)
        self.train_freq_mappings: Dict[str, Dict[int, float]] = {}
        self.encode_missing_value = encode_missing_value

    def fit(self, data: pd.DataFrame) -> "CategoricalFrequencyEncoder":
        for column in self.config.categorical_col_names:
            new_col_name = column + self.freq_suffix
            if new_col_name in self.config.training_col_names:
                raise ValueError("Frequency encoded column already exists")

            freq_map = (
                data[column]
                .value_counts(normalize=True)
                .to_dict()
            )
            self.train_freq_mappings[column] = freq_map
        self._fitted = True
        return self

    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        copy = data.copy()
        for column in self.config.categorical_col_names:
            new_col_name = column + self.freq_suffix
            freq_map = self.train_freq_mappings[column]
            copy[new_col_name] = (
                copy[column]
                .astype(str)
                .map(freq_map)
                .fillna(self.encode_missing_value)
                .astype(float)
            )
        return copy


# ============== Categorical Target Encoder ==============
class CategoricalTargetEncoder(BasePreprocessor):
    def __init__(
        self,
        config: DatasetConfig,
        random_state: int = 42,
        cv: int = 5,
        smooth: int = 15,
        shuffle: bool = True,
    ):
        super().__init__(config)
        self.target_encoder = TargetEncoder(
            random_state=random_state, cv=cv, smooth=smooth, shuffle=shuffle
        )

    def fit(self, data: pd.DataFrame) -> "CategoricalTargetEncoder":
        self.target_encoder.fit(
            data[self.config.categorical_col_names], data[self.config.target_col_name]
        )
        self._fitted = True
        return self

    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        copy = data.copy()
        copy[self.config.categorical_col_names] = self.target_encoder.transform(
            copy[self.config.categorical_col_names]
        ).astype("float32")
        return copy

    def fit_transform(self, data: pd.DataFrame) -> pd.DataFrame:
        copy = data.copy()
        copy[self.config.categorical_col_names] = self.target_encoder.fit_transform(
            copy[self.config.categorical_col_names], copy[self.config.target_col_name]
        ).astype("float32")
        self._fitted = True
        return copy


# ============== Preprocessing Pipeline ==============
class PreprocessingPipeline:
    def __init__(self, preprocessors: List[BasePreprocessor]) -> None:
        self.preprocessors = preprocessors

    def train_pipe(self, data: pd.DataFrame) -> pd.DataFrame:
        for preprocessor in self.preprocessors:
            data = preprocessor.fit_transform(data)
        return data

    def inference_pipe(self, data: pd.DataFrame) -> pd.DataFrame:
        for preprocessor in self.preprocessors:
            data = preprocessor.transform(data)
        return data

## Model Classes

In [None]:
# ============== Helper Function ==============
def infer_task(y: Union[np.ndarray, pd.Series]) -> str:
    """
    Will infer binary, multiclass classification or regression based on the target values.
    """
    if isinstance(y, pd.Series):
        y = y.to_numpy()
    y = y.flatten()

    nuniques = np.unique(y).shape[0]
    has_floats = np.any(y % 1 != 0)

    if has_floats:
        print("Target contains float values. Inferring regression task.")
        return "regression"
    elif nuniques == 2:
        print("Target contains two unique values. Inferring binary classification task.")
        return "binary_classification"
    elif nuniques > 2:
        print("Target contains more than two unique values. Inferring multiclass classification task.")
        return "multiclass_classification"

    raise ValueError(
        "Unable to infer task type from target values. Is there only one target value?"
    )


# ============== Default Objectives ==============
class DefaultObjective(Enum):
    regression = "regression"
    binary_classification = "binary"
    multiclass_classification = "multiclass"


# ============== LightGBM Model ==============
class LGBMModel(BaseKtoolsModel):
    def __init__(
        self,
        num_boost_round: int = 100,
        early_stopping_rounds: Union[int, None] = 20,
        random_state: int = 129,
        verbose: int = -1,
        n_jobs: int = 1,
        callbacks: List[Any] = [],
        **lgb_param_grid,
    ) -> None:
        super().__init__()
        self._num_boost_round = num_boost_round
        self._verbose = verbose
        self._n_jobs = n_jobs
        self._callbacks = callbacks
        self.early_stopping_rounds = early_stopping_rounds

        self._lgb_param_grid = {
            "verbose": verbose,
            "random_state": random_state,
            "n_jobs": n_jobs,
            **lgb_param_grid,
        }

    def fit(
        self,
        X: T,
        y: T,
        validation_set: Optional[Tuple[T, T]] = None,
        weights: Optional[T] = None,
        val_weights: Optional[T] = None
    ) -> "LGBMModel":
        if "objective" not in self._lgb_param_grid:
            task_id = infer_task(y)
            self._lgb_param_grid["objective"] = DefaultObjective[task_id].value
            if task_id == "multiclass_classification":
                self._lgb_param_grid["num_class"] = np.unique(y).shape[0]

        train_data = lgb.Dataset(X, label=y, weight=weights)
        eval_sets = [train_data]
        eval_names = ["train"]
        if validation_set is not None:
            X_val, y_val = validation_set
            val_data = lgb.Dataset(X_val, label=y_val, reference=train_data, weight=val_weights)
            eval_sets += [val_data]
            eval_names += ["valid"]
            self._lgb_param_grid["early_stopping_rounds"] = self.early_stopping_rounds

        train_params = {
            "params": self._lgb_param_grid,
            "train_set": train_data,
            "num_boost_round": self._num_boost_round,
            "valid_sets": eval_sets,
            "valid_names": eval_names,
            "callbacks": self._callbacks,
        }

        self.model = lgb.train(**train_params)
        self._fitted = True
        return self

    def predict(self, X: T) -> np.ndarray:
        y_pred = self.model.predict(X)
        return y_pred


# ============== Model Pipeline ==============
class ModelPipeline:
    def __init__(
        self,
        model: BaseKtoolsModel,
        config: DatasetConfig,
        preprocessor: PreprocessingPipeline = None,
    ) -> None:
        self.model = model
        self.config = config
        self.preprocessor = preprocessor if preprocessor else PreprocessingPipeline([])

    def fit(
        self,
        train_data: pd.DataFrame,
        validation_data: Optional[pd.DataFrame] = None,
        weights: Optional[Union[pd.Series, np.ndarray]] = None,
    ) -> "ModelPipeline":
        train_data = self.preprocessor.train_pipe(train_data)
        X_train = train_data.drop(columns=[self.config.target_col_name])
        y_train = train_data[self.config.target_col_name]

        if validation_data is not None:
            validation_data = self.preprocessor.inference_pipe(validation_data)
            X_valid = validation_data.drop(columns=[self.config.target_col_name])
            y_valid = validation_data[self.config.target_col_name]
            validation_data = (X_valid, y_valid)

        self.model.fit(
            X=X_train, y=y_train, validation_set=validation_data, weights=weights
        )
        return self

    def predict(self, data: pd.DataFrame) -> np.ndarray:
        data = self.preprocessor.inference_pipe(data)
        X_test = data[self.config.training_col_names]
        return self.model.predict(X_test)

## Data Loading & Preprocessing

This section implements the data preparation logic from the test file:
1. Load original, training, and test data
2. Assign data source labels (0=validation part of train, 1=train part, 2=original)
3. Create aggregated features from original data

In [None]:
# ============== Configuration ==============
# For Kaggle, update these paths to match the competition data location
# DATA_PATH = Path("/kaggle/input/playground-series-s5e2/")
DATA_PATH = Path("./data/diabetes_prediction/")  # Local path

TARGET = "diagnosed_diabetes"
SPLIT_ID = 678260  # Index to split training data into train/validation groups

In [None]:
# ============== Load Data ==============
original_data = pd.read_csv(DATA_PATH / "original.csv")
training_data = pd.read_csv(DATA_PATH / "train.csv", index_col=0)
test_data = pd.read_csv(DATA_PATH / "test.csv", index_col=0).assign(data=0)

print(f"Original data shape: {original_data.shape}")
print(f"Training data shape: {training_data.shape}")
print(f"Test data shape: {test_data.shape}")

In [None]:
# ============== Assign Data Source Labels ==============
# data=0: validation portion of training data (after SPLIT_ID)
# data=1: training portion of training data (before SPLIT_ID)
# data=2: original external data

original_data = original_data.assign(data=2)
training_data = training_data.assign(data=0)
training_data.iloc[:SPLIT_ID, training_data.columns.get_loc('data')] = 1

print(f"Training data source distribution:")
print(training_data['data'].value_counts().sort_index())

In [None]:
# ============== Create Aggregated Features from Original Data ==============
# Merge training, original and test data for feature engineering
train_orig_test = pd.concat(
    [training_data, original_data[training_data.columns], test_data], 
    axis=0, 
    ignore_index=True
)

training_cols = training_data.columns.drop(["data", TARGET]).tolist()
orig_target_mean = original_data[TARGET].mean()

print(f"Creating aggregated features from {len(training_cols)} columns...")

for c in training_cols:
    for aggr in ["mean", "count"]:
        col_name = f'{c}_org_{aggr}'
        tmp = (
            original_data.groupby(c)[TARGET]
            .agg(aggr)
            .rename(col_name)
            .reset_index()
        )
        
        train_orig_test = train_orig_test.merge(tmp, on=c, how='left')
        fill_val = orig_target_mean if aggr == 'mean' else 0
        train_orig_test[col_name] = train_orig_test[col_name].fillna(fill_val)

print(f"Combined data shape after feature engineering: {train_orig_test.shape}")

In [None]:
# ============== Split Back into Train, Original, Test ==============
len_train = training_data.shape[0]
len_orig = original_data.shape[0]

train_data = train_orig_test.iloc[:len_train, :].copy()
orig_data = train_orig_test.iloc[len_train:len_train+len_orig, :].copy()
test_data = train_orig_test.iloc[len_train+len_orig:, :].copy().drop(columns=[TARGET])

print(f"Final train data shape: {train_data.shape}")
print(f"Final original data shape: {orig_data.shape}")
print(f"Final test data shape: {test_data.shape}")

## Dataset Configuration Setup

In [None]:
# ============== Define Feature Categories ==============
all_features = train_data.columns.drop(TARGET).tolist()

# Categorical features include object/bool types + specific columns
categorical_features = (
    train_data.drop(columns=TARGET)
    .select_dtypes(include=['object', 'bool'])
    .columns.tolist() 
    + ['family_history_diabetes', 'hypertension_history', 'cardiovascular_history']
)

# Remove duplicates
categorical_features = list(set(categorical_features))

numerical_features = [col for col in all_features if col not in categorical_features]

print(f"Total features: {len(all_features)}")
print(f"Categorical features: {len(categorical_features)}")
print(f"Numerical features: {len(numerical_features)}")

In [None]:
# ============== Create Dataset Config ==============
config = DatasetConfig(
    training_col_names=all_features,
    categorical_col_names=categorical_features,
    numerical_col_names=numerical_features,
    target_col_name=TARGET,
)

# Convert categorical columns to category dtype
train_data[categorical_features] = train_data[categorical_features].astype('category')
orig_data[categorical_features] = orig_data[categorical_features].astype('category')
test_data[categorical_features] = test_data[categorical_features].astype('category')

## Cross-Validation Experiment

Using Stratified K-Fold with stratification on both target and data source (from cell 8 of the original notebook).

In [None]:
# ============== Stratification Categories ==============
# Combined stratification on target + data source
categories_of_interest = (
    train_data[TARGET].astype(str) + "_" + train_data["data"].astype(str)
)

print("Stratification category distribution:")
print(categories_of_interest.value_counts())

In [None]:
# ============== CV Training Loop ==============
N_SPLITS = 5
RANDOM_STATE = 42

# Initialize arrays for predictions
train_oof_preds = np.empty(train_data.shape[0])
test_oof_preds = np.zeros(test_data.shape[0])

# Sample weights based on data source
# data=2 (original): weight 8
# data=0 (val portion): weight 16  
# data=1 (train portion): weight 1
WEIGHT_MAP = {2: 8, 0: 16, 1: 1}

preprocessors = [
    CategoricalFrequencyEncoder(config=config),
    CategoricalTargetEncoder(config=config),
]

kfold = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=RANDOM_STATE)
mean_score = 0.0
fold_scores = []

for fold_idx, (train_idx, val_idx) in enumerate(kfold.split(train_data, categories_of_interest)):
    print(f"\n{'='*50}")
    print(f"Fold {fold_idx + 1}/{N_SPLITS}")
    print(f"{'='*50}")
    
    # Split data
    train_fold = train_data.iloc[train_idx].copy()
    val_fold = train_data.iloc[val_idx].copy()
    
    # Subset validation fold for scoring (only data=0, the "real" validation portion)
    subsetval_fold = val_fold[val_fold["data"] == 0]
    
    # Add original data to training fold
    train_fold = pd.concat([train_fold, orig_data], axis=0, ignore_index=True)
    
    print(f"Train fold size: {train_fold.shape[0]} (including {orig_data.shape[0]} original samples)")
    print(f"Validation fold size: {val_fold.shape[0]} (subset for scoring: {subsetval_fold.shape[0]})")
    
    # Calculate sample weights
    weights = train_fold["data"].map(WEIGHT_MAP).values
    
    # Use ModelPipeline with PreprocessingPipeline
    pipe = ModelPipeline(
        model=LGBMModel(num_boost_round=1000, early_stopping_rounds=50, verbose=-1),
        config=config,
        preprocessor=PreprocessingPipeline(preprocessors=preprocessors),
    )
    
    pipe.fit(train_fold, validation_data=subsetval_fold, weights=weights)
    
    # Make predictions
    y_pred = pipe.predict(subsetval_fold)
    test_pred = pipe.predict(test_data)
    oof_pred = pipe.predict(val_fold)
    
    # Calculate fold score on SUBSET validation (data=0 only)
    fold_score = roc_auc_score(subsetval_fold[TARGET], y_pred)
    fold_scores.append(fold_score)
    
    # Store OOF predictions for full validation fold
    train_oof_preds[val_idx] = oof_pred
    test_oof_preds += test_pred / N_SPLITS
    mean_score += fold_score / N_SPLITS
    
    print(f"Fold {fold_idx + 1} ROC AUC Score: {fold_score:.6f}")

print(f"\n{'='*50}")
print(f"CV Results")
print(f"{'='*50}")
print(f"Individual fold scores: {[f'{s:.6f}' for s in fold_scores]}")
print(f"Mean ROC AUC Score: {mean_score:.6f}")
print(f"Std ROC AUC Score: {np.std(fold_scores):.6f}")

## OOF Score & Submission

In [None]:
# ============== Overall OOF Score ==============
overall_oof_score = roc_auc_score(train_data[TARGET], train_oof_preds)
print(f"Overall OOF ROC AUC Score: {overall_oof_score:.6f}")

In [None]:
# ============== Create Submission ==============
# For Kaggle submission
# sample_sub = pd.read_csv(DATA_PATH / "sample_submission.csv", index_col=0)
# sample_sub[TARGET] = test_oof_preds
# sample_sub.to_csv("submission.csv")
# print("Submission saved!")

# Preview predictions
print(f"Test predictions stats:")
print(f"  Min: {test_oof_preds.min():.6f}")
print(f"  Max: {test_oof_preds.max():.6f}")
print(f"  Mean: {test_oof_preds.mean():.6f}")

In [None]:
# ============== Save OOF and Test Predictions ==============
import uuid

guid = uuid.uuid4()
print(f"Experiment GUID: {guid}")

# Uncomment to save predictions
# save_path = DATA_PATH
# pd.DataFrame({f"{guid}": train_oof_preds}).to_csv(save_path / "oofs" / f"oof_preds_{guid}.csv")
# pd.DataFrame({f"{guid}": test_oof_preds}).to_csv(save_path / "test_preds" / f"test_preds_{guid}.csv")
# print(f"Predictions saved with GUID: {guid}")