![rapids motivation](images/rapids_motivation.png)


# <center>Scaling Hyper-Parameter Optimization with RAPIDS + Dask + [ Kubernetes ]</center>
-----

**Motivation:**

To reach highest performance in classification tasks (i.e., supervised learning ), it is best practice to build an ensemble of champion models. 

Each member of the ensemble is a winner of a search over many models of its kind with altered hyper-parameters.

In this notebook, we build a harness for running such a [hyper-parameter] search to demonstrate the accuracy benefits while exploring performance as we scale within and accross GPU nodes.

**Choices:**
The user is able to make several key choices in running this notebook. They are as follows:

1. Dataset
2. Model Type & Search Bounds
3. Search Strategy
   * Particle Swarm - Fully Asynchronous or Sync at Epochs
   * Random Search
   * Grid Search
4. Compute



<center> In this notebook you can try different hyper-parameter search methods using synthetic or real data. </center>
 
&nbsp;

| method name | &nbsp;&nbsp;&nbsp; performance | &nbsp;&nbsp;&nbsp; search duration  |
|-----------------------|-----------------|------------------|
| random-search         | &nbsp;&nbsp;&nbsp; worst | &nbsp;&nbsp;&nbsp; slow    |
| particle-search [1]      | &nbsp;&nbsp;&nbsp; good  | &nbsp;&nbsp;&nbsp; fast    |
| async-particle-search | &nbsp;&nbsp;&nbsp; best  | &nbsp;&nbsp;&nbsp; fastest |

<center>[1] https://en.wikipedia.org/wiki/Particle_swarm_optimization#Algorithm</center>


# Table of Contents

