<a href="https://colab.research.google.com/github/rnomadic/Databricks_ML/blob/main/MLFlowTracking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Each run can record the following information:

Parameters: Key-value pairs of input parameters such as the number of trees in a random forest model
Metrics: Evaluation metrics such as RMSE or Area Under the ROC Curve
Artifacts: Arbitrary output files in any format. This can include images, pickled models, and data files
Source: The code that originally ran the experiment
NOTE: For Spark models, MLflow can only log PipelineModels.
"""
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

with mlflow.start_run(run_name="LR-Single-Feature") as run:
    # Define pipeline
    vec_assembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
    lr = LinearRegression(featuresCol="features", labelCol="price")
    pipeline = Pipeline(stages=[vec_assembler, lr])
    pipeline_model = pipeline.fit(train_df)

    # Log parameters
    mlflow.log_param("label", "price")
    mlflow.log_param("features", "bedrooms")

    # Log model
    mlflow.spark.log_model(pipeline_model, "model", input_example=train_df.limit(5).toPandas()) 

    # Evaluate predictions
    pred_df = pipeline_model.transform(test_df)
    regression_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
    rmse = regression_evaluator.evaluate(pred_df)

    # Log metrics
    mlflow.log_metric("rmse", rmse)

    ## Next let's build our linear regression model but use all of our features.

from pyspark.ml.feature import RFormula

with mlflow.start_run(run_name="LR-All-Features") as run:
    # Create pipeline
    r_formula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")
    lr = LinearRegression(labelCol="price", featuresCol="features")
    pipeline = Pipeline(stages=[r_formula, lr])
    pipeline_model = pipeline.fit(train_df)

    # Log pipeline
    mlflow.spark.log_model(pipeline_model, "model", input_example=train_df.limit(5).toPandas())

    # Log parameter
    mlflow.log_param("label", "price")
    mlflow.log_param("features", "all_features")

    # Create predictions and metrics
    pred_df = pipeline_model.transform(test_df)
    regression_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
    rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)

    # Log both metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
"""
Finally, we will use Linear Regression to predict the log of the price, due to its log normal distribution.
We'll also practice logging artifacts to keep a visual of our log normal histogram.
"""

from pyspark.sql.functions import col, log, exp
import matplotlib.pyplot as plt

with mlflow.start_run(run_name="LR-Log-Price") as run:
    # Take log of price
    log_train_df = train_df.withColumn("log_price", log(col("price")))
    log_test_df = test_df.withColumn("log_price", log(col("price")))

    # Log parameter
    mlflow.log_param("label", "log_price")
    mlflow.log_param("features", "all_features")

    # Create pipeline
    r_formula = RFormula(formula="log_price ~ . - price", featuresCol="features", labelCol="log_price", handleInvalid="skip")  
    lr = LinearRegression(labelCol="log_price", predictionCol="log_prediction")
    pipeline = Pipeline(stages=[r_formula, lr])
    pipeline_model = pipeline.fit(log_train_df)

    # Log model
    mlflow.spark.log_model(pipeline_model, "log-model", input_example=log_train_df.limit(5).toPandas())

    # Make predictions
    pred_df = pipeline_model.transform(log_test_df)
    exp_df = pred_df.withColumn("prediction", exp(col("log_prediction")))

    # Evaluate predictions
    rmse = regression_evaluator.setMetricName("rmse").evaluate(exp_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(exp_df)

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    # Log artifact
    plt.clf()

    log_train_df.toPandas().hist(column="log_price", bins=100)
    fig = plt.gcf()
    mlflow.log_figure(fig, username + "_log_normal.png")
    plt.show()
"""
Querying Past Runs
"""
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.list_experiments()

experiment_id = run.info.experiment_id
runs_df = mlflow.search_runs(experiment_id)

display(runs_df)

runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"], max_results=1)
runs[0].data.metrics

runs[0].info.run_id

"""
Examine the results in the UI. Look for the following:


The Experiment ID
The artifact location. This is where the artifacts are stored in DBFS.
The time the run was executed. Click this to see more information on the run.
The code that executed the run.
After clicking on the time of the run, take a look at the following:


