# Example notebook for data and model versioning using MLFlow and DeltaLake

This note book is created based on [Databricks example notebook](https://docs.databricks.com/_static/notebooks/mlflow/mlflow-delta-training.html)

The data used is loan status from lending club, 2007--2017, which can be found [here](https://www.kaggle.com/husainsb/lendingclub-issued-loans?select=lc_loan.csv)

**This notebook is used to demo**
- Data versioning using [DeltaLake](https://docs.microsoft.com/en-us/azure/databricks/delta/delta-intro) and [MLflow](https://mlflow.org/)
- MLflow version a model and its dependent model (i.e. Logistic Regression and [LIME](https://homes.cs.washington.edu/~marcotcr/blog/lime/) in this case)
- Copy both artifacts from MLflow to a specified location


**Cluster Configuration**
- Databricks Runtime Version `6.4 ML (includes Apache Spark 2.4.5, Scala 2.11)`
- Additional Libraries needed, both are using `default` repository
  - From Maven Repo, coordinate `Azure:mmlspark:0.17`
  - From Pip install, `mlflow==1.14.1`
  
**Storage Requirement**
- If no [credential passthrough](https://docs.microsoft.com/en-us/azure/databricks/security/credential-passthrough/) is used, please mount the containers in the storage account and change the container name accordingly
- If credential passthrough is used, please make sure you have configured
  - When creating clusters, tick the box of "using credential passthrough"
  - Make sure the correct permission has been set to the storage account

In [0]:
from distutils.version import LooseVersion
import pyspark
from mmlspark import TabularLIME, TabularLIMEModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.sql.types import FloatType
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from typing import Tuple, List
from pathlib import Path

import mlflow
import mlflow.spark

In [0]:
print(f"MLFlow version = {mlflow.__version__}")

# Load data from Delta Lake

To load data, in this example we use credential passthrough
- the mount Azure Delta Lake storage to Databricks (skip if we are using credential pass through)
  - If you are using credential passthrough please set  `IS_CRED_PASS` to `True`
- specify path and data version needed

## Set crendential passthrough (skip if you already mounted the storage)

In [0]:
IS_CRED_PASS = True
data_path = "/mnt"
cred_passthrough_configs = {
"fs.azure.account.auth.type": "CustomAccessToken",
"fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}
container_name = "datalake"
if IS_CRED_PASS:
  data_path = f"abfss://{container_name}@dlsloandev.dfs.core.windows.net/lc_loan"  

## Specify path and version

- Azure Delta Lake storage is mounted under path `/mnt/delta-ds-test/lc_loan` （or a direct access through credential pass through using container `delta-ds-test`). Note this is the default value and you can change it to the actual name of your deltalake container
- Data is ingested by year, from 2007 to 2012, using "issue_d" column as watermark for versioning.

In [0]:
DEFAULT_DATA_CONTAINER = "delta-ds-test"
DEFAULT_ARTIFACT_CONTAINER = "model-artifacts"
dbutils.widgets.text(name="deltaVersion", defaultValue="6", label="Table version, default=6")
dbutils.widgets.text(name="tbl_name", defaultValue="delta-ds-test/lc_loan", label="tbl_name,default=delta-ds-test/lc_loan")
data_version = None if dbutils.widgets.get("deltaVersion") == "" else int(dbutils.widgets.get("deltaVersion"))
DELTA_TABLE_DEFAULT_PATH =  f"/mnt/{DEFAULT_DATA_CONTAINER}/lc_loan"
input_str = dbutils.widgets.get("tbl_name")
data_path = ""
if input_str == "":
  data_path = f"abfss://{container_name}@dlsloandev.dfs.core.windows.net/lc_loan" if IS_CRED_PASS else DELTA_TABLE_DEFAULT_PATH
else:
  container_name, tbl_name = input_str.strip().split("/")
  data_path = f"abfss://{container_name}@dlsloandev.dfs.core.windows.net/{tbl_name}" if IS_CRED_PASS else f"/mnt/{input_str}"
displayHTML(f"Current data path {data_path}, version {data_version}")

In [0]:
displayHTML(data_path)

## Load from Delta Lake

In [0]:
dataset = spark.read.format("delta").option("versionAsOf", data_version).load(data_path)
display(dataset.select('issue_d').withColumn('year', F.year('issue_d')).select('year').distinct())

In [0]:
spark.catalog.clearCache()

# Data Transformation

## Create bad loan label

Create bad loan label, this will include charged off, defaulted, and late repayments on loans.

In [0]:
dataset = dataset.filter(dataset.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(dataset.loan_status == "Fully Paid")).cast("string"))

## Feature Engineering

- use only year information
- compute credit_length_in_years
- add a "net" column as a new feature

In [0]:
dataset = (
          dataset.withColumn('issue_year',  F.year(F.col('issue_d')).cast('double')) 
                 .withColumn('earliest_year', F.year(F.col('earliest_cr_line')).cast('double'))
                  .withColumn('credit_length_in_years', F.col('issue_year')-F.col('earliest_year'))
           .withColumn('net', F.round(F.col('total_pymnt') -F.col('loan_amnt'), 2))

          )

In [0]:
display(dataset)

# Helper functions for training

## Data Transformation

- impute columns
- create feature vector
- convert target column to label

In [0]:
def data_transform(features:list, 
                    target:str, 
                   train_df: DataFrame)->PipelineModel:
  """
  - transform feature columns into a single vector type column named `features`
  - convert target column to label using "string indexer"
  - fit the transformation pipeline using training data
  
  :param features. list of feature column names
  :type features: list
  :param target: name of the target column
  :type str
  :param train_df. Train data frame for fit the transformation pipeline
  :type: Dataframe
  """
  model_matrix_stages = [
    Imputer(inputCols = features, outputCols = features),
    VectorAssembler(inputCols=features, outputCol='features'),
    StringIndexer(inputCol=target, outputCol="label")
  ]
  transform_pipeline = Pipeline(stages=model_matrix_stages)
  transform_pipeline_model = transform_pipeline.fit(train_df)
  return transform_pipeline_model
  
  

## Train Function

In [0]:
def train(train_df:DataFrame, 
          test_df: DataFrame,
          lr_params:dict, 
          lime_params:dict,
          lime_output_col:str="weights",
          lime_prediction_col:str="prediction")->Tuple[LogisticRegressionModel, TabularLIMEModel]:
  """
  Helper function that fits a CrossValidator model to predict a binary label
  `target` on the passed-in training DataFrame using the columns in `features`
  :param: train: Spark DataFrame containing training data
  :param: features: List of strings containing column names to use as features from `train`
  :param: target: String name of binary target column of `train` to predict
  :param: lime_output_col:str, output column of LIME model
  :param: lime_prediction_col, prediction column to be input to LIME
  """
  {mlflow.log_param("lr_"+param, val) for param,val  in lr_params.items()}
  {mlflow.log_param("lime_"+param, val) for param,val  in lime_params.items()}
  #   
  lime_input_col="features"
  #   
  lr = LogisticRegression(**lr_params, featuresCol = "features")
  #   
  training_pipeline =  Pipeline(stages=[lr])
  paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0, 0.001, 1, 10]).build()
  crossval = CrossValidator(estimator=training_pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=MulticlassClassificationEvaluator(metricName="accuracy"),
                            numFolds=5)
 
  cvModel = crossval.fit(train_df)
  # evaluate on the test data
  evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
  validation_res = evaluator.evaluate(cvModel.transform(test_df))
  # log using mlflow
  mlflow.log_metric('test_' + evaluator.getMetricName(), validation_res)

  # Train LIME model
  lr_model = cvModel.bestModel
  # get the final model parameter for regParam
  mlflow.log_param("lr_regParam", lr_model.stages[-1]._java_obj.getRegParam())
  lime = (TabularLIME().
                setModel(lr_model.stages[-1]).
                setPredictionCol(lime_prediction_col).
                setOutputCol(lime_output_col).
                setInputCol(lime_input_col).
                setParams(**lime_params)
               )

  lime_model = lime.fit(train_df)
  # log models
  mlflow.spark.log_model(lr_model, "lr_model")
  mlflow.spark.log_model(lime_model, "lime_model")
  return lr_model, lime_model

## LIME Helper Function

Helper function for split lime output weight vector into corresponding feature columns

In [0]:
#Helper function for LIME
def splitVector(df_split: pyspark.sql.DataFrame, new_features: list) -> pyspark.sql.DataFrame: 
  """flatten LIME output "splitcol" wherein the importance of each feature 
  used in model is represented as a sigle column in the returend dataframe, 
  while remaining rest of the output columns, e.g. prediction

  :param df_split: Dataframe to be split column wise
  :type df_split: pyspark.sql.DataFrame
  :param new_features: column name list in the split columns
  :type new_features: list
  :return: Dataframe converted
  :rtype: pyspark.sql.DataFrame 
  """
  
  schema = df_split.schema
  cols = df_split.columns

  for col in new_features: # new_features should be the same length as vector column length
    schema = schema.add(col,FloatType(),True)
  return spark.createDataFrame(df_split.rdd.map(lambda row: [row[i] for i in cols]+row.splitcol.tolist()), schema)

# Training

- Split train/test based on the year, we always reserve latest year for validation.
- Start training

In [0]:
tot_year = sorted([x.issue_year for x in dataset.select('issue_year').distinct().collect()])
split_year = tot_year[-2]

In [0]:
# split train and validation based on the year
feature_cols = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "credit_length_in_years", 'net']
target_col = 'bad_loan'
train_df = dataset.select(feature_cols + [target_col]).where(F.col('issue_year')<= split_year)
test_df = dataset.select(feature_cols + [target_col]).where(F.col('issue_year')>split_year)
transform_model = data_transform(train_df=train_df, features=feature_cols, target=target_col)
train_t_df = transform_model.transform(train_df)
test_t_df = transform_model.transform(test_df)

In [0]:
# cache dataframe to avoid lazy eval running multiple times
train_t_df.cache().count()
test_t_df.cache().count()

In [0]:
display(test_t_df)

In [0]:
from mlflow.tracking import MlflowClient
user_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
experiment_name = f"/Users/{user_name}/loan_classification"
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
  lr_params = {"labelCol": "label", "maxIter":50}
  lime_params = {"nSamples": 1000, "samplingFraction": 0.3, "regularization":0.0}
  tags = {
          "data_path" : data_path,
          "data_version": data_version,
          "train_test_split": split_year
      }
  mlflow.set_tags(tags)
  # log note
  mlflow.set_tag("mlflow.note.content", 
                 "one sub-run for loan prediction with lr and lime models with expr for param tuning")
  mlflow.log_param("train_test_split", split_year)
  mlflow.log_param("data_version", data_version)
  lr_model, lime_model = train(train_t_df, test_t_df,lr_params, lime_params)


# Check prediction output for LR and LIME

## Check output from LR

In [0]:
pred_df = lr_model.transform(test_t_df)
pred_df.cache().count()

In [0]:
display(pred_df)

## Check output from LIME

In [0]:
lime_df = lime_model.transform(pred_df.select('features'))
dfLimeSel = lime_df.select('weights').withColumnRenamed('weights', 'splitcol')
dfSplit = splitVector(dfLimeSel, feature_cols )
dfResult = dfSplit.drop("splitcol")

In [0]:
display(dfResult)

# Retrieve and Copy Artifact
- Retrieve based specific runs and experiments
- Copy both LR and LIME model to a specified location

## Helper function for cp artifacts

In [0]:
def copy_model(
    exp_id: str,
    run_id: str,
    model_name: str,
    filepath: str
):
    """
    copy a model from mlflow artificts to a filelocaion
    :param exp_id: expr id to be retrieved
    :type exp_id: str
    :param run_id: run id to be retrieved
    :type run_id: str
    :param model_name: model name to be retrieved
    :type model_name: str
    :param filepath: filepath the model to be stored
    :type filepath: str
    """
    
    model_path = f"dbfs:/databricks/mlflow-tracking/{exp_id}/{run_id}/artifacts/{model_name}"
    model = mlflow.spark.load_model(model_path)
    model.write().overwrite().save(filepath)

## Query all runs

In [0]:
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
runs = mlflow.search_runs(experiment_ids=experiment_id, order_by=['metrics.avg_accuracy'])

In [0]:
runs

## Only get "best" runs for each CV run

In [0]:
runs[runs['tags.mlflow.parentRunId'].isnull()]

## Copy LR model and its dependent LIME model

In [0]:
run_id = "your-run-id-here-from-mlflow" #you can find your runid from mlflow
model_names=['lime_model', 'lr_model']

artifact_path = ( f"abfss://{DEFAULT_ARTIFACT_CONTAINER}@dlsloandev.dfs.core.windows.net/{experiment_id}/{run_id}" if 
                 IS_CRED_PASS else 
                 f'/mnt/{DEFAULT_ARTIFACT_CONTAINER}/{experiment_id}/{run_id}')
for model in model_names:
  try: 
    dbutils.fs.ls(f"{artifact_path}/{model}")
    displayHTML("Already copied!")
  except:
    displayHTML(f"Copying {model} to "f"{artifact_path}/{model}")
    copy_model(experiment_id,run_id,model_name=model, filepath=f"{artifact_path}/{model}")

In [0]:
display(dbutils.fs.ls(artifact_path))

## Re-load a model and test

- We use LIME model as an example, re-load it from the artifact path and test it

In [0]:
test_model = 'lime_model'
model_path = f'{artifact_path}/{test_model}'
lime_reloaded = PipelineModel.load(model_path)
lime_df2 = lime_reloaded.transform(pred_df.select('features'))
dfLimeSel = lime_df.select('weights').withColumnRenamed('weights', 'splitcol')
dfSplit = splitVector(dfLimeSel, feature_cols )
dfResult = dfSplit.drop("splitcol")
dfResult.show()