In [None]:
! pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.9.0.dev0-cp36-cp36m-manylinux1_x86_64.whl
! pip install gputil ray[tune]

In [None]:
import cudf
import numpy as np
import pandas as pd
import pickle
from datasets import prepare_dataset
from sklearn.metrics import accuracy_score


from cuml.ensemble import RandomForestClassifier as GPURandomForestClassifier

import ray
from ray import tune
from ray.tune.utils import pin_in_object_store, get_pinned_object

In [None]:

data = prepare_dataset("/data", "airline", None)
X_train, X_test, y_train, y_test = data.X_train, data.X_test, data.y_train, data.y_test
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)

QUARTER = len(X_train) // 3
X_train = X_train[QUARTER:]
y_train = y_train[QUARTER:]

In [45]:
ray.shutdown()
ray.init()
# You may see an error message similar to: 
# "The dashboard on node 9928c3082e9d failed with the following error:"
# this is harmless.

data_id = pin_in_object_store([X_train, X_test, y_train, y_test])

Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.6/asyncio/base_events.py", line 1051, in create_server
    sock.bind(sa)
OSError: [Errno 99] Cannot assign requested address

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/ray/dashboard/dashboard.py", line 920, in <module>
    dashboard.run()
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/ray/dashboard/dashboard.py", line 368, in run
    aiohttp.web.run_app(self.app, host=self.host, port=self.port)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/aiohttp/web.py", line 433, in run_app
    reuse_port=reuse_port))
  File "/opt/conda/envs/rapids/lib/python3.6/asyncio/base_events.py", line 473, in run_until_complete
    return future.result()
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/aiohttp/web.py", line 359, in _run_app
    await site.start()
  File "/opt/con

In [None]:
import os
from filelock import FileLock

class CUMLTrainable(tune.Trainable):
    def _setup(self, config):
        [X_train, X_test, y_train, y_test] = get_pinned_object(data_id)
        self._gpu_id = ray.get_gpu_ids()[0]
        print("Starting new trainable on {}.".format(self._gpu_id))
        self._wait_for_gpus()
        
        with FileLock(os.path.expanduser("~/.tune.gpulock")):
            X_cudf_train = cudf.DataFrame.from_pandas(X_train)
            self.train_mat = X_cudf_train.as_gpu_matrix(order="F")
            self._gpu_id = ray.get_gpu_ids()[0]
            del X_cudf_train
            self.X_cudf_test = cudf.DataFrame.from_pandas(X_test)
            self.y_cudf_train = cudf.Series(y_train.values)
            self.y_test = y_test
            config = {k: int(v) for k, v in config.items()}
            self.cuml_model = GPURandomForestClassifier(**config)

    def _train(self):
        self.cuml_model.fit(self.train_mat, self.y_cudf_train)
        fil_preds_orig = self.cuml_model.predict(self.X_cudf_test)
        accuracy = accuracy_score(self.y_test, fil_preds_orig)
        return {"mean_accuracy": accuracy}
    
    def _stop(self):
        import time
        import GPUtil
        gpu_object = GPUtil.getGPUs()[self._gpu_id]
        print("Deleting the model. Mem: {:0.3f}".format(gpu_object.memoryUsed))
        del self.cuml_model
        print("Deleting the test set. Mem: {:0.3f}".format(gpu_object.memoryUsed))
        del self.X_cudf_test
        print("Deleting the test labels. Mem: {:0.3f}".format(gpu_object.memoryUsed))
        del self.y_test
        print("Deleting the training labels. Mem: {:0.3f}".format(gpu_object.memoryUsed))
        del self.y_cudf_train
        print("Deleting the training matrix. Mem: {:0.3f}".format(gpu_object.memoryUsed))
        del self.train_mat
#         self._wait_for_gpus(retry=1)
        
        
    def _wait_for_gpus(self, retry=10):
        import GPUtil
        import time
        gpu_object = GPUtil.getGPUs()[self._gpu_id]
        for i in range(int(retry)):
            if gpu_object.memoryUsed > 0.1:
                print("Waiting for GPU memory to free. Mem: {:0.3f}".format(gpu_object.memoryUsed))
                time.sleep(5)
        time.sleep(5)

    def reset_config(self, config):
        del self.cuml_model
        config = {k: int(v) for k, v in config.items()}
        self.cuml_model = GPURandomForestClassifier(**config)
        return True

In [None]:
from hyperopt import hp
from ray.tune.suggest.hyperopt import HyperOptSearch

search_alg = HyperOptSearch(
    space={
        "n_estimators": hp.loguniform("n_estimators", 2, 5),
        "max_depth": hp.uniform("max_depth", 2, 15),
        "min_impurity_decrease":  hp.loguniform("min_impurity_decrease", -5, -1)
    },
    metric="mean_accuracy",
    mode="max",
    n_initial_points=8,
    max_concurrent=4,
    points_to_evaluate=[{"n_estimators": 100, "max_depth": 16, "min_impurity_decrease": 0}]
)

analysis = tune.run(
    CUMLTrainable,
    resources_per_trial={"gpu": 1},
    num_samples=50,
    stop={"training_iteration": 1},
    max_failures=0,
    search_alg=search_alg
)
