In [1]:
import pandas as pd
import itertools
import xgboost as xgb
from pyspark.sql import functions as F, Row

import numpy as np
from abc import ABC, abstractmethod

In [2]:
class ParamGrid(ABC):    
    def __init__(self, paramDict):
        self.paramGrid = pd.DataFrame(list(itertools.product(*[paramDict[x] for x in paramDict])), columns=paramDict.keys())

    def shuffleGrid(self, seed=None):
        if seed is not None:
            self.paramGrid = self.paramGrid.sample(frac=1, random_state=seed).reset_index(drop=True)
        else:
            self.paramGrid = self.paramGrid.sample(frac=1).reset_index(drop=True)
    
    @abstractmethod
    def _extractPerformance(self):
        # TODO: RETURN MODEL PERFOMRANCE DICTIONARY, OR SOMETHING THAT CAN BE REDUCED INTO A DATAFRAME
        pass
    
    @abstractmethod
    def _fitModelOnWorker(self, params, data_file, target):
        pass
    
    def evaluate(self, data, target, sparkSession, max_permutations=None, shuffle=False, shuffle_seed=None):
        if shuffle:
            self.shuffleGrid(shuffle_seed)
        if max_permutations is None:
            max_permutations = len(self.paramGrid)
        else:
            max_permutations = np.min([len(self.paramGrid), max_permutations])
        
        sparkSession.sparkContext.addFile(data)
        data_file = os.path.split(data)[1]
        paramDataframe = sparkSession.createDataFrame(self.paramGrid)
        param_subset = paramDataframe.limit(int(max_permutations))
        param_subset = param_subset.repartition(int(max_permutations))
        
        columns = [i for i in param_subset.columns]
        
        distributed_models = param_subset.rdd.mapPartitions(lambda x: self._fitModelOnWorker(
                columns, list(x)[0], data_file, target))
        return distributed_models
        
class XgboostParamGridCV(ParamGrid):
    def __init__(self, paramDict, rounds=50, nfold=3, fold_col=None, obj='reg:linear', early_stop=5, 
                 metrics=['rmse'], metrics_maximize=[False], missing=None):
        self.paramGrid = pd.DataFrame(list(itertools.product(*[paramDict[x] for x in paramDict])), columns=paramDict.keys())
        self.rounds = rounds
        self.nfold = nfold
        self.fold_col = fold_col
        self.obj = obj
        self.metrics = metrics
        self.metrics_maximize = metrics_maximize
        self.early_stop = early_stop
        self.missing = missing
        
        
    def _fitModelOnWorker(self, param_names, params, data_file, target):
        
        X = pd.read_pickle(data_file)
        
        if self.fold_col in X.columns:
            folds = []
            for i in X[self.fold_col].drop_duplicates().dropna().sort_values():
                folds.append( ([x for x,y in enumerate(X[self.fold_col].values) if y != i],
                                  [x for x,y in enumerate(X[self.fold_col].values) if y == i]))
            fold_arg = len(folds)
        else:
            folds = None
            fold_arg = self.nfold
            
        cols = [col for col in X.columns if col not in [target, self.fold_col]]

        X = xgb.DMatrix(X[cols], label=X[target].values, missing=self.missing, feature_names=cols)
        
        performance = []
        p = dict(zip(param_names, params))
        p['objective'] = self.obj
        result = xgb.cv(params = p,
                            dtrain = X,
                            num_boost_round = self.rounds,
                            nfold = fold_arg,
                            folds = folds,
                            early_stopping_rounds = self.early_stop,
                            metrics = self.metrics)
        
        return [self._extractPerformance(p, result, metric=self.metrics[-1], maximize=self.metrics_maximize[-1])]
    
    def _extractPerformance(self, p, perf, metric, maximize):
        # p: dictionary of parameters
        # perf: pandas dataframe of performance
        param_dict = p.copy()

        optimize_col = "test-" + metric + "-mean" 
        optimal_index = perf[optimize_col].argmax() if maximize else perf[optimize_col].argmin()

        performance_dict = perf.loc[optimal_index].to_dict()

        param_dict.update(performance_dict)

        return Row(**param_dict)

