In [None]:
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.preprocessing import PolynomialFeatures, StandardScaler

In [None]:
from distributed import Client
# client = await Client("localhost:8786", asynchronous=True)
client = Client("localhost:8786")
# client = Client()
client

In [None]:
client.upload_file("train.py")
client.upload_file("tune.py")

In [None]:
X, y = fetch_covtype(return_X_y=True, shuffle=False, random_state=0)

In [None]:
X.shape

In [None]:
kernel = PolynomialFeatures(degree=2, interaction_only=True, include_bias=False)
X_poly = kernel.fit_transform(X)

In [None]:
X_poly.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X_poly, y, random_state=42, train_size=200_000,
#     X_poly, y, random_state=42, train_size=20_000,
)

In [None]:
X_train.nbytes / 1024**3

In [None]:
X_train.shape

In [None]:
def _get_cnts_cols(X):
    cols = range(X.shape[1])
    uniqs = [np.unique(X[:, c]) for c in cols]
    cnts = [c for c, _u in zip(cols, uniqs) if len(_u) > 2]
    discrete_cols = [c for c, _u in zip(cols, uniqs) if len(_u) == 2]
    return cnts, discrete_cols

def normalize(X, scale, cnts, discrete):
    Y = scale.transform(X[:, cnts])
    Y2 = X[:, discrete].astype(bool).astype(int)  # one element is 30 (not 0/1)
    Z = np.hstack((Y2, Y))
    return Z

cnts, discrete = _get_cnts_cols(X_train)
scale = StandardScaler().fit(X_train[:, cnts])

print(X_train[:, cnts + discrete].shape)
X_train = normalize(X_train, scale, cnts, discrete)
X_test = normalize(X_test, scale, cnts, discrete)
print(X_train.shape)

uniqs = np.unique(X_train[:, :len(discrete)])
assert len(uniqs) == 2 and 0 <= uniqs.min() <= uniqs.max() <= 1

In [None]:
uniqs

In [None]:
X_train.shape

In [None]:
X_test.shape

In [None]:
from copy import copy
from pprint import pprint
import itertools
import pandas as pd

def train(damper, X_train=None, y_train=None, X_test=None, y_test=None, max_iter=200, ident="foo"):
    damper.initialize()
    test_score = damper.score(X_test, y_test, return_dict=True, prefix="test_")
    train_score = damper.score(X_train, y_train, return_dict=True, prefix="train_")
    meta = {
        "train_eg": len(y_train),
        "test_eg": len(y_test),
        "max_iter": max_iter,
        "damper_name": type(damper).__name__.lower(),
        **damper.get_params(),
    }
    data = [{"partial_fit_calls": 0, **test_score, **train_score, **meta, **copy(damper.meta_)}]
    print("ident =", ident)
    pprint({k: data[-1][k] for k in ["test_acc", "train_acc", "test_loss", "train_loss"]})
    for k in itertools.count():
        damper.partial_fit(X_train, y_train)
        test_score =  damper.score(X_test, y_test, return_dict=True, prefix="test_")
        train_score =  damper.score(X_train, y_train, return_dict=True, prefix="train_")
        datum = {
            "partial_fit_calls": k + 1,
            "epochs": copy(damper.meta_["num_examples"] / meta["train_eg"]),
            "ident": ident,
            **meta,
            **test_score,
            **train_score,
            **copy(damper.meta_)
        }
        cols = [
            "name", "epochs", "model_updates",
            "test_acc", "test_loss", "ident",
        ]
        show = {k: datum[k]
                for k in cols
                if k in datum
               }
        print(show)
        data.append(datum)
        if ident == "gd" or k % 100 == 0:
            pd.DataFrame(data).to_csv(f"tmp-{ident}-test-data.csv")
        if datum["epochs"] >= max_iter:
            break
    return data

In [None]:
from copy import copy

base = {
    "lr": 0.9e-3,
    "max_batch_size": 256,
    "weight_decay": 1e-6,
    "momentum": 0.9,
    "seed": 33,
}

pada_params = {
    **copy(base),
    "batch_growth_rate": 0.08192397984251328,
    "dwell": 5,
    "initial_batch_size": 256,
    "max_batch_size": 2048,
}

hsgd_params = {
    **copy(pada_params),
    "batch_growth_rate": 0.011471883405287283,
    "max_batch_size": len(X_train),
}

padalr_params = {
    **copy(pada_params),
    "static_batch_size": 256
}

gd_params = {
    **copy(base),
    "max_batch_size": int(200e3),
}

asgd_params = {
    **copy(base),
    "opt": "asgd",
}

geodamp_params = {
    **copy(base),
    "dampingdelay": 200,
    "dampingfactor": 6.604777577905901,
    "initial_batch_size": 256,
    "max_batch_size": 4096,
}

for k in ["initial_batch_size", "max_batch_size"]:
    _ = padalr_params.pop(k)

In [None]:
hsgd_params

In [None]:
from tune import GD, PadaDamp, Damper, PadaDampLR, HSGD, GeoDamp

gd = GD(name="gd", **gd_params)
pada = PadaDamp(name="pada", **pada_params)
hsgd = HSGD(name="hsgd", **hsgd_params)
padalr = PadaDampLR(name="padalr", **padalr_params)
asgd = Damper(name="asgd", **asgd_params)
geo = GeoDamp(name="geo", **geodamp_params)

In [None]:
from distributed import as_completed

dataset = dict(
    X_train=X_train,
    y_train=y_train,
    X_test=X_test,
    y_test=y_test,
)

dask_dataset = {k: client.scatter(v) for k, v in dataset.items()}

max_iter = 2000
# max_iter = 200
# dampers = {"pada": pada, "padalr": padalr, "asgd": asgd}
# dampers = {"hsgd": hsgd}
dampers = {"geo": geo}

futures = []
for ident, damper in dampers.items():
    print(ident)
    future = client.submit(train, damper, **dask_dataset, max_iter=max_iter, ident=ident)
    futures.append(future)

# future = client.submit(train, gd, **dask_dataset, max_iter=max_iter * 120, ident="gd")
# futures.append(future)


In [None]:
futures

In [None]:
len(futures)

In [None]:
import pandas as pd

for k, future in enumerate(as_completed(futures)):
    data = future.result()
    print(k)
    pprint(data[-1])
    pd.DataFrame(data).to_csv(f"tmp-geo-{k}-test-data.csv", index=False)

In [None]:
!open .