# MlFlow with Databricks 

With Community edition, only until Model Tracking is accessible, so for Model Registry commerical edition may have to be obtained.

## Data Versioning with Delta Tables

In [None]:
file_path = f"{DA.paths.datasets}/airbnb/sf-listings/sf-listings-2019-03-06-clean.delta/"
airbnb_df = spark.read.format("delta").load(file_path)

train_df, test_df = airbnb_df.randomSplit([.8, .2], seed=42)

In [None]:
train_delta_path = f"{DA.paths.working_dir}/train.delta"
test_delta_path = f"{DA.paths.working_dir}/test.delta"

# In case paths already exists
dbutils.fs.rm(train_delta_path, True)
dbutils.fs.rm(test_delta_path, True)

train_df.write.mode("overwrite").format("delta").save(train_delta_path)
test_df.write.mode("overwrite").format("delta").save(test_delta_path)

In [None]:
data_version = 0
train_delta = spark.read.format("delta").option("versionAsOf",data_version).load(train_delta_path)
test_delta = spark.read.format("delta").option("versionAsOf",data_version).load(test_delta_path)

In [1]:
#Review the transactions of thie Delta table
display(spark.sql(f"DESCRIBE HISTORY delta.`{train_delta_path}`"))

## MLflow Tracking

In [None]:
# TODO
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import RFormula

with mlflow.start_run(run_name="lr_model") as run:
    # Log parameters
    mlflow.log_param("data_path", train_delta_path)  
    # TODO: Log label: price-all-features
    mlflow.log_param("label","price-all-features")
    # TODO: Log data_version: data_version
    mlflow.log_param("data_version", data_version)


    # Create pipeline
    r_formula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")
    lr = LinearRegression(labelCol="price", featuresCol="features")
    pipeline = Pipeline(stages = [r_formula, lr])
    model = pipeline.fit(train_delta)

    # Log pipeline
    # TODO: Log model: model
    mlflow.spark.log_model(model,"model")

    # Create predictions and metrics
    pred_df = model.transform(test_delta)
    regression_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
    rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)

    # Log metrics
    # TODO: Log RMSE
    mlflow.log_metric("rmse",rmse)
    # TODO: Log R2
    mlflow.log_metric("r2",r2)

    run_id = run.info.run_id

## Register Model with Model Registry and Move it to Staging 

In [None]:
model_uri = f"runs:/{run_id}/model"

suffix = DA.unique_name("-")
model_name = f"mllib-lr_{suffix}"
print(f"Model Name: {model_name}\n")

model_details = mlflow.register_model(model_uri=model_uri, name=model_name)

In [2]:
#Transition model to staging 

from mlflow.tracking.client import MlflowClient

client = MlflowClient()

client.transition_model_version_stage(
    name=model_name,
    version=1,
    stage="Staging"
)

In [None]:
#If you are automating 

# Define a utility method to wait until the model is ready
def wait_for_model(model_name, version, stage="None", status="READY", timeout=300):
    import time

    last_stage = "unknown"
    last_status = "unknown"

    for i in range(timeout):
        model_version_details = client.get_model_version(name=model_name, version=version)
        last_stage = str(model_version_details.current_stage)
        last_status = str(model_version_details.status)
        if last_status == str(status) and last_stage == str(stage):
            return

        time.sleep(1)

    raise Exception(f"The model {model_name} v{version} was not {status} after {timeout} seconds: {last_status}/{last_stage}")

In [None]:
# Force our notebook to block until the model is ready. Useful when the scripts are running
wait_for_model(model_name, 1, stage="Staging")

In [None]:
#Add a model description 

client.update_registered_model(
  name=model_name,
  description="This model uses Airbnb data and rformula to make a spark linear regression."
)

In [None]:
wait_for_model(model_details.name, 1, stage="Staging")

## Feature Engineering & Data Version Tracking with Delta Tables

In [None]:
#Add log price

from pyspark.sql.functions import col, log, exp

# Create a new log_price column for both train and test datasets
train_new = train_delta.withColumn("log_price", log(col("price")))
test_new = test_delta.withColumn("log_price", log(col("price")))

In [None]:
train_new.write.mode("overwrite").option("mergeSchema", "true").save(train_delta_path)
train_new.write.mode("overwrite").option("mergeSchema", "true").save(test_delta_path)

In [None]:
#Look at the difference between original and modified schemas 

set(train_new.schema.fields) ^ set(train_delta.schema.fields)

In [None]:
#Review the histroy of our delta table 
display(spark.sql(f"DESCRIBE HISTORY delta.`{train_delta_path}`"))

In [None]:
data_version = 1
train_delta_new = spark.read.format("delta").option("versionAsOf", data_version).load(train_delta_path)  
test_delta_new = spark.read.format("delta").option("versionAsOf", data_version).load(test_delta_path)

## Use Log Price Model and Track Run with MLFlow 

In [None]:
with mlflow.start_run(run_name="lr_log_model") as run:
    # Log parameters
    mlflow.log_param("label", "log-price")
    mlflow.log_param("data_version", data_version)
    mlflow.log_param("data_path", train_delta_path)    

    # Create pipeline
    r_formula = RFormula(formula="log_price ~ . - price", featuresCol="features", labelCol="log_price", handleInvalid="skip")  
    lr = LinearRegression(labelCol="log_price", predictionCol="log_prediction")
    pipeline = Pipeline(stages = [r_formula, lr])
    pipeline_model = pipeline.fit(train_delta_new)

    # Log model and update the registered model
    mlflow.spark.log_model(
        spark_model=pipeline_model,
        artifact_path="log-model",
        registered_model_name=model_name
    )  

    # Create predictions and metrics
    pred_df = pipeline_model.transform(test_delta)
    exp_df = pred_df.withColumn("prediction", exp(col("log_prediction")))
    rmse = regression_evaluator.setMetricName("rmse").evaluate(exp_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(exp_df)

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)  

    run_id = run.info.run_id

## Compare performance across model runs based on Data Versions

In [None]:
#Version 0

data_version = 0

mlflow.search_runs(filter_string=f"params.data_version='{data_version}'")

In [None]:
data_version = 1

mlflow.search_runs(filter_string=f"params.data_version='{data_version}'")

## Move the best model to Production (Basics of CI/CD)

In [None]:
#Get the most recent model version and move to production. In this case, we know model with log price was better due to low rmse, however 
#can be checked as one of the testing criteria for the model with the lowest rmse (or similar metric) before moving it into production 
model_version_infos = client.search_model_versions(f"name = '{model_name}'")
new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

In [None]:
client.update_model_version(
    name=model_name,
    version=new_model_version,
    description="This model version was built using a MLlib Linear Regression model with all features and log_price as predictor."
)

In [None]:
model_version_details = client.get_model_version(name=model_name, version=new_model_version)
model_version_details.status

In [None]:
# Move Model into Production
client.transition_model_version_stage(
  model=model_name,
  version=new_model_version,
  stage='production',
  archive_existing_versions=True 
)

In [None]:
wait_for_model(model_name, new_model_version, "Production")