# Hyperopt

Hyperopt is a Python library for "serial and parallel optimization over awkward search spaces, which may include real-valued, discrete, and conditional dimensions".

In the machine learning workflow, hyperopt can be used to distribute/parallelize the hyperparameter optimization process with more advanced optimization strategies than are available in other libraries.

There are two ways to scale hyperopt with Apache Spark:
* Use single-machine hyperopt with a distributed training algorithm (e.g. MLlib)
* Use distributed hyperopt with single-machine training algorithms (e.g. scikit-learn) with the SparkTrials class. 

We will use single-machine hyperopt with MLlib, and also see how to use hyperopt to distribute the hyperparameter tuning of single node models with Scikit-Learn. 

Unfortunately you can’t use hyperopt to distribute the hyperparameter optimization for distributed training algorithms at this time. However, you do still get the benefit of using more advanced hyperparameter search algorthims (random search, TPE, etc.) with Spark ML.


Resources:
0. [Documentation](http://hyperopt.github.io/hyperopt/scaleout/spark/)
0. [Hyperopt on Databricks](https://docs.databricks.com/applications/machine-learning/automl/hyperopt/index.html)
0. [Hyperparameter Tuning with MLflow, Apache Spark MLlib and Hyperopt](https://databricks.com/blog/2019/06/07/hyperparameter-tuning-with-mlflow-apache-spark-mllib-and-hyperopt.html)

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lesson you:<br>
 - Use hyperopt to find the optimal parameters for an MLlib model using TPE

Let's start by loading in our Airbnb Dataset.

In [0]:
import os

In [0]:
## Put your name here
username = "renato"

dbutils.widgets.text("username", username)
spark.sql(f"CREATE DATABASE IF NOT EXISTS dsacademy_embedded_wave3_{username}")
spark.sql(f"USE dsacademy_embedded_wave3_{username}")
spark.conf.set("spark.sql.shuffle.partitions", 40)

spark.sql("SET spark.databricks.delta.formatCheck.enabled = false")
spark.sql("SET spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true")

Out[22]: DataFrame[key: string, value: string]

In [0]:
deltaPath = os.path.join("/", "tmp", username)    #If we were writing to the root folder and not to the DBFS
if not os.path.exists(deltaPath):
    os.mkdir(deltaPath)
    
print(deltaPath)

airbnbDF = spark.read.format("delta").load(deltaPath)

/tmp/renato


In [0]:
airbnbDF.display()

host_is_superhost,instant_bookable,host_total_listings_count,neighbourhood_cleansed,latitude,longitude,property_type,room_type,accommodates,bedrooms,beds,minimum_nights,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,price,bedrooms_na,beds_na,review_scores_rating_na,review_scores_accuracy_na,review_scores_cleanliness_na,review_scores_checkin_na,review_scores_communication_na,review_scores_location_na,review_scores_value_na
f,f,6.0,Donaustadt,48.24262,16.42767,Room in bed and breakfast,Hotel room,3.0,1.0,2.0,1.0,14.0,4.71,4.86,4.93,4.93,4.86,4.71,4.5,110.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
t,t,3.0,Leopoldstadt,48.21924,16.37831,Entire rental unit,Entire home/apt,5.0,1.0,3.0,5.0,350.0,4.75,4.8,4.65,4.91,4.93,4.75,4.69,69.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
t,t,19.0,Rudolfsheim-Fnfhaus,48.18434,16.32701,Entire rental unit,Entire home/apt,6.0,2.0,4.0,1.0,181.0,4.83,4.9,4.88,4.89,4.93,4.59,4.7,145.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
f,f,6.0,Innere Stadt,48.21496,16.37161,Entire rental unit,Entire home/apt,2.0,1.0,1.0,2.0,100.0,4.64,4.73,4.55,4.8,4.91,4.89,4.59,100.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
t,f,3.0,Leopoldstadt,48.21778,16.37847,Entire rental unit,Entire home/apt,3.0,1.0,2.0,5.0,347.0,4.65,4.77,4.51,4.93,4.95,4.86,4.58,68.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
f,f,6.0,Innere Stadt,48.21351,16.37282,Entire rental unit,Entire home/apt,2.0,1.0,1.0,3.0,52.0,4.63,4.67,4.35,4.69,4.75,4.88,4.56,99.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
t,f,4.0,Leopoldstadt,48.2176,16.38018,Private room in rental unit,Private room,2.0,1.0,2.0,2.0,117.0,4.77,4.74,4.68,4.8,4.75,4.81,4.71,50.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
f,f,6.0,Innere Stadt,48.21318,16.37486,Entire rental unit,Entire home/apt,4.0,2.0,1.0,3.0,69.0,4.58,4.8,4.76,4.83,4.92,4.85,4.73,140.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
f,t,1.0,Ottakring,48.22207,16.31594,Entire rental unit,Entire home/apt,4.0,2.0,2.0,3.0,50.0,4.87,4.94,4.71,4.94,4.96,4.4,4.73,77.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
t,f,2.0,Favoriten,48.17437,16.39339,Entire condo,Entire home/apt,4.0,1.0,2.0,5.0,178.0,4.77,4.87,4.67,4.88,4.87,3.98,4.66,87.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [0]:
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

We will then create our random forest pipeline and regression evaluator.

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid='skip')

rf = RandomForestRegressor(labelCol="price", maxBins=56, seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")

In [0]:
regressionEvaluator

Next, we get to the hyperopt-specific part of the workflow.

First, we define our **objective function**. The objective function has two primary requirements:

1. An **input** `params` including hyperparameter values to use when training the model
2. An **output** containing a loss metric on which to optimize

In this case, we are specifying values of `max_depth` and `num_trees` and returning the RMSE as our loss metric.

We are reconstructing our pipeline for the `RandomForestRegressor` to use the specified hyperparameter values.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow

def objective_function(params):    
    # set the hyperparameters that we want to tune
    max_depth = params["max_depth"]
    num_trees = params["num_trees"]

    # create a grid with our hyperparameters
    grid = (ParamGridBuilder()
      .addGrid(rf.maxDepth, [max_depth])
      .addGrid(rf.numTrees, [num_trees])
      .build())

    # cross validate the set of hyperparameters
    cv = CrossValidator(estimator=pipeline, 
                        estimatorParamMaps=grid, 
                        evaluator=regressionEvaluator, 
                        numFolds=3)

    cvModel = cv.fit(trainDF)

    # get our average RMSE across all three folds
    rmse = cvModel.avgMetrics[0]

    return {"loss": rmse, "status": STATUS_OK}

Next, we define our search space. 

This is similar to the parameter grid in a grid search process. However, we are only specifying the range of values rather than the individual, specific values to be tested. It's up to hyperopt's optimization algorithm to choose the actual values.

See the [documentation](https://github.com/hyperopt/hyperopt/wiki/FMin) for helpful tips on defining your search space.

In [0]:
from hyperopt import hp

search_space = {
  "max_depth": hp.randint("max_depth", 2, 5),
  "num_trees": hp.randint("num_trees", 10, 100)
}

`fmin()` generates new hyperparameter configurations to use for your `objective_function`. It will evaluate 4 models in total, using the information from the previous models to make a more informative decision for the the next hyperparameter to try. 

Hyperopt allows for parallel hyperparameter tuning using either random search or Tree of Parzen Estimators (TPE). Note that in the cell below, we are importing `tpe`. According to the [documentation](http://hyperopt.github.io/hyperopt/scaleout/spark/), TPE is an adaptive algorithm that 

> iteratively explores the hyperparameter space. Each new hyperparameter setting tested will be chosen based on previous results. 

Hence, `tpe.suggest` is a Bayesian method.

In [0]:
import mlflow
mlflow.autolog(disable=True)

In [0]:
from hyperopt import fmin, tpe, STATUS_OK, Trials
import numpy as np

# Creating a parent run
with mlflow.start_run():
    num_evals = 4
    trials = Trials()
    best_hyperparam = fmin(fn=objective_function, 
                           space=search_space,
                           algo=tpe.suggest, 
                           max_evals=num_evals,
                           trials=trials,
                           #rstate=np.random.RandomState(42)
                           rstate=np.random.default_rng(42) #https://github.com/hyperopt/hyperopt/issues/838
                          )
  
    # get optimal hyperparameter values
    best_max_depth = best_hyperparam["max_depth"]
    best_num_trees = best_hyperparam["num_trees"]
  
    # change RF to use optimal hyperparameter values (this is a stateful method)
    rf.setMaxDepth(best_max_depth)
    rf.setNumTrees(best_num_trees)
  
    # train pipeline on entire training data - this will use the updated RF values
    pipelineModel = pipeline.fit(trainDF)
  
    # evaluate final model on test data
    predDF = pipelineModel.transform(testDF)
    rmse = regressionEvaluator.evaluate(predDF)
  
    # Log param and metric for the final model
    mlflow.log_param("max_depth", best_max_depth)
    mlflow.log_param("numTrees", best_num_trees)
    mlflow.log_metric("rmse", rmse)


job exception: An error occurred while calling o2692.transform.
: java.lang.NoClassDefFoundError: com/facebook/fb303/FacebookService$Iface
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:362)
	at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:327)
	at java.lang.ClassLoa

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3451876691914167>[0m in [0;36m<cell line: 5>[0;34m()[0m
[1;32m      6[0m     [0mnum_evals[0m [0;34m=[0m [0;36m4[0m[0;34m[0m[0;34m[0m[0m
[1;32m      7[0m     [0mtrials[0m [0;34m=[0m [0mTrials[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 8[0;31m     best_hyperparam = fmin(fn=objective_function, 
[0m[1;32m      9[0m                            [0mspace[0m[0;34m=[0m[0msearch_space[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m
[1;32m     10[0m                            [0malgo[0m[0;34m=[0m[0mtpe[0m[0;34m.[0m[0msuggest[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/.python_edge_libs/hyperopt/fmin.py[0m in [0;36mfmin[0;34m(fn, space, algo, max_evals, timeout, loss_threshold, trials, rstate, allow_trials_fmin, pass_expr_memo_ctr

#### [Using Hyperopt with Scikit-Learn](http://hyperopt.github.io/hyperopt/scaleout/spark/)  
Below, we give an example workflow which tunes a scikit-learn model using SparkTrials. 
This example was adapted from the scikit-learn doc example for sparse logistic regression

In [0]:
df = airbnbDF.toPandas()
df.dropna(inplace=True)
y = df["price"].values
X = df[['accommodates', 'bedrooms', 'beds', 'minimum_nights',
       'number_of_reviews', 'review_scores_rating', 'review_scores_accuracy',
       'review_scores_cleanliness', 'review_scores_checkin',
       'review_scores_communication', 'review_scores_location',
       'review_scores_value', 'bedrooms_na', 'beds_na',
       'review_scores_rating_na', 'review_scores_accuracy_na',
       'review_scores_cleanliness_na', 'review_scores_checkin_na',
       'review_scores_communication_na', 'review_scores_location_na',
       'review_scores_value_na']].values



In [0]:
print(X.shape)
print(y.shape)



In [0]:
from sklearn.preprocessing import KBinsDiscretizer
discret = KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='quantile', dtype=np.float32)
y = discret.fit_transform(y.reshape(-1, 1)).reshape(-1)
print(y.shape)



In [0]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.utils import check_random_state

from hyperopt import fmin, hp, tpe
from hyperopt import SparkTrials, STATUS_OK

X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8)

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# First, set up the scikit-learn workflow, wrapped within a function.
def train(params):
    """
    This is our main training function which we pass to Hyperopt.
    It takes in hyperparameter settings, fits a model based on those settings,
    evaluates the model, and returns the loss.

    :param params: map specifying the hyperparameter settings to test
    :return: loss for the fitted model
    """
    # We will tune 2 hyperparameters:
    #  regularization and the penalty type (L1 vs L2).
    regParam = float(params['regParam'])
    penalty = params['penalty']

    # Turn up tolerance for faster convergence
    clf = LogisticRegression(C=1.0 / regParam,
                             multi_class='multinomial',
                             penalty=penalty, 
                             solver='saga', 
                             tol=0.1)
    clf.fit(X_train, y_train)
    score = clf.score(X_test, y_test)

    return {'loss': -score, 'status': STATUS_OK}

# Next, define a search space for Hyperopt.
search_space = {
   'penalty': hp.choice('penalty', ['l1', 'l2']),
   'regParam': hp.loguniform('regParam', -10.0, 0),
}

# Select a search algorithm for Hyperopt to use.
algo=tpe.suggest  # Tree of Parzen Estimators, a Bayesian method



##### We can run Hyperopt locally (only on the driver machine) by calling `fmin` without an explicit `trials` argument.

In [0]:
best_hyperparameters = fmin(
    fn=train,
    space=search_space,
    algo=algo,
    max_evals=32)

print(best_hyperparameters)



##### We can distribute tuning across our Spark cluster by calling `fmin` with a `SparkTrials` instance.

In [0]:
spark_trials = SparkTrials()
best_hyperparameters = fmin(
    fn=train,
    space=search_space,
    algo=algo,
    trials=spark_trials,
    max_evals=32)

print(best_hyperparameters)



#### Comparing with Grid Search of Scikit Learn

In [0]:
from sklearn.model_selection import GridSearchCV
import numpy as np



In [0]:
import warnings
warnings.filterwarnings("ignore")



In [0]:
search_space = {'penalty': ['l1', 'l2', 'none'],
                'C': np.logspace(0.1,1,3)
               }

clf = LogisticRegression(max_iter=5000,
                         solver='saga')

clf_lr = GridSearchCV(clf, param_grid=search_space)
clf_lr.fit(X_train, y_train)
score = clf_lr.score(X_test, y_test)



In [0]:
print(clf_lr.best_params_)



Adapted and updated from 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>