# Managing Models

There are two methods to manage models with Azure Databricks:  using the user interface or programmatically.  In the next two exercises, you will look at each technique.

## Start Your Cluster
To get started, first attach a Databricks cluster to this notebook.  If you have not created a cluster yet, use the **Clusters** menu on the left-hand sidebar to create a new Databricks cluster.  Then, return to this notebook and attach the newly-created cluster to this notebook.

## Managing a Model via the User Interface

In this exercise, you will once more train a model based on the `nyc-taxi` dataset.  From there, you will register the model using the Databricks user interface.  

The first step is to load the libraries you will use and featurize the NYC Taxi & Limousine Commission - green taxi trip records dataset.  Because you have reviewed this code in the prior notebook, explanations here will be brief until you have run the trained model.

In [0]:
import urllib.request
import os
import warnings
import sys
import numpy as np
from pyspark.sql.types import * 
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import udf
import matplotlib
import matplotlib.pyplot as plt
import mlflow
import mlflow.spark
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline

dataset = spark.sql("select * from nyc_taxi")

def get_sin_cosine(value, max_value):
  sine =  np.sin(value * (2.*np.pi/max_value))
  cosine = np.cos(value * (2.*np.pi/max_value))
  return (sine.tolist(), cosine.tolist())

schema = StructType([
    StructField("sine", DoubleType(), False),
    StructField("cosine", DoubleType(), False)
])

get_sin_cosineUDF = udf(get_sin_cosine, schema)

dataset = dataset.withColumn("udfResult", get_sin_cosineUDF(col("hour_of_day"), lit(24))).withColumn("hour_sine", col("udfResult.sine")).withColumn("hour_cosine", col("udfResult.cosine")).drop("udfResult").drop("hour_of_day")

dataset = dataset.filter(dataset.totalAmount.isNotNull())

dataset = dataset.withColumn("isPaidTimeOff", col("isPaidTimeOff").cast("integer"))

numerical_cols = ["passengerCount", "tripDistance", "snowDepth", "precipTime", "precipDepth", "temperature", "hour_sine", "hour_cosine"]
categorical_cols = ["day_of_week", "month_num", "normalizeHolidayName", "isPaidTimeOff"]
label_column = "totalAmount"

stages = []

inputCols = ["passengerCount"]
outputCols = ["passengerCount"]
imputer = Imputer(strategy="median", inputCols=inputCols, outputCols=outputCols)
stages += [imputer]

assembler = VectorAssembler().setInputCols(numerical_cols).setOutputCol('numerical_features')
scaler = MinMaxScaler(inputCol=assembler.getOutputCol(), outputCol="scaled_numerical_features")
stages += [assembler, scaler]

for categorical_col in categorical_cols:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index", handleInvalid="skip")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categorical_col + "_classVector"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]
    
assemblerInputs = [c + "_classVector" for c in categorical_cols] + ["scaled_numerical_features"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataset)
preppedDataDF = pipelineModel.transform(dataset)

(trainingData, testData) = preppedDataDF.randomSplit([0.7, 0.3], seed=97)

print('Data preparation work completed.')

Data preparation work completed.


With this data in place, create a function to plot the quality of the regression model based on predicted amounts versus actual amounts.

In [0]:
def plot_regression_quality(predictions):
  p_df = predictions.select(["totalAmount",  "prediction"]).toPandas()
  true_value = p_df.totalAmount
  predicted_value = p_df.prediction

  fig = plt.figure(figsize=(10,10))
  plt.scatter(true_value, predicted_value, c='crimson')
  plt.yscale('log')
  plt.xscale('log')

  p1 = max(max(predicted_value), max(true_value))
  p2 = min(min(predicted_value), min(true_value))
  plt.plot([p1, p2], [p1, p2], 'b-')
  plt.xlabel('True Values', fontsize=15)
  plt.ylabel('Predictions', fontsize=15)
  plt.axis('equal')
  
  global image

  image = fig
  fig.savefig("LinearRegressionPrediction.png")
  plt.close(fig)
  return image

print('Created regression quality plot function')

Created regression quality plot function


The following method trains the regression model and uses MLflow Tracking to record parameters, metrics, model, and a plot which compares actual versus predicted amounts spent on taxi rides.  This is essentially the same model as what you used in the prior lesson, although there is a minor change in lines 40-43, which you will take advantage of in the next exercise.

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

