In [None]:
import boto3
from io import BytesIO
import joblib
import numpy as np
import pandas as pd
import pandas as pd
import plotly.express as px
import ray
from ray import tune
from ray.tune.callback import Callback
from ray.tune.suggest.bohb import TuneBOHB
from ray.tune.schedulers import HyperBandForBOHB
from scipy.stats import loguniform, randint, uniform
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold
import tqdm
from tqdm.notebook import trange, tqdm
from tune_sklearn import TuneSearchCV
from xgboost import XGBClassifier

import logging
logging.disable(logging.INFO)
logging.disable(logging.WARNING)
import warnings
warnings.filterwarnings("ignore")
import xgboost as xgb
xgb.set_config(verbosity=0)

import os
os.environ["TUNE_DISABLE_AUTO_CALLBACK_SYNCER"] = "1"

class TqdmCallback(Callback):
    def setup(self,
              stop = None,
              num_samples = None,
              total_num_samples = None,
              **info):
        self.pbar = tqdm(total=total_num_samples)

    def on_trial_complete(self, **info):
        self.pbar.update(1)

    def on_experiment_end(self, **info):
        self.pbar.close()

In [None]:
def plot_cv_score(analysis):
    df = analysis.results_df[["average_test_score", "timestamp"]]
    df["timestamp"] = pd.to_datetime(df['timestamp'], unit='s')
    df.set_index("timestamp", inplace=True)
    df.sort_index(inplace=True)
    df["cummax_cv_score"] = df["average_test_score"].cummax()
    df = df[~df.index.duplicated(keep="last")]
    df = df["cummax_cv_score"].resample("1S").bfill()
    fig = px.line(df, y="cummax_cv_score")
    fig.show()

In [None]:
DATA_URL = "https://ray-ci-higgs.s3.us-west-2.amazonaws.com/" \
                      "safe_driver.csv"

train_df = pd.read_csv(DATA_URL, dtype={'id': np.int32, 'target': np.int8})

y = train_df['target'].values
X = train_df.drop(['target', 'id'], axis=1)

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=1234)

In [None]:
def print_test_score(model, X_test, y_test):
    y_pred = model.predict_proba(X_test)
    roc_auc = roc_auc_score(y_test, y_pred[:,1])
    print("**************** roc_auc score: {} ****************".format(roc_auc))

def train_model_and_print_test_score(model, X_train, y_train, X_test, y_test):
    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=1234)
    run_cv = RandomizedSearchCV(model, param_distributions= {}, n_iter=1, scoring='roc_auc', n_jobs=-1, cv=skf.split(X_train,y_train), verbose=0, random_state=1001)
    run_cv.fit(X_train, y_train)
    print_test_score(run_cv.best_estimator_, X_test, y_test)

In [None]:
X_train.shape

In [None]:
model = XGBClassifier(objective='binary:logistic', n_jobs=1, eval_metric='auc', random_state=1234, verbosity=0, use_label_encoder=False)

In [None]:
train_model_and_print_test_score(model, X_train, y_train, X_test, y_test)

## Now let's see how to do a distributed HPO using a Ray cluster!

In [None]:
ray.init(address="auto")

In [None]:
ray.cluster_resources()['CPU']

In [None]:
params = {
        "max_depth": randint(1, 5),
        "min_child_weight": loguniform(0.001, 128),
        "subsample": uniform(0.1, 1.0),
        "colsample_bylevel": uniform(0.01, 1.0),
        "colsample_bytree": uniform(0.01, 1.0),
        "reg_alpha": loguniform(1 / 1024, 10.0),
        "reg_lambda": loguniform(1 / 1024, 10.0),
        "scale_pos_weight": [1, 26],
}

gs = RandomizedSearchCV(
    model, 
    params,
    cv=3,
    n_iter=100,  # TODO: change to n_trials
    scoring='roc_auc', 
    n_jobs=-1,  # TODO: change to 40verbose=0,
#     early_stopping=True,
#     max_iters=10,
#     search_optimization="optuna",
)

gs.fit(X_train, y_train) # , tune_params=dict(callbacks=[TqdmCallback()]))

## Results can be accessed similarly through `best_estimator_`. Tune also provides `ExperiementAnalysis` object.

In [None]:
ray.shutdown()