<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo">

Dask and XGBoost
=======================

<img src="https://raw.githubusercontent.com/dmlc/dmlc.github.io/master/img/logo-m/xgboost.png"
     align="left"
     width="15%"
     alt="XGBoost logo">


This is an example code from 

In [6]:
from dask import compute, persist
from dask.distributed import Client, progress, LocalCluster
import subprocess
import psutil

decide automatically how many cores to use. leave two out for running system processes

Disclaimer: XGBoost does not need Dask to use all the cores in a multicore chip because it uses threads. We are using this to test Dask.

In [16]:
n_workers = psutil.cpu_count() // 4 - 4
threads_per_worker = 2 

In [17]:
n_workers

18

In [18]:
clst = LocalCluster(n_workers, threads_per_worker)

In [4]:
client = Client(clst)
client

0,1
Client  Scheduler: tcp://127.0.0.1:41227  Dashboard: http://127.0.0.1:8787,Cluster  Workers: 18  Cores: 36  Memory: 16.51 GB


## Load the Data

In [5]:
import dask.dataframe as dd

# Subset of the columns to use
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']

# Create the dataframe
df = dd.read_csv('../data/200*.csv', usecols=cols,)

df = df.sample(frac=0.1) # we blow out ram otherwise

is_delayed = (df.DepDelay.fillna(16) > 15)

df['CRSDepTime'] = df['CRSDepTime'].clip(upper=2399)
del df['DepDelay']

In [12]:
size = df.size.compute() / 1e9
print(size)

0.052590376


In [13]:
len(df)

6573797

In [14]:
df, is_delayed = persist(df, is_delayed)
progress(df, is_delayed)

In [15]:
df.head()

Unnamed: 0,Year,Month,DayOfWeek,UniqueCarrier,Origin,Dest,CRSDepTime,Distance
76285,2000,1,3,US,ILM,CLT,745,185.0
15018,2000,1,7,DL,HOU,ATL,1540,696.0
67510,2000,1,1,WN,BWI,CLE,630,314.0
4279,2000,1,4,UA,ORD,ALB,1320,723.0
9875,2000,1,1,DL,CLT,ATL,805,227.0


In [16]:
is_delayed.head()

76285    False
15018     True
67510    False
4279      True
9875     False
Name: DepDelay, dtype: bool

## One hot encode

In [17]:
df2 = dd.get_dummies(df.categorize()).persist()

In [18]:
len(df2.columns)

688

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f89ffefdc80>, <tornado.concurrent.Future object at 0x7f8a15102c88>)
Traceback (most recent call last):
  File "/home/karenyin/miniconda3/py35_envs/idp35_201703/lib/python3.5/site-packages/distributed/comm/core.py", line 167, in connect
    quiet_exceptions=EnvironmentError)
  File "/home/karenyin/miniconda3/py35_envs/idp35_201703/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/karenyin/miniconda3/py35_envs/idp35_201703/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
tornado.gen.TimeoutError: Timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/karenyin/miniconda3/py35_envs/idp35_201703/lib/python3.5/site-packages/tornado/ioloop.py", line 604, i

In [19]:
df2.head()

Unnamed: 0,Year,Month,DayOfWeek,CRSDepTime,Distance,UniqueCarrier_US,UniqueCarrier_DL,UniqueCarrier_WN,UniqueCarrier_UA,UniqueCarrier_CO,...,Dest_EAU,Dest_RKS,Dest_GCC,Dest_OTH,Dest_LMT,Dest_MKG,Dest_BKG,Dest_MHK,Dest_SAF,Dest_MWH
76285,2000,1,3,745,185.0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
15018,2000,1,7,1540,696.0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
67510,2000,1,1,630,314.0,0,0,1,0,0,...,0,0,0,0,0,0,0,0,0,0
4279,2000,1,4,1320,723.0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
9875,2000,1,1,805,227.0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0


## Split and Train

We split our data randomly in a 90%/10% split for training and testing.

We pull parameters from [this similar experiment](https://github.com/szilard/benchm-ml/blob/master/3-boosting/6-xgboost.R#L28-L32).

We then set up XGBoost and hand data off for training

figure out how many trees is built for each partition

In [20]:
data_train, data_test = df2.random_split([0.9, 0.1], 
                                         random_state=1234)
labels_train, labels_test = is_delayed.random_split([0.9, 0.1], 
                                                    random_state=1234)

In [6]:
%%time
import dask_xgboost as dxgb

params = {'objective': 'binary:logistic', 'nround': 1000, 
          'max_depth': 16, 'eta': 0.01, 'subsample': 0.5, 
          'min_child_weight': 1}

bst = dxgb.train(client, params, data_train, labels_train)
bst



NameError: name 'data_train' is not defined

In [None]:
bst

## Predict and Evaluate

We get back a normal XGBoost model.  We can hand it normal Pandas data or use dask-xgboost again to predict against our hold-out testing data.

We numerically evaluate the result using ROC utilities from Scikit-learn

In [None]:
# Use normal XGBoost model with normal Pandas
import xgboost as xgb
dtest = xgb.DMatrix(data_test.head())
bst.predict(dtest)

In [None]:
# Or use dask-xgboost to predict in parallel across the cluster
predictions = dxgb.predict(client, bst, data_test).persist()
predictions.head()

In [None]:
predictions

In [None]:
from sklearn.metrics import roc_auc_score, roc_curve
print(roc_auc_score(labels_test.compute(), 
                    predictions.compute()))

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
fpr, tpr, _ = roc_curve(labels_test.compute(), predictions.compute())

In [None]:
# Taken from http://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#sphx-glr-auto-examples-model-selection-plot-roc-py
plt.figure(figsize=(8, 8))
lw = 2
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve')
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()
