# 1 - Use MLflow models in batch deployments

**Agenda:** Deploy MLflow model to Azure ML for batch inference using batch endpoints

- Azure ML supports no-code deployment of models created and logged with MLflow, means you don't have to provide a scoring script or an environment

**For no-code-deployment, Azure Machine Learning provides,**

1. A MLflow base image/curated environment that contains the required dependencies to run an Azure Machine Learning Batch job
2. Creates a batch job pipeline with a scoring script for you that can be used to process data using parallelization

In [None]:
# give permissions in the workspace

# workspace - Access control (IAM) - Add role assignment - job function roles - AzureML Data Scientist - next - Select members
# select user - review + assign

# 2 - Connect to Azure Machine Learning Workspace

In [1]:
# Import the required libraries

from azure.ai.ml import MLClient, Input
from azure.ai.ml.entities import (
    BatchEndpoint,
    ModelBatchDeployment,
    ModelBatchDeploymentSettings,
    Model,
    AmlCompute,
    Data,
    BatchRetrySettings,
    CodeConfiguration,
    Environment,
)
from azure.ai.ml.constants import AssetTypes, BatchDeploymentOutputAction
from azure.identity import DefaultAzureCredential

In [2]:
# Configure workspace details and get a handle to workspace

subscription_id = "3b57d2fe-08b1-4fe9-b535-f5c4387b9a66"
resource_group = "mlflow-rg60"
workspace = "mlflow-ws60"

ml_client = MLClient(DefaultAzureCredential(), subscription_id, resource_group, workspace)

# 3 - Registering the model

In [3]:
model_name = "heart-classifier-mlflow"
model_local_path = "model"

model = ml_client.models.create_or_update(
    Model(name=model_name, path=model_local_path, type=AssetTypes.MLFLOW_MODEL)
)

In [4]:
# Let's get the model

model = ml_client.models.get(name=model_name, label="latest")

# 4 - Create Batch Endpoint

### a) Configure the endpoint

In [5]:
# Define endpoint name
endpoint_name = "heart-classifier"

In [6]:
# Creating a unique endpoint name by including a random suffix


# Import libraries
import random
import string

# Create allowed characters
allowed_chars = string.ascii_lowercase + string.digits

# Generate random suffix
endpoint_suffix = "".join(random.choice(allowed_chars) for x in range(5))

# Combine endpoint name and suffix
endpoint_name = f"{endpoint_name}-{endpoint_suffix}"

# Print the endpoint name
print(f"Endpoint name: {endpoint_name}")

Endpoint name: heart-classifier-7kl74


In [7]:
# configure the endpoint

endpoint = BatchEndpoint(
    name=endpoint_name,
    description="A heart condition classifier for batch inference",
)

### b) Create the endpoint

In [8]:
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

<azure.ai.ml._restclient.v2022_05_01.models._models_py3.BatchEndpointData at 0x7f84f10e3c10>

# 5 - Create a batch deployment

### a) Creating an scoring script to work with the model

In [None]:
# MLflow models don't require an scoring script

### b) Creating the compute

In [9]:
# Define compute name
compute_name = "batch-cluster"

# Check for existing cluster, Create compute cluster (if needed)
if not any(filter(lambda m: m.name == compute_name, ml_client.compute.list())):
    compute_cluster = AmlCompute(
        name=compute_name, description="amlcompute", min_instances=0, max_instances=5
    )
    ml_client.begin_create_or_update(compute_cluster).result()

### c) Creating the environment

In [None]:
# MLflow models don't require an environment

### d) Configuring the deployment

In [10]:
deployment = ModelBatchDeployment(
    name="classifier-xgboost",
    description="A heart condition classifier based on XGBoost",
    endpoint_name=endpoint.name,
    model=model,
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        instance_count=2,
        max_concurrency_per_instance=2,
        mini_batch_size=10,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name="predictions.csv",
        retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
        logging_level="info",
    ),
)

Class ModelBatchDeploymentSettings: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ModelBatchDeployment: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


### e) Create the deployment

In [11]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

