### Operationalizing ML Models
John Hoff  
Machine Learning Architect  
jhoff@productiveedge.com
# Step 2: Building the Model
![Step 1: Prepare](https://drive.google.com/uc?export=view&id=1LzKYjwhjddy6IvNlrieIhA62d2Cvk07w)

This step will create an ML Pipeline based model using Spark MLlib.  The model will then be trained, scored, and saved to an MLflow experiment run.

_Please Note: The "Run All" command is safe to run on this notebook._

In [2]:
import mlflow
import mlflow.spark

from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
seed = 1023

A new MLflow run will be created if one is not currently active.  The MLflow run is used to track the performance of the model and to serialize the model for later use.

In [5]:
if not mlflow.active_run():
  mlflow.start_run()

## Part 1: Load the Dataset
The training data is loaded from Spark SQL and split into a 75%/25% split for training and testing.

In [7]:
data = spark.sql("select * from bank_marketing_training")
training_data, testing_data = data.randomSplit([0.75, 0.25], seed=seed)

## Part 2: Create the ML Pipeline
Additional documentation can be found on the Spark ML Piplines here: https://spark.apache.org/docs/latest/ml-pipeline.html

The high-level goal here is to create a model that accepts human-readable input parameters, thus making the model far more accessible.  Secondary goals are to have the pipeline robust enough to handle missing and unknown inputs and be flexible enough for reuse in later steps.

In [9]:
preprocessing_pipeline = Pipeline(stages=[
  
  # This is the cleanest method I have found to date for ensuring that all incoming
  # numeric feature values are the floating point numbers that Spark expects.
  SQLTransformer(statement="""
  select
    *,
    cast(age as double) as age__double,
    cast(duration as double) as duration__double,
    cast(campaign as double) as campaign__double,
    cast(pdays as double) as pdays__double,
    cast(previous as double) as previous__double,
    cast(`emp.var.rate` as double) as emp_var_rate__double,
    cast(`cons.price.idx` as double) as cons_price_idx__double,
    cast(`cons.conf.idx` as double) as cons_conf_idx__double,
    cast(euribor3m as double) as euribor3m__double,
    cast(`nr.employed` as double) as nr_employed__double
  from __THIS__
  """),
  
  # The handling of each feature is explicitly handled. With the immutability of the underlying
  # data, each transformer extends the original dataset by adding columns.  In this case, it
  # that some mental tracking is required to ensure input features are properly connected to
  # the desired output features.
  Imputer(inputCols=['age__double'], outputCols=['age__imputed'], strategy='median'),
  StringIndexer(inputCol='job', outputCol='job__index', handleInvalid='keep'),
  StringIndexer(inputCol='marital', outputCol='marital__index', handleInvalid='keep'),
  StringIndexer(inputCol='education', outputCol='education__index', handleInvalid='keep'),
  StringIndexer(inputCol='default', outputCol='default__index', handleInvalid='keep'),
  StringIndexer(inputCol='housing', outputCol='housing__index', handleInvalid='keep'),
  StringIndexer(inputCol='loan', outputCol='loan__index', handleInvalid='keep'),
  StringIndexer(inputCol='contact', outputCol='contact__index', handleInvalid='keep'),
  StringIndexer(inputCol='month', outputCol='month__index', handleInvalid='keep'),
  StringIndexer(inputCol='day_of_week', outputCol='day_of_week__index', handleInvalid='keep'),
  Imputer(inputCols=['duration__double'], outputCols=['duration__imputed'], strategy='median'),
  Imputer(inputCols=['campaign__double'], outputCols=['campaign__imputed'], strategy='median'),
  Imputer(inputCols=['pdays__double'], outputCols=['pdays__imputed'], strategy='median'),
  Imputer(inputCols=['previous__double'], outputCols=['previous__imputed'], strategy='median'),
  StringIndexer(inputCol='poutcome', outputCol='poutcome__index', handleInvalid='keep'),
  Imputer(inputCols=['emp_var_rate__double'], outputCols=['emp_var_rate__imputed'], strategy='median'),
  Imputer(inputCols=['cons_price_idx__double'], outputCols=['cons_price_idx__imputed'], strategy='median'),
  Imputer(inputCols=['cons_conf_idx__double'], outputCols=['cons_conf_idx__imputed'], strategy='median'),
  Imputer(inputCols=['euribor3m__double'], outputCols=['euribor3m__imputed'], strategy='median'),
  Imputer(inputCols=['nr_employed__double'], outputCols=['nr_employed__imputed'], strategy='median'),
  
  # With all feature engineering completed, a single features vector is assembled to be fed into
  # the estimator in the pipeline.
  VectorAssembler(
    inputCols=[
      'age__imputed',
      'job__index',
      'marital__index',
      'education__index',
      'default__index',
      'housing__index',
      'loan__index',
      'contact__index',
      'month__index',
      'day_of_week__index',
      'duration__imputed',
      'campaign__imputed',
      'pdays__imputed',
      'previous__imputed',
      'poutcome__index',
      'emp_var_rate__imputed',
      'cons_price_idx__imputed',
      'cons_conf_idx__imputed',
      'euribor3m__imputed',
      'nr_employed__imputed',
    ],
    outputCol='features'
  ),
])

# The target pipeline-based model is then created using the preprocessing pipeline as the first stage.
pipeline = Pipeline(stages=[
  preprocessing_pipeline,
  StringIndexer(inputCol='y', outputCol='label'),
  RandomForestClassifier(featuresCol='features', labelCol='label', seed=1023)
])

## Part 3: Train the Model

In [11]:
model = pipeline.fit(training_data)

## Part 4: Score the Model

In [13]:
predictions = model.transform(testing_data)

multiclass_evaluator = MulticlassClassificationEvaluator()
binary_evaluator = BinaryClassificationEvaluator()

accuracy = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName:'accuracy'})
print('Accuracy: %s' % accuracy)
mlflow.log_metric('accuracy', accuracy)

f1 = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName:'f1'})
print('F1: %s' % f1)
mlflow.log_metric('f1', f1)

precision = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName:'weightedPrecision'})
print('Precision: %s' % precision)
mlflow.log_metric('precision', precision)

recall = multiclass_evaluator.evaluate(predictions, {multiclass_evaluator.metricName:'weightedRecall'})
print('Recall: %s' % recall)
mlflow.log_metric('recall', recall)

roc_auc = binary_evaluator.evaluate(predictions)
print('ROC AUC: %s' % roc_auc)
mlflow.log_metric('auc', roc_auc)

## Part 5: Save the Model

In [15]:
mlflow.spark.log_model(model, 'spark_model')
mlflow.end_run()