def train_nyc_taxi(train_data, test_data, label_column, features_column, elastic_net_param, reg_param, max_iter, model_name=None):
  # Evaluate metrics
  def eval_metrics(predictions):
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="prediction", metricName="rmse")
      rmse = evaluator.evaluate(predictions)
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="prediction", metricName="mae")
      mae = evaluator.evaluate(predictions)
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="prediction", metricName="r2")
      r2 = evaluator.evaluate(predictions)
      return rmse, mae, r2

  # Start an MLflow run; the "with" keyword ensures we'll close the run even if this cell crashes
  with mlflow.start_run():
    lr = LinearRegression(featuresCol="features", labelCol=label_column, elasticNetParam=elastic_net_param, regParam=reg_param, maxIter=max_iter)
    lrModel = lr.fit(train_data)
    predictions = lrModel.transform(test_data)
    (rmse, mae, r2) = eval_metrics(predictions)

    # Print out model metrics
    print("Linear regression model (elasticNetParam=%f, regParam=%f, maxIter=%f):" % (elastic_net_param, reg_param, max_iter))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

    # Log hyperparameters for mlflow UI
    mlflow.log_param("elastic_net_param", elastic_net_param)
    mlflow.log_param("reg_param", reg_param)
    mlflow.log_param("max_iter", max_iter)
    # Log evaluation metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)
    # Log the model itself
    if model_name is None:
      mlflow.spark.log_model(lrModel, "model")
    else:
      mlflow.spark.log_model(lrModel, artifact_path="model", registered_model_name=model_name)
    modelpath = "/dbfs/mlflow/taxi_total_amount_2/model-%f-%f-%f" % (elastic_net_param, reg_param, max_iter)
    mlflow.spark.save_model(lrModel, modelpath)
    
    # Generate a plot
    image = plot_regression_quality(predictions)
    
    # Log artifacts (in this case, the regression quality image)
    mlflow.log_artifact("LinearRegressionPrediction.png")
    
print('Created training and evaluation method')

Created training and evaluation method


Remove any prior executions of this script.  Note that the folder is now `dbfs:/mlflow/taxi_total_amount_2` instead of `taxi_total_amount`.  This way, you will not overwrite executions from the prior lab.

In [0]:
%fs rm -r dbfs:/mlflow/taxi_total_amount_2

Train the model with what were the most successful hyperparameters in the prior lab.

In [0]:
# L1 penalty, regularization parameter 0.3, 50 iterations
train_nyc_taxi(trainingData, testData, label_column, "features", 1.0, 0.3, 50)

Linear regression model (elasticNetParam=1.000000, regParam=0.300000, maxIter=50.000000):
  RMSE: 5.224525975596698
  MAE: 2.198313990976637
  R2: 0.7836611624718266


2023/03/21 11:00:39 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


### Registering the Model

Select the **Experiment** option in the notebook context bar to display the Experiment sidebar.  In this sidebar, select the `spark` Link for your experiment run.  This will open the experiment run's details in a new browser tab and navigate to the model itself.

On the model page, select **Register Model** to register a new model.  In the **Model** drop-down list, select **+ Create New Model** and enter the name **NYC Taxi Amount UI**.  Then, select **Register**.  Registration may take a couple of minutes to complete.  You may need to refresh the tab to change the model registration status changes from **Registration pending...** to its **Registered** status.

### Serving the Model

From here, navigate to the **Models** page using the menu on the left-hand side.  You will see the `NYC Taxi Amount UI` model.  Select the model link to view details about the model.  Note that you can add tags the model or view different versions of a model.  To activate the model, select the **Serving** tab and then select **Enable Serving**.  This will set up a single-node cluster intended for generating predictions.  This process may take several minutes, so be patient.  You may need to refresh your browser occasionally to see updates.

After the registration status changes to **Ready** from **Pending**, you can generate a prediction through your browser.  One way to test this is to select the **Browser** button in the **Call The Model** section and enter a JSON array into the **Request** field.  This particular model, however, is fairly complex, so it's actually easier to call it from code.  We will do that in the next exercise.

### Deleting the Model

Once you are done testing the model, select the drop-down symbol next to **Registered Models > NYC Taxi Amount UI** in the header section and then choose **Delete**.  Confirm that you wish to delete the model.  It will stop serving the current model and delete the model from the registry.