BatchDeployment({'provisioning_state': 'Succeeded', 'endpoint_name': 'heart-classifier-7kl74', 'type': None, 'name': 'classifier-xgboost', 'description': 'A heart condition classifier based on XGBoost', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/batchEndpoints/heart-classifier-7kl74/deployments/classifier-xgboost', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/compute-instance01/code/Users/vijaygadhave199', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7f84f1131810>, 'serialize': <msrest.serialization.Serializer object at 0x7f84f1131ba0>, 'model': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/models/heart-classifier-mlflow/versions/1', 'code_configu

In [12]:
# configure this new deployment as the default one

# Retrieve Batch Endpoint
endpoint = ml_client.batch_endpoints.get(endpoint.name)
# Set Default Deployment
endpoint.defaults.deployment_name = deployment.name
# Update Batch Endpoint
ml_client.batch_endpoints.begin_create_or_update(endpoint).result()

<azure.ai.ml._restclient.v2022_05_01.models._models_py3.BatchEndpointData at 0x7f84f10e3520>

In [13]:
print(f"The default deployment is {endpoint.defaults.deployment_name}")

The default deployment is classifier-xgboost


# 6 - Testing the deployment

### a) Creating a data asset

In [14]:
data_path = "data"
dataset_name = "heart-dataset-unlabeled"

# creates a data asset object
heart_dataset_unlabeled = Data(
    path=data_path,
    type=AssetTypes.URI_FOLDER,
    description="An unlabeled dataset for heart classification",
    name=dataset_name,
)

In [15]:
# creates or updates a data asset in Azure ML

ml_client.data.create_or_update(heart_dataset_unlabeled)

[32mUploading data (0.01 MBs): 100%|██████████| 13148/13148 [00:00<00:00, 82519.75it/s]
[39m



Data({'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_folder', 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'heart-dataset-unlabeled', 'description': 'An unlabeled dataset for heart classification', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/data/heart-dataset-unlabeled/versions/1', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/compute-instance01/code/Users/vijaygadhave199', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7f84f10e1f90>, 'serialize': <msrest.serialization.Serializer object at 0x7f84f10e39d0>, 'version': '1', 'latest_version': None, 'path': 'azureml://subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourcegroups/mlflow-rg60/workspaces/mlflow

In [16]:
# get a reference of the new data asset

heart_dataset_unlabeled = ml_client.data.get(name=dataset_name, label="latest")

### b) Creating an input for the deployment

In [17]:
input = Input(type=AssetTypes.URI_FOLDER, path=heart_dataset_unlabeled.id)

In [18]:
input

{'type': 'uri_folder', 'path': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/data/heart-dataset-unlabeled/versions/1'}

### c) Invoke the deployment

In [19]:
job = ml_client.batch_endpoints.invoke(endpoint_name=endpoint.name, input=input)

In [None]:
job = ml_client.batch_endpoints.invoke(
    deployment_name=deployment.name, endpoint_name=endpoint.name, input=input
)

### d) Get the details of invoked job

In [21]:
# get details and logs of the invoked job
ml_client.jobs.get(job.name)

Experiment,Name,Type,Status,Details Page
heart-classifier-7kl74,batchjob-840c71ee-d3d3-4c3c-8828-8577f5adf2db,pipeline,Completed,Link to Azure Machine Learning studio


In [22]:
# wait for the job to finish using the following code
ml_client.jobs.stream(job.name)

RunId: batchjob-840c71ee-d3d3-4c3c-8828-8577f5adf2db
Web View: https://ml.azure.com/runs/batchjob-840c71ee-d3d3-4c3c-8828-8577f5adf2db?wsid=/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourcegroups/mlflow-rg60/workspaces/mlflow-ws60

Execution Summary
RunId: batchjob-840c71ee-d3d3-4c3c-8828-8577f5adf2db
Web View: https://ml.azure.com/runs/batchjob-840c71ee-d3d3-4c3c-8828-8577f5adf2db?wsid=/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourcegroups/mlflow-rg60/workspaces/mlflow-ws60



### e) Exploring the results

In [23]:
scoring_job = list(ml_client.jobs.list(parent_job_name=job.name))[0]

In [24]:
print("Job name:", scoring_job.name)
print("Job status:", scoring_job.status)
print(
    "Job duration:",
    scoring_job.creation_context.last_modified_at
    - scoring_job.creation_context.created_at,
)

Job name: 675d9574-3900-495d-a009-1eaf2dd8cb4a
Job status: Completed
Job duration: 0:12:22.792539


### f) Download the results

In [25]:
ml_client.jobs.download(name=scoring_job.name, download_path=".", output_name="score")

Downloading artifact azureml://datastores/workspaceblobstore/paths/azureml/675d9574-3900-495d-a009-1eaf2dd8cb4a/score/ to named-outputs/score


In [26]:
# We can read this CSV file using the pandas library

import pandas as pd

score = pd.read_csv(
    "named-outputs/score/predictions.csv", names=["row", "prediction", "file"]
)

In [27]:
score.head()

Unnamed: 0,row,prediction,file
0,['heart-unlabeled-0.csv','0'],
1,['heart-unlabeled-0.csv','1'],
2,['heart-unlabeled-0.csv','0'],
3,['heart-unlabeled-0.csv','0'],
4,['heart-unlabeled-0.csv','0'],


# 7 - Customize deployment with an scoring script

### a) Create scoring script

In [28]:
%%writefile code/batch_driver.py

import os
import glob
import mlflow
import pandas as pd


def init():
    global model
    global model_input_types
    global model_output_names

    # AZUREML_MODEL_DIR is an environment variable created during deployment
    # It is the path to the model folder
    # Please provide your model's folder name if there's one
    model_path = glob.glob(os.environ["AZUREML_MODEL_DIR"] + "/*/")[0]

    # Load the model, it's input types and output names
    model = mlflow.pyfunc.load(model_path)
    if model.metadata.signature.inputs:
        model_input_types = dict(
            zip(
                model.metadata.signature.inputs.input_names(),
                model.metadata.signature.inputs.pandas_types(),
            )
        )
    if model.metadata.signature.outputs:
        if model.metadata.signature.outputs.has_input_names():
            model_output_names = model.metadata.signature.outputs.input_names()
        elif len(model.metadata.signature.outputs.input_names()) == 1:
            model_output_names = ["prediction"]


def run(mini_batch):
    print(f"run method start: {__file__}, run({len(mini_batch)} files)")

    data = pd.concat(
        map(
            lambda fp: pd.read_csv(fp).assign(filename=os.path.basename(fp)), mini_batch
        )
    )
    if model_input_types:
        data = data.astype(model_input_types)

    pred = model.predict(data)

    if pred is not pd.DataFrame:
        if not model_output_names:
            model_output_names = ["pred_col" + str(i) for i in range(pred.shape[1])]
        pred = pd.DataFrame(pred, columns=model_output_names)

    return pd.concat([data, pred], axis=1)


Writing code/batch_driver.py


### b) Indicate the environment

In [29]:
environment = Environment(
    name="batch-mlflow-xgboost",
    conda_file="environment/conda.yaml",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)

### c) Configure the deployment

In [30]:
deployment = ModelBatchDeployment(
    name="classifier-xgboost-custom",
    description="A heart condition classifier based on XGBoost with a custom scoring script",
    endpoint_name=endpoint.name,
    model=model,
    environment=environment,
    code_configuration=CodeConfiguration(code="code", scoring_script="batch_driver.py"),
    compute=compute_name,
    settings=ModelBatchDeploymentSettings(
        instance_count=2,
        max_concurrency_per_instance=2,
        mini_batch_size=10,
        output_action=BatchDeploymentOutputAction.APPEND_ROW,
        output_file_name="predictions.csv",
        retry_settings=BatchRetrySettings(max_retries=3, timeout=300),
        logging_level="info",
    ),
)

### d) Create the deployment

In [31]:
ml_client.batch_deployments.begin_create_or_update(deployment).result()

[32mUploading code (0.0 MBs): 100%|██████████| 1959/1959 [00:00<00:00, 22123.37it/s]
[39m



BatchDeployment({'provisioning_state': 'Succeeded', 'endpoint_name': 'heart-classifier-7kl74', 'type': None, 'name': 'classifier-xgboost-custom', 'description': 'A heart condition classifier based on XGBoost with a custom scoring script', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/batchEndpoints/heart-classifier-7kl74/deployments/classifier-xgboost-custom', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/compute-instance01/code/Users/vijaygadhave199', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7f84aed230a0>, 'serialize': <msrest.serialization.Serializer object at 0x7f84aed20040>, 'model': '/subscriptions/3b57d2fe-08b1-4fe9-b535-f5c4387b9a66/resourceGroups/mlflow-rg60/providers/Microsoft.MachineLearningServices/workspaces/mlflow-ws60/models/heart-c

### e) Clean up resources

In [32]:
ml_client.batch_endpoints.begin_delete(endpoint_name).result()

...........