## Connection to a 'distributed' cluster via the Executor API

In [None]:
from distributed import Executor, progress
from time import sleep

In [24]:
e = Executor('dscheduler:8786')
print(e)

<Executor: scheduler=dscheduler:8786 workers=2 threads=24>


In [26]:
sleep_times = [0.1] * 1000
futures = e.map(sleep, sleep_times, pure=False)
progress(futures)

## Installing Python packages on the cluster

In [4]:
import subprocess


def install_libs():
    libs = 'scikit-learn pandas matplotlib'.split()
    subprocess.check_call('conda install -yq'.split() + libs)
    subprocess.check_call('pip install git+https://github.com/joblib/joblib'.split())

In [5]:
install_libs()
e.run(install_libs)
# e.restart()

{'10.0.0.3:56625': None, '10.0.0.4:48930': None}

In [6]:
# Force the use of the development branch of joblib in scikit-learn
# won't be necessary once scikit-learn will get in sync with joblib
# 0.10+
import joblib
from sklearn.externals import joblib as skl_joblib
print('Monkeypatching scikit-learn embedded joblib')
for k, v in vars(joblib).items():
   setattr(skl_joblib, k, v)

Monkeypatching scikit-learn embedded joblib


## Parameter search for machine learning

In [7]:
from sklearn.datasets import load_digits
from sklearn.grid_search import RandomizedSearchCV
from sklearn.svm import SVC
import numpy as np
from pprint import pprint

digits = load_digits()
model = SVC(kernel='rbf')


param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

## Running one iteration on a single machine 

In [18]:
%%time
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=1)
search.fit(digits.data, digits.target)

print("Best parameter score: %0.3f" % search.best_score_)
pprint(search.best_params_)

Best parameter score: 0.100
{'C': 0.001, 'class_weight': 'balanced', 'gamma': 100.0, 'tol': 0.0001}
CPU times: user 4.08 s, sys: 2 ms, total: 4.08 s
Wall time: 4.07 s


## Running the parameter search on the cluster

In [9]:
from distributed_joblib_backend import DistributedBackend
from joblib.parallel import register_parallel_backend, parallel_backend

register_parallel_backend('distributed', DistributedBackend)

In [15]:
with parallel_backend('distributed', scheduler_host='dscheduler:8786'):
    search = RandomizedSearchCV(model, param_space, cv=3, n_iter=30, verbose=10)
    search.fit(digits.data, digits.target)

Fitting 3 folds for each of 30 candidates, totalling 90 fits


[Parallel(n_jobs=-1)]: Done   2 tasks      | elapsed:    1.8s
[Parallel(n_jobs=-1)]: Done  13 tasks      | elapsed:    4.4s
[Parallel(n_jobs=-1)]: Done  24 tasks      | elapsed:    5.5s
[Parallel(n_jobs=-1)]: Done  37 tasks      | elapsed:    7.1s
[Parallel(n_jobs=-1)]: Done  90 out of  90 | elapsed:   13.2s finished


In [16]:
print("Best parameter score: %0.3f" % search.best_score_)
pprint(search.best_params_)

Best parameter score: 0.976
{'C': 10.0, 'class_weight': 'balanced', 'gamma': 0.001, 'tol': 0.01}
