mlflow and azure

https://databricks.com/blog/2020/10/13/using-mlops-with-mlflow-and-azure.html

data drfit

https://docs.microsoft.com/en-us/azure/machine-learning/how-to-monitor-datasets?tabs=python

# Review the development cycle
1. Initial deployment: model trained and selected with existing train&test data
  * deploy model via mlflow model registry: https://www.mlflow.org/docs/latest/model-registry.html#adding-an-mlflow-model-to-the-model-registry


2. Monitoring model performance on new dataset
    * supervised learning
      * creating new test dataset
      * evaluate on test dataset with current model and log evaluation metrics & params
      * analyze logged metrics with predefined rules
      * trigger retrain if failed rules
    * unsupervised learning
      * monitoring used param and resulted metrics
      * analyze logged metrics with predefined rules
      * trigger retrain if failed rules
      
      
# Modules
1. create new test data - DS notebook, non-generic

2. train & register in model registry, log test set metrics of registered model
  * train & test - notebook by DS, non-generic
  * register - wrapper of <span style="color:blue">mlflow.register_model, client.transition_model_version_stage</span>
  * logger - wrapper of <span style="color:blue">mlflow.log_metrics, log_params, log_artifact</span>
    * experiment_id/experiment_name
    * evaluation metrics
    * params
    * evaluation metrics valid range
  * define rules (what is considered as a good enough model) <span style="color:blue">mlflow.set_tags</span>

2. use latest model from model registry and make predictions in on new test data, wrapper of <span style="color:blue">client = mlflow.tracking.MlflowClient()
latest_model = client.get_latest_versions(name = model_name, stages=[stage])</span>
3. analyze logged metrics with predefined rules - wrapper of <span style="color:blue">mlflow.search_runs</span>
  * input
    * experiment_id/experiment_name
  * output: retrain, /path/to/notebook
4. trigger retrain if failed rules
  * trigger azure data factory pipeline for DS training notebook (via storage event or custom event) or
  * execute the DS notebook by calling <span style="color:blue">%run /Users/path/to/notebookA</span>
  
5. move to production if meet rules: 

# Model Stages
None: WIP, Staging: Ready to test, Production: Serving, Archive: expired Staging and Production model

https://docs.databricks.com/_static/notebooks/mlflow/mlflow-model-registry-example.html

In [0]:
#%pip install mlflow
#%pip install pymysql

In [0]:
import mlflow
from mlflow.tracking.client import MlflowClient
client = MlflowClient()
client.list_experiments()

In [0]:
experiment = mlflow.get_experiment_by_name("/Users/saurabh.verma2@adidas.com/test_experiment")

In [0]:
experiment

# Generate Demo Model

In [0]:
from sklearn.datasets import load_iris
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
import pandas as pd

pd.set_option("display.max_rows", None, "display.max_columns", None)


import mlflow
import mlflow.sklearn
mlflow.sklearn.autolog()
#mlflow.set_tracking_uri("databricks") #
#mlflow.set_registry_uri("mysql+pymysql://mlflow-user:password@localhost:3306/mlflowruns")
# Note: on Databricks, the experiment name passed to set_experiment must be a valid path
# in the workspace, like '/Users/<your-username>/my-experiment'. See
# https://docs.databricks.com/user-guide/workspace.html for more info.
# mlflow.set_experiment("/my-experiment")
mlflow.set_experiment(experiment_name = experiment.name)

#NOTEBOOK_ID = 1557754033372830

KEY_EVALUATION = "eval_metric"

In [0]:
data = spark.read.option("header",True).option("inferSchema",True).csv('/FileStore/tables/Iris.csv')
data = data.drop("id")
raw_data = data.drop_duplicates()
display(raw_data)

SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
5.0,2.0,3.5,1.0,Iris-versicolor
6.8,3.0,5.5,2.1,Iris-virginica
6.7,3.0,5.2,2.3,Iris-virginica
5.6,2.9,3.6,1.3,Iris-versicolor
5.1,3.5,1.4,0.2,Iris-setosa
6.5,3.2,5.1,2.0,Iris-virginica
5.4,3.7,1.5,0.2,Iris-setosa
5.0,3.2,1.2,0.2,Iris-setosa
6.0,2.9,4.5,1.5,Iris-versicolor
5.0,3.6,1.4,0.2,Iris-setosa


