## Azure Databricks / Azure Machine Learning Sample - Deployment Update

Sample notebook showcasing how to promote a staged model to a full-production slot (100% traffic allocation) and remove the previous deployment.

#### Import required packages

In [0]:
import numpy as np
import os
from pyspark.sql import SparkSession  
import mlflow
import mlflow.sklearn
from mlflow.deployments import get_deploy_client
import json
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

#### Parse arguments

In [0]:
subscription_id = dbutils.widgets.get('subscription_id')
resource_group = dbutils.widgets.get('resource_group')
workspace = dbutils.widgets.get('workspace')

model_name = dbutils.widgets.get('model_name')
endpoint_name = dbutils.widgets.get('endpoint_name')
endpoint_description = dbutils.widgets.get('endpoint_description')

#### Helper functions

In [0]:
def get_aml_client(subscription_id, resource_group, workspace):
    """
    This function establishes a connection to an Azure Machine Learning (AML) workspace using a service principal.
    It retrieves the tenant ID, client ID, and client secret from a Databricks secret scope and returns an AML client object.

    Args:  
        subscription_id (str): The Azure subscription ID.  
        resource_group (str): The Azure resource group name.  
        workspace (str): The Azure Machine Learning workspace name.  

    Returns:  
        ml_client (azure.ml.core.client.MLClient): An Azure ML client object with an established connection to the specified AML workspace.  
    """  
    from azure.identity import ClientSecretCredential, DefaultAzureCredential
    import os

    tenant_id = dbutils.secrets.get(scope="amlsecretscope",key="tenantid")
    client_id = dbutils.secrets.get(scope="amlsecretscope",key="clientid")
    client_secret = dbutils.secrets.get(scope="amlsecretscope",key="clientsecret")
    
    os.environ["AZURE_TENANT_ID"] = tenant_id
    os.environ["AZURE_CLIENT_ID"] = client_id
    os.environ["AZURE_CLIENT_SECRET"] = client_secret

    credential = ClientSecretCredential(tenant_id, client_id, client_secret)

    ml_client = MLClient(
        credential, subscription_id, resource_group, workspace
    )
    print("Establishing connection to Azure ML workspace")
    return ml_client
    

def get_aml_mlflow_tracking_uri(ml_client):
    """
    This function retrieves the MLflow tracking URI for an Azure Machine Learning workspace.

    Args:  
        ml_client (object): The ml_client which references the target Azure Machine Learning workspace.  

    Returns:  
        tracking_uri (str): The MLflow tracking URI associated with the Azure Machine Learning workspace.  
    """  
    
    ws = ml_client.workspaces.get(workspace)
    return ws.mlflow_tracking_uri

def split_train_test_data(pandas_df, training_percent=0.8):
    """
    This function splits a Pandas DataFrame into training and testing datasets using the specified training percentage.

    Args:  
        pandas_df (pd.DataFrame): The input Pandas DataFrame to be split.  
        training_percent (float, optional): The percentage of data to be used for training. Defaults to 0.8.  

    Returns:  
        train_df (pd.DataFrame): The training dataset as a Pandas DataFrame.  
        test_df (pd.DataFrame): The testing dataset as a Pandas DataFrame.  
    """  
    from sklearn.model_selection import train_test_split
    train_df, test_df = train_test_split(pandas_df, test_size = 1.0 - training_percent)
    print('Splitting data into train/test subsets')
    return train_df, test_df


def get_deployments(endpoint_name, ml_client):
    """
    Retrieves the traffic allocation for a specific online endpoint using the provided ML client. 

    This function fetches the traffic allocation for a given online endpoint using the specified ML client. It returns the traffic allocation as a dictionary where each key-value pair represents the model version and its corresponding percentage of traffic. 

    Args:
    endpoint_name (str): The name of the online endpoint for which to retrieve the traffic allocation.
    ml_client (object): An instance of the ML client used to interact with the ML deployment environment. 

    Returns:
    dict: A dictionary containing the traffic allocation for the online endpoint, with model versions as keys and traffic percentages as values. 
    """
    endpoint = ml_client.online_endpoints.get(endpoint_name)
    return endpoint.traffic

def get_staged_deployment(endpoint_name, ml_client):
    """  
    Get the staged deployment of a given endpoint.  

    This function retrieves the staged deployment with the highest traffic percentage  
    for the specified endpoint. If no deployments are found, it returns None.

    Parameters:  
    endpoint_name (str): The name of the endpoint to retrieve the staged deployment from.  
    ml_client (object): The Machine Learning client instance to interact with the API.  

    Returns:  
    str or None: The name of the staged deployment with the highest traffic percentage,  
                 or None if no deployments are found.  
    """  
    endpoint = ml_client.online_endpoints.get(endpoint_name)
    mirror_traffic = endpoint.mirror_traffic
    if len(mirror_traffic.keys())==0:
        return None
    else:
        staged_deployment = max(mirror_traffic, key=lambda k: mirror_traffic[k])
        return staged_deployment

