xgb

In [None]:
import configparser
from typing import Optional

import numpy as np
import pandas as pd
import xgboost as xgb
import modin.pandas as md
import snowflake.snowpark.modin.plugin
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
from skopt.utils import use_named_args

from xgb_helpers import (
    split_numeric_categorical,
    build_dmatrix,
    OrdinalEncoder,
    NumericImputerArbitrary,
    RichProgressBarCallback,
)

from snowflake.snowpark.context import get_active_session

np.int = int
session = get_active_session()
session.use_schema("ML_AUTOMATION")

CONFIG_FILE = "config.ini"
config = configparser.ConfigParser()
config.read(CONFIG_FILE)


def _normalize_config_entry(value: Optional[str]) -> Optional[str]:
    if value is None:
        return None
    stripped = value.strip()
    if stripped.lower() in {"", "none"}:
        return None
    return stripped


def _parse_value(value: str):
    lower = value.strip().lower()
    if lower in {"true", "false"}:
        return lower == "true"
    try:
        return int(value)
    except ValueError:
        try:
            return float(value)
        except ValueError:
            return value


train_path = _normalize_config_entry(config["PATHS"].get("train"))
test_path = _normalize_config_entry(config["PATHS"].get("test"))

if not train_path or not test_path:
    raise ValueError("Both train and test Snowflake objects must be defined in config.ini")

df_train = md.read_snowflake(name_or_query=train_path)
df_test = md.read_snowflake(name_or_query=test_path)

target_column = config["VARIABLES"].get("y_col", "")
identifier_str = config["VARIABLES"].get("identifier_cols", "")
identifier_cols = [col.strip() for col in identifier_str.split(",") if col.strip()]
modelling_features = [
    col.strip()
    for col in config["VARIABLES"].get("modelling_features", "").split(",")
    if col.strip()
]

if not target_column:
    raise ValueError("A target column must be provided via [VARIABLES] y_col in config.ini")

# Convert to pandas early to avoid relying on private Modin APIs
df_train = df_train.to_pandas()
df_test = df_test.to_pandas()

if target_column not in df_train.columns:
    raise ValueError(f"Target column '{target_column}' is not present in the training dataset")

if not modelling_features:
    raise ValueError(
        "No modelling features were provided via [VARIABLES] modelling_features in config.ini"
    )

identifier_cols_in_test = [c for c in identifier_cols if c in df_test.columns]
df_test_identifiers = df_test.loc[:, identifier_cols_in_test].copy()

valid_features = [feature for feature in modelling_features if feature in df_train.columns]
if target_column in valid_features:
    valid_features.remove(target_column)

if not valid_features:
    raise ValueError("None of the configured modelling features are present in the training set")

y = df_train[target_column].copy()
X_train = df_train.loc[:, valid_features].copy()

existing_test_features = [feature for feature in valid_features if feature in df_test.columns]
X_test = df_test.loc[:, existing_test_features].copy()
missing_test_features = [feature for feature in valid_features if feature not in existing_test_features]
for column in missing_test_features:
    X_test[column] = np.nan
X_test = X_test.reindex(columns=valid_features)

num_feats, cat_feats = split_numeric_categorical(X_train, valid_features)

from sklearn.compose import ColumnTransformer

preprocessor = ColumnTransformer(
    [
        ("cat", OrdinalEncoder(columns=cat_feats, method="freq"), cat_feats),
        ("num", NumericImputerArbitrary(), num_feats),
    ]
)

preprocessor.fit(X_train)

X_train_transformed = preprocessor.transform(X_train)
X_test_transformed = preprocessor.transform(X_test)

X_train_transformed_df = pd.DataFrame(
    X_train_transformed, columns=num_feats + cat_feats, index=X_train.index
)
X_test_transformed_df = pd.DataFrame(
    X_test_transformed, columns=num_feats + cat_feats, index=X_test.index
)

params_section = config["PARAMS"]
default_params = {k: _parse_value(v) for k, v in params_section.items()}

print("default params loaded:")
for k, v in default_params.items():
    print(f"{k}: {v}")

search_space = [
    Integer(4, 7, name="max_depth"),
    Real(0.1, 0.2, prior="log-uniform", name="eta"),
    Real(0.5, 1.0, prior="uniform", name="subsample"),
    Real(0.0, 10.0, prior="uniform", name="reg_alpha"),
    Real(0.0, 10.0, prior="uniform", name="reg_lambda"),
    Integer(500, 3000, name="n_estimators"),
    Real(0.0, 1.0, prior="uniform", name="colsample_bytree"),
    Categorical(["hist", "approx", "gpu_hist"], name="tree_method"),
]

print("
search space:")
for p in search_space:
    print(p)


@use_named_args(search_space)
def objective(**params_opt):
    xgb_params = default_params.copy()
    xgb_params.update(
        {
            "max_depth": params_opt["max_depth"],
            "eta": params_opt["eta"],
            "subsample": params_opt["subsample"],
            "reg_alpha": params_opt["reg_alpha"],
            "reg_lambda": params_opt["reg_lambda"],
            "colsample_bytree": params_opt["colsample_bytree"],
            "tree_method": params_opt["tree_method"],
        }
    )

    dtrain_cv = build_dmatrix(X_train_transformed_df, y)
    cv_results = xgb.cv(
        params=xgb_params,
        dtrain=dtrain_cv,
        num_boost_round=params_opt["n_estimators"],
        nfold=3,
        metrics="rmse",
        early_stopping_rounds=20,
        verbose_eval=False,
        seed=42,
    )
    return cv_results["test-rmse-mean"].min()


result = gp_minimize(objective, search_space, n_calls=30, random_state=42)

print("best hyperparameters:")
best_param_names = [dim.name for dim in search_space]
for name, val in zip(best_param_names, result.x):
    print(f"{name}: {val}")

trained_params = default_params.copy()
for dim, val in zip(search_space, result.x):
    if dim.name in trained_params:
        if isinstance(dim, Integer):
            trained_params[dim.name] = int(val)
        else:
            trained_params[dim.name] = val

n_estimators_for_cv = int(trained_params.get("n_estimators", default_params.get("n_estimators", 1000)))

cv_dtrain = build_dmatrix(X_train_transformed_df, y)
cv_results = xgb.cv(
    params=trained_params,
    dtrain=cv_dtrain,
    num_boost_round=n_estimators_for_cv,
    nfold=3,
    metrics="rmse",
    early_stopping_rounds=20,
    verbose_eval=False,
    seed=42,
)
best_iteration = cv_results["test-rmse-mean"].idxmin()
final_boost_rounds = int(best_iteration) + 1

trained_params["n_estimators"] = final_boost_rounds

dtrain = build_dmatrix(X_train_transformed_df, y)
rich_callback = RichProgressBarCallback(total_rounds=final_boost_rounds)

print("training...")
model = xgb.train(
    trained_params,
    dtrain,
    num_boost_round=final_boost_rounds,
    callbacks=[rich_callback],
)
print("train complete")

dtest = build_dmatrix(X_test_transformed_df, None)

preds = model.predict(dtest)

results = df_test_identifiers.copy()
results["prediction"] = preds

output_path = config["PATHS"].get("output_predictions_csv", "xgb_test_predictions.csv")
results.to_csv(output_path, index=False)

output = session.write_pandas(
    results,
    "snowpark_jc",
    auto_create_table=True,
    table_type="transient",
)