In [0]:
# raw_data.createOrReplaceTempView("iris_dataset")
# raw_data.write.format("delta").saveAsTable("iris_features_example.test_table")

In [0]:
# %sql 
# CREATE DATABASE IF NOT EXISTS iris_features_example;

In [0]:
# from databricks import feature_store
# from pyspark.sql.functions import *
# from pyspark.sql.types import FloatType, IntegerType, StringType
# fs = feature_store.FeatureStoreClient()
# fs.create_feature_table(
#     name= "iris_features_example.iris_features",
#     keys=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
#     features_df=raw_data,
# #     partition_columns="yyyy_mm",
#     description="isis dataset features",
# )

In [0]:
# delta = spark.read.format("delta").table("iris_features_example.iris_features").drop("test")
# delta.write.format("delta").mode("OVERWRITE").option("overwriteSchema", True).saveAsTable("iris_features_example.iris_features")

In [0]:
# test2 = fs.read_table(
#   name='iris_features_example.iris_features',
# #   as_of_delta_timestamp = str('2021-08-18 12:21:19')
# )
# display(test2)

SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species,test
5.0,2.0,3.5,1.0,Iris-versicolor,abc
6.8,3.0,5.5,2.1,Iris-virginica,abc
6.7,3.0,5.2,2.3,Iris-virginica,abc
5.6,2.9,3.6,1.3,Iris-versicolor,abc
5.1,3.5,1.4,0.2,Iris-setosa,abc
6.5,3.2,5.1,2.0,Iris-virginica,abc
5.4,3.7,1.5,0.2,Iris-setosa,abc
5.0,3.2,1.2,0.2,Iris-setosa,abc
6.0,2.9,4.5,1.5,Iris-versicolor,abc
5.0,3.6,1.4,0.2,Iris-setosa,abc


In [0]:
# raw_data_2 = raw_data.withColumn("test", lit("abc"))
# raw_data_2

In [0]:
# fs.write_table(
#   name='iris_features_example.iris_features',
#   df=raw_data_2,
#   mode="merge",
# )

In [0]:
# %sql
# -- ALTER TABLE iris_features_example.iris_features ADD COLUMNS (test string)
# -- -- DESCRIBE HISTORY iris_features_example.iris_features
# select * from iris_features_example.iris_features


SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species,test
4.3,3.0,1.1,0.1,Iris-setosa,abc
4.4,2.9,1.4,0.2,Iris-setosa,abc
4.4,3.0,1.3,0.2,Iris-setosa,abc
4.4,3.2,1.3,0.2,Iris-setosa,abc
4.5,2.3,1.3,0.3,Iris-setosa,abc
4.6,3.1,1.5,0.2,Iris-setosa,abc
4.6,3.2,1.4,0.2,Iris-setosa,abc
4.6,3.4,1.4,0.3,Iris-setosa,abc
4.6,3.6,1.0,0.2,Iris-setosa,abc
4.7,3.2,1.3,0.2,Iris-setosa,abc


In [0]:
# from databricks.feature_store import FeatureLookup
# iris_feature_lookups = [
#     FeatureLookup( 
#       table_name = 'iris_features_example.iris_features',
#       feature_name = "test",
#       lookup_key = ["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
#     )
# ]

In [0]:
# training_set = fs.create_training_set(
#   raw_data,
#   feature_lookups = iris_feature_lookups,
#   label = "Species",
# #   exclude_columns = exclude_columns
# )

In [0]:
training_df = training_set.load_df()

In [0]:
# from sklearn.model_selection import train_test_split
# from mlflow.tracking import MlflowClient
# import lightgbm as lgb
# import mlflow.lightgbm
# from mlflow.models.signature import infer_signature

# features_and_label = training_df.columns

