In [1]:
import os
import pandas as pd
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder

In [2]:
train_filename = "binary_classification.csv"
output = "output"
test_filename = None
task = None
idx = None
targets = ["income"]
features = None
categorical_features = None
use_gpu = False
num_folds = 5
seed = 42
num_trials = 100
time_limit = 360
fast = False

In [3]:
# __post_init__

# self.output
# self.targets
# self.idx

# if os.path.exists(output):
#   raise Exception(
#       "Output directory already exists. Please specify some other directory.")

os.makedirs(output, exist_ok=True)

if targets is None:
  print("No target columns specified. Will default to `target`.")
  targets = ["target"]

if idx is None:
  print("No id column specified. Will default to `id`.")
  idx = "id"

No id column specified. Will default to `id`.


In [4]:
import enum

class Task(enum.Enum):
    classification = 0
    regression = 1

    @staticmethod
    def from_str(task_type: str):
        if task_type == "classification":
            return Task.classification
        elif task_type == "regression":
            return Task.regression
        else:
            raise ValueError("Invalid task type: {}".format(task_type))

    @staticmethod
    def list_str():
        return ["classification", "regression"]

class Problem(enum.IntEnum):
    binary_classification = 1
    multi_class_classification = 2
    multi_label_classification = 3
    single_column_regression = 4
    multi_column_regression = 5

    @staticmethod
    def from_str(label):
        if label == "binary_classification":
            return Problem.binary_classification
        elif label == "multi_class_classification":
            return Problem.multi_class_classification
        elif label == "multi_label_classification":
            return Problem.multi_label_classification
        elif label == "single_column_regression":
            return Problem.single_column_regression
        elif label == "multi_column_regression":
            return Problem.multi_column_regression
        else:
            raise NotImplementedError


In [5]:
from typing import List, Optional
from pydantic import BaseModel

class ModelConfig(BaseModel):
    train_filename: str
    test_filename: Optional[str] = None
    idx: str
    targets: List[str]
    problem: Problem
    output: str
    features: List[str]
    num_folds: int
    use_gpu: bool
    seed: int
    categorical_features: List[str]
    num_trials: int
    time_limit: Optional[int] = None
    fast: bool

In [6]:
from sklearn.utils.multiclass import type_of_target
import numpy as np

# self.task
# self.targets
def _determine_problem(df: pd.DataFrame) -> Problem:
  values = df[targets].values
  if task is None:
    target_type = type_of_target(values)
    
    if target_type == "continuous":
      problem = Problem.single_column_regression
    elif target_type == "continuous-multioutput":
      problem = Problem.multi_column_regression
    elif target_type == "binary":
      problem = Problem.binary_classification
    elif target_type == "multiclass":
      problem = Problem.multi_class_classification
    elif target_type == "multilabel-indicator":
      problem = Problem.multi_label_classification
    else:
      raise Exception("Unable to infer `problem_type`. Please provide `classification` or `regression`")

    return problem

  if task == "classification":
    if len(targets) == 1:
      if (len(np.unique(values)) == 2):
        problem = Problem.binary_classification
      else:
        problem = Problem.multi_label_classification
    else:
      problem = Problem.multi_label_classification
  
  elif task == "regression":
    if len(targets) == 1:
      problem = Problem.single_column_regression
    else:
      problem = Problem.multi_column_regression
  else:
    raise Exception("Problem type not understood")

  return problem


In [7]:
# self.idx
# what if `idx` is `None`?
def _inject_idxumn(df: pd.DataFrame) -> pd.DataFrame:
  if idx not in df.columns:
    df[idx] = np.arange(len(df))
  
  return df

In [8]:
# self.num_folds
# self.targets
# self.seed
from sklearn.model_selection import KFold, StratifiedKFold

