In [0]:
%pip install --quiet mlflow==2.19
%pip install --force-reinstall databricks-automl-runtime==0.2.20.6
dbutils.library.restartPython()

In [0]:
dbutils.widgets.text("model_name", "", "Model Name")
dbutils.widgets.text("model_version", "", "Model Version")
#dbutils.widgets.text("source_catalog", "", "Source Catalog")
#dbutils.widgets.text("source_db", "", "Source Database")
dbutils.widgets.text("validation_data", "", "validation data")
dbutils.widgets.text("target_catalog", "", "Target Catalog")

In [0]:
source_full_model_name = dbutils.widgets.get("model_name")
model_version = dbutils.widgets.get("model_version")
#catalog = dbutils.widgets.get("source_catalog")
#db = dbutils.widgets.get("source_db")
validation_data = dbutils.widgets.get("validation_data")
target_catalog = dbutils.widgets.get("target_catalog")

#Not going to use the inputs for catalog and db. Instead use model name and extract catalog and db
catalog, db, model_name = source_full_model_name.split('.', 2)

catalog = catalog.strip()
db = db.strip()
model_name = model_name.strip()

model_version = int(model_version)


# Validate automl model before moving it to target

In [0]:
# We are interested in validating the automl model in dev before propogating to stage
import mlflow
from mlflow.tracking import MlflowClient

client = MlflowClient()
#model_details = client.get_model_version_by_alias(full_model_name, model_alias)
#model_version = int(model_details.version)

print(f"Validating model for {source_full_model_name} on model version {model_version}")

## Validate if model version description is populated

In [0]:
# If there's no description or an insufficient number of charaters, tag accordingly
model_details = client.get_model_version(name=source_full_model_name, version=model_version)
if not model_details.description:
  has_description = False
  print("Please add model description to the version")
  #dbutils.notebook.exit("validation Completed. Model version is not fit for next stage")
elif not len(model_details.description) > 20:
  has_description = False
  print("Please add detailed model description (40 char min).")
  #dbutils.notebook.exit("validation Completed. Model version is not fit for next stage")
else:
  has_description = True

print(f'Model {source_full_model_name} version {model_details.version} has description: {has_description}')
client.set_model_version_tag(name=source_full_model_name, version=str(model_details.version), key="has_description", value=has_description)

## Validate if champion model exist.

In [0]:
model_run_id = model_details.run_id
test_smape = mlflow.get_run(model_run_id).data.metrics['test_smape']

try:
    #Compare the challenger smape score to the existing champion if it exists
    champion_model = client.get_model_version_by_alias(source_full_model_name, "Champion")
    champion_smape = mlflow.get_run(champion_model.run_id).data.metrics['test_smape']
    print(f'Champion test_smape score: {champion_smape}. Challenger champion_smape score: {test_smape}.')
    metric_smape_passed = test_smape <= champion_smape
except:
    print(f"No Champion found. Accept the model as it's the first one.")
    metric_smape_passed = True

if metric_smape_passed == True:
    print(f'Model {source_full_model_name} version {model_details.version} smape comparison with champion model passed. champion_smape_passed: {metric_smape_passed}.')
    client.set_model_version_tag(name=source_full_model_name, version=model_details.version,key="champion_smape_passed", value=metric_smape_passed)
    client.set_registered_model_alias(name=source_full_model_name, alias="Challenger", version=model_version)
else:
    print(f'Model {source_full_model_name} version {model_details.version} champion_smape_passed: {metric_smape_passed}. Not a good model to proceed with')
    client.set_model_version_tag(name=source_full_model_name, version=model_details.version, key="champion_smape_passed", value=metric_smape_passed)
    client.set_registered_model_alias(name=source_full_model_name, alias="Challenger", version=model_version)
    dbutils.notebook.exit("validation Completed. Model version is not fit for next stage")


## Validating model performance against validation dataset

In [0]:
import pyspark.sql.functions as F
import mlflow
#get our validation dataset:
validation_data_df = spark.table(f"{catalog}.{db}.{validation_data}").toPandas()

requirements = mlflow.pyfunc.get_model_dependencies(model_uri=f"models:/{catalog}.{db}.{model_name}@Challenger")
%pip install -r {requirements}
%pip install --force-reinstall databricks-automl-runtime==0.2.20.6

In [0]:
import mlflow
model = mlflow.pyfunc.load_model(model_uri=f"models:/{catalog}.{db}.{model_name}@Challenger")
validation_data_df['prediction'] = model.predict(validation_data_df)
display(validation_data_df)

In [0]:
import pandas as pd
import numpy as np

def smape(df, actual_col, predicted_col):
  """
  Calculates the Symmetric Mean Absolute Percentage Error (SMAPE).

  Args:
    df: Pandas DataFrame containing actual and predicted values.
    actual_col: Name of the column containing actual values.
    predicted_col: Name of the column containing predicted values.

  Returns:
    The SMAPE value as a float.
  """
  actual = df[actual_col]
  predicted = df[predicted_col]
  return np.mean(2 * np.abs(predicted - actual) / (np.abs(actual) + np.abs(predicted))) * 100


validation_smape_value = smape(validation_data_df, 'Year_1', 'prediction')
print(f"SMAPE value based on staging validation data: {validation_smape_value:.2f}")

In [0]:
if validation_smape_value <= test_smape:
    print(f"Validation SMAPE value {validation_smape_value} is less than or equal to the test SMAPE value {test_smape}. Validation Passed")
    validation_metric_smape_passed = validation_smape_value <= test_smape
    print(f'Model {source_full_model_name} version {model_details.version} validation_metric_smape_passed: {validation_metric_smape_passed}')
    client.set_model_version_tag(name=source_full_model_name, version=model_details.version, key="validation_metric_smape_passed", value=validation_metric_smape_passed)
    client.set_registered_model_alias(name=source_full_model_name, alias="Champion", version=model_version)
else:
    print(f"Validation SMAPE value {validation_smape_value} is greater than the test SMAPE value {test_smape}. Validation Failed")
    validation_metric_smape_passed = validation_smape_value <= test_smape
    client.set_model_version_tag(name=source_full_model_name, version=model_details.version, key="validation_metric_smape_passed", value=validation_metric_smape_passed)
    dbutils.notebook.exit("validation Completed. Model version is not fit for next stage")


## Promoting model to next catalog

In [0]:
if validation_metric_smape_passed == True:
    %pip install --quiet mlflow==2.19
    client = MlflowClient()
    if catalog != "mlops_prod":
        target_model_details = client.copy_model_version(f"models:/{source_full_model_name}/{model_version}",f"{target_catalog}.{db}.{model_name}")
        target_model_version = int(target_model_details.version)
        client.set_registered_model_alias(name=f"{target_catalog}.{db}.{model_name}", alias="Challenger", version=target_model_version)
        client.delete_model_version_tag(f"{target_catalog}.{db}.{model_name}",f"{target_model_version}" , "validation_metric_smape_passed")
        client.delete_model_version_tag(f"{target_catalog}.{db}.{model_name}",f"{target_model_version}" , "has_description")
        client.delete_model_version_tag(f"{target_catalog}.{db}.{model_name}",f"{target_model_version}" , "champion_smape_passed")
