d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# MLflow Lab

In this lab we will explore the path to moving models to production with MLflow using the following steps:

1. Load in Airbnb dataset, and save both training dataset and test dataset as Delta tables
2. Train an MLlib linear regression model using all the listing features and tracking parameters, metrics artifacts and Delta table version to MLflow
3. Register this initial model and move it to staging using MLflow Model Registry
4. Add a new column, `log_price` to both our train and test table and update the corresponding Delta tables
5. Train a second MLlib linear regression model, this time using `log_price` as our target and training on all features, tracking to MLflow 
6. Compare the performance of the different runs by looking at the underlying data versions for both models
7. Move the better performing model to production in MLflow model registry

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lab you:<br>
- Create Delta tables
- Track your MLlib model and Delta table version using MLflow
- Use MLflow model registry to version your models

In [0]:
# req for community edition
# will restart python interpreter
pip install mlflow

In [0]:
%run "../Includes/Classroom-Setup"

###  Step 1. Creating Delta Tables

Data versioning is an advantage of using Delta Lake, which preserves previous versions of datasets so that you can restore later.

Let's split our dataset into train and test datasets, and writing them out in Delta format. You can read more at the Delta Lake [documentation](https://docs.delta.io/latest/index.html).

In [0]:
filePath = "dbfs:/mnt/training/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
airbnbDF = spark.read.format("delta").load(filePath)

(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

In [0]:
trainDeltaPath = userhome + "/machine-learning-p/train.delta"
testDeltaPath = userhome + "/machine-learning-p/test.delta"

# In case paths already exists
dbutils.fs.rm(trainDeltaPath, True)
dbutils.fs.rm(testDeltaPath, True)

(trainDF
  .write
  .mode("overwrite")
  .format("delta")
  .save(trainDeltaPath)
)

(testDF
  .write
  .mode("overwrite")
  .format("delta")
  .save(testDeltaPath)
)

Let's now read in our train and test Delta tables, specifying that we want the first version of these tables. This [blog post](https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html) has a great example of how to read in a Delta table at a given version.

In [0]:
# TODO
data_version = 0
trainDelta = spark.read.format("delta").option("versionAsOf", data_version).load(trainDeltaPath)
testDelta = spark.read.format("delta").option("versionAsOf", data_version).load(testDeltaPath)

### Review Delta Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts.

In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {cleaned_username}")
spark.sql(f"USE {cleaned_username}")
spark.sql("DROP TABLE IF EXISTS train_delta")
spark.sql(f"CREATE TABLE train_delta USING DELTA LOCATION '{trainDeltaPath}'")

In [0]:
%sql
DESCRIBE HISTORY train_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
0,2021-07-05T12:34:01.000+0000,3311790168177,melburne42@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1750467221661667),0705-121312-elite555,,WriteSerializable,False,"Map(numFiles -> 4, numOutputBytes -> 205120, numOutputRows -> 5786)",


-sandbox

