<h2>Import Libraries</h2>

In [1]:
from scjpnlib.utils.file_io import FileManager
import scjpnlib.utils as scjpnutils
from IPython.core.display import HTML, Markdown

import pandas as pd
import numpy as np

from sklearn.model_selection import GridSearchCV, cross_val_score
import dask_ml.model_selection as dcv
from dask.distributed import Client
import joblib

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier 
# import xgboost as xgb
from xgboost import XGBClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

%load_ext autoreload
%autoreload 2

K = 3 # num folds for cross-val
n_jobs = 8

## Read Configs for this Experiment

In [2]:
fm = FileManager()

models_config = fm.load_json('models-config.txt')
models_config

{'SEED': 42,
 'DecisionTreeClassifier': {'trials': {'run': False,
   'array': [{'gridsearch': {'run': False,
      'param_grid': {'criterion': ['entropy', 'gini'],
       'splitter': ['best'],
       'max_depth': [10, 50, 75, None],
       'min_samples_split': [2],
       'max_features': ['auto', 'sqrt', 'log2']},
      'last_best': {'criterion': 'entropy',
       'splitter': 'best',
       'max_depth': None,
       'min_samples_split': 2,
       'max_features': 'sqrt'}}}]},
  'params': {'criterion': 'entropy',
   'splitter': 'best',
   'max_depth': None,
   'min_samples_split': 2,
   'max_features': 'sqrt'}},
 'RandomForestClassifier': {'trials': {'run': False,
   'array': [{'gridsearch': {'run': False,
      'param_grid': {'bootstrap': [True, False],
       'criterion': ['entropy', 'gini'],
       'max_features': ['auto', 'sqrt', 'log2'],
       'max_depth': [10, 50, 75, None],
       'n_estimators': [100, 500, 1000]},
      'last_best': {'bootstrap': True,
       'criterion': 'entro

In [3]:
is_data_cached = 'data_cached' in models_config
data_config = models_config['data_cached'] if is_data_cached else fm.load_json('eda-config.txt')
digest = data_config['digest'] if is_data_cached else scjpnutils.json_to_md5_hash_digest(data_config)

In [4]:
print(f"EDA description: {'CACHED ' if is_data_cached else ''}(digest: {digest}): {data_config['eda_desc']['short']}")

EDA description: (digest: afdac7327a7b30faeede4a7e88650a6e): 0.10 test_ratio, flat insig cat hat handling with threshold 10


<p><br>
<h2>Load TEST/TRAIN Data</h2>

In [5]:
SEED = models_config['SEED']

In [6]:
fname = scjpnutils.get_data_fname(data_config, data_kwargs={'is_labels':False,'type':'train','is_cached':is_data_cached})
data_train = pd.read_csv(fname, index_col=0)
print(f"loaded {fname}\n")
data_train.info()

loaded wrangled-labeled-data-train-afdac7327a7b30faeede4a7e88650a6e.csv

<class 'pandas.core.frame.DataFrame'>
Int64Index: 53460 entries, 44928 to 56422
Columns: 107 entries, installer_distr to pump_age
dtypes: float64(107)
memory usage: 44.0 MB


In [7]:
fname = scjpnutils.get_data_fname(data_config, data_kwargs={'is_labels':True,'type':'train','is_cached':is_data_cached})
y_train = pd.read_csv(fname, index_col=0)
print(f"loaded {fname}\n")
y_train.info()

loaded labels-train-afdac7327a7b30faeede4a7e88650a6e.csv

<class 'pandas.core.frame.DataFrame'>
Int64Index: 53460 entries, 44928 to 56422
Data columns (total 1 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   status_group  53460 non-null  object
dtypes: object(1)
memory usage: 835.3+ KB


In [8]:
classes = y_train.status_group.unique()

In [9]:
fname = scjpnutils.get_data_fname(data_config, data_kwargs={'is_labels':False,'type':'test','is_cached':is_data_cached})
data_test = pd.read_csv(fname, index_col=0)
print(f"loaded {fname}\n")
data_test.info()

loaded wrangled-labeled-data-test-afdac7327a7b30faeede4a7e88650a6e.csv

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5940 entries, 2980 to 26085
Columns: 107 entries, installer_distr to pump_age
dtypes: float64(107)
memory usage: 4.9 MB


In [10]:
fname = scjpnutils.get_data_fname(data_config, data_kwargs={'is_labels':True,'type':'test','is_cached':is_data_cached})
y_test = pd.read_csv(fname, index_col=0)
print(f"loaded {fname}\n")
y_test.info()

loaded labels-test-afdac7327a7b30faeede4a7e88650a6e.csv

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5940 entries, 2980 to 26085
Data columns (total 1 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   status_group  5940 non-null   object
dtypes: object(1)
memory usage: 92.8+ KB


<p><br>
<h2>Models</h2>

<h3>General functions for building Classifiers and running trials</h3>

In [11]:
def gs_find_best_params(clf, param_grid):
    display(HTML(f"param_grid for {type(clf)} GridSearch:<br><pre>{params}</pre>"))
    grid_clf = GridSearchCV(
        clf, 
        param_grid, 
        cv=K, 
        n_jobs=-1
        , verbose=20
    )
    # with joblib.parallel_backend('dask'):
    #     %time _ = grid_clf.fit(data_train, y_train)
    %time _ = grid_clf.fit(data_train, y_train)
    return grid_clf.best_params_

In [12]:
def clf_fit(clf, data_train, y_train):
    # with joblib.parallel_backend('dask'):
    #     %time clf.fit(data_train, y_train)
    %time clf.fit(data_train, y_train)
    return clf

In [13]:
def summarize_preds(X, y, preds, dataset_name, classes):
    display(HTML("<p><br>"))
    _accuracy = accuracy_score(y, preds)
    display(HTML(f"{dataset_name} Accuracy: {round(_accuracy*100,4)}"))
    display(HTML("<p><br>"))
    display(HTML(f"<pre>{classification_report(y, preds, target_names=classes)}</pre>"))

In [14]:
def clf_run_trial(clf, params, run_trials_gridsearch):
    best_parameters = gs_find_best_params(clf, params) if run_trials_gridsearch else params

    display(HTML("<p><br>"))
    display(HTML(f"Grid Search {'(previously) ' if not run_trials_gridsearch else ''}found the following optimal parameters: "))
    s_best_params = ""
    for param_name in list(best_parameters.keys()):
        s_best_params += f"\t{param_name}: {best_parameters[param_name]}\n"
    display(HTML(f"<pre>{s_best_params}</pre>"))

    display(HTML("<p><br>"))
    display(HTML("Fitting classifer..."))
    clf = clf.set_params(**best_parameters)
    clf = clf_fit(clf, data_train, y_train)
    s_all_done = "\tALL DONE!"
    display(HTML(f"<pre>{s_all_done}</pre>"))

    display(HTML("<p><br>"))
    display(HTML("Predicting labels on training data..."))
    pred_train = clf.predict(data_train)
    display(HTML(f"<pre>{s_all_done}</pre>"))
    summarize_preds(data_train, y_train, pred_train, 'Training', classes)

    # display(HTML("<p><br>"))
    # display(HTML("Computing cross-val score on training data..."))
    # cv_score_train = cross_val_score(clf, data_train, y_train, cv=K)
    # display(HTML(f"<pre>{s_all_done}</pre>"))
    # display(HTML(f"cross_val_score: {np.mean(cv_score_train)}"))

    display(HTML("<p><br>"))
    display(HTML(f"Predicting labels on testing data..."))
    pred_test = clf.predict(data_test)
    display(HTML(f"<pre>{s_all_done}</pre>"))
    summarize_preds(data_test, y_test, pred_test, 'Testing', classes)
    
    # display(HTML("<p><br>"))
    # display(HTML(f"Computing cross-val score on testing data..."))
    # cv_score_test = cross_val_score(clf, data_test, y_test, cv=K)
    # display(HTML(f"<pre>{s_all_done}</pre>"))
    # display(HTML(f"cross_val_score: {np.mean(cv_score_test)}"))

    return clf, best_parameters

<p><br>
<h2>Initialize Dask-Client (to Dask backend for parallelization) <i>(DISABLED for now)</i></h2>

In [15]:
# # local
# # dask_client = Client(n_workers=2, threads_per_worker=8, memory_limit='8GB') #spawns a local cluster; memory_limit is per worker
# dask_client = Client(n_workers=1, threads_per_worker=n_jobs, memory_limit='16GB') #spawns a local cluster; memory_limit is per worker

# # for Kubernetes dask scheduler/worker cluster in GCP - but this costs money to run the cluster AND requires a lot more work for data parallelization!
# # scheduler_address = '35.230.13.87'
# # dask_client = Client(f'tcp://{scheduler_address}:8786')

# dask_client

<p><br>
<h3>Decision Tree Classifier</h3>
<h4>Trials</h4>

In [16]:
trials = models_config['DecisionTreeClassifier']['trials']

display(HTML(f"models_config['DecisionTreeClassifier']['trials']['run']: {trials['run']}"))
if trials['run']:
    trials_list = trials['array']

    for i, trial in enumerate(trials_list):
        display(HTML(f"trial[{i}]['gridsearch']['run']: {trial['gridsearch']['run']}"))
        params = trial['gridsearch']['last_best'] if not trial['gridsearch']['run'] else trial['gridsearch']['param_grid']
        rfclf, best_parameters = clf_run_trial(DecisionTreeClassifier(), params, trial['gridsearch']['run']) # note that best_parameters will be set to those used in the last trial

else:
    best_parameters = models_config['DecisionTreeClassifier']['params']

<p><br>
<h4>Build Final Model with best params</h4>

In [17]:
_, dtclf = clf_run_trial(DecisionTreeClassifier(), best_parameters, run_trials_gridsearch=False);

CPU times: user 243 ms, sys: 14.1 ms, total: 257 ms
Wall time: 258 ms


<p><br>
<h3>Random Forest Classifier</h3>
<h4>Trials</h4>

In [18]:
trials = models_config['RandomForestClassifier']['trials']

display(HTML(f"models_config['RandomForestClassifier']['trials']['run']: {trials['run']}"))
if trials['run']:
    trials_list = trials['array']

    for i, trial in enumerate(trials_list):
        display(HTML(f"trial[{i}]['gridsearch']['run']: {trial['gridsearch']['run']}"))
        params = trial['gridsearch']['last_best'] if not trial['gridsearch']['run'] else trial['gridsearch']['param_grid']
        if trial['gridsearch']['run']:
            params.update({'n_jobs': [-1]})
        else:
            params.update({'n_jobs':-1})
        rfclf, best_parameters = clf_run_trial(RandomForestClassifier(), params, trial['gridsearch']['run']) # note that best_parameters will be set to those used in the last trial

else:
    best_parameters = models_config['RandomForestClassifier']['params']

<p><br>
<h4>Build Final Model with best params</h4>

In [19]:
best_parameters.update({'n_jobs':-1, 'verbose':1})
rfclf, _ = clf_run_trial(RandomForestClassifier(), best_parameters, run_trials_gridsearch=False);

[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  34 tasks      | elapsed:    2.1s
[Parallel(n_jobs=-1)]: Done 184 tasks      | elapsed:   11.7s
[Parallel(n_jobs=-1)]: Done 434 tasks      | elapsed:   24.6s
[Parallel(n_jobs=-1)]: Done 784 tasks      | elapsed:   38.7s
CPU times: user 4min 24s, sys: 5.91 s, total: 4min 30s
Wall time: 48.3 s
[Parallel(n_jobs=-1)]: Done 1000 out of 1000 | elapsed:   47.6s finished


[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.3s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    1.4s
[Parallel(n_jobs=8)]: Done 434 tasks      | elapsed:    3.5s
[Parallel(n_jobs=8)]: Done 784 tasks      | elapsed:    5.7s
[Parallel(n_jobs=8)]: Done 1000 out of 1000 | elapsed:    7.0s finished


[Parallel(n_jobs=8)]: Using backend ThreadingBackend with 8 concurrent workers.
[Parallel(n_jobs=8)]: Done  34 tasks      | elapsed:    0.1s
[Parallel(n_jobs=8)]: Done 184 tasks      | elapsed:    0.3s
[Parallel(n_jobs=8)]: Done 434 tasks      | elapsed:    0.5s
[Parallel(n_jobs=8)]: Done 784 tasks      | elapsed:    0.9s
[Parallel(n_jobs=8)]: Done 1000 out of 1000 | elapsed:    1.1s finished


<p><br><br><br>
<h3>XGBClassifier</h3>
<h4>Trials</h4>

In [20]:
trials = models_config['XGBClassifier']['trials']

display(HTML(f"models_config['XGBClassifier']['trials']['run']: {trials['run']}"))
if trials['run']:
    trials_list = trials['array']

    for i, trial in enumerate(trials_list):
        display(HTML(f"trial[{i}]['gridsearch']['run']: {trial['gridsearch']['run']}"))
        params = trial['gridsearch']['last_best'] if not trial['gridsearch']['run'] else trial['gridsearch']['param_grid']
        if trial['gridsearch']['run']:
            params.update({'n_jobs': [-1]})
        else:
            params.update({'n_jobs':-1})
        xgbclf, best_parameters = clf_run_trial(XGBClassifier(), params, trial['gridsearch']['run']) # note that best_parameters will be set to those used in the last trial

else:
    best_parameters = models_config['RandomForestClassifier']['params']

<p><br>
<h4>Build Final Model with best params</h4>

In [21]:
best_parameters.update({'n_jobs':-1, 'verbosity':1})
xgbclf, _ = clf_run_trial(XGBClassifier(), best_parameters, run_trials_gridsearch=False)

CPU times: user 5h 5min 45s, sys: 2min 26s, total: 5h 8min 12s
Wall time: 51min 12s


In [22]:
# # Voting Classifier with hard voting 
# vot_hard = VotingClassifier(estimators=voting_classifers, voting ='hard') 
# vot_hard.fit(data_train, y_train)  
# pred_test = vot_hard.predict(data_test)

# # using accuracy_score metric to predict accuracy 
# score = accuracy_score(y_test, pred_test) 
# print("Hard Voting Score % d" % score) 
  
# # Voting Classifier with soft voting 
# vot_soft = VotingClassifier(estimators=voting_classifers, voting ='soft') 
# vot_soft.fit(data_train, y_train)  
# pred_test = vot_soft.predict(data_test)
  
# # using accuracy_score 
# score = accuracy_score(y_test, pred_test) 
# print("Soft Voting Score % d" % score) 