## Managing a Model via Code

In addition to the user interface, it is possible to manage models via code.  In this exercise, you will take the same trained model as in the prior exercise and manage the model using the `MlflowClient` library in Python.

In [0]:
from mlflow.tracking import MlflowClient
import time
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus

client = MlflowClient()

### Retrieve the Model

The first step will be to retrieve the model you created in the prior exercise.  To do this, first retrieve the experiment that you created in the prior exercise.  Because you did not specify an experiment name, the name will be the same as this notebook's name.

In [0]:
user_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
experiment_name = "/Users/{user_name}/03 - Managing Experiments and Models/02 - Managing Models".format(user_name=user_name)

experiment = client.get_experiment_by_name(experiment_name)

Next, retrieve the latest run of model training.  This is located in a folder named by the run's unique identifier (`run_uuid`).  From there, you wrote the model to a `model` folder in `train_nyc_taxi()`.

In [0]:
experiment_id = experiment.experiment_id
runs_df = client.search_runs(experiment_id, order_by=["attributes.start_time desc"], max_results=1)
run_id = runs_df[0].info.run_uuid

model_name = "NYC Taxi Amount API"

artifact_path = "model"
model_uri = "runs:/{run_id}/{artifact_path}".format(run_id=run_id, artifact_path=artifact_path)
model_uri

Out[7]: 'runs:/94935e6dcd974aad8159f931a3b00ca1/model'

### Register Model

The next step is to register the model.  This model will be registered under the name `NYC Taxi Amount API`.  Once the cell returns "Model status: READY", the model will be available.  This may take a few minutes.

In [0]:
model_details = mlflow.register_model(model_uri=model_uri, name=model_name)

# Wait until the model is ready
def wait_until_ready(model_name, model_version):
  client = MlflowClient()
  for _ in range(10):
    model_version_details = client.get_model_version(
      name=model_name,
      version=model_version,
    )
    status = ModelVersionStatus.from_string(model_version_details.status)
    print("Model status: %s" % ModelVersionStatus.to_string(status))
    if status == ModelVersionStatus.READY:
      break
    time.sleep(1)

wait_until_ready(model_details.name, model_details.version)

Successfully registered model 'NYC Taxi Amount API'.
2023/03/21 11:20:30 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: NYC Taxi Amount API, version 1


Model status: READY


Created version '1' of model 'NYC Taxi Amount API'.


Once the model is available, you can update the currently registered model.  The following method calls update the model description and the model version's description, respectively.

Each model has one or more versions, which represent iterations on the trained model.  Creating descriptions for these model versions can help you keep track of changes over time, such as using a new algorithm.

In [0]:
client.update_registered_model(
  name=model_details.name,
  description="This model forecasts the amount a taxi cab ride might cost in New York City."
)

client.update_model_version(
  name=model_details.name,
  version=model_details.version,
  description="This model version was built using Spark ML's linear regression algorithm."
)

### Model Staging

MLflow allows multiple versions of a model to exist at the same time.  To remove ambiguity in which model should be in use at any time, you can stage models, using states such as `Staging` or `Production`.

Use the `Production` stage on the version of the model you want to use for inference.  The process to do this follows.

In [0]:
client.transition_model_version_stage(
  name=model_details.name,
  version=model_details.version,
  stage='Production',
)
model_version_details = client.get_model_version(
  name=model_details.name,
  version=model_details.version,
)
print("The current model stage is: '{stage}'".format(stage=model_version_details.current_stage))

latest_version_info = client.get_latest_versions(model_name, stages=["Production"])
latest_production_version = latest_version_info[0].version
print("The latest production version of the model '%s' is '%s'." % (model_name, latest_production_version))

The current model stage is: 'Production'
The latest production version of the model 'NYC Taxi Amount API' is '1'.


### Model Inference

The following function will allow you to forecast the cost of a taxi ride in New York City given certain conditions.

In [0]:
import mlflow.pyfunc

def forecast_nyc_taxi_amount(model_name, model_stage, df):
  model_uri = "models:/{model_name}/{model_stage}".format(model_name=model_name,model_stage=model_stage)
  print("Loading registered model version from URI: '{model_uri}'".format(model_uri=model_uri))
  model = mlflow.pyfunc.load_model(model_uri)
  return model.predict(df)

