## Fitting and applying multiple models in parallel using Pandas UDFs, Hyperopt, and MLflow
This use case involves fitting multiple models in parallel to different groups of data. Each model is persisted in MLflow. Then, we apply the models to each group by loading each group's best model from MLflow and performing a prediction. Hyperopt is used for parameter tuning. The dataset is based on the Titanic survival classification dataset. In this case 500 records were randomly chosen from that dataset and assigned to other well known shipwrecks. We will build a separate model in parallel for each ship that estimates survival likelihood. This framework could easily be extended to large datasets with many groups.

In [2]:
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV

from pyspark.sql.types import StringType, DoubleType, StructType, StructField
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as func

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

from hyperopt import fmin, tpe, hp, Trials, STATUS_OK, SparkTrials
from hyperopt.pyll.stochastic import sample

client = MlflowClient()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true");

### Import the features

In [4]:
features = spark.read.format("delta").load("/mnt/databricks-datasets-private/ML/many_models")
features.createOrReplaceTempView("features_table")

display(features.limit(5))

name_prefix_Master,name_prefix_Miss,name_prefix_Mr,name_prefix_Mrs,name_prefix_None,name_parenths_no,name_parenths_yes,Sex_female,Sex_male,Embarked_C,Embarked_Q,Embarked_S,Embarked_nan,Pclass_1,Pclass_2,Pclass_3,ticket_text_1,ticket_text_2,ticket_text_3,ticket_text_4,ticket_text_5,ticket_text_6,ticket_text_7,ticket_text_8,ticket_length_3,ticket_length_4,ticket_length_5,ticket_length_6,ticket_length_7,cabin_chars_A,cabin_chars_B,cabin_chars_C,cabin_chars_D,cabin_chars_E,cabin_chars_F,cabin_chars_INFREQ,cabin_chars_NONE,SibSp,Parch,Survived,ship_name,Age,Fare
0,0,1,0,0,1,0,0,1,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,0,1,0,estonia,65,62
0,0,1,0,0,1,0,0,1,0,0,1,0,0,0,1,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1,0,0,estonia,30,20
0,1,0,0,0,1,0,1,0,0,0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,8,2,0,estonia,30,70
0,1,0,0,0,1,0,1,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,1,0,1,1,1,estonia,4,17
0,1,0,0,0,1,0,1,0,0,0,1,0,1,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,3,2,1,estonia,24,263


### View the groups. A model will be fit on each of these groups in parallel

In [6]:
%sql 

SELECT ship_name,
       count(*) as count
FROM features_table
GROUP BY ship_name

ship_name,count
arizona,500
edmund_fitzgerald,500
titanic,500
estonia,500


The data was partitioned by group. With large datasets, this could improve performance by avoiding constant shuffling.

In [8]:
dbutils.fs.ls('/mnt/databricks-datasets-private/ML/many_models')

Specify the label column name and features column names

In [10]:
label_col = 'Survived'
feature_cols = [column for column in features.columns if column not in [label_col, 'ship_name']]

### Define the Hyperopt objective function

In [12]:
def config_objective(df, feature_cols, label_col, clf, scoring, cv):
  
  """Configure the Hyperopt objective function

  Arguments:
  df: Pandas DataFrame:     The Pandas Dataframe on which to fit the model
  features_cols: List[str]: List of column names that represent the model features
  label_col: str            The label column name
  clf: classifier           The model object that will be fit on the data, for instance a random forest
  scoring: str              Scoring method to use for selecting the best model
  cv: int                   The number of cross validation folds

  """

  def objective(params):
    
    """The Hyperopt objective function"""

    params['n_estimators'] = int(params['n_estimators'])
    params['min_samples_split'] = int(params['min_samples_split'])
    params['max_features'] = int(params['max_features'])

    clf_params = clf(**params)

    scores = cross_val_score(clf_params, df[feature_cols], df[label_col], cv=cv, scoring=scoring, n_jobs=-1)

    mean_score = scores.mean()
    loss = 1 - mean_score

    return {'loss': loss, 'params': params, 'status': STATUS_OK}  
  
  return objective

### Define the Pandas UDF with MLflow logging
Define a UDF that will fit a sklearn model to a group of data and perform hyperparameter tuning using Hyperopt; the models will be stored in MLFlow, and the model scores will be returned for each group.

