In [19]:
import time
import matplotlib.pyplot as plt
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state

from utils.mnist_reader import load_mnist
X_main, y_main = load_mnist("data/fashion", kind="train")
X_main = X_main.astype(np.float32)
y_main = y_main.astype(np.float32)

X_test, y_test = load_mnist("data/fashion", kind="t10k")
X_test = X_test.astype(np.float32)
y_test = y_test.astype(np.float32)

print (X_main.shape, y_main.shape)
print (X_test.shape, y_test.shape)

(60000, 784) (60000,)
(10000, 784) (10000,)


### Standard scaling the pixel values with mean=0.0 and var=1.0

In [20]:
sc = StandardScaler()
X_main_std = sc.fit_transform(X_main)
X_test_std = sc.fit_transform(X_test)

### Splitting the train dataset into train and validation sets

In [21]:
X_train, X_valid, y_train, y_valid = train_test_split(X_main_std, y_main, test_size=0.1)

### Training XgBoost classifier on fashion-mnist dataset

In [5]:
import xgboost as xgb
param_list = [("eta", 0.08), ("max_depth", 6), ("subsample", 0.8), ("colsample_bytree", 0.8), ("objective", "multi:softmax"), ("eval_metric", "merror"), ("alpha", 8), ("lambda", 2), ("num_class", 10)]
n_rounds = 10
early_stopping = 2

d_train = xgb.DMatrix(X_train, label=y_train)
d_val = xgb.DMatrix(X_valid, label=y_valid)
eval_list = [(d_train, "train"), (d_val, "validation")]

In [17]:
global bst

%time bst = xgb.train(param_list, d_train, n_rounds, evals=eval_list, early_stopping_rounds=early_stopping, verbose_eval=True)

[0]	train-merror:0.178463	validation-merror:0.202333
Multiple eval metrics have been passed: 'validation-merror' will be used for early stopping.

Will train until validation-merror hasn't improved in 2 rounds.
[1]	train-merror:0.160222	validation-merror:0.185167
[2]	train-merror:0.151685	validation-merror:0.182333
[3]	train-merror:0.147611	validation-merror:0.1755
[4]	train-merror:0.14663	validation-merror:0.177167
[5]	train-merror:0.14437	validation-merror:0.173667
[6]	train-merror:0.141815	validation-merror:0.169333
[7]	train-merror:0.139352	validation-merror:0.166333
[8]	train-merror:0.138593	validation-merror:0.167333
[9]	train-merror:0.137519	validation-merror:0.167667
Stopping. Best iteration:
[7]	train-merror:0.139352	validation-merror:0.166333

CPU times: user 3min 44s, sys: 740 ms, total: 3min 44s
Wall time: 58.5 s


### Training the XgBoost classifier on MNIST dataset

### Predicting with trained classifiers

In [19]:
d_test = xgb.DMatrix(data=X_test_std)
y_pred = bst.predict(d_test)

### Checking accuracy for fashion and MNIST datasets respectively

In [20]:
np.sum(y_pred == y_test) / y_test.shape

array([0.835])

In [22]:
X_test_std

array([[-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946],
       [-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946],
       [-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946],
       ...,
       [-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946],
       [-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946],
       [-0.01341814, -0.02066268, -0.02378437, ..., -0.16669004,
        -0.09440119, -0.03502946]], dtype=float32)

## Using Dask...

In [25]:
from dask.distributed import Client
client = Client('dask-scheduler:8786')  # connect to cluster


In [None]:
client

In [None]:
global bst2

import dask_xgboost as dxgb

import dask.array as da 
da_x_train = da.from_array(X_train, chunks=1000)
da_y_train = da.from_array(y_train, chunks=1000)
# d_train = xgb.DMatrix(X_train, label=y_train)
# d_val = xgb.DMatrix(X_valid, label=y_valid)
# eval_list = [(d_train, "train"), (d_val, "validation")]
# bst = dxgb.train(client, params, df_train, labels_train)

%time bst2 = dxgb.train(client, param_list, da_x_train, da_y_train, n_rounds, evals=eval_list, early_stopping_rounds=early_stopping, verbose_eval=True)

In [None]:
import dask.array as da 

da.from_array(X_train, chunks=1000)

In [32]:
y_train

array([4., 2., 1., ..., 5., 7., 9.], dtype=float32)

In [17]:
from dask_xgboost.core import parse_host_port 
host, port = parse_host_port(client.scheduler.address)

def update_hosts3():
#     import os
    import logging
    import dask_xgboost.core as dc
    log = logging.getLogger('distributed.scheduler')
    log.info(dc.parse_host_port)
    print(dc.parse_host_port)
    def parse_host_port(address):
        if '://' in address:
            address = address.rsplit('://', 1)[1]
        host, port = address.split(':')
        port = int(port)
        log.info("override host/port")
        return '0.0.0.0', port
    dc.parse_host_port = parse_host_port
#     setattr(dc, 'parse_host_port', parse_host_port)
    log.info(dc.parse_host_port)
    from dask_xgboost.core import parse_host_port
    log.info(parse_host_port)
#     os.system('sudo echo 0.0.0.0 {} > /etc/hosts'.format(host))
    
def install_fixed_xgboost():
    import os
    os.system('pip install git+https://github.com/javabrett/dask-xgboost@master')
    
def stuff():
    import logging
    from dask_xgboost.tracker import get_host_ip
    log = logging.getLogger('distributed.scheduler')
    log.info(get_host_ip('auto'))
    return get_host_ip('auto')
    
client._run_on_scheduler(stuff)



# parse_host_port(client.scheduler.address)
# setattr(dask_xgboost.core, 'parse_host_port', parse_host_port)

<Future pending cb=[coroutine.<locals>.wrapper.<locals>.<lambda>() at /opt/conda/lib/python3.7/site-packages/tornado/gen.py:226]>