## Dask ML and Gridsearch with cuML

In [None]:
import numpy as np
from cuml import Ridge as cumlRidge
import cudf
from sklearn import datasets, linear_model
from sklearn.externals.joblib import parallel_backend
from sklearn.model_selection import train_test_split, GridSearchCV
import dask_ml.model_selection as dcv

from dask.distributed import Client
from dask_cuda import LocalCUDACluster

## Use a DGX

In [None]:
# Start one worker per GPU on the local system
cluster = LocalCUDACluster(dashboard_address='0.0.0.0:8789')
client = Client(cluster)
client

## Load Diabetes Data

In [None]:
diabetes = datasets.load_diabetes()

In [None]:
diabetes.feature_names

In [None]:
# row of data
diabetes.data[0]

## Fit Data with Ridge Regression

In [None]:
# Split the data into training/testing sets
X_train, X_test, y_train, y_test = train_test_split(diabetes.data, diabetes.target, test_size=0.2)

In [None]:
# data in MB
X_train.nbytes/1e6

In [None]:
dup_data = np.array(np.vstack([X_train]*int(1e5)))
dup_train = np.array(np.hstack([y_train]*int(1e5)))
print(f'Duplicated data in memory: {dup_data.nbytes / 1e6} MB')

## Load Data onto GPU

In [9]:
%%time
record_data = (('fea%d'%i, dup_data[:,i]) for i in range(dup_data.shape[1]))
gdf_data = cudf.DataFrame(record_data)
gdf_train = cudf.DataFrame(dict(train=dup_train))

## Hyperparameter Optimization

In [10]:
fit_intercept = True
normalize = False
alpha = np.array([1.0]) 

In [11]:
params = {'alpha': np.logspace(-3, -1, 10)}
clf = linear_model.Ridge(alpha=alpha, fit_intercept=fit_intercept, normalize=normalize, solver='cholesky')
cu_clf = cumlRidge(alpha=alpha, fit_intercept=fit_intercept, normalize=normalize, solver="eig")

In [12]:
# %%timeit
# sk_grid = GridSearchCV(clf, params, cv=5, iid=False)
# sk_grid.fit(dup_data, dup_train)

In [13]:
# %%timeit
# sk_cu_grid = GridSearchCV(cu_clf, params, cv=5, iid=False)
# sk_cu_grid.fit(gdf_data, gdf_train.train)

## Swap Sklearn Gridsearch with DaskML Gridsearch

In [14]:
%%time
grid = dcv.GridSearchCV(clf, params, cv=5)
grid.fit(dup_data, dup_train)

CPU times: user 22.6 s, sys: 9.74 s, total: 32.4 s
Wall time: 4min 14s


In [15]:
%%time
cu_grid = dcv.GridSearchCV(cu_clf, params)
cu_grid.fit(gdf_data, gdf_train.train)

CPU times: user 8.89 s, sys: 10.9 s, total: 19.7 s
Wall time: 1min 38s


In [16]:
import time
start = time.time()

In [17]:
_ = client.get_task_stream(start=start, filename='dask-cuml-gridsearchcv-profile-rapids-task.html')

In [22]:
cu_grid = dcv.GridSearchCV(cu_clf, params)
cu_grid.fit(gdf_data, gdf_train.train)

GridSearchCV(cache_cv=True, cv=None, error_score='raise',
       estimator=<cuml.linear_model.ridge.Ridge object at 0x7f176853dbe0>,
       iid=True, n_jobs=-1,
       param_grid={'alpha': array([0.001  , 0.00167, 0.00278, 0.00464, 0.00774, 0.01292, 0.02154,
       0.03594, 0.05995, 0.1    ])},
       refit=True, return_train_score='warn', scheduler=None, scoring=None)

In [20]:
_ = client.profile(start=start, filename='dask-cuml-gridsearchcv-profile-rapids.html')

In [21]:
_ = client.get_task_stream(start=start, plot='save', filename='dask-cuml-gridsearchcv-profile-rapids-task.html')