By default Delta tables [keep a commit history of 30 days](https://docs.databricks.com/delta/delta-batch.html#data-retention). This retention period can be adjusted by setting `delta.logRetentionDuration`, which will determine how far back in time you can go. Note that setting this can result in storage costs to go up. 

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Be aware that versioning with Delta in this manner may not be feasible as a long term solution. The retention period of Delta tables can be increased, but with that comes additional costs to storage. Alternative methods of data versioning when training models and tracking to MLflow is to save copies of the datasets, either as an MLflow artifact (for a small dataset), or save to a separate distributed location and record the location of the underlying dataset as a tag in MLflow

### Step 2. Log initial run to MLflow and move to staging in Model Registry

Let's first log a run to MLflow where we use all features. We use the same approach with RFormula as before. This time however, let's also log both the version of our data and the data path to MLflow.

In [0]:
# TODO
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import RFormula

with mlflow.start_run(run_name="lr_model") as run:
  
  # Log parameters
  mlflow.log_param("data_path", trainDeltaPath)  
  # TODO: Log label: price-all-features
  mlflow.log_param("label", "price-all-features")
  # TODO: Log data_version: data_version
  mlflow.log_param("data_version", data_version)
  
    
  # Create pipeline
  rFormula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")
  lr = LinearRegression(labelCol="price", featuresCol="features")
  pipeline = Pipeline(stages = [rFormula, lr])
  model = pipeline.fit(trainDelta)
  
  # Log pipeline
  # TODO: Log model: model
  mlflow.spark.log_model(model, "model")

  # Create predictions and metrics
  predDF = model.transform(testDelta)
  regressionEvaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
  
  # Log metrics
  # TODO: Log RMSE
  mlflow.log_metric("rmse", rmse)
  # TODO: Log R2
  mlflow.log_metric("r2", r2)
  
  runID = run.info.run_id

### Step 3. Register model and move to staging using MLflow Model Registry

-sandbox

We are happy with the performance of the above model and want to move it to staging. Let's create the model and register it to the MLflow model registry.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Make sure the path to `model_uri` matches the subdirectory (the second argument to `mlflow.log_model()`) included above.

In [0]:
model_name = f"{cleaned_username}_mllib_lr"
model_uri = f"runs:/{runID}/model" # NOTE: confirm that this path is corrrect

model_details = mlflow.register_model(model_uri=model_uri, name=model_name)

Check the current status of the model.

In [0]:
from mlflow.tracking.client import MlflowClient

client = MlflowClient()
model_version_details = client.get_model_version(name=model_name, version=1)

model_version_details.status

Add a model description using [update_registered_model](https://mlflow.org/docs/latest/python_api/mlflow.tracking.html#mlflow.tracking.MlflowClient.update_registered_model).

In [0]:
# TODO
client.update_registered_model(
  name = model_details.name,
  description = "This model forecasts Airbnb housing prices"
)

###  Step 4. Feature Engineering: Evolve Data Schema

We now want to do some feature engineering with the aim of improving model performance; we can use Delta Lake to track older versions of the dataset. 

We will add `log_price` as a new column and update our Delta table with it.

In [0]:
from pyspark.sql.functions import col, log, exp

# Create a new log_price column for both train and test datasets
trainNew = trainDelta.withColumn("log_price", log(col("price")))
testNew = testDelta.withColumn("log_price", log(col("price")))

Save the updated DataFrames to `trainDeltaPath` and `testDeltaPath`, respectively, passing the `mergeSchema` option to safely evolve its schema.

In [0]:
# TODO
trainNew.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(trainDeltaPath)
testNew.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(testDeltaPath)

Look at the difference between the original & modified schemas

In [0]:
set(trainNew.schema.fields) ^ set(trainDelta.schema.fields)

Let's review the Delta history of our `train_delta` table and load in the most recent versions of our train and test Delta tables.

In [0]:
%sql
DESCRIBE HISTORY train_delta

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
1,2021-07-05T12:55:24.000+0000,3311790168177,melburne42@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1750467221661667),0705-121312-elite555,0.0,WriteSerializable,False,"Map(numFiles -> 4, numOutputBytes -> 221980, numOutputRows -> 5786)",
0,2021-07-05T12:34:01.000+0000,3311790168177,melburne42@gmail.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(1750467221661667),0705-121312-elite555,,WriteSerializable,False,"Map(numFiles -> 4, numOutputBytes -> 205120, numOutputRows -> 5786)",


In [0]:
data_version = 1
trainDeltaNew = spark.read.format("delta").option("versionAsOf", data_version).load(trainDeltaPath)  
testDeltaNew = spark.read.format("delta").option("versionAsOf", data_version).load(testDeltaPath)

### Step 5. Use `log_price` as target and track run with MLflow

Retrain the model on the updated data and compare its performance to the original, logging results to MLflow.

In [0]:
with mlflow.start_run(run_name="lr_log_model") as run:
  
  # Log parameters
  mlflow.log_param("label", "log-price")
  mlflow.log_param("data_version", data_version)
  mlflow.log_param("data_path", trainDeltaPath)    
 
  # Create pipeline
  rFormula = RFormula(formula="log_price ~ . - price", featuresCol="features", labelCol="log_price", handleInvalid="skip")  
  lr = LinearRegression(labelCol="log_price", predictionCol="log_prediction")
  pipeline = Pipeline(stages = [rFormula, lr])
  pipelineModel = pipeline.fit(trainDeltaNew)
   
  # Log model and update the registered model
  mlflow.spark.log_model(
    spark_model=pipelineModel,
    artifact_path="log-model",
    registered_model_name=model_name,
  )  
  
  # Create predictions and metrics
  predDF = pipelineModel.transform(testDelta)
  expDF = predDF.withColumn("prediction", exp(col("log_prediction")))
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)
  
  # Log metrics
  mlflow.log_metric("rmse", rmse)
  mlflow.log_metric("r2", r2)  
  
  runID = run.info.run_id

### Step 6. Compare performance across runs by looking at Delta table versions