def _create_folds(train_df: pd.DataFrame, problem: Problem) -> pd.DataFrame:
  # in which case could we have `kfold` in our dataset?
  # `UnboundLocalError: local variable 'num_folds' referenced before assignment`
  # it should work correctly. remove comments when moving code to files.
  # if "kfold" in train_df.columns:
  #   num_folds = len(np.unique(train_df["kfold"]))
  #   return train_df

  train_df["kfold"] = -1
  if problem in (Problem.binary_classification, Problem.multi_class_classification):
    y = train_df[targets].values
    kf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=seed)
    for fold, (_, valid_indices) in enumerate(kf.split(X=train_df, y=y)):
      train_df.loc[valid_indices, "kfold"] = fold
  elif problem == Problem.single_column_regression:
    num_bins = int(np.floor(1 + np.log2(len(train_df))))
    if num_bins > 10:
      num_bins = 10
    kf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=seed)
    train_df["bins"] = pd.cut(train_df[targets].values.ravel(), bins=num_bins, labels=False)
    for fold, (_, valid_indices) in enumerate(kf.split(X=train_df, y=y)):
      train_df.loc[valid_indices, "kfold"] = fold
    train_df = train_df.drop("bins", axis=1)
  elif problem == Problem.multi_column_regression:
    y = train_df[targets].values
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=seed)
    for fold, (_, valid_indices) in enumerate(kf.split(X=train_df, y=y)):
      train_df.loc[valid_indices, "kfold"] = fold
  elif problem == Problem.multi_label_classification: # TODO: use `iterstrat`
    y = train_df[targets].values
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=seed)
    for fold, (_, valid_indices) in enumerate(kf.split(X=train_df, y=y)):
      train_df.loc[valid_indices, "kfold"] = fold
  else:
    raise Exception("Problem type not supported")
  
  return train_df

In [9]:
# self.train_filename
# self.test_filename

# START: _process_data

train_df = pd.read_csv(train_filename)
# TODO: use `reduce_memory_usage` here

problem = _determine_problem(train_df)
train_df = _inject_idxumn(train_df)

if test_filename is not None:
  test_df = pd.read_csv(test_filename)
  # TODO: use `reduce_memory_usage` here
  test_df = _inject_idxumn(train_df)

In [10]:
# self.idx
# self.targets

train_df = _create_folds(train_df, problem)
ignore_columns = [idx, "kfold"] + targets

In [11]:
# self.features

if features is None:
  features = list(train_df.columns)
  features = [x for x in features if x not in ignore_columns]

In [12]:
if problem in [Problem.binary_classification, Problem.multi_class_classification]:
  target_encoder = LabelEncoder()
  target_values = train_df[targets].values
  target_encoder.fit(target_values.reshape(-1))
  train_df.loc[:, targets] = target_encoder.transform(
      target_values.reshape(-1))
else:
  target_encoder = None

In [13]:
# self.features
# self.categorical_features

if categorical_features is None:
  categorical_features = []
  for feature in features:
    if train_df[feature].dtype == "object":
      categorical_features.append(feature)

In [14]:
# self.categorical_features
# self.num_folds
# self.test_filename
# self.output

if len(categorical_features) > 0:
  print("Encoding categorical features")

categorical_encoders = {}
for fold in range(num_folds):
  fold_train = train_df[train_df.kfold != fold].reset_index(drop=True)
  fold_valid = train_df[train_df.kfold == fold].reset_index(drop=True)

  if test_filename is not None:
    test_fold = test_df.copy(deep=True)

  if len(categorical_features) > 0:
    ordinal_encoder = OrdinalEncoder(
        handle_unknown="use_encoded_value", unknown_value=np.nan)
    fold_train[categorical_features] = ordinal_encoder.fit_transform(
        fold_train[categorical_features].values)
    fold_valid[categorical_features] = ordinal_encoder.transform(
        fold_valid[categorical_features].values)
    if test_filename is not None:
      test_fold[categorical_features] = ordinal_encoder.transform(
          test_fold[categorical_features].values)
    categorical_encoders[fold] = ordinal_encoder

  # WHAT'S A FCKING FEATHER? look at `to_feather`
  fold_train.to_feather(os.path.join(output, f"train_fold_{fold}.feather"))
  fold_valid.to_feather(os.path.join(output, f"valid_fold_{fold}.feather"))
  if test_filename is not None:
    test_fold.to_feather(os.path.join(output, f"test_fold_{fold}.feather"))

Encoding categorical features


In [15]:
# save configs
model_config = {}
model_config["idx"] = idx  # self
model_config["features"] = features  # self
model_config["categorical_features"] = categorical_features
model_config["train_filename"] = train_filename  # self
model_config["test_filename"] = test_filename  # self
model_config["output"] = output  # self
model_config["problem"] = problem
model_config["idx"] = idx  # self
model_config["targets"] = targets  # self
model_config["use_gpu"] = use_gpu  # self
model_config["num_folds"] = num_folds  # self
model_config["seed"] = seed  # self
model_config["num_trials"] = num_trials  # self
model_config["time_limit"] = time_limit  # self
model_config["fast"] = fast  # self


In [16]:
import joblib

model_config = ModelConfig(**model_config)
print(f"Model config: {model_config}")
print("Saving model config")
joblib.dump(model_config, f"{output}/axgb.config")

