### Parallel execution of Hyperopt optimization using Spark

With the new class SparkTrials, you can tell Hyperopt to distribute a tuning job across an Apache Spark cluster. Initially developed within Databricks, this API has now been contributed to Hyperopt.

Hyperparameter tuning and model selection often involve training hundreds or thousands of models.  SparkTrials runs batches of these training tasks in parallel, one on each Spark executor, allowing massive scale-out for tuning. To use SparkTrials with Hyperopt, simply pass the SparkTrials object to Hyperopt’s fmin() function.

Under the hood, fmin() will generate new hyperparameter settings to test and pass them to SparkTrials, which runs these tasks asynchronously on a cluster as follows:

- Hyperopt’s primary logic runs on the Spark driver, computing new hyperparameter settings.
- When a worker is ready for a new task, Hyperopt kicks off a single-task Spark job for that hyperparameter setting.
- Within that task, which runs on one Spark executor, user code will be executed to train and evaluate a new ML model.
- When done, the Spark task will return the results, including the loss, to the driver.

These new results are used by Hyperopt to compute better hyperparameter settings for future tasks.

Since SparkTrials fits and evaluates each model on one Spark worker, it is limited to tuning single-machine ML models and workflows, such as scikit-learn or single-machine TensorFlow. For distributed ML algorithms such as Apache Spark MLlib or Horovod, you can use Hyperopt’s default Trials class.

SparkTrials may be configured via 3 arguments, all of which are optional:

1. parallelism The maximum number of trials to evaluate concurrently. Greater parallelism allows scale-out testing of more hyperparameter settings. Defaults to the number of Spark executors.

    - Trade-offs: The parallelism parameter can be set in conjunction with the max_evals parameter in fmin(). Hyperopt will test max_evals total settings for your hyperparameters, in batches of size parallelism. If parallelism = max_evals, then Hyperopt will do Random Search: it will select all hyperparameter settings to test independently and then evaluate them in parallel. If parallelism = 1, then Hyperopt can make full use of adaptive algorithms like Tree of Parzen Estimators (TPE) which iteratively explore the hyperparameter space: each new hyperparameter setting tested will be chosen based on previous results. Setting parallelism in between 1 and max_evals allows you to trade off scalability (getting results faster) and adaptiveness (sometimes getting better models).
    - Limits: There is currently a hard cap on parallelism of 128. SparkTrials will also check the cluster’s configuration to see how many concurrent tasks Spark will allow; if parallelism exceeds this maximum, SparkTrials will reduce parallelism to this maximum.
    

2. timeout Maximum time in seconds which fmin() is allowed to take, defaulting to None. Timeout provides a budgeting mechanism, allowing a cap on how long tuning can take. When the timeout is hit, runs are terminated if possible, and fmin() exits, returning the current set of results.

3. spark_session SparkSession instance for SparkTrials to use. If this is not given, SparkTrials will look for an existing SparkSession.


https://github.com/hyperopt/hyperopt/blob/master/hyperopt/spark.py

http://hyperopt.github.io/hyperopt/scaleout/spark/
http://hyperopt.github.io/hyperopt/scaleout/spark/#scaling-out-search-with-apache-spark



### establishing a spark session

In [1]:
import os
import sys

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('Hyperopt').getOrCreate()

### Optimization using hyperopt:

In [4]:
from hyperopt import fmin,tpe,hp, STATUS_OK, Trials
from hyperopt import SparkTrials

In [2]:
help(SparkTrials)

Help on class SparkTrials in module hyperopt.spark:

class SparkTrials(hyperopt.base.Trials)
 |  SparkTrials(parallelism=None, timeout=None, spark_session=None)
 |  
 |  Implementation of hyperopt.Trials supporting
 |  distributed execution using Apache Spark clusters.
 |  This requires fmin to be run on a Spark cluster.
 |  
 |  Plugging SparkTrials into hyperopt.fmin() allows hyperopt
 |  to send model training and evaluation tasks to Spark workers,
 |  parallelizing hyperparameter search.
 |  Each trial (set of hyperparameter values) is handled within
 |  a single Spark task; i.e., each model will be fit and evaluated
 |  on a single worker machine.  Trials are run asynchronously.
 |  
 |  See hyperopt.Trials docs for general information about Trials.
 |  
 |  The fields we store in our trial docs match the base Trials class.  The fields include:
 |   - 'tid': trial ID
 |   - 'state': JOB_STATE_DONE, JOB_STATE_ERROR, etc.
 |   - 'result': evaluation result for completed trial run
 |

