## Dask ML and Gridsearch with cuML

In [1]:
import numpy as np
from cuml import Ridge as cumlRidge
import cudf
from sklearn import datasets, linear_model
from sklearn.externals.joblib import parallel_backend
from sklearn.model_selection import train_test_split, GridSearchCV
import dask_ml.model_selection as dcv



## Use a DGX

In [2]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

# Start one worker per GPU on the local system
cluster = LocalCUDACluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:39861  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 1.08 TB


In [3]:
def set_rmm():
    import cudf
    # pre-allocate GPU memory
    cudf.set_allocator("default", 
                       pool=True, 
                       initial_pool_size=int(1e10)) 
client.run(set_rmm)
set_rmm()

In [7]:
import cudf as cd                                                                          

a = np.arange(100000) 
dxs = np.random.randint(0, len(a), 50000)                                                 

cs = cd.Series(a)  
cdxs = cd.Series(dxs)

## Load Diabetes Data

In [None]:
diabetes = datasets.load_diabetes()
diabetes.feature_names

In [None]:
fit_intercept = True
normalize = False
alpha = np.array([1.0]) 

## Fit Data with Ridge Regression

In [None]:
# Split the data into training/testing sets
X_train, X_test, y_train, y_test = train_test_split(diabetes.data, diabetes.target, test_size=0.2)

In [None]:
cu_ridge = cumlRidge(alpha=alpha, fit_intercept=fit_intercept, normalize=normalize, solver="eig")

In [None]:
cu_ridge.fit(X_train, y_train)

## Increase Data Size

In [None]:
dup_data = np.array(np.vstack([X_train]*int(1e5)))
dup_train = np.array(np.hstack([y_train]*int(1e5)))
print(f'Data in memory: {X_train.nbytes / 1e3} KB')
print(f'Duplicated data in memory: {dup_data.nbytes / 1e6} MB')

## Load Data onto GPU

In [None]:
record_data = (('fea%d'%i, dup_data[:,i]) for i in range(dup_data.shape[1]))
gdf_data = cudf.DataFrame(record_data)
gdf_train = cudf.DataFrame(dict(train=dup_train))

In [None]:
cu_ridge.fit(gdf_data, gdf_train.train)

## Hyperparameter Optimization

In [None]:
params = {'alpha': np.logspace(-3, -1, 10)}

In [None]:
%%time
sk_cu_grid = GridSearchCV(cu_ridge, params, scoring='r2', cv=5, iid=False)
sk_cu_grid.fit(gdf_data, gdf_train.train)

In [None]:
%%time
sk_cu_grid = GridSearchCV(cu_ridge, params, scoring='r2', cv=5, iid=False)
sk_cu_grid.fit(gdf_data, gdf_train.train)

In [None]:
sk_cu_grid.best_params_

## Swap Sklearn Gridsearch with DaskML Gridsearch

In [None]:
two_dup_data = np.array(np.vstack([X_train]*int(1e2)))
two_dup_train = np.array(np.hstack([y_train]*int(1e2)))
three_dup_data = np.array(np.vstack([X_train]*int(1e3)))
three_dup_train = np.array(np.hstack([y_train]*int(1e3)))
print(f'Two Dup Data: {two_dup_data.nbytes / 1e6} MB\nThree Dup Data: {three_dup_data.nbytes / 1e6} MB')

In [None]:
%%time
cu_grid = dcv.GridSearchCV(cu_clf, params, scoring='r2', cv=5)
cu_grid.fit(two_dup_data, two_dup_train)

In [None]:
%%time
cu_grid = dcv.GridSearchCV(cu_clf, params, scoring='r2', cv=5)
cu_grid.fit(three_dup_data, three_dup_train)

In [None]:
%%time
grid = dcv.GridSearchCV(clf, params, scoring='r2', cv=5)
grid.fit(three_dup_data, three_dup_data)

In [None]:
%%time
with parallel_backend('dask', scatter=[dup_data, dup_train]):
    cu_grid = dcv.GridSearchCV(cu_clf, params, scoring='r2')
    cu_grid.fit(dup_data, dup_train)