# # Collect data into a Pandas array for training
# data = training_df.toPandas()[features_and_label]

In [0]:
data = raw_data.toPandas()

In [0]:

def load_iris_data(data):
#     data = load_iris()
#     X = data.data
#     y = data.target
    X = data.drop(["Species"], axis=1)
    y = data["Species"]
    
    X_train, X_test, y_train, y_test = train_test_split(X,y, test_size = 0.25, random_state = 10)
    input_data = (X_train, X_test, y_train, y_test)
      
    return input_data

def train_predict_evaluate_dtree(input_data, params):
    with mlflow.start_run(run_name = "Decision Tree Classifier Experiments"):
        mlflow.sklearn.autolog()
        X_train, X_test, y_train, y_test = input_data
        clf = DecisionTreeClassifier(random_state=42, max_leaf_nodes=params['leaf_nodes'], max_depth=params['max_depth'])
        clf.fit(X_train, y_train)

        y_pred = clf.predict(X_test)
        test_accuracy = metrics.accuracy_score(y_test, y_pred)
        test_f1_score = metrics.f1_score(y_test, y_pred, average='weighted')
        test_metrics = (test_accuracy, test_f1_score)
        
        signature = infer_signature(X_train, clf.predict(X_test))
        mlflow.log_metric('test_accuracy' , test_accuracy)
        mlflow.log_metric('test_f1_score', test_f1_score)
#         mlflow.sklearn.log_model(clf, "iris_decision_tree", signature=signature)
    return clf, clf_trained, test_metrics
        
        

In [0]:
# input_data = load_iris_data()
mlflow.end_run()
input_data = load_iris_data(data)
params = {'leaf_nodes': 4, 'max_depth' :4}
iris_model, test_metrics = train_predict_evaluate_dtree(input_data, params)

In [0]:
# fs.log_model(
#   iris_model,
#   artifact_path="iris_decision_tree_model_packaged",
#   flavor=mlflow.sklearn,
#   training_set=training_set,
#   registered_model_name="iris_decision_tree_model"
# )

In [0]:
stg_model_version = get_lastest_model(model_name = 'iris_decision_tree_model', stage = 'Staging')  #Staging
stg_model_version

In [0]:
# input_data = load_iris_data(data)
# X_train, X_test, y_train, y_test = input_data
model_name = 'iris_decision_tree_model'
model = mlflow.pyfunc.load_model(f"models:/{model_name}/Staging")

# Sanity-check: This should match the predictions logged by MLflow
print(f'pred: {model.predict(X_test)}')

In [0]:
from pyspark.sql.functions import struct
from pyspark.sql.types import *

spark_df = spark.createDataFrame(X_train)
apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}/Production", result_type=StringType())


# Apply the model to the new data
udf_inputs = struct(*(X_train.columns.tolist()))


new_data = spark_df.withColumn(
  "prediction",
  apply_model_udf(udf_inputs)
#   apply_model_udf(struct(*('SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm')))
)

In [0]:
display(new_data)

SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,prediction
5.2,3.5,1.5,0.2,Iris-setosa
6.5,3.0,5.8,2.2,Iris-virginica
4.7,3.2,1.3,0.2,Iris-setosa
6.0,2.2,5.0,1.5,Iris-versicolor
5.1,3.4,1.5,0.2,Iris-setosa
5.5,3.5,1.3,0.2,Iris-setosa
6.8,3.2,5.9,2.3,Iris-virginica
7.4,2.8,6.1,1.9,Iris-virginica
5.8,4.0,1.2,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa


# Model Serving

In [0]:
# import os
# os.environ["DATABRICKS_TOKEN"] = "<YOUR_TOKEN>"

In [0]:
# # Replace with code snippet from the model serving page
# import os
# import requests
# import pandas as pd

# def score_model(dataset):
#   url = 'https://dbc-af318caa-90f9.cloud.databricks.com/model/iris_decision_tree_model/Production/invocations'
#   headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}'}
#   data_json = dataset.to_dict(orient='split') if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
#   response = requests.request(method='POST', headers=headers, url=url, json=data_json)
#   if response.status_code != 200:
#     raise Exception(f'Request failed with status {response.status_code}, {response.text}')
#   return response.json()