With this function in place, build a sample input and generate the forecast for the `Production` model.  Use the `testData` DataFrame that you created earlier in this lab, as it has all of the inputs in the right shape for performing inference.

In [0]:
model_stage = "Production"
df = testData.head(1)
forecast_nyc_taxi_amount(model_name, model_stage, df)

Loading registered model version from URI: 'models:/NYC Taxi Amount API/Production'
Out[11]: [5.187231884093903]

### Model Versioning

Creating a new version of a model is easy.  In this case, run the `train_nyc_taxi()` method and specify a new parameter which defines the model name.  This will write a new version of the current model while retaining the current `Production` version.

In [0]:
# Create a new version
# L2 penalty, regularization parameter 0.3, 500 iterations
train_nyc_taxi(trainingData, testData, label_column, "features", 0.0, 0.3, 500, model_name)

Linear regression model (elasticNetParam=0.000000, regParam=0.300000, maxIter=500.000000):
  RMSE: 5.190288093298021
  MAE: 2.171016700427377
  R2: 0.7864873379373474


2023/03/21 11:22:21 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
Registered model 'NYC Taxi Amount API' already exists. Creating a new version of this model...
2023/03/21 11:23:07 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: NYC Taxi Amount API, version 2
Created version '2' of model 'NYC Taxi Amount API'.


Now, retrieve the latest version of the `NYC Taxi Amount API` model.

In [0]:
model_version_infos = client.search_model_versions("name = '%s'" % model_name)
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

wait_until_ready(model_name, new_model_version)

Model status: READY


Use the model version description to explain how this model differs from the others.  In this case, you changed the value of the *max_iter* parameter from 50 to 500 and also changed the ElasticNet parameter.

In [0]:
client.update_model_version(
  name=model_name,
  version=new_model_version,
  description="This model version has changed the max number of iterations to 500 and minimizes L2 penalties."
)

Out[14]: <ModelVersion: creation_timestamp=1679397787455, current_stage='None', description=('This model version has changed the max number of iterations to 500 and '
 'minimizes L2 penalties.'), last_updated_timestamp=1679397910626, name='NYC Taxi Amount API', run_id='bb814b59af5146ebb3c88dfb17ceec7d', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/bb814b59af5146ebb3c88dfb17ceec7d/artifacts/model', status='READY', status_message='', tags={}, user_id='3790246273816926', version='2'>

Before moving this model to production, you can stage the model by moving this version to `Staging`.