Use MLflow's [`mlflow.search_runs`](https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.search_runs) API to identify runs according to the version of data the run was trained on. Let's compare our runs according to our data versions.

Filter based on `params.data_path` and `params.data_version`.

In [0]:
# TODO
data_version = 0

mlflow.search_runs(filter_string=f"params.data_path='{trainDeltaPath}' and params.data_version='{data_version}'")

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.rmse,metrics.r2,params.data_path,params.label,params.data_version,tags.mlflow.user,tags.mlflow.databricks.notebookRevisionID,tags.mlflow.source.name,tags.mlflow.databricks.notebookPath,tags.mlflow.runName,tags.mlflow.databricks.notebookID,tags.mlflow.source.type,tags.mlflow.log-model.history,tags.mlflow.databricks.cluster.info,tags.mlflow.databricks.cluster.id,tags.mlflow.databricks.webappURL,tags.mlflow.databricks.cluster.libraries
0,5aa205ad26054887a6d89aafcaaa8140,1750467221661667,FINISHED,dbfs:/databricks/mlflow-tracking/1750467221661...,2021-07-05 12:35:04.149000+00:00,2021-07-05 12:36:55.303000+00:00,133.462953,0.441615,dbfs:/user/melburne42@gmail.com/machine-learni...,price-all-features,0,melburne42@gmail.com,1625488615454,/Users/melburne42@gmail.com/Scalable-ML-3.4.1-...,/Users/melburne42@gmail.com/Scalable-ML-3.4.1-...,lr_model,1750467221661667,NOTEBOOK,"[{""artifact_path"":""model"",""flavors"":{""spark"":{...","{""cluster_name"":""My Cluster"",""spark_version"":""...",0705-121312-elite555,https://community.cloud.databricks.com,"{""installable"":[],""redacted"":[]}"


In [0]:
# TODO
data_version = 1

mlflow.search_runs(filter_string=f"params.data_path='{trainDeltaPath}' and params.data_version='{data_version}'")

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.data_path,params.label,params.data_version,tags.mlflow.user,tags.mlflow.databricks.notebookRevisionID,tags.mlflow.source.name,tags.mlflow.databricks.notebookPath,tags.mlflow.runName,tags.mlflow.databricks.notebookID,tags.mlflow.source.type,tags.mlflow.log-model.history,tags.mlflow.databricks.cluster.info,tags.mlflow.databricks.cluster.id,tags.mlflow.databricks.webappURL,tags.mlflow.databricks.cluster.libraries
0,d60d693376ad441d87d0cbb1a1d7eb60,1750467221661667,FAILED,dbfs:/databricks/mlflow-tracking/1750467221661...,2021-07-05 12:57:34.325000+00:00,2021-07-05 12:59:04.796000+00:00,dbfs:/user/melburne42@gmail.com/machine-learni...,log-price,1,melburne42@gmail.com,1625489944931,/Users/melburne42@gmail.com/Scalable-ML-3.4.1-...,/Users/melburne42@gmail.com/Scalable-ML-3.4.1-...,lr_log_model,1750467221661667,NOTEBOOK,"[{""artifact_path"":""log-model"",""flavors"":{""spar...","{""cluster_name"":""My Cluster"",""spark_version"":""...",0705-121312-elite555,https://community.cloud.databricks.com,"{""installable"":[],""redacted"":[]}"


Which version of the data produced the best model?

### Step 7. Move best performing model to production using MLflow model registry

Get the most recent model version and move it to production

In [0]:
model_version_infos = client.search_model_versions(f"name = '{model_name}'")
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

In [0]:
client.update_model_version(
  name=model_name,
  version=new_model_version,
  description="This model version was built using a MLlib Linear Regression model with all features and log_price as predictor."
)

In [0]:
time.sleep(60) # In case the registration is still pending before move to Staging

client.transition_model_version_stage(
  name=model_name,
  version=new_model_version,
  stage="Staging"
)

In [0]:
# TODO
# Move Model into Production
client.transition_model_version_stage(
  name=model_name,
  version=new_model_version,
  stage="Production"
)

Have a look at the MLflow model registry UI to check that your models have been successfully registered. You should see that version 1 of your model is now in staging, with version 2 in production.

To finish the lab, let's clean up by archiving both model versions and deleting the whole model from the registry

In [0]:
client.transition_model_version_stage(
  name=model_name,
  version=1,
  stage="Archived",
)

client.transition_model_version_stage(
  name=model_name,
  version=2,
  stage="Archived",
)

time.sleep(10)

client.delete_registered_model(model_name)


-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>