1. [ Generate a classification dataset on GPU ](#data-load) (e.g., double helix, unwinding helix/whirl )

2. [ ETL - process/prepare data for model training ](#ETL) (e.g., scale, split, augment )   
    
3. [ Define HPO Strategy ](#define-hpo)

4. [ Create Compute Cluster ](#compute-cluster)
   > LocalCUDACluster or KubeCluster
      
5. [ Define Seach ](#define-search)

6. [ Run ASYNC Particle Swarm ](#run-async-PSO)

7. [ Run Classic Particle Swarm ](#run-classic-PSO)

8. [ Run Random Search Baseline ](#run-random-search)

9. [ Summary ](#summary)

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import ipyvolume as ipv
import matplotlib.pyplot as plt

import numpy as np; import pandas as pd; import cudf
import cuml; import xgboost; from xgboost import plot_tree

import time; import copy 

import data_utils               # load datasets (or generate data) on the gpu
import swarm                    # particle swarm implementation
import visualization as viz     # visualization

In [12]:
# reload library modules/code without a kernel restart
import importlib; importlib.reload( swarm ); importlib.reload( data_utils ); importlib.reload( viz);

> TODO: update viz

> TODO: prune swarm.py, data_utils.py, visualization.py

> TODO: main.py / command-line

> TODO: run_local_experiment.py -> scaling_experiments.py

In [4]:
import dask
from dask import delayed
from dask_cuda import LocalCUDACluster

from dask.distributed import Client
from dask.distributed import as_completed
from dask.distributed import worker

In [5]:
paramRanges = { 0: ['max_depth', 3, 20, 'int'],
                1: ['learning_rate', .001, 1, 'float'],
                2: ['gamma', 0, 2, 'float'] }

In [None]:
class Dataset():    
    def __init__(self, datasetName = 'synthetic', nSamples = None):
        self.datasetName = datasetName
        
        if self.datasetName == 'synthetic':
            
            if nSamples == None: nSamples = 1000000
            data, labels, elapsedTime  = data_utils.generate_dataset( coilType = 'helix', nSamples = nSamples)
            self.trainObjective = ['binary:hinge', None]

        elif self.datasetName == 'fashion-mnist':

            data, labels, elapsedTime = data_utils.load_fashion_mnist () 
            self.trainObjective = ['multi:softmax', 10]

        elif self.datasetName == 'airline':
            
            if nSamples == None: nSamples = 5000000
            data, labels, elapsedTime = data_utils.load_airline_dataset ( 'data/', np.min( ( nSamples, 115000000 )))
            self.trainObjective = ['binary:hinge', None]
        
        # split train and test data
        self.trainData, self.trainLabels, self.testData, self.testLabels = self.split_train_test ( data, labels )
        
        # apply standard scaling
        trainMeans, trainSTDevs, _ = data_utils.scale_dataframe_inplace ( self.trainData )
        _,_,_ = data_utils.scale_dataframe_inplace ( self.testData, trainMeans, trainSTDevs )
        
    def split_train_test(self, data, labels, trainSize = .75, trainTestOverlap = .025 ):
        if self.datasetName == 'synthetic':            
            trainData, trainLabels, testData, testLabels, _ = data_utils.split_train_test_nfolds ( data, labels, trainTestOverlap = trainTestOverlap )
        else:
            trainData, testData, trainLabels, testLabels = cuml.train_test_split( data, labels, shuffle = False, train_size= trainSize )        
        return trainData, trainLabels, testData, testLabels

In [None]:
class Particle():
    def __init__ (self, pos, velo, particleID = -1 ):
        self.pos = pos
        self.velo = velo
        self.pID = particleID
        self.personalBestParams = pos
        self.personalBestPerf = 0
        self.posHistory = []
        self.evalTimeHistory = []
        self.nEvals = 0

In [None]:
class Swarm():
    def __init__ ( self, client, dataset, paramRanges, nParticles = 10, nEpochs = 10 ):
        
        self.client = client
        self.dataset = dataset
        self.paramRanges = paramRanges
        
        self.nParticles = nParticles
        self.nEpochs = nEpochs
        self.reset_swarm()
        
    def reset_swarm( self ):
        self.nEvals = 0
        
        self.particles = {}
        self.delayedEvalParticles = []

        self.globalBest = {'accuracy': 0, 'particleID': -1, 'params': [], 'iEvaluation': - 1}
    
    def scatter_data_to_workers( self ):
        self.scatteredDataFutures = None
        if self.client is not None:
            self.scatteredDataFutures = self.client.scatter( [ self.dataset.trainData, self.dataset.trainLabels,
                                                               self.dataset.testData,  self.dataset.testLabels ], broadcast = True )
    
    def build_initial_particles( self ):
        self.delayedEvalParticles = []
        for iParticle in range( self.nParticles ):
            pos, velo = swarm.sample_params ( self.paramRanges )
            self.particles[iParticle] = Particle( pos, velo, particleID = iParticle )
            self.delayedEvalParticles.append ( delayed ( evaluate_particle ) ( self.scatteredDataFutures,
                                                                               self.particles[iParticle].pos,
                                                                               self.paramRanges,
                                                                               self.particles[iParticle].pID,
                                                                               self.dataset.trainObjective ) )

    def enforce_bounds( self, newParameters ):
        for iParameter in range( len ( newParameters )):
            newParameters[iParameter] = np.clip ( newParameters[iParameter], self.paramRanges[iParameter][1], self.paramRanges[iParameter][2])
        return newParameters
    
    def update_particle (self, pID, latestTestDataPerf, evalTime, 
                         wMomentum = .1, wGlobalBest = .55, wPersonalBest = .35, 
                         mode = 'classification'):
        
        if latestTestDataPerf > self.globalBest['accuracy']:
            self.globalBest['accuracy'] = latestTestDataPerf
            self.globalBest['params'] = self.particles[pID].pos.copy()
            self.globalBest['particleID'] = pID
            print(f'new global best {latestTestDataPerf:0.5f} found by particle {pID}, at eval {self.nEvals}')
        
        if latestTestDataPerf > self.particles[pID].personalBestPerf:
            self.particles[pID].personalBestPerf = latestTestDataPerf
            self.particles[pID].personalBestParams = self.particles[pID].pos.copy()
            print(f'\t\t new personal best {latestTestDataPerf:0.5f} found by particle {pID}, at eval {self.nEvals}')
        
        # computing update terms for particle swarm
        inertiaInfluence = self.particles[pID].velo.copy()
        socialInfluence = ( self.globalBest['params'] - self.particles[pID].pos )
        individualInfluence = ( self.particles[pID].personalBestParams - self.particles[pID].pos )

        self.particles[pID].velo  =    wMomentum      *  inertiaInfluence     \
                                     + wPersonalBest  *  individualInfluence  * np.random.random()   \
                                     + wGlobalBest    *  socialInfluence      * np.random.random()

        self.particles[pID].pos = self.particles[pID].pos.copy() + self.particles[pID].velo
        self.particles[pID].pos = self.enforce_bounds( self.particles[pID].pos )
        
        self.particles[pID].posHistory.append( self.particles[pID].pos )
        self.particles[pID].nEvals += 1
        self.particles[pID].evalTimeHistory.append( evalTime )

In [None]:
class SyncSwarm ( Swarm ):
    def run_search( self ):
        self.reset_swarm ()
        self.scatter_data_to_workers ()
        self.build_initial_particles ()
        
        for iEpoch in range( self.nEpochs ):
            futureEvalParticles = self.client.compute( self.delayedEvalParticles )
            self.delayedEvalParticles = []
            
            for iParticleFuture in futureEvalParticles:
                testDataPerf, trainDataPerf, pID, evalTime = iParticleFuture.result()
                self.update_particle ( pID, testDataPerf, evalTime ) # inplace update to particle.pos, particle.velo
                
                self.nEvals += 1
                
                self.delayedEvalParticles.append ( delayed ( evaluate_particle ) ( self.scatteredDataFutures,
                                                                                   self.particles[pID].pos,
                                                                                   self.paramRanges,
                                                                                   self.particles[pID].pID,
                                                                                   self.dataset.trainObjective ) )
class AsyncSwarm ( Swarm ):
    def run_search( self ):
        self.reset_swarm ()
        self.scatter_data_to_workers ()
        self.build_initial_particles ()

        futureEvalParticles = self.client.compute( self.delayedEvalParticles )
        particleFutureSeq = as_completed( futureEvalParticles )
        
        # note that the particleFutureSeq is an iterator of futures
        # at the end of the loop we create new work and append it to the iterartor, this behavior resembles a while loop
        # see dask documentation https://docs.dask.org/en/latest/futures.html#distributed.as_completed
        for particleFuture in particleFutureSeq: 
            testDataPerf, trainDataPerf, pID, evalTime = particleFuture.result()
            self.update_particle ( pID, testDataPerf, evalTime ) # inplace update to particle.pos, particle.velo

            self.nEvals += 1
            approximateEpoch = self.nEvals // self.nParticles
            if approximateEpoch > self.nEpochs: break
            
            delayedParticle = delayed ( evaluate_particle ) ( self.scatteredDataFutures,
                                                              self.particles[pID].pos,
                                                              self.paramRanges,
                                                              self.particles[pID].pID,
                                                              self.dataset.trainObjective )
            
            futureParticle = self.client.compute( delayedParticle )
            particleFutureSeq.add( futureParticle )

In [None]:
# xgboost parameters -- https://xgboost.readthedocs.io/en/latest/parameter.html
def evaluate_particle ( scatteredDataFutures, particleParams, paramRanges, particleID, trainObjective, earlyStoppingRounds = 25 ) :
    trainDataFuture = scatteredDataFutures[0]
    trainLabelsFuture = scatteredDataFutures[1]
    testDataFuture = scatteredDataFutures[2]
    testLabelsFuture = scatteredDataFutures[3]
        
    xgboostParams = {
        'tree_method': 'gpu_hist',
        'random_state': 0, 
    }
    
    # objective [ binary or multi-class ]
    xgboostParams['objective'] = trainObjective[0]
    if trainObjective[1] is not None: xgboostParams['num_class'] = trainObjective[1]
    
    def enforce_type( parameterValue, paramRange ):        
        if paramRange[3] == 'int': 
            return int( parameterValue )
        elif paramRange[3] == 'float':
            return float( parameterValue )
        
    # flexible parameters
    xgboostParams['max_depth'] = enforce_type( particleParams[0], paramRanges[0] )
    xgboostParams['learning_rate'] = enforce_type( particleParams[1], paramRanges[1] ) # shrinkage of feature weights after each boosting step
    xgboostParams['gamma'] = enforce_type( particleParams[2], paramRanges[2] ) # complexity control, range [0, Inf ]
    xgboostParams['num_boost_rounds'] = 2000
    
    startTime = time.time()

    trainDMatrix = xgboost.DMatrix( data = trainDataFuture, label = trainLabelsFuture )
    testDMatrix = xgboost.DMatrix( data = testDataFuture, label = testLabelsFuture )

    trainedModelGPU = xgboost.train( dtrain = trainDMatrix, evals = [(testDMatrix, 'test')], params = xgboostParams,
                                     num_boost_round = xgboostParams['num_boost_rounds'], 
                                     early_stopping_rounds = earlyStoppingRounds,
                                     verbose_eval = False )

    trainDataPerf = 1 - float( trainedModelGPU.eval(trainDMatrix).split(':')[1] )
    testDataPerf = 1 - float( trainedModelGPU.eval(testDMatrix).split(':')[1] )   

    elapsedTime = time.time() - startTime

    return testDataPerf, trainDataPerf, particleID, elapsedTime

In [6]:
cluster = LocalCUDACluster( ip = '', n_workers = 4)

In [7]:
client = Client( cluster, asynchronous = True)

In [8]:
client

0,1
Client  Scheduler: tcp://172.17.0.2:45177  Dashboard: http://172.17.0.2:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 270.39 GB


In [13]:
#s = SyncSwarm( client, Dataset('synthetic'), paramRanges, nEpochs = 3 )
s = swarm.AsyncSwarm( client, data_utils.Dataset('synthetic'), paramRanges, nEpochs = 10 )

generating blobs; # points = 500000
generating blobs; # points = 500000
splitting data into training and test set
rescaling data
rescaling data


In [14]:
s.run_search()

new global best 0.22356 found by particle 7, at eval 0
		 new personal best 0.22356 found by particle 7, at eval 0
new global best 0.89590 found by particle 6, at eval 1
		 new personal best 0.89590 found by particle 6, at eval 1
new global best 0.92141 found by particle 1, at eval 2
		 new personal best 0.92141 found by particle 1, at eval 2
new global best 0.92362 found by particle 4, at eval 3
		 new personal best 0.92362 found by particle 4, at eval 3
		 new personal best 0.92168 found by particle 2, at eval 5
		 new personal best 0.28172 found by particle 5, at eval 6
new global best 0.92551 found by particle 8, at eval 7
		 new personal best 0.92551 found by particle 8, at eval 7
		 new personal best 0.91869 found by particle 6, at eval 8
		 new personal best 0.91586 found by particle 0, at eval 9
		 new personal best 0.92333 found by particle 2, at eval 10
new global best 0.92803 found by particle 7, at eval 11
		 new personal best 0.92803 found by particle 7, at eval 11
		 new 

# Viz

In [None]:
nEvalList = []
evalTimeList = []
pIDList = []
for pID, particle in s.particles.items():
    nEvalList.append ( particle.nEvals )
    evalTimeList.append ( np.mean( particle.evalTimeHistory ) )
    pIDList.append( pID )
df = pd.DataFrame ( data = { 'nEvals' : nEvalList}, index = pIDList )

In [None]:
df.sort_values('nEvals', ascending=False).plot.bar( figsize = (15, 7), color=[ tuple(viz.rapidsColors[1]), tuple(viz.rapidsColors[0]) ] );
plt.xlabel('pID'); plt.ylabel('nEvals');
plt.title('Async Swarm Evaluations Per Particle');

In [None]:
ipv.figure()
for iParticle in range(len(s.particles)):    
    xyz=np.matrix(s.particles[iParticle].posHistory)
    ipv.scatter(xyz[:,0].squeeze(),xyz[:,1].squeeze(),xyz[:,2].squeeze(), size=2, marker='sphere', color=viz.rapidsColors[iParticle])
    ipv.plot(xyz[:,0].squeeze(),xyz[:,1].squeeze(),xyz[:,2].squeeze(), size=2, color=viz.rapidsColors[iParticle])
ipv.show()

# TODO: update animation

In [None]:

def viz_particle_movement( particleHistory ):
    
    sortedBarHeightsDF = sorted_eval_frequency_per_particle ( particleHistory )
    
    particleHistoryCopy = copy.deepcopy( particleHistory )
    
    paramRanges = particleHistoryCopy['paramRanges']
    particleColors = particleHistoryCopy['particleColors']
    nParticles = particleHistory['nParticles']
    initialParticleParams = particleHistoryCopy['initialParams']
    
    nAnimationFrames = max( sortedBarHeightsDF['nEvals'] )
    particleXYZ = np.zeros( ( nAnimationFrames, nParticles, 3 ) )
    lastKnownLocation = {}
    
    
    # TODO: bestIterationNTrees
    # particleSizes[ iFrame, iParticle ] = particleHistoryCopy[iParticle]['bestIterationNTrees'].pop(0).copy()
    
    for iFrame in range( nAnimationFrames ):
        for iParticle in range( nParticles ):
            if iParticle in particleHistoryCopy.keys():
                # particle exists in the particleHistory and it has parameters for the current frame
                if len( particleHistoryCopy[iParticle]['particleParams'] ):
                    particleXYZ[iFrame, iParticle, : ] = particleHistoryCopy[iParticle]['particleParams'].pop(0).copy()
                    lastKnownLocation[iParticle] = particleXYZ[iFrame, iParticle, : ].copy()
                else:
                    # particle exists but it's params have all been popped off -- use its last known location
                    particleXYZ[iFrame, iParticle, : ] = lastKnownLocation[iParticle].copy()
                    
            else:
                # particle does not exist in the particleHistory
                if iParticle in lastKnownLocation.keys():
                    # particle has no params in current frame, attempting to use last known location
                    particleXYZ[iFrame, iParticle, : ] = lastKnownLocation[iParticle].copy()                    
                else:
                    # using initial params
                    particleXYZ[iFrame, iParticle, : ] = initialParticleParams[iParticle].copy()
                    lastKnownLocation[iParticle] = particleXYZ[iFrame, iParticle, : ].copy()                    
        
    ipv.figure()
    
    colorStack = np.random.random( ( nParticles, 3) )
    scatterPlots = ipv.scatter( particleXYZ[:, :,0], 
                                particleXYZ[:, :,1], 
                                particleXYZ[:, :,2], 
                                marker='sphere', 
                                size=5,
                                color = particleColors )
    
    ipv.animation_control( [ scatterPlots ] , interval = 400 )
    ipv.xlim( paramRanges[0][1]-.5, paramRanges[0][2]+.5 )
    ipv.ylim( paramRanges[1][1]-.1, paramRanges[1][2]+.1 )
    ipv.zlim( paramRanges[2][1]-.1, paramRanges[2][2]+.1 )
    
    ipv.show()    

<a id = 'data-load'></a>
----
# 1. Load/Generate Data 
----

This notebook works with a dataset (data + binary labels) that can either be generated or loaded.

Note that you are also welcome to bring your own dataset. The available datasets we provide include:

| Name                                                      | Default Samples &nbsp;&nbsp;&nbsp;| Max Samples &nbsp;&nbsp;&nbsp;| Columns | Accuracy Before HPO  | Accuracy After HPO |
|-----------------------------------------------------------|-----------------|-------------|---------|-------| -------|
| synthetic helix/whirl &nbsp;&nbsp;&nbsp;                  | 1M              | Inf         | 3       | .25 | .95 |
| [fashion-mnist](https://github.com/zalandoresearch/fashion-mnist)    | 60K              | 60K         | 784      | .86 | .90 |
| [airline](http://kt.ijs.si/elena_ikonomovska/data.html)   | 5M              | 115M        | 13      | .81 | .86 |

<!--- | [higgs](https://archive.ics.uci.edu/ml/datasets/HIGGS)    | 1M              | 11M         | 28      | .69 | .75 | -->


In addition to specifying a dataset, you can also choose the number of samples that will be loaded/generated -- this is helpful to make for a compelling demo in a short amount of time as well as to stay within memory limits.



In [None]:
datasetName = 'airline'; nSamples = 5000000
datasetName = 'synthetic'; nSamples = 1000000
datasetName = 'fashion-mnist'; nSamples = 60000

In [None]:
if datasetName == 'synthetic':
        
    data, labels, elapsedTime  = data_utils.generate_dataset( coilType = 'helix', nSamples = nSamples)

elif datasetName == 'fashion-mnist':
    
    data, labels, elapsedTime = data_utils.load_fashion_mnist () 

elif datasetName == 'airline':
    
    nSamplesToLoad = np.min( ( nSamples, 115000000 ))    
    data, labels, elapsedTime = data_utils.load_airline_dataset ( 'data/', nSamplesToLoad)

'''
elif datasetName == 'higgs-kaggle':
    nSamplesToLoad = np.min( ( nSamples, 250000 ))
    data, labels, elapsedTime = data_utils.load_higgs_kaggle ()
    
elif datasetName == 'higgs':
    nSamplesToLoad = np.min( ( nSamples, 11000000 ))
    data, labels, elapsedTime = data_utils.load_higgs_dataset ( 'data/', nSamplesToLoad)
'''        

print(f'dataset shape: {data.shape}\n > loaded in {elapsedTime} s')

## 1.1) Plot Dataset

In the case of the synthetic dataset, the feature dimensionality is 3, so we do no need to do any reduction prior to 3D plotting.

For real datasets with many features, we first apply a dimensionality reduction method prior to plotting.

In the cells below we demonstrate several approaches to dimensionality reduction available in RAPIDS [ cuml ] :

* PCA (linear, unsupervised), 
* TSNE (non-linear, unsupervised), and 
* UMAP (non-linear, supervised)

In [None]:
if datasetName == 'synthetic':
    viz.plot_data( data, labels, datasetName )
else:
    viz.plot_data( data, labels, datasetName, dimReductionMethod = 'PCA', maxSamplesForDimReduction = 1000000)

In [None]:
if datasetName != 'synthetic':
    viz.plot_data( data, labels, datasetName, dimReductionMethod = 'UMAP', maxSamplesForDimReduction = 100000 )

In [None]:
if datasetName != 'synthetic':
    viz.plot_data( data, labels, datasetName, dimReductionMethod = 'TSNE', maxSamplesForDimReduction = 25000 )

<a id = 'ETL'></a>

# 2. ETL
-----

## 2.1) Split Data into Train & Test Sets

In [None]:
if datasetName == 'synthetic':
    trainTestOverlap = .025
    trainData, trainLabels, testData, testLabels, _ = data_utils.split_train_test_nfolds ( data, labels, trainTestOverlap = trainTestOverlap )
else:
    trainData, testData, trainLabels, testLabels = cuml.train_test_split( data, labels, shuffle = False, train_size=.70 )

## 2.2) Re-scale / Normalize

In [None]:
def scale_dataframe_inplace ( targetDF, trainMeans = {}, trainSTDevs = {} ):    
    print('rescaling data')
    sT = time.time()
    for iCol in targetDF.columns:
        
        # omit scaling label column
        if iCol == targetDF.columns[-1] == 'label': continue
            
        # compute means and standard deviations for each column [ should skip for test data ]
        if iCol not in trainMeans.keys() and iCol not in trainSTDevs.keys():            
            trainMeans[iCol] = targetDF[iCol].mean()
            trainSTDevs[iCol] = targetDF[iCol].std()
            
        # apply scaling to each column
        targetDF[iCol] = ( targetDF[iCol] - trainMeans[iCol] ) / ( trainSTDevs[iCol] + 1e-10 )
        
    return trainMeans, trainSTDevs, time.time() - sT

In [None]:
# apply standard scaling
trainMeans, trainSTDevs, t_scaleTrain = scale_dataframe_inplace ( trainData )
_,_, t_scaleTest = scale_dataframe_inplace ( testData, trainMeans, trainSTDevs ) 

## 2.3) Train vs Test Data Comparison

In [None]:
viz.plot_data( trainData, trainLabels, datasetName, dimReductionMethod = 'UMAP', maxSamplesForDimReduction = 100000 )

In [None]:
viz.plot_data( testData, testLabels, datasetName, dimReductionMethod = 'UMAP', maxSamplesForDimReduction = 100000 )

In [None]:
trainData.shape, testData.shape

# Single Model Run 

In [None]:
trainData.shape, testData.shape, 

In [None]:
# xgboost parameters -- https://xgboost.readthedocs.io/en/latest/parameter.html
paramsGPU = { 
    'max_depth': 5,               
    'tree_method': 'gpu_hist',
    'random_state': 0, 
    'learning_rate': 0.3,         # shrinkage of feature weights after each boosting step
    'gamma': 0,                   # complexity control, range [0, Inf ]
    'subsample': 1,
    'min_child_weight': 1
}
# fixed parameters
nTrees = 10

nClasses = labels[labels.columns[0]].nunique()
if nClasses == 2:
    paramsGPU['objective'] = 'binary:hinge'
else:
    paramsGPU['objective'] = 'multi:softmax'
    paramsGPU['num_class'] = nClasses

startTime = time.time()

trainDMatrix = xgboost.DMatrix( data = trainData, label = trainLabels )
testDMatrix = xgboost.DMatrix( data = testData, label = testLabels )

trainedModelGPU = xgboost.train( dtrain = trainDMatrix, evals = [(testDMatrix, 'test')], params = paramsGPU,
                                 num_boost_round = nTrees, verbose_eval = False )

trainAccuracy = 1 - float( trainedModelGPU.eval(trainDMatrix).split(':')[1] )
testAccuracy = 1 - float( trainedModelGPU.eval(testDMatrix).split(':')[1] )   

predictionsGPU = trainedModelGPU.predict( testDMatrix ).astype(int)

elapsedTime = time.time() - startTime

In [None]:
paramsGPU

In [None]:
print(f' test accuracy: {testAccuracy:.3f},\n train accuracy: {trainAccuracy:.3f},\n elapsed time: {elapsedTime:.3f} s')

### Visualize a sample tree

In [None]:
fig = plt.figure(figsize=(100,100))
plot_tree(trainedModelGPU, num_trees=0, ax=plt.subplot(1,1,1))

In [None]:
viz.plot_data( testData, testLabels, datasetName, predictionsGPU, dimReductionMethod = 'UMAP')

<a id = 'define-hpo'></a>

# 3. Define HPO Strategy
-----

### Particle Swarm : 
> Particles are randomly initialized, and change their position in hyper-parameter space using three terms 
1. momentum
2. personal best 
3. global best
<center>    
$ velo_{t+1} = velo_t * w_{momentum} + (pos_{personal-best} - pos_{current}) * w_{personal} + (pos_{global-best} - pos_{current}) * w_{global} $
   $ pos_{t+1} = pos_{t} + velo_{t+1} $
</center>

### Particle Evaluation Logic 
> (i.e., Train & Eval using Changing Parameter Sets )

In [None]:
def evaluate_particle ( particle, dataFutures, earlyStoppingRounds, retainPredictionsFlag ):    

    # fixed parameters
    paramsGPU = { 'tree_method': 'gpu_hist',
                  'n_gpus': 1,
                  'random_state': 0 }

    # TODO: loop over paramRanges instead of hard code
    paramsGPU['max_depth'] = int( particle['params'][0] )
    paramsGPU['learning_rate'] = particle['params'][1]
    paramsGPU['gamma'] = particle['params'][2]
    paramsGPU['num_boost_rounds'] = 2000

    nClasses = dataFutures['trainLabels'][dataFutures['trainLabels'].columns[0]].nunique()
    if nClasses == 2:
        paramsGPU['objective'] = 'binary:hinge'
    else:
        paramsGPU['objective'] = 'multi:softmax'
        paramsGPU['num_class'] = nClasses        
    
    startTime = time.time()

    trainDMatrix = xgboost.DMatrix( data = dataFutures['trainData'], label = dataFutures['trainLabels'] )
    testDMatrix = xgboost.DMatrix( data = dataFutures['testData'], label = dataFutures['testLabels'] )

    trainedModelGPU = xgboost.train( dtrain = trainDMatrix, evals = [(testDMatrix, 'test')], 
                                     params = paramsGPU,
                                     num_boost_round = paramsGPU['num_boost_rounds'],
                                     early_stopping_rounds = earlyStoppingRounds,
                                     verbose_eval = False )
        
    predictionsGPU = trainedModelGPU.predict( testDMatrix ).astype(int)
            
    elapsedTime = time.time() - startTime

    particle['nTrees'] = trainedModelGPU.best_iteration
    particle['trainAccuracy'] = 1 - float( trainedModelGPU.eval(trainDMatrix).split(':')[1] )
    particle['testAccuracy'] = 1 - float( trainedModelGPU.eval(testDMatrix).split(':')[1] )    
    
    particle['predictions'] = None
    if retainPredictionsFlag: 
        particle['predictions'] = predictionsGPU
    
    return particle, elapsedTime

### Particle Update Logic 
https://en.wikipedia.org/wiki/Particle_swarm_optimization

In [None]:
def update_particle( particle, paramRanges, globalBestParams, personalBestParams, 
                     wMomentum, wIndividual, wSocial, wExplore, randomSearchMode = False, randomSeed = None ):
    ''' 
    # TODO: debug dask caching [?] attempting to use a seed produces the same sequence of random samples
    if randomSeed is not None:
        np.random.seed(randomSeed)    
    '''
    
    # baseline to compare swarm update versus random search
    if randomSearchMode:        
        sampledParams, sampledVelocities = swarm.sample_params( paramRanges )
        return sampledParams, sampledVelocities
        
    # computing update terms for particle swarm
    inertiaInfluence = particle['velocities'].copy()
    socialInfluence = ( globalBestParams - particle['params'] )
    individualInfluence = ( personalBestParams - particle['params'] )
    
    newParticleVelocities =    wMomentum    *  inertiaInfluence \
                             + wIndividual  *  individualInfluence  * np.random.random()   \
                             + wSocial      *  socialInfluence      * np.random.random()
    
    newParticleParams = particle['params'].copy() + newParticleVelocities
    newParticleParams = swarm.enforce_param_bounds_inline ( newParticleParams, paramRanges )
            
    return newParticleParams, newParticleVelocities

### HPO Run Loop

In [None]:
def run_hpo ( client, mode, paramRanges, trainData_cDF, trainLabels_cDF, testData_cDF, testLabels_cDF,
              nParticles, nEpochs,             
              wMomentum = .05, wIndividual = .35, wBest = .25, wExplore = .15, earlyStoppingRounds = 50,
              terminationAccuracy = np.Inf, 
              randomSeed = 0, 
              plotFlag = False,
              retainPredictionsFlag = False ):
    
    startTime = time.time()
    
    # ----------------------------
    # scatter data to all workers
    # ----------------------------
    if client is not None:
        scatteredData_future = client.scatter( [ trainData_cDF, trainLabels_cDF, testData_cDF, testLabels_cDF], broadcast = True )
        
    dataFutures = { 'trainData'   : scatteredData_future[0], 'trainLabels' : scatteredData_future[1], 
                    'testData'    : scatteredData_future[2], 'testLabels'  : scatteredData_future[3] }
    
    # ----------------------------
    # initialize HPO strategy 
    # ----------------------------        
    def initialize_particle_futures ( nParticles, paramRanges, randomSeed, plotFlag ) :
        initialParticleParams, initialParticleVelocities, globalBest, particleColors = swarm.initialize_particle_swarm ( nParticles, paramRanges, randomSeed, plotFlag )
        # create particle futures using the initialization positions and velocities    
        delayedEvalParticles = []
        for iParticle in range(nParticles):
            particle = { 'ID': iParticle, 'params': initialParticleParams[iParticle], 'velocities': initialParticleVelocities[iParticle], 'predictions': None }        
            delayedEvalParticles.append( delayed ( evaluate_particle )( particle.copy(), dataFutures, earlyStoppingRounds, retainPredictionsFlag ))
        return delayedEvalParticles, initialParticleParams, globalBest, particleColors
        
    # ------------------------------------------------
    # shared logic for particle evaluation and updates
    # ------------------------------------------------
    def eval_and_update ( particleFuture, delayedEvalParticles, particleHistory, paramRanges, globalBest, randomSearchMode, nEvaluations ):
        # convert particle future to concrete result and collect returned values
        particle, elapsedTime = particleFuture.result()

        # update hpo strategy meta-parameters -- i.e. swarm global best and particle personal best
        particleHistory, globalBest = swarm.update_bests ( particleHistory, particle, globalBest, nEvaluations, mode['randomSearch'] )

        # update history with this particle's latest contribution/eval
        particleHistory = swarm.update_history_dictionary ( particleHistory, particle, nEvaluations )

        # update particle
        if randomSearchMode:
            personalBestParams = None
        else:
            personalBestParams = particleHistory[particle['ID']]['personalBestParams']
            
        particle['params'], particle['velocities'] = update_particle ( particle, paramRanges,
                                                                       globalBest['params'], personalBestParams,
                                                                       wMomentum, wIndividual, wBest, wExplore,
                                                                       randomSearchMode = randomSearchMode, 
                                                                       randomSeed = particle['ID'] ) # repeatability
        return particle.copy(), particleHistory, globalBest
    
    nEvaluations = 0
    particleHistory = {}
    
    if mode['allowAsyncUpdates'] != True:
        # ----------------------------
        # synchronous particle swarm
        # ----------------------------
        delayedEvalParticles, initialParticleParams, globalBest, particleColors = initialize_particle_futures ( nParticles, paramRanges, randomSeed, plotFlag )
        futureEvalParticles = client.compute( delayedEvalParticles )
        
        for iEpoch in range (0, nEpochs ):    
            futureEvalParticles = client.compute( delayedEvalParticles )
            delayedEvalParticles = []
            for particleFuture in futureEvalParticles:
                newParticle, particleHistory, globalBest = eval_and_update ( particleFuture, delayedEvalParticles, particleHistory, paramRanges, globalBest, mode['randomSearch'], nEvaluations )

                # append future work for the next instantiation of this particle ( using the freshly updated parameters )
                delayedEvalParticles.append( delayed ( evaluate_particle )( newParticle, dataFutures, earlyStoppingRounds, retainPredictionsFlag ))
                
                nEvaluations += 1
            # --- 
            print(f' > on epoch {iEpoch} out of {nEpochs}') 
    
    else:
        # ----------------------------
        # asynchronous particle swarm
        # ----------------------------
        delayedEvalParticles, initialParticleParams, globalBest, particleColors = initialize_particle_futures ( nParticles, paramRanges, randomSeed, plotFlag )
        futureEvalParticles = client.compute( delayedEvalParticles )        
        particleFutureSeq = as_completed( futureEvalParticles )
        
        for particleFuture in particleFutureSeq:
            newParticle, particleHistory, globalBest = eval_and_update ( particleFuture, delayedEvalParticles, particleHistory, paramRanges, globalBest, mode['randomSearch'], nEvaluations )
            
            # termination conditions 
            if globalBest['accuracy'] > terminationAccuracy: break
            approximateEpoch = nEvaluations // nParticles
            if ( approximateEpoch ) > nEpochs : break
            
            # append future work for the next instantiation of this particle ( using the freshly updated parameters )
            delayedParticle = delayed ( evaluate_particle )( newParticle, dataFutures, earlyStoppingRounds, retainPredictionsFlag )
            # submit this particle future to the client ( returns a future )
            futureParticle = client.compute( delayedParticle )
            # track its completion via the as_completed iterator 
            particleFutureSeq.add( futureParticle )
            
            nEvaluations += 1
            if nEvaluations % nParticles == 0:
                print(f' > on approximate epoch {approximateEpoch} out of {nEpochs}') 
                              
    elapsedTime = time.time() - startTime
    
    print(f"\n\n best accuracy: {globalBest['accuracy']}, by particle: {globalBest['particleID']} on eval: {globalBest['iEvaluation']} ")
    print(f" best parameters: {swarm.format_params( globalBest['params'], globalBest['nTrees'] )}, \n elpased time: {elapsedTime:.2f} seconds")
    
    particleHistory['initialParams'] = initialParticleParams
    particleHistory['paramRanges'] = paramRanges
    particleHistory['particleColors'] = particleColors
    particleHistory['nParticles'] = nParticles
    return particleHistory, globalBest, elapsedTime

### Inspect/Edit Library Code [optional]
> swarm.import_library_function_in_new_cell ( **function-name** )

In [None]:
import inspect

In [None]:
sampleImportFlag = False
if sampleImportFlag:
    swarm.import_library_function_in_new_cell ( [ swarm.sample_params, swarm.initialize_particle_swarm] )

<a id = 'compute-cluster'></a>

# 4. Create Compute Cluster
-----

In [None]:
import dask
from dask import delayed
from dask_cuda import LocalCUDACluster

from dask.distributed import Client
from dask.distributed import as_completed
from dask.distributed import worker

In [None]:
cluster = LocalCUDACluster( ip = '', n_workers = 4)

In [None]:
''' # TODO -- finalize dask_kubernetes check on RAPIDS 0.10.0 cuda 10.1
from dask_kubernetes import KubeCluster
cluster = KubeCluster( ip = '' )
''';

In [None]:
client = Client( cluster, asynchronous = True)

<a id = 'define-search'></a>

# 5. Define Search: 
-----

Define hyper-parameter ranges

In [None]:
paramRanges = { 0: ['max_depth', 3, 20, 'int'],
                1: ['learning_rate', .001, 1, 'float'],
                2: ['gamma', 0, 2, 'float'] }

Define SWARM size (nParticles) particles and search duration (nEpochs)

In [None]:
nParticles = 16
nEpochs = 15

### 5.1) Baseline / Demo : Single Particle for a Single Epoch

In [None]:
mode = {'allowAsyncUpdates': False, 'randomSearch': False }
particleHistory, globalBest, _ = run_hpo ( client, mode, paramRanges, 
                                           trainData, trainLabels, testData, testLabels, 
                                           nParticles = 1, nEpochs = 1, randomSeed = 0, retainPredictionsFlag = False,
                                           earlyStoppingRounds = 15 )

<a id = 'run-async-PSO'></a>
# 6. Run **ASYNC-Particle-Swarm** 
-----

Performance: Typically **better performance** compared to synchronous-particle-swarm and random-search.

Wall Clock Time: Typically this is the **fastest** search process.

* Description: Particles move using a random mixture of personal and global best [ possibly stale ]

* Parallelism: Asynchronous
  * NOTE: Particles update without waiting for their peers causing faster evaluating hyper-parameter particles to run more often ( see nEvaluations figure ).

In [None]:
mode = {'allowAsyncUpdates': True, 'randomSearch': False }
particleHistoryAsync, globalBestAsync, _ = run_hpo ( client, mode, paramRanges,
                                                     trainData, trainLabels, testData, testLabels, 
                                                     nParticles, nEpochs,
                                                     wMomentum = .05, wIndividual = .25, wBest = .45 )

In [None]:
sortedBarHeightsDF = swarm.plot_eval_distribution ( particleHistoryAsync,  globalBestAsync['particleID'] );

In [None]:
swarm.viz_particle_movement( particleHistoryAsync )

<a id = 'run-classic-PSO'></a>
# 7. Run **Classic-Particle-Swarm**
-----




Typically classic-particle-swarm reaches higher performance than random-search, but lower performance relative to async-particle-search
* Wall Clock Time: Typically this is search process.

* Performance: Typically this search method reaches the best result.


* Description: Particles move using a random mixture of personal and global best

* Parallelism: Partial-Asynchronous
  * NOTE: Particles update in parallel with their peers within an epoch, but wait for the slowest particle at epoch boundaries. All particles are guaranteed to have the same number of evaluations ( see nEvaluations figure ).

In [None]:
mode = {'allowAsyncUpdates': False, 'randomSearch': False }
particleHistory, globalBest, _ = run_hpo ( client, mode, paramRanges, 
                                           trainData, trainLabels, testData, testLabels,
                                           nParticles, nEpochs,
                                           wMomentum = .05, wIndividual = .25, wBest = .45 )

In [None]:
sortedBarHeightsDF = swarm.plot_eval_distribution ( particleHistory,  globalBest['particleID'] );


In [None]:
swarm.viz_particle_movement( particleHistory )

<a id = 'run-random-search'></a>
# 8. Run **Random-Search-Baseline**
-----

Performance: Typically random search **performs worse** than the particle swarm variants.

Wall Clock Time: Typically this is the **slowest** search process.

* Description: Particles telport to new [random ] positions in hyper-parameter space
* Parallelism: Asynchronous
  * NOTE: Particles update without waiting for their peers, and randomly adopt new parameter combinations. Long runs (e.g.,  nEpochs = 30000) should produce balanced nEvaluations per-particle, though imbalance is often visible for shorter runs [ less imbalanced than ASYNC PSO, more imbalanced than PSO ]

In [None]:
mode = {'allowAsyncUpdates': True, 'randomSearch': True }
particleHistoryBaseline, globalBestBaseline, _ = run_hpo ( client, mode, paramRanges, 
                                                           trainData, trainLabels, testData, testLabels,  
                                                           nParticles, nEpochs )

In [None]:
sortedBarHeightsDF = swarm.plot_eval_distribution ( particleHistoryBaseline,  globalBestBaseline['particleID'] );

In [None]:
swarm.viz_particle_movement( particleHistoryBaseline )

<a id = 'summary'></a>
# 9. Summary
-----

Async Scaling > Sync Scaling > Random Search

# 10. Extensions

larger than single GPU memory datasets - dask_cudf + [ dask_xgboost or xgboost.dask ]