In [0]:
client.transition_model_version_stage(
  name=model_name,
  version=new_model_version,
  stage="Staging",
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mRestException[0m                             Traceback (most recent call last)
File [0;32m<command-1988255026187313>:1[0m
[0;32m----> 1[0m [43mclient[49m[38;5;241;43m.[39;49m[43mtransition_model_version_stage[49m[43m([49m
[1;32m      2[0m [43m  [49m[43mname[49m[38;5;241;43m=[39;49m[43mmodel_name[49m[43m,[49m
[1;32m      3[0m [43m  [49m[43mversion[49m[38;5;241;43m=[39;49m[43mnew_model_version[49m[43m,[49m
[1;32m      4[0m [43m  [49m[43mstage[49m[38;5;241;43m=[39;49m[38;5;124;43m"[39;49m[38;5;124;43mStaging[39;49m[38;5;124;43m"[39;49m[43m,[49m
[1;32m      5[0m [43m)[49m

File [0;32m/databricks/python/lib/python3.9/site-packages/mlflow/tracking/client.py:2402[0m, in [0;36mMlflowClient.transition_model_version_stage[0;34m(self, name, version, stage, archive_existing_versions)[0m
[1;32m   2336[0m [38;5;28;01mdef[39;00m [38;5;21mtran

The reason the `forecast_nyc_taxi_amount()` function included a model stage is to allow testing of the `Staging` model before transitioning it to `Production`.

Note that the predicted amount is slightly different from the model in production.

In [0]:
# Generate a prediction for the new model
forecast_nyc_taxi_amount(model_name, "Staging", df)

Loading registered model version from URI: 'models:/NYC Taxi Amount API/Staging'
Out[16]: [5.265552327948803]

It looks like this didn't change the results very much, but there is a small difference.  Let's say that you are confident in the new model and are ready to make it the new production model.

### Transitioning a New Version of a Model

Now that the `Staging` model version is out, the next step is to transition the latest model version to `Production`.  Do this using the same `transition_model_version_stage()` method as before.

In [0]:
client.transition_model_version_stage(
  name=model_name,
  version=new_model_version,
  stage="Production",
)

Out[17]: <ModelVersion: creation_timestamp=1679397787455, current_stage='Production', description=('This model version has changed the max number of iterations to 500 and '
 'minimizes L2 penalties.'), last_updated_timestamp=1679397941213, name='NYC Taxi Amount API', run_id='bb814b59af5146ebb3c88dfb17ceec7d', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/bb814b59af5146ebb3c88dfb17ceec7d/artifacts/model', status='READY', status_message='', tags={}, user_id='3790246273816926', version='2'>

In [0]:
client.search_model_versions("name = '%s'" % model_name)

Out[18]: [<ModelVersion: creation_timestamp=1679397787455, current_stage='Production', description=('This model version has changed the max number of iterations to 500 and '
  'minimizes L2 penalties.'), last_updated_timestamp=1679397941213, name='NYC Taxi Amount API', run_id='bb814b59af5146ebb3c88dfb17ceec7d', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/bb814b59af5146ebb3c88dfb17ceec7d/artifacts/model', status='READY', status_message='', tags={}, user_id='reddytechconnect@gmail.com', version='2'>,
 <ModelVersion: creation_timestamp=1679397630076, current_stage='Production', description='', last_updated_timestamp=1679397667676, name='NYC Taxi Amount API', run_id='94935e6dcd974aad8159f931a3b00ca1', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/94935e6dcd974aad8159f931a3b00ca1/artifacts/model', status='READY', status_message='', tags={}, user_id='reddytechconnect@gmail.com', version='1'>]

Now both model versions are tagged as production.  Which one will Azure Databricks use?

In [0]:
forecast_nyc_taxi_amount(model_name, "Production", df)

Loading registered model version from URI: 'models:/NYC Taxi Amount API/Production'
Out[19]: [5.265552327948803]

It turns out that Azure Databricks looks for the latest model version with a given tag.  We can tell because the predicted amount is the same amount we saw from the most recently trained model--in other words, the one we most recently promoted to `Production`.

This means that you could conceivably have several `Production` versions of models running concurrently.  But a more practical plan is to archive the old model.

### Archiving a Model Version

In order to archive a model version, call `transition_model_version_stage()` once more, but use the `Archived` stage.

In [0]:
client.transition_model_version_stage(
  name=model_name,
  version=model_details.version,
  stage="Archived",
)

Out[20]: <ModelVersion: creation_timestamp=1679397630076, current_stage='Archived', description='', last_updated_timestamp=1679397966484, name='NYC Taxi Amount API', run_id='94935e6dcd974aad8159f931a3b00ca1', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/94935e6dcd974aad8159f931a3b00ca1/artifacts/model', status='READY', status_message='', tags={}, user_id='3790246273816926', version='1'>

If you wish to go further and delete a model version, a method is available for that as well.

In [0]:
client.delete_model_version(
   name=model_name,
   version=model_details.version,
)

Before you are able to delete a model, you must transition all `Production` or `Staging` versions to `Archived`.  This is a safety precaution to prevent accidentally deleting a model being served in production.  The following cells will transition the new model version to `Archived` and then delete this model version.

In [0]:
# Need to transition before deleting
client.transition_model_version_stage(
  name=model_name,
  version=new_model_version,
  stage="Archived",
)

Out[22]: <ModelVersion: creation_timestamp=1679397787455, current_stage='Archived', description=('This model version has changed the max number of iterations to 500 and '
 'minimizes L2 penalties.'), last_updated_timestamp=1679397980325, name='NYC Taxi Amount API', run_id='bb814b59af5146ebb3c88dfb17ceec7d', run_link='', source='dbfs:/databricks/mlflow-tracking/1988255026187242/bb814b59af5146ebb3c88dfb17ceec7d/artifacts/model', status='READY', status_message='', tags={}, user_id='3790246273816926', version='2'>

In [0]:
client.delete_model_version(
   name=model_name,
   version=new_model_version
)

Finally, you will be able to delete the registered model.

In [0]:
client.delete_registered_model(name=model_name)