# MLflow

[MLflow](https://mlflow.org/docs/latest/concepts.html) seeks to address these three core issues:

* It’s difficult to keep track of experiments
* It’s difficult to reproduce code
* There’s no standard way to package and deploy models

In the past, when examining a problem, you would have to manually keep track of the many models you created, as well as their associated parameters and metrics. This can quickly become tedious and take up valuable time, which is where MLflow comes in.

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lesson you:<br>
* Use MLflow to track experiments, log metrics, and compare runs

**Required Libraries**: 
* `mlflow==1.7.0` via PyPI

-sandbox
<div><img src="https://files.training.databricks.com/images/eLearning/ML-Part-4/mlflow-tracking.png" style="height: 400px; margin: 20px"/></div>

In [0]:
%run "./Includes/Classroom-Setup"

Python interpreter will be restarted.
Collecting tensorflow
  Using cached tensorflow-2.13.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (524.1 MB)
Collecting termcolor>=1.1.0
  Using cached termcolor-2.3.0-py3-none-any.whl (6.9 kB)
Collecting flatbuffers>=23.1.21
  Using cached flatbuffers-23.5.26-py2.py3-none-any.whl (26 kB)
Collecting astunparse>=1.6.0
  Using cached astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting grpcio<2.0,>=1.24.3
  Using cached grpcio-1.56.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.2 MB)
Collecting google-pasta>=0.1.1
  Using cached google_pasta-0.2.0-py3-none-any.whl (57 kB)
Collecting protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3
  Using cached protobuf-4.23.4-cp37-abi3-manylinux2014_x86_64.whl (304 kB)
Collecting tensorflow-estimator<2.14,>=2.13.0
  Using cached tensorflow_estimator-2.13.0-py2.py3-none-any.whl (440 kB)
Collecting opt-einsum>=2.3.2
  Using cached opt_einsum-3.3.0-py3-n

Let's start by loading SF Airbnb Dataset.

In [0]:
%python
filePath = "dbfs:/mnt/training/airbnb/sf-listings/sf-listings-2019-03-06-clean.parquet/"
airbnbDF = spark.read.parquet(filePath)