Model config: train_filename='binary_classification.csv' test_filename=None idx='id' targets=['income'] problem=<Problem.binary_classification: 1> output='output' features=['age', 'workclass', 'fnlwgt', 'education', 'education.num', 'marital.status', 'occupation', 'relationship', 'race', 'sex', 'capital.gain', 'capital.loss', 'hours.per.week', 'native.country'] num_folds=5 use_gpu=False seed=42 categorical_features=['workclass', 'education', 'marital.status', 'occupation', 'relationship', 'race', 'sex', 'native.country'] num_trials=100 time_limit=360 fast=False
Saving model config


['output/axgb.config']

In [17]:
print("Saving encoders")
joblib.dump(categorical_encoders,
            f"{output}/axgb.categorical_encoders")
joblib.dump(target_encoder, f"{output}/axgb.target_encoder")

# END: _process_data

Saving encoders


['output/axgb.target_encoder']

In [18]:
from xgboost import XGBClassifier, XGBRegressor

def fetch_model_params(model_config: ModelConfig):
  problem = model_config.problem
  direction = "minimize"
  match problem:
    case [Problem.binary_classification | Problem.multi_label_classification]:
      model = XGBClassifier
      use_predict_probabilities = True
      eval_metric = "logloss"
    case Problem.multi_class_classification:
      model = XGBClassifier
      use_predict_probabilities = True
      eval_metric = "mlogloss"
    case [Problem.single_column_regression | Problem.multi_column_regression]:
      model = XGBRegressor
      use_predict_probabilities = False
      eval_metric = "rmse"
    case other:
      raise NotImplementedError
  
  return model, use_predict_probabilities, eval_metric, direction


In [19]:
from typing import Type, Optional
from optuna import Trial

class Params:
  learning_rate: float
  reg_lambda: float
  reg_alpha: float
  subsample: float
  colsample_bytree: float
  max_depth: int
  early_stopping_rounds: int
  n_estimators: any
  tree_method: any
  gpu_id: Optional[int]
  predictor: Optional[any]
  booster: Optional[any]
  gamma: Optional[float]
  grow_policy: Optional[any]

def get_params(trial: Type[Trial], model_config: Type[ModelConfig]) -> Params:
  params = Params()
  params.learning_rate = trial.suggest_float("learning_rate", 1e-2, 0.25, log=True)
  params.reg_lambda = trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True)
  params.reg_alpha = trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True)
  params.subsample = trial.suggest_float("subsample", 0.1, 1.0)
  params.colsample_bytree = trial.suggest_float("colsample_bytree", 0.1, 1.0)
  params.max_depth = trial.suggest_int("max_depth", 1, 9)
  params.early_stopping_rounds = trial.suggest_int("early_stopping_rounds", 100, 500)
  params.n_estimators = trial.suggest_categorical("n_estimators", [7000, 15000, 20000])

  if model_config.use_gpu:
    params.tree_method = "gpu_hist"
    params.gpu_id = 0
    params.predictor = "gpu_predictor"
  else:
    params.tree_method = trial.suggest_categorical("tree_method", ["exact", "approx", "hist"])
    params.booster = trial.suggest_categorical("booster", ["gbtree", "gblinear"])
    if params.booster == "gbtree":
      params.gamma = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
      params.grow_policy = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])

  return params


  from .autonotebook import tqdm as notebook_tqdm


In [46]:
from dataclasses import dataclass
from typing import Callable, Type, List, TypedDict, Any
from sklearn import metrics as skmetrics
from functools import partial
import copy

MetricsDict = TypedDict('MetricsDict', {
  'auc': Any,
  'logloss': Any,
  'f1': Any,
  'accuracy': Any,
  'mlogloss': Any,
  'r2': Any,
  'mse': Any,
  'mae': Any,
  'rmse': Any,
  'rmsle': Any,
})

