### DASK XGBoost 

#### Introduction to Dask and Dask cuDF

[Dask](https://dask.org/) is a Python library for parallel computing. In Dask programming, we create computational graphs that define code we **would like** to execute, and then, give these computational graphs to a Dask scheduler which evaluates them lazily, and efficiently, in parallel. In addition to using multiple CPU cores or threads to execute computational graphs in parallel, Dask schedulers can also be configured to execute computational graphs on multiple CPUs, or, as we will do in this workshop, multiple GPUs. On account of its ability to utilize multiple compute resources, Dask programming facilitates operating on datasets that are larger than the memory of a single compute resource.

[Dask cuDF](https://github.com/rapidsai/dask-cudf) can be used to distribute dataframe operations on larger-than-memory datasets to multiple GPUs. In this notebook you'll receive and introduction to some key Dask concepts, learn how to setup a Dask cluster for utilizing multiple GPUs, and how to perform simple dataframe operations on distributed Dask dataframes.

#### Setting up a Dask Scheduler


We begin by starting a Dask scheduler which will take care to distribute our work across the 4 available GPUs. In order to do this we need to start a `LocalCUDACluster` instance, using our host machine's IP, and then instantiate a client that can communicate with the cluster.

In [1]:
import subprocess # ...which we will use to obtain our local IP using the cmd...
cmd = "hostname --all-ip-addresses"

process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]
print(IPADDR)

192.168.99.2


#### Starting a `LocalCUDACluster`

`dask_cuda` provides utilities for Dask and CUDA (as in **cu**DF) interactions.

In [2]:
import dask
from dask import delayed
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(ip=IPADDR)
from dask.distributed import Client

### Instantiating a Client Connection

The `dask.distributed` library gives us distributed functionality, including the ability to connect to the CUDA Cluster we just created. The `progress` import will give us a handy progress bar we can utilize below.

In [3]:
from dask.distributed import Client, progress
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://192.168.99.2:34645  Dashboard: http://192.168.99.2:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 270.39 GB


In [4]:
cluster

VBox(children=(HTML(value='<h2>LocalCUDACluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n…

### The Dask Dashboard 
As you can see, the `client` instance gives us information about our CUDA cluster (utilizing 4 GPUs), as well as information about our client connection. Dask ships with an incredibly helpful dashboard, which you can see runs on port `8787`. Open a new browser tab now at `<YOUR_IP_ADDRESS>:8787`, for example `ec2-12-345-67-890.us-east-2.compute.amazonaws.com:8787`, which should open the Dask dashboard, currently idle.


### Import Libraries

In [5]:
# !pip install dask_xgboost
import numpy as np; print('numpy Version:', np.__version__)
import pandas as pd; print('pandas Version:', pd.__version__)

import sklearn; print('Scikit-Learn Version:', sklearn.__version__)
from sklearn.metrics import confusion_matrix, accuracy_score
import dask_xgboost; print('Dask XGBoost Version:', dask_xgboost.__version__)
import dask_cudf

import time 


import rapids_lib_v8 as rl
''' NOTE: anytime changes are made to rapids_lib.py you can either:
      1. refresh/reload via the code below, OR
      2. restart the kernel '''
import importlib; importlib.reload(rl)

numpy Version: 1.16.2
pandas Version: 0.24.2
Scikit-Learn Version: 0.20.4
Dask XGBoost Version: 0.1.5


<module 'rapids_lib_v8' from '/rapids/notebooks/ml_tutorial/version_101/rapids_lib_v8.py'>

In [6]:
%store -r expLog
%store -r trainData_pDF
%store -r trainLabels_pDF 
%store -r testData_pDF
%store -r testLabels_pDF

%store -r expLog
%store -r trainData_cDF
%store -r trainLabels_cDF 
%store -r testData_cDF
%store -r testLabels_cDF

nCores = !nproc --all
nCores = int(nCores[0]) # we want to extract number of cores the CPU has 

paramsCPU = {
    'max_depth': 10,
    'learning_rate': .1,
    'num_boost_rounds': 100,
    'lambda': 1,
    'objective': 'binary:hinge',
    'tree_method': 'hist',
    'n_jobs': nCores,
    'random_state': 0
}

paramsGPU = {
    'max_depth': 10,
    'learning_rate': .1,
    'num_boost_rounds': 100,
    'lambda': 1,
    'objective': 'binary:hinge',
    'tree_method': 'gpu_hist',
    'n_gpus': 1,    
    'random_state': 0
}

In [7]:
num_partitions = 80

In [8]:
 #create Dask cuDF dataframes
trainData_dask_cDF = dask_cudf.from_cudf(trainData_cDF, npartitions = num_partitions )
testData_dask_cDF = dask_cudf.from_cudf(testData_cDF, npartitions = num_partitions)
trainLabels_dask_cDF = dask_cudf.from_cudf(trainLabels_cDF, npartitions = num_partitions)
testLabels_dask_cDF = dask_cudf.from_cudf(testLabels_cDF, npartitions = num_partitions)

In [9]:
trainData_dask_cDF= trainData_dask_cDF.persist()
testData_dask_cDF=testData_dask_cDF.persist()
trainLabels_dask_cDF = trainLabels_dask_cDF.persist()
testLabels_dask_cDF =testLabels_dask_cDF.persist()

In [10]:
type(testLabels_dask_cDF)

dask_cudf.core.DataFrame

In [11]:
#create Dask DataFrames
trainData_dask_pDF = dask.dataframe.from_pandas(trainData_pDF, npartitions = num_partitions)
testData_dask_pDF = dask.dataframe.from_pandas(testData_pDF, npartitions = num_partitions)
trainLabels_dask_pDF = dask.dataframe.from_pandas(trainLabels_pDF, npartitions=num_partitions)
testLabels_dask_pDF=dask.dataframe.from_pandas(testLabels_pDF,npartitions = num_partitions)

# #create Dask cuDF dataframes
# trainData_dask_cDF = dask_cudf.from_dask_dataframe(trainData_dask_pDF)
# testData_dask_cDF = dask_cudf.from_dask_dataframe(testData_dask_pDF)
# trainLabels_dask_cDF = dask_cudf.from_dask_dataframe(trainLabels_dask_pDF)
# testLabels_dask_cDF = dask_cudf.from_dask_dataframe(testLabels_dask_pDF)




In [12]:
##Dask data prep:

#scatter data:
# scattaredData_future=client.scatter([trainData_pDF, trainLabels_pDF, testData_pDF, testLabels_pDF],broadcast=True )



In [13]:
# print(scattaredData_future[0])

In [14]:
# trainData_pDF_future = scattaredData_future[0]; 
# trainLabels_pDF_future = scattaredData_future[1]
# testData_pDF_future = scattaredData_future[2]; 
# testLabels_pDF_future = scattaredData_future[3]

In [15]:
# help(dask_xgboost.train)

In [16]:
''' -------------------------------------------------------------------------
>  CPU Train and Test
------------------------------------------------------------------------- '''
startTime = time.time()
Dask_xgBoostModelCPU = dask_xgboost.train( client=client, 
                                     params = paramsCPU,
                                     data= trainData_dask_pDF, 
                                     labels = trainLabels_dask_pDF,
                                     num_boost_round = paramsCPU['num_boost_rounds'])

print("CPU training time:" + str(time.time()-startTime),"seconds.")

#Generate Predictions
predictionsCPU = dask_xgboost.predict(client, Dask_xgBoostModelCPU, testData_dask_pDF)
predictionsCPU_con = dask.dataframe.multi.concat([predictionsCPU], axis=1)



KeyboardInterrupt: 

In [17]:
''' -------------------------------------------------------------------------
>  GPU Train and Test
------------------------------------------------------------------------- '''
startTime = time.time()
Dask_xgBoostModelGPU = dask_xgboost.train( client=client, 
                                     params = paramsGPU,
                                     data= trainData_dask_pDF, 
                                     labels = trainLabels_dask_pDF,
                                     num_boost_round = paramsGPU['num_boost_rounds'])

print("GPU training time:" + str(time.time()-startTime),"seconds.")

# #Generate Predictions
predictionsGPU = dask_xgboost.predict(client, Dask_xgBoostModelGPU, testData_dask_pDF)
predictionsGPU_con = dask.dataframe.multi.concat([predictionsGPU], axis=1)

GPU training time:38.457282066345215 seconds.


#### Evaluate model 

In [None]:
# print(predictionsGPU_con.compute())

In [None]:
gpu_accuracy = accuracy_score(testLabels_dask_pDF.astype(int),predictionsGPU_con.compute());print(gpu_accuracy)

In [None]:
cpu_accuracy = accuracy_score(testLabels_dask_pDF.astype(int),predictionsCPU_con.compute());print(cpu_accuracy)