This notebook will run simulations that use parameters:

* use [2, 4, 8, 16, 20, 24] workers
* replay history from each of the 6 runs.
* Use `tol in [True, max_iter // 6]`

For the model,

* have one `partial_fit` call take 1 second (20781 examples, about 1/3 of a dataset)
    * sec / dataset = 3
* have one score call take 1.5 seconds (66500 examples, about 1 dataset)
    * sec / dataset = 1.5

It will write to `sim/2019-06-28/`.

Another notebook will pull this data in and visualize.

Plan: rerun for each of unique worker value.

In [1]:
import dask_ml
dask_ml.__file__

'/Users/scott/Developer/stsievert/dask-ml/dask_ml/__init__.py'

In [2]:
from distributed import Client, LocalCluster

In [3]:
# cluster = LocalCluster(n_workers=2)
# client = Client(cluster)
# client = Client("localhost:8786")
client = Client(n_workers=4, threads_per_worker=1)
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:63632  Dashboard: http://127.0.0.1:63633/status,Cluster  Workers: 4  Cores: 4  Memory: 8.59 GB


In [4]:
from sklearn.base import BaseEstimator
import numpy as np
import toolz

def _same_params(p1, p2):
    p1keys = set(p1.keys())
    p2keys = set(p2.keys())
    assert p1keys == p2keys
    for k in p1.keys():
        if isinstance(p1[k], float) and not np.allclose(p1[k], p2[k]):
            return False
        elif p1[k] != p2[k]:
            return False
    return True
    
def _get_model_history(history, params):
    model_histories = toolz.groupby("model_id", history)
    same_params = {k: _same_params(v[0]["params"], params) for k, v in model_histories.items()}
    assert sum(same_params.values()) == 1
    model_id = [k for k, v in same_params.items() if v][0]
    return model_histories[model_id]

In [5]:
params = {
    "hidden_layer_sizes": [(24, ),
                           (12, ) * 2,
                           (6, ) * 4,
                           (4, ) * 6,
                           (12, 6, 3, 3)],  # 5
    "alpha": np.logspace(-6, -3, num=1000),  # cnts
    "batch_size": [32, 64, 128, 256, 512],  # 5
    "learning_rate": ["constant", "invscaling"],  # 2
    "learning_rate_init": np.logspace(-4, -2, num=1000), # cnts
    "power_t": np.linspace(0.1, 0.9, num=1000), # cnts
    "momentum": np.linspace(0, 1, num=1000),  # cnts
    "nesterovs_momentum": [True],  # 1
    "n_iter_no_change": [np.inf],
    "solver": ["sgd"],
    "activation": ["relu"],
    "random_state": [42],
}
   

In [6]:
from distributed.utils import sleep
from copy import deepcopy
from sklearn.utils import check_random_state
from sklearn.neural_network import MLPClassifier
 
class ReplayModel(MLPClassifier):
    def __init__(
        self,
        history,
        **kwargs,
    ):
        self.history = history
        super().__init__(**kwargs)
    
    def _get_formatted_keys(self):
        params = self.get_params()
        params.pop("history")
        new_params = {}
        for k, v in params.items():
            if "module" in k:
                k = "module__" + "_".join(k.split("_")[1:])
            if "optimizer" in k and k != "optimizer":
                k = "optimizer__" + "_".join(k.split("_")[1:])
            new_params[k] = v
        return new_params
    
    def partial_fit(self, X, y):
        self._pf_calls += 1
        sleep(20e-3)
        return self
    
    def fit(self, X, y):
        return self
    
    def score(self, X, y):
        model_history = _get_model_history(self.history, self._get_formatted_keys())
        valid = [h for h in model_history if h["partial_fit_calls"] == self._pf_calls]
        sleep(10e-3)
        return valid[0]["score"]

In [7]:
num_workers = len(client.scheduler_info()["workers"])
num_workers

4

In [8]:
import json
from sys import getsizeof
import msgpack
from copy import deepcopy


