## Distributed hyperparameter tuning with Hyperopt + SparkTrials

- Dataset: [Credit Card Fraud Detection, Worldline & Machine Learning Group of ULB](http://mlg.ulb.ac.be/)
  - Anonymized credit card transactions labeled as fraudulent or genuine
  - Hosted on Databricks at `/databricks-datasets/credit-card-fraud`

- Model: `sklearn.ensemble.RandomForestClassifier`
- Tuning: Hyperopt with `SparkTrials`
  - An open-source tuning library that employs Tree of Parzien Estimators (TPE)
  - Spark backend for distributed tuning

In [0]:
import mlflow
import pandas as pd

from hyperopt import hp, tpe, fmin, STATUS_OK, Trials, SparkTrials
from hyperopt.pyll.base import scope

from pyspark.ml.functions import vector_to_array

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

In [0]:
mlflow.sklearn.autolog(disable=True)

#### Load and prepare the data

In [0]:
spark_df = spark.read.parquet("/databricks-datasets/credit-card-fraud/data")
spark_df = spark_df.withColumn("pca", vector_to_array("pcaVector"))

pca_length = 28
pca_columns = [spark_df.pca[i] for i in range(pca_length)]

data = spark_df.select(["time", "amountRange", "label"] + pca_columns).toPandas()
data = data.fillna(data.mean())
target_col = "label"

In [0]:
data.head()

Unnamed: 0,time,amountRange,label,pca[0],pca[1],pca[2],pca[3],pca[4],pca[5],pca[6],pca[7],pca[8],pca[9],pca[10],pca[11],pca[12],pca[13],pca[14],pca[15],pca[16],pca[17],pca[18],pca[19],pca[20],pca[21],pca[22],pca[23],pca[24],pca[25],pca[26],pca[27]
0,52972.0,2,0,-0.775461,0.709595,1.610122,1.231792,0.316178,0.11441,0.410964,0.048246,0.016042,0.590293,1.069637,-0.097002,-2.053954,0.296434,-0.242126,-1.103983,0.438081,-0.090098,0.972525,-0.009525,-0.031317,0.28993,-0.215881,0.196379,-0.267693,-0.298693,0.011195,-0.02332
1,41768.0,6,0,0.873554,-1.37751,1.072031,0.898335,-1.377677,1.134118,-1.049505,0.303593,0.391175,0.306457,-1.743321,0.496823,0.582594,-1.042981,-1.093048,-2.158048,0.552883,0.94166,-0.878166,-0.188112,-0.206446,-0.02086,-0.294526,-0.419986,0.530462,-0.095314,0.10247,0.061393
2,40769.0,7,0,0.890897,-0.528187,-0.678654,0.168701,-0.141127,-1.037345,0.78672,-0.463099,-0.231256,-0.210527,-0.579607,0.320749,0.739513,0.385612,0.760072,0.056286,-0.365932,-0.553207,0.225507,0.469012,0.072491,-0.283583,-0.367602,-0.003742,0.548136,1.073835,-0.15602,0.02921
3,40682.0,3,0,-0.572954,0.458246,0.239598,-1.561953,2.688063,3.529501,0.386101,0.773474,0.06167,-0.425875,-0.594523,-0.149708,-0.36863,-0.285347,-0.600322,0.106861,-0.793363,-0.208004,0.08233,0.229461,-0.2907,-0.627817,-0.20694,1.021907,0.191261,0.219703,0.122007,-0.062997
4,50032.0,7,0,-2.053059,0.50453,-0.111156,-1.040738,-2.68459,0.048439,2.15222,0.49799,-0.802817,-1.262772,1.101034,0.61755,-0.478062,0.968829,-0.337551,1.020513,-0.476524,0.048632,-0.389469,-0.142086,-0.051183,-0.37012,0.58337,0.556771,-0.250298,0.631668,0.10311,-0.186396


In [0]:
# Split into train and validation sets
X = data.drop([target_col], axis=1)
y = data[target_col]

X_train, X_val, y_train, y_val = train_test_split(X, y, stratify=y)

### Single-node hyperparameter tuning

#### Define search space and objective function

In [0]:
# Define the hyperparameter search space
space = {
  "n_estimators": scope.int(hp.quniform("n_estimators", 50, 150, 1)),
  "criterion": hp.choice("criterion", ["gini", "entropy"]),
  "min_samples_leaf": scope.int(hp.quniform("min_samples_leaf", 1, 20, 1)),
  "min_samples_split": hp.uniform("min_samples_split", 0, 1),
}

n_evals = 200

In [0]:
def objective(hyperparameters):
  # Instantiate the model with hyperparameters
  model = RandomForestClassifier(**hyperparameters)

  # Train the model
  model.fit(X_train, y_train)

  # Evaluate the learned model
  val_pred = model.predict(X_val)
  val_f1_score = f1_score(y_val, val_pred)

  # Use negative F1 score as our loss metric
  return {"loss": -val_f1_score, "status": STATUS_OK}

#### Train a `RandomForestClassifier` with Hyperopt

In [0]:
trials = Trials()
best = fmin(
  fn=objective,
  space=space,
  algo=tpe.suggest,
  max_evals=n_evals,
  trials=trials
)

best_f1 = sorted(trials.results, key=lambda result: result["loss"])[0]["loss"] * -1

In [0]:
best["F1 Score"] = best_f1
best["criterion"] = "gini" if best["criterion"] == 0 else "entropy"
display(pd.DataFrame(best, index=[0]))

criterion,min_samples_leaf,min_samples_split,n_estimators,F1 Score
gini,7.0,0.00011731443489267082,83.0,0.8225108225108225


### Distributed hyperparameter tuning

#### Broadcast data

In [0]:
# Broadcast the data
X_train_bc = sc.broadcast(X_train)
y_train_bc = sc.broadcast(y_train)
X_val_bc = sc.broadcast(X_val)
y_val_bc = sc.broadcast(y_val)

#### Train a `RandomForestClassifier` with Hyperopt + `SparkTrials`

In [0]:
def objective(hyperparameters):
  # Get the broadcasted variables
  X_train = X_train_bc.value
  y_train = y_train_bc.value
  X_val = X_val_bc.value
  y_val = y_val_bc.value
  
  # Instantiate the model with hyperparameters
  model = RandomForestClassifier(**hyperparameters)

  # Train the model
  model.fit(X_train, y_train)

  # Evaluate the learned model
  val_pred = model.predict(X_val)
  val_f1_score = f1_score(y_val, val_pred)

  # Use negative F1 score as our loss metric
  return {"loss": -val_f1_score, "status": STATUS_OK}

In [0]:
trials = SparkTrials(parallelism=32)
best = fmin(
  fn=objective,
  space=space,
  algo=tpe.suggest,
  max_evals=n_evals,
  trials=trials
)

best_f1 = sorted(trials.results, key=lambda result: result["loss"])[0]["loss"] * -1

In [0]:
best["F1 Score"] = best_f1
best["criterion"] = "gini" if best["criterion"] == 0 else "entropy"
display(pd.DataFrame(best, index=[0]))

criterion,min_samples_leaf,min_samples_split,n_estimators,F1 Score
gini,10.0,0.00032316495187868083,60.0,0.7948717948717948