@dataclass
class Metrics:
  problem: Problem

  def __post_init__(self):
    values: List[tuple[str, Callable]] = []

    match problem:
      case Problem.binary_classification:
        values.append(('auc', skmetrics.roc_auc_score))
        values.append(('logloss', skmetrics.log_loss))
        values.append(('f1', skmetrics.f1_score))
        values.append(('accuracy', skmetrics.accuracy_score))
        values.append(('precision', skmetrics.precision_score))
        values.append(('recall', skmetrics.recall_score))
      case Problem.multi_class_classification:
        values.append(('logloss', skmetrics.log_loss))
        values.append(('accuracy', skmetrics.accuracy_score))
        values.append(('mlogloss', skmetrics.log_loss))
      case [Problem.single_column_regression, Problem.multi_column_regression]:
        values.append(('r2', skmetrics.r2_score))
        values.append(('mse', skmetrics.mean_squared_error))
        values.append(('mae', skmetrics.mean_absolute_error))
        values.append(('rmse', partial(skmetrics.mean_squared_error, squared=False)))
        values.append(('rmsle', partial(skmetrics.mean_squared_log_error, squared=False)))
      case Problem.multi_label_classification:
        values.append(('r2', skmetrics.log_loss))
      case other:
        raise Exception("Invalid problem type")

    self.values = values

  def calculate(self, y_true: List[Any], y_pred: List[Any]):
    metrics: MetricsDict = {}
    for name, callable in self.values:
      if self.problem == Problem.binary_classification:
        if name == "auc":
          metrics[name] = callable(y_true, y_pred[:, 1])
        if name == "logloss":
          metrics[name] = callable(y_true, y_pred)
        else:
          metrics[name] = callable(y_true, y_pred[:, 1] >= 0.5)
      elif self.problem == Problem.multi_class_classification:
        if name == "accuracy":
          metrics[name] = callable(y_true, np.argmax(y_pred, axis=1))
        else:
          metrics[name] = callable(y_true, y_pred)
      else:
        if name == "rmsle":
          temp_pred = copy.deepcopy(y_pred)
          temp_pred[temp_pred < 0] = 0
          metrics[name] = callable(y_pred, temp_pred)
        else:
          metrics[name] = callable(y_true, y_pred)
    
    return metrics


# test_metrics = Metrics(Problem.binary_classification)
# test_metrics.calculate([[0, 1], [2, 3], [4, 5]], [[0, 1], [2, 3], [4, 5]])


In [None]:
def dict_mean(dict_list):
  mean_dict = {}
  for key in dict_list[0].keys():
    mean_dict[key] = sum(d[key] for d in dict_list / len(dict_list))
  
  return mean_dict

In [47]:
from typing import Literal, Type

def optimize(
  trial: any,
  xgb_model: Type[XGBClassifier] | Type[XGBRegressor],
  predict_probabilities: bool,
  eval_metric: Literal['logloss', 'mlogloss', 'rmse'],
  model_config: Type[ModelConfig],
):
  params = get_params(trial, model_config)
  early_stopping_rounds = params.early_stopping_rounds
  params.early_stopping_rounds = None

  metrics = Metrics(model_config.problem)

  scores = []

  for fold in range(model_config.num_folds):
    train_path = f"train_fold_{fold}.feather"
    valid_path = f"valid_fold_{fold}.feather"
    
    train_feather = pd.read_feather(train_path)
    valid_feather = pd.read_feather(valid_path)

    x_train = train_feather[model_config.features]
    x_valid = valid_feather[model_config.features]

    y_train = train_feather[model_config.targets].values
    y_valid = valid_feather[model_config.targets].values

    # train model
    model = xgb_model(
      random_state=model_config.seed,
      eval_metric=eval_metric,
      use_label_encoder=False,
      **params,
    )

    if model_config.problem in (Problem.multi_column_regression, Problem.multi_label_classification):
      y_pred = []
      models = [model] * len(model_config.targets)

      for idx, _model in enumerate(models):
        _model.fit(
          x_train,
          y_train[:, idx],
          early_stopping_rounds=early_stopping_rounds,
          eval_set=[(x_valid, y_valid[:, idx])],
          verbose=False,
        )

        if model_config.problem == Problem.multi_column_regression:
          temp = _model.predict(x_valid)
        else:
          temp = _model.predict_proba(x_valid)[:, 1]
        y_pred.append(temp)
      
      y_pred = np.column_stack(y_pred)

    else: 
      model.fit(
        x_train,
        y_train,
        early_stopping_rounds=early_stopping_rounds,
        eval_set=[(x_valid, y_valid)],
        verbose=False,
      )

      if predict_probabilities:
        y_pred = model.predict_proba(x_valid)
      else:
        y_pred = model.predict(x_valid)

    metrics: MetricsDict = dict_mean(scores)
    print(f"Metrics: {metrics}")

    return metrics[eval_metric]

  return

In [49]:
from functools import partial
import optuna

def train_model(model_config: ModelConfig):
  model, use_predict_probabilities, eval_metric, direction = fetch_model_params(
      model_config)

  optimize_function = partial(
    optimize,
    xgb_model=model,
    use_predict_probabilities=use_predict_probabilities,
    eval_metric=eval_metric,
    model_config=model_config
  )

  db_path = os.path.join(model_config.output, "params.db")
  study = optuna.create_study(
    direction=direction,
    study_name="testtesttest",
    storage=f"sqlite:///{db_path}",
    load_if_exists=True
  )

  study.optimize(
    optimize_function, 
    n_trials=model_config.num_trials,
    timeout=model_config.time_limit,
  )

  return study.best_params