def _to_str_keys(d):
    if isinstance(d, list):
        return [_to_str_keys(di) for di in d]
    if isinstance(d, dict):
        out = {}
        for k, v in d.items():
            out[_to_str_keys(k)] = _to_str_keys(v)
        return out
    if isinstance(d, bytes):
        return d.decode()
    return d
    
def _get_history(today, random_state):
    # with open(f"{today}/-hyperband-{random_state}-history.json", "r") as f:
    #     history = json.load(f)
    with open(f"out/2019-07-06-histories-2.msgpack", "rb") as f:
        data = msgpack.load(f)[b"hyperband"]
    histories = _to_str_keys(data)
    same_histories = [hist for hist in histories if hist[0]["random_state"] == random_state]
    assert len(same_histories) == 1
    history = same_histories[0]


    params_recorded = set()
    out = []
    for h in history:
        out.append({
                k: h[k] for k in ["bracket", "model_id", "partial_fit_calls", "score"]
        })
        if h["model_id"] not in params_recorded:
            out[-1]["params"] = h["params"]
            params_recorded.update({h["model_id"]})
    return deepcopy(out)

In [11]:
from dask_ml.model_selection import HyperbandSearchCV
import scipy.stats
from sklearn.datasets import make_classification
import pandas as pd
from distributed import get_task_stream
from time import time
import warnings

def process(random_state):
    today = "sim/2019-07-07-"
    pre = today + f"-workers={num_workers}-rs={random_state}"
    history = _get_history(random_state=random_state, today="2019-06-28")
    _hash = history[-1]["score"]
    print(f"Starting rs={random_state} ({_hash:0.4f})... ")
    model = ReplayModel(history)

    search = HyperbandSearchCV(
        model,
        params,
        max_iter=299,
        random_state=random_state,
        patience=False,
    )
    X, y = make_classification()

    start = time()
    search.fit(X, y)
    print(f"...done in {time() - start:0.2f}s")
    with open(pre + "-history.json", 'w') as f:
        summary = {"best_score_": best_score_}
        hist = [{**h, **summary} for h in search.history_]
        json.dump(search.history_, f)

In [12]:
for random_state in range(400, 500):
        process(random_state)

Starting rs=400 (0.6465)... 


tornado.application - ERROR - Multiple exceptions in yield list
Traceback (most recent call last):
  File "/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 849, in callback
    result_list.append(f.result())
  File "/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/scott/Developer/stsievert/dask-ml/dask_ml/model_selection/_incremental.py", line 604, in _fit
    random_state=self.random_state,
  File "/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/scott/Developer/stsievert/dask-ml/dask_ml/model_selection/_incremental.py", line 233, in _fit
    metas = yield client.gather(new_scores)
  File "/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py", line

BdbQuit: 

### 32 workers
* 400
    * p=T: 387.79
    * p=F: 390.59
* 401:
    * p=T: 386.41
    * p=F: 399.03
* 402:
    * p=T: 386.23
    * p=F: 394.98
* 403:
    * p=T: 387.60
    * p=F: 403.52
* 404:
    * p=T: 394.93s
    * p=F: 395.26
* 405:
    * p=T: 402.44s
    * p=F: 398.64

### 25 workers
4 workers.

10 workers are serial towards the end.

* 400
    * p=T: 400.85s
    * p=F: 423.90

### 8 workers
* 400
    * p=T: 740.51
    * p=F: 880.76
* 401
    * p=T: 806.99
    * p=F: 881.97
* 402:
    * p=T: 758.08s
    * p=F: 879.38s
* 403
    * p=T: 733.64s
    * p=F: 830.70s
* 404
    * p=T: 782.63s
    * p=F: 832.84s
* 405
    * p=T: 861.43s
    * p=F: 862.22s

### 16 workers
* 400:
    * p=T: 468.32, 484.23, 485.07
    * p=F: 513.42, 513.42
* 401:
    * p=T: 487.75, 505.91
    * p=F: 510.94
* 402
    * p=T: 466.80
    * p=F: 531.35
* 403
    * p=T: 459.30
    * p=F: 530.60
* 404
    * p=T: 509.88
    * p=F: 524.06
* 405:
    * p=T: 531.11
    * p=T: 526.37

Model creation lasts less than a second (with `_create_model`).