In [5]:
import pickle
import time

#### Defining the objective function 

In [6]:
def objective(x):
    return {
        'loss': x ** 2,
        'status': STATUS_OK,
        # -- store other results like this
        'eval_time': time.time(),
        'other_stuff': {'type': None, 'value': [0, 1, 2]},
        # -- attachments are handled differently
        'attachments':
            {'time_module': pickle.dumps(time.time)}
    }

In [7]:
spark_trials=SparkTrials()

In [8]:
best=fmin(objective,
         space=hp.uniform('x',-10,10),
         algo=tpe.suggest,
         max_evals=100,
         trials=spark_trials)

100%|███████████████████████████████████████████████| 100/100 [01:40<00:00,  1.01s/it, best loss: 0.000557585765298602]


Total Trials: 100: 100 succeeded, 0 failed, 0 cancelled.


In [14]:
trials=Trials()

In [15]:
#not using spark
best=fmin(objective,
         space=hp.uniform('x',-10,10),
         algo=tpe.suggest,
         max_evals=100,
         trials=trials)

100%|██████████████████████████████████████████████| 100/100 [00:00<00:00, 761.52it/s, best loss: 0.005200315930984425]


In [16]:
trials.best_trial

{'state': 2,
 'tid': 67,
 'spec': None,
 'result': {'loss': 0.005200315930984425,
  'status': 'ok',
  'eval_time': 1579012188.4286406,
  'other_stuff': {'type': None, 'value': [0, 1, 2]}},
 'misc': {'tid': 67,
  'cmd': ('domain_attachment', 'FMinIter_Domain'),
  'workdir': None,
  'idxs': {'x': [67]},
  'vals': {'x': [-0.07211321606324617]}},
 'exp_key': None,
 'owner': None,
 'version': 0,
 'book_time': datetime.datetime(2020, 1, 14, 14, 29, 48, 428000),
 'refresh_time': datetime.datetime(2020, 1, 14, 14, 29, 48, 428000)}

In [9]:
spark_trials.best_trial

{'state': 2,
 'tid': 63,
 'spec': None,
 'result': {'loss': 0.000557585765298602,
  'status': 'ok',
  'eval_time': 1579028504.032471,
  'other_stuff': {'type': None, 'value': [0, 1, 2]},
  'attachments': {'time_module': b'\x80\x03ctime\ntime\nq\x00.'}},
 'misc': {'tid': 63,
  'cmd': ('domain_attachment', 'FMinIter_Domain'),
  'workdir': None,
  'idxs': {'x': [63]},
  'vals': {'x': [0.023613254017576697]}},
 'exp_key': None,
 'owner': None,
 'version': 0,
 'book_time': datetime.datetime(2020, 1, 14, 19, 1, 41, 562000),
 'refresh_time': datetime.datetime(2020, 1, 14, 19, 1, 44, 94000)}

In [10]:
print('Minimum loss attained with TPE:    {:.4f}'.format(spark_trials.best_trial['result']['loss']))

Minimum loss attained with TPE:    0.0006


In [11]:
import pandas as pd
spark_results = pd.DataFrame({'loss': [x['loss'] for x in spark_trials.results], 'iteration': spark_trials.idxs_vals[0]['x'],
                            'x': spark_trials.idxs_vals[1]['x']})
                            
spark_results.head()

Unnamed: 0,loss,iteration,x
0,2.229794,0,1.49325
1,30.622647,1,-5.533773
2,77.918291,2,8.827134
3,74.070993,3,8.606451
4,6.10997,4,-2.471835