(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)
print(trainDF.cache().count())

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2177866436138535>:2[0m
[1;32m      1[0m filePath [38;5;241m=[39m [38;5;124m"[39m[38;5;124mdbfs:/user/hive/warehouse/airbnb_listings[39m[38;5;124m"[39m
[0;32m----> 2[0m airbnbDF [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mparquet(filePath)
[1;32m      4[0m (trainDF, testDF) [38;5;241m=[39m airbnbDF[38;5;241m.[39mrandomSplit([[38;5;241m.8[39m, [38;5;241m.2[39m], seed[38;5;241m=[39m[38;5;241m42[39m)
[1;32m      5[0m [38;5;28mprint[39m(trainDF[38;5;241m.[39mcache()[38;5;241m.[39mcount())

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry

### MLflow Tracking

MLflow Tracking is a logging API specific for machine learning and agnostic to libraries and environments that do the training.  It is organized around the concept of **runs**, which are executions of data science code.  Runs are aggregated into **experiments** where many runs can be a part of a given experiment and an MLflow server can host many experiments.


MLflow tracking also serves as a **model registry** so tracked models can easily be stored and, as necessary, deployed into production. This also standardizes this process, which significantly accelerates it and allows for scalability. Experiments can be tracked using libraries in Python, R, and Java as well as by using the CLI and REST calls.  This module will use Python, though the majority of MLflow functionality is also exposed in these other APIs.

### Track Runs

Each run can record the following information:<br><br>

- **Parameters:** Key-value pairs of input parameters such as the number of trees in a random forest model
- **Metrics:** Evaluation metrics such as RMSE or Area Under the ROC Curve
- **Artifacts:** Arbitrary output files in any format.  This can include images, pickled models, and data files
- **Source:** The code that originally ran the experiment

**NOTE**: MLflow can only log PipelineModels.

In [0]:
pip install mlflow

Python interpreter will be restarted.
Collecting mlflow
  Downloading mlflow-2.5.0-py3-none-any.whl (18.2 MB)
Collecting gunicorn<21
  Downloading gunicorn-20.1.0-py3-none-any.whl (79 kB)
Collecting docker<7,>=4.0.0
  Downloading docker-6.1.3-py3-none-any.whl (148 kB)
Collecting pyyaml<7,>=5.1
  Downloading PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (661 kB)
Collecting gitpython<4,>=2.1.0
  Downloading GitPython-3.1.32-py3-none-any.whl (188 kB)
Collecting cloudpickle<3
  Downloading cloudpickle-2.2.1-py3-none-any.whl (25 kB)
Collecting alembic!=1.10.0,<2
  Downloading alembic-1.11.1-py3-none-any.whl (224 kB)
Collecting Flask<3
  Downloading Flask-2.3.2-py3-none-any.whl (96 kB)
Collecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting sqlalchemy<3,>=1.4.0
  Downloading SQLAlchemy-2.0.19-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.8 MB)
Collecting sqlparse

In [0]:
%python
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
username = "maitrik.patel2025@gmail.com"
mlflow.set_experiment(f"/Users/{username}/tr-mlflow")

with mlflow.start_run(run_name="LR-Single-Feature") as run:
  # Define pipeline
  vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
  lr = LinearRegression(featuresCol="features", labelCol="price")
  pipeline = Pipeline(stages=[vecAssembler, lr])
  pipelineModel = pipeline.fit(trainDF)
  
  # Log parameters
  mlflow.log_param("label", "price-bedrooms")
  
  # Log model
  mlflow.spark.log_model(pipelineModel, "model")
  
  # Evaluate predictions
  predDF = pipelineModel.transform(testDF)
  regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price", metricName="rmse")
  rmse = regressionEvaluator.evaluate(predDF)
  
  # Log metrics
  mlflow.log_metric("rmse", rmse)

# display_run_uri(run.info.experiment_id, run.info.run_id)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-2177866436138538>:15[0m
[1;32m     13[0m lr [38;5;241m=[39m LinearRegression(featuresCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mfeatures[39m[38;5;124m"[39m, labelCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mprice[39m[38;5;124m"[39m)
[1;32m     14[0m pipeline [38;5;241m=[39m Pipeline(stages[38;5;241m=[39m[vecAssembler, lr])
[0;32m---> 15[0m pipelineModel [38;5;241m=[39m pipeline[38;5;241m.[39mfit(trainDF)
[1;32m     17[0m [38;5;66;03m# Log parameters[39;00m
[1;32m     18[0m mlflow[38;5;241m.[39mlog_param([38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mprice-bedrooms[39m[38;5;124m"[39m)

File [0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<local

Next let's build our linear regression model but use all of our features.

In [0]:
%python
from pyspark.ml.feature import RFormula
with mlflow.start_run(run_name="LR-All-Features") as run:
  # Create pipeline
  rFormula = RFormula(formula="price ~ .", featuresCol="features", labelCol="price", handleInvalid="skip")
  lr = LinearRegression(labelCol="price", featuresCol="features")
  pipeline = Pipeline(stages = [rFormula, lr])
  pipelineModel = pipeline.fit(trainDF)
  
  # Log pipeline
  mlflow.spark.log_model(pipelineModel, "model")
  
  # Log parameter
  mlflow.log_param("label", "price-all-features")
  
  # Create predictions and metrics
  predDF = pipelineModel.transform(testDF)
  regressionEvaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
  
  # Log both metrics
  mlflow.log_metric("rmse", rmse)
  mlflow.log_metric("r2", r2)

# display_run_uri(run.info.experiment_id, run.info.run_id)

Finally, we will use Linear Regression to predict the log of the price, due to its log normal distribution.

We'll also practice logging artifacts to keep a visual of our log normal histogram.

In [0]:
%python
from pyspark.ml.feature import RFormula
from pyspark.sql.functions import col, log, exp
import matplotlib.pyplot as plt

with mlflow.start_run(run_name="LR-Log-Price") as run:
  # Take log of price
  logTrainDF = trainDF.withColumn("log_price", log(col("price")))
  logTestDF = testDF.withColumn("log_price", log(col("price")))
  
  # Log parameter
  mlflow.log_param("label", "log-price")
  
  # Create pipeline
  rFormula = RFormula(formula="log_price ~ . - price", featuresCol="features", labelCol="log_price", handleInvalid="skip")  
  lr = LinearRegression(labelCol="log_price", predictionCol="log_prediction")
  pipeline = Pipeline(stages = [rFormula, lr])
  pipelineModel = pipeline.fit(logTrainDF)
  
  # Log model
  mlflow.spark.log_model(pipelineModel, "log-model")
  
  # Make predictions
  predDF = pipelineModel.transform(logTestDF)
  expDF = predDF.withColumn("prediction", exp(col("log_prediction")))
  
  # Evaluate predictions
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(expDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(expDF)
  
  # Log metrics
  mlflow.log_metric("rmse", rmse)
  mlflow.log_metric("r2", r2)
  
  # Log artifact
  plt.clf()
  logTrainDF.toPandas().hist(column="log_price", bins=100)
  figPath = username + "logNormal.png" 
  plt.savefig(figPath)
  mlflow.log_artifact(figPath)
  display(plt.show())
  
# display_run_uri(run.info.experiment_id, run.info.run_id)

That's it! Now, let's use MLflow to easily look over our work and compare model performance. You can either query past runs programmatically or use the MLflow UI.

### Querying Past Runs

You can query past runs programatically in order to use this data back in Python.  The pathway to doing this is an `MlflowClient` object.

In [0]:
%python
from mlflow.tracking import MlflowClient

client = MlflowClient()

In [0]:
%python
client.list_experiments()

You can also use [search_runs](https://mlflow.org/docs/latest/search-syntax.html) to find all runs for a given experiment.

In [0]:
%python
experiment_id = run.info.experiment_id
runs_df = mlflow.search_runs(experiment_id)

display(runs_df)

Pull the last run and look at metrics.

In [0]:
%python
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"], max_results=1)
runs[0].data.metrics

In [0]:
%python
run_id = runs[0].info.run_id
# display_run_uri(run.info.experiment_id, run_id)

-sandbox
Examine the results in the UI.  Look for the following:<br><br>

1. The `Experiment ID`
2. The artifact location.  This is where the artifacts are stored in DBFS.
3. The time the run was executed.  **Click this to see more information on the run.**
4. The code that executed the run.


After clicking on the time of the run, take a look at the following:<br><br>

1. The Run ID will match what we printed above
2. The model that we saved, included a pickled version of the model as well as the Conda environment and the `MLmodel` file.

Note that you can add notes under the "Notes" tab to help keep track of important information about your models. 

Also, click on the run for the log normal distribution and see that the histogram is saved in "Artifacts".

### Load Saved Model

Let's practice [loading](https://www.mlflow.org/docs/latest/python_api/mlflow.spark.html) our logged log-normal model.

In [0]:
%python

loaded_model = mlflow.spark.load_model(f"runs:/{run.info.run_uuid}/log-model")
display(loaded_model.transform(testDF))

### Log Param, Metrics, and Artifacts

Now it's your turn! Log your name, your height, and a fun [matplotlib visualization](https://matplotlib.org/3.1.0/gallery/lines_bars_and_markers/scatter_with_legend.html#sphx-glr-gallery-lines-bars-and-markers-scatter-with-legend-py) (by calling the `generate_plot` function below - feel free to modify the viz!) under a run with name `MLflow-Lab` in our new MLflow experiment.

In [0]:
%python
def generate_plot():
  import numpy as np
  np.random.seed(19680801)
  import matplotlib.pyplot as plt

  fig, ax = plt.subplots()
  for color in ['tab:blue', 'tab:orange', 'tab:green']:
      n = 750
      x, y = np.random.rand(2, n)
      scale = 200.0 * np.random.rand(n)
      ax.scatter(x, y, c=color, s=scale, label=color,
                 alpha=0.3, edgecolors='none')

  ax.legend()
  ax.grid(True)
#   display(plt.show())
  return fig, plt

generate_plot()