# Scikit learn and XGBoost with Dask

In this notebook, we will need a dataset available in my S3. it's a public folder, so feel free to download the data:

- X_train: https://tutorial-machine-learning.s3.eu-west-3.amazonaws.com/Dask-dataset/X_train.csv
- Y_train: https://tutorial-machine-learning.s3.eu-west-3.amazonaws.com/Dask-dataset/y_train.csv

The dataset contain 53630 observations each. The train set has 40 features, among which V_0, V_1 and V_2 are object. 

In this notebook, I only aim at showing you how to run a random forest and an XGBoost on a fargate cluster using Dask and scikit learn/XGboost. I don't intend to create the best ML model. Besides, the dataset is too small to learn something useful since the target is highly imbalanced.

# Prepare the cluster

We need to set the cluster with `dask_cloudprovider` librairy. It can take few minutes to run. If the number of cluster is not enough, you can increase it with `cluster.scale(4)`. 

If you want to run on a local cluster, you can use this command

```
client = Client(processes=False,
                threads_per_worker=4,
                n_workers=1)  # set up local cluster on your laptop
```

When the cluster is open you can monitor it with the provided URL. For instance: `http://3.9.190.245:8787/status`

In [1]:
import numpy as np
from dask.distributed import Client
import dask.dataframe as dd

In [2]:
from dask_cloudprovider import FargateCluster
cluster = FargateCluster(n_workers=4,
                         image='thomaspernet/dask-container:py-38'
                        )
client = Client(cluster)
client

  next(self.gen)


0,1
Client  Scheduler: tcp://3.9.190.245:8786  Dashboard: http://3.9.190.245:8787/status,Cluster  Workers: 4  Cores: 16  Memory: 64.00 GB


In [3]:
#### X train 
p_x = "https://tutorial-machine-learning.s3.eu-west-3.amazonaws.com/" \
"Dask-dataset/X_train.csv"
X_train= (
    dd.read_csv(p_x,
                low_memory=False,
               dtype={'V_0': 'category', ### Important !
                      'V_0':'category', ### Important !
                      'V_0':'category' ### Important !
                     }
               )   
    .categorize()
)

#### Y train
p_y = "https://tutorial-machine-learning.s3.eu-west-3.amazonaws.com/" \
"Dask-dataset/y_train.csv"
y_train= (
    dd.read_csv(p_y,
                low_memory=False,
                #usecols = features,
               )
)

## Prepare the pipeline line

Our pipeline is simple:

1. Create robust standardise value-> Large outliers so better to use robust
2. Convert category to one hot encoder. Note that, to make it works, we must load the object features as `category`
3. Build the learner

In [12]:
from dask_ml.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold, RepeatedStratifiedKFold

In [5]:
X_train, X_test, y_train, y_test = train_test_split(X_train,
                                                    y_train,
                                                    shuffle=True,
                                                    random_state=0)



Let's check we are using Dask DataFrame

In [6]:
X_train

Unnamed: 0_level_0,V_0,V_1,V_2,V_3,V_4,V_5,V_6,V_7,V_8,V_9,V_10,V_11,V_12,V_13,V_14,V_15,V_16,V_17,V_18,V_19,V_20,V_21,V_22,V_23,V_24,V_25,V_26,V_27,V_28,V_29,V_30,V_31,V_32,V_33,V_34,V_35,V_36,V_37,V_38,V_39
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1
,category[known],category[known],category[known],float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Our test set has 105 label equals to 1. 

In [38]:
y_test.compute()['target'].value_counts()

0    5329
1     105
Name: target, dtype: int64

In [7]:
from sklearn import set_config
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import RobustScaler, OneHotEncoder
from sklearn.compose import make_column_transformer
from sklearn.metrics import confusion_matrix, classification_report
from dask_ml.model_selection import GridSearchCV, RandomizedSearchCV

In [9]:
feat_obj = (X_train
            .dtypes
            .loc[lambda x : 
                 (x =='category') 
                &(x.index != 'status')]
            .index
           )
feat_cont = (X_train
            .dtypes
            .loc[lambda x : x =='float64']
            .index
           )

In [10]:
num_proc = make_pipeline(
    RobustScaler()
    #StandardScaler()
)
cat_proc = make_pipeline(
    OneHotEncoder()
)
preprocessor = make_column_transformer(
    (num_proc, tuple(feat_cont)),
    (cat_proc, tuple(feat_obj))
)

In [13]:
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1) 

## Model 1: Random forest

### Pipeline

- Step 1
  - Call pipeline
- Step 2
  - Stratified K-fold
- Step 3
  - Evaluate the model
  
We'll fit a [random forest classfier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $n$ number of estimators hyperparameter.

In [14]:
from sklearn.ensemble import RandomForestClassifier 

In [15]:
clf = make_pipeline(preprocessor,
                    RandomForestClassifier(random_state=0))
param_grid = {
    "randomforestclassifier__n_estimators": [300, 400],
}
# Create grid search -> Use 
random_search = RandomizedSearchCV(clf,
                                   param_grid,
                                   cv=cv,
                                   scoring = 'recall') 

Scikit-learn uses joblib for single-machine parallelism. This lets you train most estimators (anything that accepts an n_jobs parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism. This lets you train those estimators using all the cores of your cluster without significantly changing your code.

This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below).

We fit 2 different models, with 30 repeatitions each, one for each hyper-parameter combination in param_grid, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