In [0]:
# score_model_test = score_model(X_test)

# Initial Deployment

## Shared functions

In [0]:
def generate_model_uri(run_id):
  artifact_path = 'model'
  return "runs:/{run_id}/{artifact_path}".format(run_id=run_id, artifact_path=artifact_path)

## DS: Select runs to register

First model development, expects to run multiple experiments and multiple runs with different params. We should select the best run to put into registry

In [0]:
def get_best_run_of_experiment(experiment_ids, filter_string = "", order_by = None):
  
  """Get a pandas DataFrame of runs that fit the search criteria.
    :param experiment_ids: List of experiment IDs.
    :param filter_string: Filter query string, defaults to searching all runs.
    :param order_by: List of columns to order by (e.g., "metrics.rmse"). The ``order_by`` column
                     can contain an optional ``DESC`` or ``ASC`` value. The default is ``ASC``.
                     The default ordering is to sort by ``start_time DESC``, then ``run_id``.
    :return:  ``pandas.DataFrame`` of runs, where each metric,
             parameter, and tag is expanded into its own column named metrics.*, params.*, or
             tags.* respectively. For runs that don't have a particular metric, parameter, or tag,
             the value for the corresponding column is (NumPy) ``Nan``, ``None``, or ``None``
             respectively.

    """
  import mlflow
  
  filter_string = filter_string.strip()
  if ('status' not in filter_string.lower()) and (filter_string != ''):
    filter_string = " and ".join(["status == 'FINISHED'", filter_string])

  
  if order_by is not None:
    if len([el for el in order_by if "start_time" in el]) == 0:
      order_by= order_by + ["start_time DESC"]
  
  pdf = mlflow.search_runs(
    experiment_ids = experiment_ids, 
    filter_string = filter_string, 
    order_by= order_by
  )
  run_id = list(pdf.run_id)[0]
  return run_id

# experiment_ids = list(set([mlflow.get_experiment_by_name(name).experiment_id for name in ["/my-experiment"]]))
experiment_ids = list(set([mlflow.get_experiment_by_name(name).experiment_id for name in [experiment.name]]))
run_id = get_best_run_of_experiment(experiment_ids = experiment_ids, order_by = ["metrics.test_accuracy DESC"])

from mlflow.tracking.client import MlflowClient
client = MlflowClient()


client.set_tag(run_id, KEY_EVALUATION, {"name": "test_accuracy_expectation", "expectation": 0.6})


## DS: Register Demo Model

In [0]:

model_name = 'iris_decision_tree'

model_uri = generate_model_uri(run_id)
model_details = mlflow.register_model(model_uri=model_uri, name=model_name)


import time
from mlflow.tracking.client import MlflowClient
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
 
def wait_until_ready(model_name, model_version):
  client = MlflowClient()
  for _ in range(10):
    model_version_details = client.get_model_version(
      name=model_name,
      version=model_version,
    )
    status = ModelVersionStatus.from_string(model_version_details.status)
    print("Model status: %s" % ModelVersionStatus.to_string(status))
    if status == ModelVersionStatus.READY:
      break
    time.sleep(1)
    
    
wait_until_ready(model_details.name, model_details.version)


## Transit Demo Model

Model are transited into staging to run additional tests on and compare with production model. 

Select one of the registered model to promote to staging

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.transition_model_version_stage(
    name=model_details.name,
    version=model_details.version,
    stage="Staging",
    archive_existing_versions = True
)


## Get latest Production/Staging model

In [0]:
def load_model(run_id):
  from mlflow.tracking.client import MlflowClient
  client = MlflowClient()
  model_uri = generate_model_uri(run_id)
  model = mlflow.pyfunc.load_model(model_uri)
  return model