def update_traffic(deployment_name, endpoint_name, traffic_percent):
    """
    Updates the traffic allocation for a specific deployment within an online endpoint. 

    This function updates the traffic percentage for a given deployment within an online endpoint using the provided deployment_name, endpoint_name, and traffic_percent. The traffic allocation is updated using the deployment_client, and the function returns None. 

    Args:
    deployment_name (str): The name of the deployment for which to update the traffic allocation.
    endpoint_name (str): The name of the online endpoint containing the deployment.
    traffic_percent (int): The new traffic percentage to allocate to the specified deployment. 

    Returns:
    None 
    """
    deployment_client = get_deploy_client(mlflow.get_tracking_uri()) 
    traffic_config = {"traffic": {deployment_name: traffic_percent}}
    traffic_config_path = "traffic_config.json"
    
    with open(traffic_config_path, "w") as outfile:
        outfile.write(json.dumps(traffic_config))
        
    print(f"Updating traffic to {endpoint_name}")
    print(json.dumps(traffic_config))
    deployment_client.update_endpoint(
        endpoint=endpoint_name,
        config={"endpoint-config-file": traffic_config_path},
    )
    return

def update_mirror_traffic(deployment_name, endpoint_name, ml_client, traffic_percent):
    """
    Update the mirror traffic percentage of a deployment in an online endpoint. 

    This function retrieves the online endpoint using the given endpoint_name, and updates the mirror traffic percentage
    of the specified deployment_name within that endpoint. The new traffic percentage is set using the provided
    traffic_percent parameter. 

    Args:
    deployment_name (str): The name of the deployment for which the mirror traffic percentage needs to be updated.
    endpoint_name (str): The name of the online endpoint containing the specified deployment.
    ml_client (MLClient): An instance of MLClient used to manage and interact with the machine learning service.
    traffic_percent (float): The new mirror traffic percentage to be assigned to the deployment. 

    Returns:
    None
    """
    print(f"Updating mirror traffic at {endpoint_name}")
    print(json.dumps({deployment_name: traffic_percent}))
    endpoint = ml_client.online_endpoints.get(endpoint_name)
    endpoint.mirror_traffic = {deployment_name: traffic_percent}
    result = ml_client.begin_create_or_update(endpoint).result()
    return result
    
    
def get_current_deployment_name(deployments, model_name):
    """
    Get the current active deployment name for a given model from a dictionary of deployments. 

    This function takes a dictionary of deployments and the model_name as input, and returns the name of the current
    active deployment for the specified model. If there are no deployments, the function returns None. If there are
    deployments, the function returns the name of the deployment with the highest traffic percentage. 

    Args:
    deployments (dict): A dictionary containing deployment names as keys and their corresponding traffic percentages as values.
    model_name (str): The name of the model for which the current active deployment name is required. 

    Returns:
    str: The name of the current active deployment for the specified model, or None if there are no deployments.
    """
    if len(deployments.keys())==0:
        return None
    else:
        active_deployment = max(deployments, key=lambda k: deployments[k])
    return active_deployment
 
def get_new_deployment_name(deployments, model_name):
    """
    Get a new deployment name for a given model from a dictionary of deployments. 

    This function takes a dictionary of deployments and the model_name as input, and generates a new deployment name
    for the specified model. If there are no deployments, the function creates a deployment name with a 'BLUE' prefix.
    If there are existing deployments, it checks the prefix of the current active deployment and creates a new deployment
    name with the opposite color prefix (either 'BLUE' or 'GREEN'). 

    Args:
    deployments (dict): A dictionary containing deployment names as keys and their corresponding traffic percentages as values.
    model_name (str): The name of the model for which the new deployment name is required. 

    Returns:
    str: A new deployment name for the specified model with either a 'BLUE' or 'GREEN' prefix, depending on the current active deployment.
    """
    if len(deployments.keys())==0:
        deployment_name = f'BLUE-{model_name}'
    else:
        active_deployment = max(deployments, key=lambda k: deployments[k])
        if 'blue' in active_deployment.lower():
            deployment_name = f'GREEN-{model_name}'
        else:
            deployment_name = f'BLUE-{model_name}'
    return deployment_name.lower()

def remove_deployment(deployment_name, endpoint_name, ml_client):
    """
    Remove the deployment from an online endpoint. 

    This function removes the specified deployment_name from the online endpoint using the provided ml_client. The
    function begins the delete operation but does not wait for it to complete. 

    Args:
    deployment_name (str): The name of the deployment to be removed.
    endpoint_name (str): The name of the online endpoint where the deployment is located.
    ml_client (MLClient): An instance of MLClient used to manage and interact with the machine learning service. 

    Returns:
    None
    """
    print(f"Removing deployment {deployment_name} from endpoint {endpoint_name}")
    ml_client.online_deployments.begin_delete(deployment_name, endpoint_name)
    return
    

#### Promote current staged deployment

In [0]:
# Establish connection to target Azure ML workspace
ml_client = get_aml_client(subscription_id, resource_group, workspace)

# Get the mlflow tracking URI associated with the AML workspace
mlflow_tracking_uri = get_aml_mlflow_tracking_uri(ml_client)

# Update MLflow tracking URI
mlflow.set_tracking_uri(mlflow_tracking_uri)

# Get current deployments
deployments = get_deployments(endpoint_name, ml_client)

# Get name of staged deployment (has mirror traffic)
staged_deployment_name = get_staged_deployment(endpoint_name, ml_client)

# Get name of active deployment (production)
active_deployment_name = get_current_deployment_name(deployments, model_name)

# Update the endpoint
if staged_deployment_name != None:
    
    # Remove all mirror traffic
    update_mirror_traffic(staged_deployment_name, endpoint_name, ml_client, 0)
    
    # Route all traffic to staged endpoint
    update_traffic(staged_deployment_name, endpoint_name, 100)
    
    # Remove previous deployment
    remove_deployment(active_deployment_name, endpoint_name, ml_client)