In [14]:
def fit_models_config(schema, feature_cols, label_col, grouping_col, clf, space, 
                      experiment_location=None, scoring='roc_auc', cv=5, max_evals=100, objective=config_objective, fit_best=True):
  
  """Apply a scikit learn model to a group of data within a Spark DataFrame using a Pandas UDF

  Arguments:
  schema: Spark DataFrame schema: A Spark DataFrame schema that maps to the output of the function
  features_cols: List[str]:       List of column names that represent the model features
  label_col: str:                 Column to be predicted
  grouping_col: str:              The column on which the DataFrame is being grouped
  clf: classifier:                Classifier to fit to the data
  space: Dict:                    Grid search data structure containing the parameters to search
  experiment_location: str:       Path to the MLFlow experiment. If None, create a notebook experiment
  scoring: str:                   Scoring method to use for validation
  cv: int:                        Number of cross validation folds
  max_evals: int:                 Max Hyperopt evaluations to run
  objective: function:            The Hyperopt objective function
  fit_best: boolean:              If True, a model with the best parameters will be fit and logged in MLFlow

  """
  
  
  @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
  def fit_models(data):
    """Fit the model; log the best model and its paramenters to 
    MLFlow"""

    group_name = data[grouping_col].loc[0]
    
    # Specify features and label
    features = data[feature_cols]
    label = data[label_col]
    
    if experiment_location is not None:
      mlflow.set_experiment(experiment_location)
  
    with mlflow.start_run() as run:
      
      # Configure and apply Hyperopt
      bayes_trials = Trials()
      
      objective_config = config_objective(data, feature_cols, label_col, clf, scoring, cv)
      
      best_params = fmin(fn=objective_config, space=space, algo=tpe.suggest, max_evals=max_evals, trials=bayes_trials, rstate=np.random.RandomState(50))
      
      best_model_score = round(1 - bayes_trials.best_trial['result']['loss'], 4)

      # Create model results output dataset
      model_results_df = pd.DataFrame([(group_name, best_model_score)], 
                                        columns= ['group_name', 'best_model_score'])

      # Log best model parameters and statistics to MLFlow
      mlflow.set_tag("group_name", group_name)

      mlflow.set_tag("classifier_type", clf.__name__)

      mlflow.log_metric("roc_auc", best_model_score)
      
      mlflow.log_params(best_params)
      
      # Fit the best model on the full dataset for the group
      if fit_best:
        
        # Configure and fit best model
        best_params_as_int = {param_name: int(value) for param_name, value in best_params.items()}
        best_model_config = clf(**best_params_as_int)
        best_model_config.fit(data[feature_cols], data[label_col])
        
        # Log the best model to MLFlow
        mlflow.sklearn.log_model(sk_model=best_model_config, 
                                  artifact_path='survival_model')
        
      return model_results_df
    
  return fit_models

Configure the Pandas UDF

In [16]:
# Hyperopt search space
space = {'n_estimators':      hp.quniform('n_estimators', 10, 200, 10),
         'min_samples_split': hp.quniform('min_samples_split', 2, 20, 1),
         'max_features':      hp.quniform('max_features', 2, 15, 1)}


# Pandas_UDF requires a Spark Schema that matches the output of the UDF
fit_schema = StructType([StructField('group_name', StringType(), True),
                         StructField('best_model_score', DoubleType(), True)])


fit_models = fit_models_config(schema =        fit_schema, 
                               feature_cols =  feature_cols, 
                               label_col =     label_col,
                               grouping_col =  "ship_name",
                               clf =           RandomForestClassifier,
                               space =         space
                                )

### Fit and store the best model for each group

In [18]:
best_model_stats = features.groupBy('ship_name').apply(fit_models)

display(best_model_stats)

group_name,best_model_score
arizona,0.8567
edmund_fitzgerald,0.8605
titanic,0.8647
estonia,0.8657


### Create a Pandas UDF to apply the models  
The UDF will find the relevent model for each group within MLflow and preform a prediction for that group. In this example, the best model for each group is being chosen by the 'score' metric. A more production focused method for applying the models would be to register each group's best model in the Model Registry and then load the models from the registry. See the Model registry [documentation](https://docs.databricks.com/applications/mlflow/model-registry.html) and [example notebook](https://docs.databricks.com/_static/notebooks/mlflow/mlflow-model-registry-example.html).