def get_lastest_model(model_name: str, stage: str):
  """
  Latest version models for requested stage. If no ``stages`` provided, returns the
        latest version for each stage.

  :param model_name: Name of the registered model to update.
  :param stage: Desired stage. 'Staging', 'Production', 'Archived'. Case-insensitive
  :return: A :py:class:`mlflow.entities.model_registry.ModelVersion` object. Return None if no version found under desired stage

  """

  import mlflow
  
  client = mlflow.tracking.MlflowClient() 
  stages = [stage]
  models = client.get_latest_versions(name = model_name, stages=[stage])
  if len(models) >0:
    latest_model = models[0]
  else:
    latest_model = None
    
  return latest_model

stg_model_version = get_lastest_model(model_name = model_name, stage = 'Staging')
prod_model_version = get_lastest_model(model_name = model_name, stage = 'Production')



## Compare with production model and transit staging to production if better than production

In [0]:
import json
stg_model = load_model(stg_model_version.run_id)

X_train, X_test, y_train, y_test = load_iris_data() # notebook by DS to generate new test data
y_pred = stg_model.predict(X_test)

test_accuracy = metrics.accuracy_score(y_test, y_pred)
test_f1_score = metrics.f1_score(y_test, y_pred, average='weighted')
test_metrics = (test_accuracy, test_f1_score)


expectation_metric = client.get_run(stg_model_version.run_id).data.tags[KEY_EVALUATION].replace("'", "\"")
expectation_metric = json.loads(expectation_metric)
expectation_metric_name = expectation_metric['name']
expectation_val = float(expectation_metric['expectation'])

expectation_val

In [0]:
if (prod_model_version is not None) and (test_accuracy > expectation_val):
  
  prod_model = load_model(prod_model_version.run_id)
  y_pred_prod = prod_model.predict(X_test)
  test_accuracy_prod = metrics.accuracy_score(y_test, y_pred_prod)
  test_f1_score_prod = metrics.f1_score(y_test, y_pred_prod, average='weighted')
  test_metrics_prod = (test_accuracy, test_f1_score)
  
  if (test_f1_score_prod <= test_f1_score):
    client.transition_model_version_stage(
    name=stg_model_version.name,
    version=stg_model_version.version,
    stage="Production",
    archive_existing_versions = True
  )
else:
  client.transition_model_version_stage(
    name=stg_model_version.name,
    version=stg_model_version.version,
    stage="Production",
    archive_existing_versions = True
  )


In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
for rm in client.list_registered_models():
  print(f"name={rm.name}")
  [(print(f"run_id={mv.run_id}"), print(f"status={mv.current_stage}"),
         print(f"version={mv.version}"), print(f"creation_timestamp={rm.creation_timestamp}")) for mv in rm.latest_versions]

In [0]:

from mlflow.tracking import MlflowClient

client = MlflowClient()
for mv in client.search_model_versions("name = '{model_name}'".format(model_name = model_name)):
  print(f"run_id={mv.run_id}")
  print(f"status={mv.current_stage}")
  print(f"version={mv.version}")
  print(f"creation_timestamp={rm.creation_timestamp}")
  print()

In [0]:
def delete_archived_models(model_name, keep_latest = False):
  from mlflow.tracking import MlflowClient
  
  client = MlflowClient()
  model_version_infos = client.search_model_versions("name = '{model_name}'".format(model_name = model_name))
  archived_versions = [mv for mv in model_version_infos if mv.current_stage == 'Archived']
  
  if keep_latest:
    latest_av = get_lastest_model(model_name = model_name, stage = 'Archived')
    archived_versions = [av for av in archived_versions if av.version != latest_av.version]
  
  for av in archived_versions:
    client.delete_model_version(name=model_name, version=av.version)

delete_archived_models(model_name)

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()
for mv in client.search_model_versions("name = '{model_name}'".format(model_name = model_name)):
  print(f"run_id={mv.run_id}")
  print(f"status={mv.current_stage}")
  print(f"version={mv.version}")
  print(f"creation_timestamp={rm.creation_timestamp}")
  print()

https://mlflow.org/docs/latest/python_api/mlflow.html#mlflow.search_runs

https://docs.databricks.com/data/data-sources/mlflow-experiment.html#mlflow-exp-datasource