In [3]:
# params = {
#     'max_depth': [2, 4, 6, 8, 10, 12, 14],
#     'min_child_weight': [2**i for i in range(-3, 4)],
#     'subsample': [0.25, 0.5, 1],
#     'colsample_bytree': [0.5, 0.8, 1],
#     'colsample_bylevel': [0.25, 0.5, 0.75, 1],
#     'colsample_bynode': [0.25, 0.5, 0.75, 1],
#     'scale_pos_weight': [0.5, 1, 1.5]
# }

In [4]:
# params = {
#     'gamma' : np.linspace(0.1, 10, 100),
#     'lambda' : np.linspace(0.1, 10, 100),
#     'alpha' : np.linspace(0.1, 10, 100),
# }

In [24]:
df_params = pd.read_csv("old_params.csv")

In [26]:
df_params["eta"] = 0.01

In [27]:
df_params

Unnamed: 0,colsample_bylevel,colsample_bynode,colsample_bytree,max_depth,min_child_weight,eta
0,1.0,0.75,0.8,6,0.25,0.01
1,0.75,0.5,1.0,6,4.0,0.01
2,0.75,1.0,0.8,6,4.0,0.01
3,0.75,0.75,0.8,6,0.125,0.01
4,1.0,0.5,0.5,6,4.0,0.01
5,1.0,0.5,0.8,6,1.0,0.01
6,1.0,1.0,0.8,6,1.0,0.01
7,0.75,1.0,1.0,6,8.0,0.01
8,0.75,0.75,1.0,6,8.0,0.01
9,1.0,0.75,0.8,6,2.0,0.01


In [20]:
# df = pd.read_pickle("fit_data_classification_preprocessed.pkl")

In [21]:
# df[~pd.isnull(df["fold"])].to_pickle("fit_data_classification_preprocessed_no_null.pkl")

In [28]:
temp = XgboostParamGridCV(paramDict=params, rounds=10000, early_stop=10, obj='binary:logistic', 
                          metrics=['auc', 'logloss'], fold_col='fold', missing=999999)

In [29]:
temp.paramGrid = df_params

In [30]:
performance = temp.evaluate("fit_data_classification_preprocessed_no_null.pkl", "closed", spark,
                            shuffle=True, shuffle_seed=1, max_permutations=None)

In [31]:
%%time
df_result = performance.toDF().toPandas()
df_result.to_csv("performance_grid_search_final.csv", index=False)

CPU times: user 2.43 s, sys: 997 ms, total: 3.42 s
Wall time: 6h 4min 17s


In [32]:
df_result

Unnamed: 0,colsample_bylevel,colsample_bynode,colsample_bytree,eta,max_depth,min_child_weight,objective,test-auc-mean,test-auc-std,test-logloss-mean,test-logloss-std,train-auc-mean,train-auc-std,train-logloss-mean,train-logloss-std
0,1.0,1.0,0.8,0.01,6,1.0,binary:logistic,0.83402,0.002934,0.355534,0.00181,0.860843,0.000343,0.332606,0.00035
1,0.75,0.5,1.0,0.01,6,4.0,binary:logistic,0.834461,0.002773,0.35516,0.001685,0.862936,0.000299,0.33064,0.000323
2,0.75,1.0,0.8,0.01,6,4.0,binary:logistic,0.834374,0.002819,0.355248,0.001706,0.859838,0.000241,0.333492,0.000271
3,1.0,0.75,0.8,0.01,6,0.25,binary:logistic,0.834097,0.002891,0.355466,0.001766,0.861882,0.000395,0.331672,0.000337
4,1.0,0.75,0.8,0.01,6,2.0,binary:logistic,0.834361,0.00291,0.355253,0.001773,0.863505,0.00037,0.330207,0.000323
5,1.0,0.5,0.5,0.01,6,4.0,binary:logistic,0.834234,0.002783,0.355368,0.001653,0.859053,0.000249,0.334375,0.000262
6,0.75,1.0,1.0,0.01,6,8.0,binary:logistic,0.834499,0.002847,0.355139,0.001689,0.862765,0.000223,0.330758,0.000235
7,0.75,0.75,1.0,0.01,6,8.0,binary:logistic,0.834499,0.002847,0.355139,0.001689,0.862765,0.000223,0.330758,0.000235
8,1.0,0.5,0.8,0.01,6,1.0,binary:logistic,0.83402,0.002934,0.355534,0.00181,0.860843,0.000343,0.332606,0.00035
9,0.75,0.75,0.8,0.01,6,0.125,binary:logistic,0.834646,0.002892,0.354997,0.001783,0.86786,0.000401,0.326288,0.000333