In [20]:
def apply_models_config(schema, features_cols, grouping_col, score="roc_auc", experiment_id=6649887):
  
  """For each distinct group (values in groupBy statement), load the group's best model and 
  perform a prediction
  
  Arguments:
  schema: Spark DataFrame schema: A Spark DataFrame schema that maps to the output of the function
  features_cols: List[str]:       List of column names that represent the model features
  grouping_col: str:              The column on which the DataFrame is being grouped
  scoring: str                    Scoring method to use for selecting the best model
  experiment_id: str              The id of the experiment from which to select models. Note, if
                                  using the notebook experience (no external MLFlow experiment created)
                                  then the experiment id is equal to the notebook id
  
  """
  
  @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
  def apply_models(data):
    
    """Load the relvent model for the selected group and generate a
    prediciton for the group"""
  
    group_name = data[grouping_col].loc[0]
    
    
    def get_model_info(run_id):
      
      """Get the ship name, run_id, and scoring metric of each run in 
      the experiment"""

      data = client.get_run(run_id).data

      fitted_model_group_name = data.tags['group_name']
      metric = data.metrics[score]

      return (fitted_model_group_name, run_id, metric)


    # Get all of the runs in the MLFlow experiment
    runs = client.list_run_infos(experiment_id)

    # Get the run_ids for each run
    run_ids = [run.run_id for run in runs]

    # Get all relevent infor for each model run
    models = [get_model_info(run_id) for run_id in run_ids]

    # Filter to only models built using this group's data
    models_for_group = [model for model in models if model[0] == group_name]

    # Find the best model for this group by sorting descending by the scoring metric
    best_model_for_group = sorted(models_for_group, key = lambda x: x[2], reverse=True)[0]
  
    best_model_run_id = best_model_for_group[1]

    # Load the best model via its run_id
    loaded_model = mlflow.sklearn.load_model(f"runs:/{best_model_run_id}/survival_model")

    # Perform prediction; combine features and predictions
    predictions = loaded_model.predict(data[features_cols])
    predictions_df = pd.DataFrame(predictions, columns=['prediction'])

    features_and_predictions = pd.concat([predictions_df, data], axis=1)
    features_and_predictions['run_id'] = best_model_run_id

    return features_and_predictions

  return apply_models

Configure the Pandas UDF

In [22]:
# Define the schema for the UDF's output DataFrame
prediction_schema = StructType()
prediction_schema.add('prediction', DoubleType())
prediction_schema.add('run_id', StringType())

for column in features.schema:
  prediction_schema.add(column.name, column.dataType)
  

apply_models = apply_models_config(schema =        prediction_schema, 
                                   features_cols = feature_cols,
                                   grouping_col =   "ship_name"
                                    )

#### Apply the models to each group

In [24]:
predictions = features.groupBy('ship_name').apply(apply_models)

display(predictions.limit(5))

prediction,run_id,name_prefix_Master,name_prefix_Miss,name_prefix_Mr,name_prefix_Mrs,name_prefix_None,name_parenths_no,name_parenths_yes,Sex_female,Sex_male,Embarked_C,Embarked_Q,Embarked_S,Embarked_nan,Pclass_1,Pclass_2,Pclass_3,ticket_text_1,ticket_text_2,ticket_text_3,ticket_text_4,ticket_text_5,ticket_text_6,ticket_text_7,ticket_text_8,ticket_length_3,ticket_length_4,ticket_length_5,ticket_length_6,ticket_length_7,cabin_chars_A,cabin_chars_B,cabin_chars_C,cabin_chars_D,cabin_chars_E,cabin_chars_F,cabin_chars_INFREQ,cabin_chars_NONE,SibSp,Parch,Survived,ship_name,Age,Fare
0.0,9a7f7bc820994dbea89e70f978d7e9bd,0,0,1,0,0,1,0,0,1,0,1,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,1,0,0,arizona,30,8
1.0,9a7f7bc820994dbea89e70f978d7e9bd,1,0,0,0,0,1,0,0,1,0,0,1,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,1,1,1,arizona,3,26
0.0,9a7f7bc820994dbea89e70f978d7e9bd,0,1,0,0,0,1,0,1,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,1,2,2,0,arizona,9,34
0.0,9a7f7bc820994dbea89e70f978d7e9bd,0,0,1,0,0,1,0,0,1,0,0,1,0,0,0,1,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,arizona,36,0
1.0,9a7f7bc820994dbea89e70f978d7e9bd,1,0,0,0,0,1,0,0,1,0,0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1,1,1,arizona,3,16


Confirm that different models were used for each group

In [26]:
display(
  predictions.groupBy(['ship_name', 'run_id']).agg(func.count("*").alias("count"))
)

ship_name,run_id,count
estonia,ddb4bd4717b149eca5519f17adebecfe,500
arizona,9a7f7bc820994dbea89e70f978d7e9bd,500
titanic,59611a4ba7674058b0653f6546951803,500
edmund_fitzgerald,d264bc85b8e64657a61107a56b7f8757,500


View the prediction for each group

In [28]:
display(
  predictions.groupBy('ship_name').agg(func.sum("prediction").alias("survived"))
)

ship_name,survived
arizona,158.0
edmund_fitzgerald,165.0
titanic,163.0
estonia,164.0