![](https://github.com/thomaspernet/Dask_cluster_aws_Docker/blob/master/notebooks_example/images/random_forest.gif?raw=true)

In [20]:
%%time
import joblib

with joblib.parallel_backend('dask'):
    randomtree = random_search.fit(X_train, y_train) 



CPU times: user 890 ms, sys: 238 ms, total: 1.13 s
Wall time: 3min 49s


It took 3 minutes and 49 seconds to train and the model managed to find 24 over the 101 label 1, or a recall of merely 20%.

If you want to compute the confusion matrix, or make the prediction, it's important to add `.compute()` before so that we ask Dask to perform the operation

In [42]:
conf_mat =confusion_matrix(y_test, randomtree.predict(X_test.compute()))    
    
dic_metrics = {
    "classifier": list(randomtree.best_estimator_.named_steps.keys())[-1],
    "best_params": randomtree.best_params_,
    "score": [
        {
            "metric": 'recall',
            'mean_test_score':list(randomtree.cv_results_['mean_test_score']),
            'std_test_score':list(randomtree.cv_results_['std_test_score']),
            "confusion_matrix": dict(enumerate(conf_mat.flatten(), 1)),
            "classification_report": classification_report(
                y_test,
                randomtree.predict(X_test.compute()),
                target_names=["Not User", "User"],
                output_dict=True,
            ),
        }
    ],
}
dic_metrics

{'classifier': 'randomforestclassifier',
 'best_params': {'randomforestclassifier__n_estimators': 300},
 'score': [{'metric': 'recall',
   'mean_test_score': [0.19306603094254246, 0.18895009522742978],
   'std_test_score': [0.04715511718050672, 0.04477014534307923],
   'confusion_matrix': {1: 5325, 2: 4, 3: 81, 4: 24},
   'classification_report': {'Not User': {'precision': 0.9850166481687015,
     'recall': 0.9992493901294802,
     'f1-score': 0.992081974848626,
     'support': 5329},
    'User': {'precision': 0.8571428571428571,
     'recall': 0.22857142857142856,
     'f1-score': 0.3609022556390978,
     'support': 105},
    'accuracy': 0.9843577475156422,
    'macro avg': {'precision': 0.9210797526557792,
     'recall': 0.6139104093504544,
     'f1-score': 0.6764921152438619,
     'support': 5434},
    'weighted avg': {'precision': 0.9825457707197295,
     'recall': 0.9843577475156422,
     'f1-score': 0.9798858264281253,
     'support': 5434}}}]}

## Model 2: XGboost

We'll fit a [XGBoost classfier](https://xgboost.readthedocs.io/en/latest/python/python_api.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $n$ number of estimators hyperparameter.

Note that, I didn't manage to make `dask_ml.xgboost` works on the cluster, so I rely on the official librairy

In [30]:
import xgboost as xgb
from xgboost.sklearn import XGBClassifier
set_config(display='diagram')

In [27]:
param_grid = {
    
 'xgbclassifier__n_estimators':[100, 250],
}

random_search = GridSearchCV(clf, param_grid, cv=cv, scoring="recall")

In [31]:
clf = make_pipeline(
    preprocessor,
    XGBClassifier(
 learning_rate =0.1,
 n_estimators=1000,
 max_depth=5,
 min_child_weight=13,
 gamma=0,
 subsample=0.8,
 colsample_bytree=0.8,
 objective= 'binary:logistic')
)
random_search = GridSearchCV(clf, param_grid, cv=cv, scoring="recall")

![](https://github.com/thomaspernet/Dask_cluster_aws_Docker/blob/master/notebooks_example/images/XGboost.gif?raw=true)

In [39]:
%%time
import joblib

with joblib.parallel_backend('dask'):
    xgboost_bm = random_search.fit(X_train, y_train)



CPU times: user 505 ms, sys: 77.5 ms, total: 582 ms
Wall time: 2min 19s


It took 2 minutes and 19 seconds to train and the model managed to find 46 over the 101 label 1, or a recall of merely 43%, but overfit since the recall on the train is about 33% on average. 

In [40]:
conf_mat =confusion_matrix(y_test, xgboost_bm.predict(X_test.compute()))    
    
dic_metrics = {
    "classifier": list(xgboost_bm.best_estimator_.named_steps.keys())[-1],
    "best_params": xgboost_bm.best_params_,
    "score": [
        {
            "metric": 'recall',
            'mean_test_score':list(xgboost_bm.cv_results_['mean_test_score']),
            'std_test_score':list(xgboost_bm.cv_results_['std_test_score']),
            "confusion_matrix": dict(enumerate(conf_mat.flatten(), 1)),
            "classification_report": classification_report(
                y_test,
                xgboost_bm.predict(X_test.compute()),
                target_names=["Not User", "User"],
                output_dict=True,
            ),
        }
    ],
}
dic_metrics

{'classifier': 'xgbclassifier',
 'best_params': {'xgbclassifier__n_estimators': 250},
 'score': [{'metric': 'recall',
   'mean_test_score': [0.3292594114337853, 0.35111853639828944],
   'std_test_score': [0.04151084154626891, 0.0402538824948912],
   'confusion_matrix': {1: 5307, 2: 22, 3: 59, 4: 46},
   'classification_report': {'Not User': {'precision': 0.9890048453224003,
     'recall': 0.9958716457121412,
     'f1-score': 0.9924263674614306,
     'support': 5329},
    'User': {'precision': 0.6764705882352942,
     'recall': 0.4380952380952381,
     'f1-score': 0.5317919075144508,
     'support': 105},
    'accuracy': 0.9850938535149062,
    'macro avg': {'precision': 0.8327377167788472,
     'recall': 0.7169834419036896,
     'f1-score': 0.7621091374879407,
     'support': 5434},
    'weighted avg': {'precision': 0.9829658138549462,
     'recall': 0.9850938535149062,
     'f1-score': 0.983525627988771,
     'support': 5434}}}]}