# 05 Wrap Scikit-Learn with Dask-ML Incrementally

#### Objective: 
- Demonstrate how to further scale Scikit-Learn estimators with Dask-ML incremental meta-estimator
- Demonstrate incremental hyperparameter search methods with Dask

#### Some estimators can be trained incrementally – without seeing the entire dataset at once. 

#### Scikit-Learn provides the partial_fit API to stream batches of data to an estimator that can be fit in batches.

#### Dask Hyperparameter Search Blog Post: https://blog.dask.org/2019/09/30/dask-hyperparam-opt

#### Create Dask Cluster via CDSW dask utils

In [1]:
import cdsw_dask_utils
import cdsw

# Run a Dask cluster with three workers and return an object containing
# a description of the cluster. 
# 
# Note that the scheduler will run in the current session, and the Dask
# dashboard will become available in the nine-dot menu at the upper
# right corner of the CDSW app.

cluster = cdsw_dask_utils.run_dask_cluster(
  n=3, \
  cpu=1, \
  memory=1, \
  nvidia_gpu=0
)

# Connect a Dask client to the scheduler address in the cluster
# description.
from dask.distributed import Client
client = Client(cluster["scheduler_address"])
client

Waiting for Dask scheduler to become ready...
Dask scheduler is ready
IDs ['kcoz8h17544s2gkv', '4npgmyba2as0r6hb', '13fa2q6qkw4x4xe2']


0,1
Client  Scheduler: tcp://10.0.85.15:2323  Dashboard: http://10.0.85.15:8100/status,Cluster  Workers: 2  Cores: 32  Memory: 2.00 GB


#### Dask Scheduler UI

In [None]:
import os 
engine_id = os.environ.get('CDSW_ENGINE_ID')
cdsw_domain = os.environ.get('CDSW_DOMAIN')

from IPython.core.display import HTML
HTML('<a  target="_blank" rel="noopener noreferrer" href="http://read-only-{}.{}">http://read-only-{}.{}</a>'
     .format(engine_id,cdsw_domain,engine_id,cdsw_domain))

In [2]:
import dask
import dask.array as da
from dask_ml.datasets import make_classification


n, d = 100000, 100

X, y = make_classification(n_samples=n, n_features=d,
                           chunks=n // 10, flip_y=0.2)
X

Unnamed: 0,Array,Chunk
Bytes,80.00 MB,8.00 MB
Shape,"(100000, 100)","(10000, 100)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 80.00 MB 8.00 MB Shape (100000, 100) (10000, 100) Count 10 Tasks 10 Chunks Type float64 numpy.ndarray",100  100000,

Unnamed: 0,Array,Chunk
Bytes,80.00 MB,8.00 MB
Shape,"(100000, 100)","(10000, 100)"
Count,10 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [3]:
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train

Unnamed: 0,Array,Chunk
Bytes,72.00 MB,7.20 MB
Shape,"(90000, 100)","(9000, 100)"
Count,70 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 72.00 MB 7.20 MB Shape (90000, 100) (9000, 100) Count 70 Tasks 10 Chunks Type float64 numpy.ndarray",100  90000,

Unnamed: 0,Array,Chunk
Bytes,72.00 MB,7.20 MB
Shape,"(90000, 100)","(9000, 100)"
Count,70 Tasks,10 Chunks
Type,float64,numpy.ndarray


#### Persisting Data in Memory

In [4]:
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)

Precompute Classes

In [5]:
classes = da.unique(y_train).compute()
classes

array([0, 1])

#### Create Scikit-Learn model and wrap it with Dask-ML incremental meta-estimator

In [6]:
from sklearn.linear_model import SGDClassifier
from dask_ml.wrappers import Incremental

est = SGDClassifier(loss='log', penalty='l2', tol=1e-3)
inc = Incremental(est, scoring='accuracy')

#### Train the model on the entire dataset at once 

In [7]:
inc.fit(X_train, y_train, classes=classes)

Incremental(estimator=SGDClassifier(alpha=0.0001, average=False,
                                    class_weight=None, early_stopping=False,
                                    epsilon=0.1, eta0=0.0, fit_intercept=True,
                                    l1_ratio=0.15, learning_rate='optimal',
                                    loss='log', max_iter=1000,
                                    n_iter_no_change=5, n_jobs=None,
                                    penalty='l2', power_t=0.5,
                                    random_state=None, shuffle=True, tol=0.001,
                                    validation_fraction=0.1, verbose=0,
                                    warm_start=False),
            random_state=None, scoring='accuracy', shuffle_blocks=True)

#### Alternatively, train the model incrementally

In [8]:
inc = Incremental(est, scoring='accuracy')

In [9]:
for _ in range(10):
    inc.partial_fit(X_train, y_train, classes=classes)
    print('Score:', inc.score(X_test, y_test))

Score: 0.5169
Score: 0.5083
Score: 0.5141
Score: 0.5234
Score: 0.5301
Score: 0.5323
Score: 0.5433
Score: 0.5311
Score: 0.5336
Score: 0.5353


#### Incremental Hyperparameter Search 

In [12]:
from scipy.stats import uniform, loguniform
from dask_ml.model_selection import HyperbandSearchCV

In [19]:
X, y = make_classification(chunks=20, random_state=0)

In [20]:
X_train, X_test, y_train, y_test = train_test_split(X, y)

In [21]:
params = {'alpha': loguniform(1e-2, 1e0),  # or np.logspace
          'l1_ratio': uniform(0, 1)}  # or np.linspace

In [22]:
search = HyperbandSearchCV(est, params, max_iter=81, random_state=0)
search.fit(X_train, y_train, classes=[0, 1])

HyperbandSearchCV(aggressiveness=3,
                  estimator=SGDClassifier(alpha=0.0001, average=False,
                                          class_weight=None,
                                          early_stopping=False, epsilon=0.1,
                                          eta0=0.0, fit_intercept=True,
                                          l1_ratio=0.15,
                                          learning_rate='optimal', loss='log',
                                          max_iter=1000, n_iter_no_change=5,
                                          n_jobs=None, penalty='l2',
                                          power_t=0.5, random_state=None,
                                          shuffle=True, tol=0.001,
                                          validation_fraction=0.1, verbose=0,
                                          warm_start=False),
                  max_iter=81,
                  parameters={'alpha': <scipy.stats._distn_infrastructure.rv_frozen object

In [23]:
search.best_params_

{'alpha': 0.5139790648649304, 'l1_ratio': 0.7032691784458193}

In [26]:
search.best_score_

0.7

#### For more on incremental hyperparameter optimization: https://ml.dask.org/hyper-parameter-search.html#hyperparameter-incremental