The Run ID will match what we printed above
The model that we saved, included a pickled version of the model as well as the Conda environment and the MLmodel file.
"""

## Load saved model
model_path = f"runs:/{run.info.run_id}/log-model"
loaded_model = mlflow.spark.load_model(model_path)

display(loaded_model.transform(test_df))




## MLFlow Lab --------------------------------------------------------
"""
Step 2. Log initial run to MLflow
"""

import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import RFormula

with mlflow.start_run(run_name="lr_model") as run:
      #log parameter
      mlflow.log_param("label", "price_all_features")
      mlflow.log_param("data_version", data_version)
      mlflow.log_param("data_path", train_delta_path)

      ##create pipeline
      r_formula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")
      lr = LinearRegression(labelCol="price", featuresCol="features")
      stages =[r_formula, lr]
      pipeline = Pipeline(stages=stages)
      model = pipeline.fit(train_delta)

      #log pipeline
      #remember NOTE: For Spark models, MLflow can only log PipelineModels.
      mlflow.spark.log_model(model, "model")

      #create prediction and metrics
      pred_df = model.transform(test_delta)
      regression_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
      rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
      r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)

      #log metric
      mlflow.log_metric("rmse", rmse)
      mlflow.log_metric("r2", r2)

      run_id = run.info.run_id

      """
      Step 3. Register model and move to staging using MLflow Model Registry 
      We are happy with the performance of the above model and want to move it to 
      staging. Let's create the model and register it to the MLflow model registry.
      """

      model_name = f"{cleaned_username}_mllib_lr"
      model_uri = f"runs:/{run_id}/model"
      model_detail = mlflow.register_model(model_uri=model_uri, name=model_name)

      #Transition model to staging.
      from mlflow.tracking.client import MlflowClient
      client = MlflowClient()

      client.transition_model_version_stage(name=model_name, 
                                            version=1, 
                                            stage="staging")
      
      # Define a utility method to wait until the model is ready
      def wait_for_model(model_name, version, stage=None, status="Ready", timeout=300):
          
          import time
          last_stage= "unknown"
          last_status= "unknown"

          for i in range(timeout):
              model_version_detail = client.get_model_version(name=model_name, version=version)
              last_stage = str(model_version_detail.current_stage)
              last_status = str(model_version_detail.status)
              if last_status == status & last_stage==str(stage):
                return

              time.sleep(1)

          raise Exception(f"The model {model_name} version {version} was not {status} after {timeout} seconds")

      #Force our notebook to block until the model is ready
      wait_for_model(model_name, 1, stage="Staging")

      #Add a model description
      client.update_registered_model(name=model_version_detail.name,
                                     description="This model forecasts Airbnb housing list prices based on various listing inputs.")

"""
Step 4. Feature Engineering: Evolve Data Schema
We now want to do some feature engineering with the aim of improving model performance; 
we can use Delta Lake to track older versions of the dataset.

We will add log_price as a new column and update our Delta table with it.
"""

from pyspark.sql.functions import col, log, exp
# Create a new log_price column for both train and test datasets
train_new = train_delta.withColumn("log_price", log(col("price")))
test_new = test_delta.withColumn("log_price", log(col("price")))

#Save the updated DataFrames to train_delta_path and test_delta_path, respectively, 
#passing the mergeSchema option to safely evolve its schema.
train_new.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(train_delta_path)
test_new.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(test_delta_path)

#Look at the difference between the original & modified schemas
set(train_delta.schema.fields) ^ set(train_new.schema.fields)

#Let's review the Delta history of our train_delta table and load in the most 
#recent versions of our train and test Delta tables.
display(spark.sql(f"Describe History Delta. {train_delta_path}"))

data_version=1
train_delta_new =  spark.read.format("delta").option("versionAsOf", data_version).load(train_delta_path)
test_delta_new = spark.read.format("delta").option("versionAsOf", data_version).load(test_delta_path)

"""
Step 5. Use log_price as target and track run with MLflow
Retrain the model on the updated data and compare its performance 
to the original, logging results to MLflow.
"""
with mlflow.start_run(run_name = "lr_log_model") as run:

    #log param
    mlflow.log_param("label", "log-price")
    mlflow.log_param("data_version", data_version)
    mlflow.log_param("data_path", train_delta_path)

    #create pipeline
    r_formula = RFormula(formula="log_price ~ .-price", 
                         featuresCol="features", 
                         labelCol="log_price", 
                         handleInvalid="skip")
    
    lr = LinearRegression(labelCol="log_price", predictionCol="log_prediction")
    stages = [r_formula, lr]
    pipeline = Pipeline(stages = stages)
    pipeline_model = pipeline.fit(train_delta_new)

    # Log model and update the registered model
    # mlflow.spark.log_model(model, "model")
    mlflow.spark.log_model(spark_model=pipeline_model, 
                           artifact_path="log-model", 
                           registered_model_name=model_name)
    
    # Create predictions and metrics
    pred_df = pipeline_model.transform(test_delta)
    exp_df = pred_df.withColumn("prediction", exp(col("log_prediction")))
    rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
    r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)t

    #log metric
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    run_id = run.info.run_id

    """
    Step 6. Compare performance across runs by looking at Delta table versions

    Use MLflow's mlflow.search_runs API to identify runs according to the version 
    of data the run was trained on. Let's compare our runs according to our data versions.

    Filter based on params.data_path and params.data_version.

    And compare which data_version produced best models
    """
    data_version=0
    mlflow.search_runs(filter_string=f"params.data_path= '{train_delta_path}' and params.data_version='{data_version}'")

    data_version=1
    mlflow.search_runs(filter_string=f"params.data_path= '{train_delta_path}' and params.data_version='{data_version}'")

    """
    Step 7. Move best performing model to production using MLflow model registry
    """

    model_version_infos = clients.search_model_versions(f"name= '{model_name}'")
    new_model_version = max([model_version_info.version for model_version_info in model_version_infos])

    client.update_model_version(name=model_name,
                                version= new_model_version,
              description="This model version was built using a MLlib Linear \
              Regression model with all features and log_price as predictor."
                                )
    
    model_version_details = client.get_model_version(name=model_name, version=new_model_version)
    model_version_details.status

    wait_for_model(model_name, new_model_version)

    # Move Model into Production
    client.transition_model_version_stage(name=model_name, 
                                          version=new_version_model, 
                                          stage="Production")
    
    wait_for_model(model_name, new_model_version, "Production")

    # Have a look at the MLflow model registry UI to check that your models have 
    # been successfully registered. You should see that version 1 of your model 
    # is now in staging, with version 2 in production.

    #let's clean up by archiving both model versions and deleting the whole model from the registry
    client.transition_model_version_stage(name=model_name,
                                          version=1,
                                          stage="Archived")
    
    wait_for_model(model_name, 1, "Archived")