### Production ML pipeline in Azure AI - Machine Learning Studio for Credit Card Defaults model with Python SDK v2 & MLFlow


## Authentication to Azure

In [1]:
# Handle to the workspace
from azure.ai.ml import MLClient

# Authentication package
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

try:
    # Attempt to create a credential using DefaultAzureCredential
    credential = DefaultAzureCredential()
    
    # Check if the credential can successfully get a token for Azure Resource Manager
    credential.get_token("https://management.azure.com/.default")
    
except Exception as ex:
    # If DefaultAzureCredential fails, handle the exception
    
    # Fall back to InteractiveBrowserCredential for interactive sign-in
    credential = InteractiveBrowserCredential()


## Access the Azure workspace

In [2]:
# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id="...",
    resource_group_name="rg-mlops",
    workspace_name="azureml-WS05",
)

## Get the data

In [3]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

web_path = "https://azuremlexamples.blob.core.windows.net/datasets/credit_card/default%20of%20credit%20card%20clients.csv"

credit_data = Data(
    name="creditcard_blob_csv_defaults",
    path=web_path,
    type=AssetTypes.URI_FILE,
    description="Dataset for credit card defaults",
    tags={"source_type": "web", "source": "azure_ml_creditcard_default_data"},
    version="1.0.0",
)

## Create a data asset

In [4]:
# Create or update a dataset in the Azure Machine Learning workspace
credit_data = ml_client.data.create_or_update(credit_data)

# Provide feedback to the user about the operation result
print(
    f"Dataset with name {credit_data.name} was registered to workspace, the dataset version is {credit_data.version}"
)


Dataset with name creditcard_blob_csv_defaults was registered to workspace, the dataset version is 1.0.0


## Create an environment

In [5]:
# Import the 'os' module, which provides operating system-related functions.
import os

# Define a directory path for dependencies, in this case, "./dependencies."
dependencies_dir = "./dependencies"

# Create a directory at the specified path if it doesn't already exist (exist_ok=True).
os.makedirs(dependencies_dir, exist_ok=True)


## Create the file in the dependencies directory.

In [6]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - azureml-mlflow==1.42.0

Overwriting ./dependencies/conda.yaml


## Use the yaml file to create and register this custom environment in the workspace

In [7]:
# Import the 'Environment' class from the Azure Machine Learning SDK
from azure.ai.ml.entities import Environment

# Define a custom environment name
custom_env_name = "creditcard-default-scikit-learn-env"

# Create an 'Environment' object to define the custom environment
pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.1",
)

# Create or update the custom environment in the Azure Machine Learning workspace
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

# Provide user feedback about the environment registration
print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)


Environment with name creditcard-default-scikit-learn-env is registered to workspace, the environment version is 0.1.1


## The training pipeline



## Create component 1: data prep (using programmatic definition)

This component handles the preprocessing of the data. The preprocessing task is performed in the *data_prep.py* python file.



In [8]:
# Import the 'os' module, which provides operating system-related functions.
import os

# Define a directory path for data preparation source code, in this case, "./components/data_prep."
data_prep_src_dir = "./components/data_prep"

# Create a directory at the specified path if it doesn't already exist (exist_ok=True).
os.makedirs(data_prep_src_dir, exist_ok=True)


This script performs the simple task of splitting the data into train and test datasets. 

[MLFlow](https://mlflow.org/docs/latest/tracking.html) will be used to log the parameters and metrics during our pipeline run.

In [9]:
%%writefile {data_prep_src_dir}/data_prep.py

# Import necessary libraries and modules
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow

# Define the main function for the script
def main():
    """Main function of the script."""

    # Define command-line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    # Start an MLflow run for logging
    mlflow.start_run()

    # Print the arguments passed to the script
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    # Load input data from the specified path
    print("input data:", args.data)
    credit_df = pd.read_csv(args.data, header=1, index_col=0)

    # Log metrics about the loaded dataset
    mlflow.log_metric("num_samples", credit_df.shape[0])
    mlflow.log_metric("num_features", credit_df.shape[1] - 1)

    # Split the dataset into training and testing sets
    credit_train_df, credit_test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio,
    )

    # Save the training and testing datasets to the specified paths
    credit_train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)
    credit_test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)

    # End the MLflow run
    mlflow.end_run()

# Check if the script is being run directly
if __name__ == "__main__":
    main()


Overwriting ./components/data_prep/data_prep.py


In [10]:
# Import necessary Azure Machine Learning libraries
from azure.ai.ml import command
from azure.ai.ml import Input, Output

# Define a data preparation component using Azure Machine Learning commands
data_prep_component = command(
    name="data_prep_credit_defaults",
    display_name="Data preparation for training",
    description="Reads a .xl input, splits the input to train and test",
    inputs={
        "data": Input(type="uri_folder"),  # Define an input parameter for the component
        "test_train_ratio": Input(type="number"),  # Define another input parameter
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),  # Define an output parameter
        test_data=Output(type="uri_folder", mode="rw_mount"),  # Define another output parameter
    ),
    # Specify the source folder for the component's code
    code=data_prep_src_dir,
    # Define the command to be executed for this component
    command="""python data_prep.py \
            --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            """,
    # Specify the environment for running this component
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)


In [11]:
# Register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

Component data_prep_credit_defaults with Version 2023-10-21-14-41-24-1346269 is registered


## Create component 2: training (using yaml definition)

In [12]:
# Import the 'os' module, which provides operating system-related functions.
import os

# Define a directory path for training source code, in this case, "./components/train."
train_src_dir = "./components/train"

# Create a directory at the specified path if it doesn't already exist (exist_ok=True).
os.makedirs(train_src_dir, exist_ok=True)


Create the training script in the directory:

In [13]:
%%writefile {train_src_dir}/train.py
import argparse
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
import os
import pandas as pd
import mlflow


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# Start Logging
mlflow.start_run()

# enable autologging
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    # paths are mounted as folder, therefore, we are selecting the file from folder
    train_df = pd.read_csv(select_first_file(args.train_data))

    # Extracting the label column
    y_train = train_df.pop("default payment next month")

    # convert the dataframe values to array
    X_train = train_df.values

    # paths are mounted as folder, therefore, we are selecting the file from folder
    test_df = pd.read_csv(select_first_file(args.test_data))

    # Extracting the label column
    y_test = test_df.pop("default payment next month")

    # convert the dataframe values to array
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, learning_rate=args.learning_rate
    )
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    print(classification_report(y_test, y_pred))

    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.model, "trained_model"),
    )

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/train/train.py


 Once the model is trained, the model file is saved and registered to the workspace. The registered model can now be used in inferencing endpoints.


First, create the *yaml* file describing the component

In [14]:
%%writefile {train_src_dir}/train.yml
# <component>
name: train_credit_defaults_model
display_name: Train Credit Defaults Model
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  train_data: 
    type: uri_folder
  test_data: 
    type: uri_folder
  learning_rate:
    type: number     
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: .
environment:
  # for this step, we'll use an AzureML curate environment
  azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data}} 
  --test_data ${{inputs.test_data}} 
  --learning_rate ${{inputs.learning_rate}}
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}
# </component>


Overwriting ./components/train/train.yml


Once the `yaml` file and the script are ready, we can create the component using `load_component()`. 

In [15]:
# importing the Component Package
from azure.ai.ml import load_component

# Loading the component from the yml file
train_component = load_component(source=os.path.join(train_src_dir, "train.yml"))

Now create and register the component:

In [16]:
# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component)

# Create (register) the component in your workspace
print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

[32mUploading train (0.0 MBs):   0%|          | 0/3723 [00:00<?, ?it/s][32mUploading train (0.0 MBs): 100%|██████████| 3723/3723 [00:00<00:00, 38786.47it/s]
[39m



Component train_credit_defaults_model with Version 2023-10-21-14-41-28-5067901 is registered


## Create the pipeline from components


In [17]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute="serverless",
    description="E2E data_perp-train pipeline",
)
def credit_defaults_pipeline(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_learning_rate,
    pipeline_job_registered_model_name,
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = data_prep_component(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio,
    )

    # using train_func like a python call with its own inputs
    train_job = train_component(
        train_data=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data=data_prep_job.outputs.test_data,  # note: using outputs from previous step
        learning_rate=pipeline_job_learning_rate,  # note: using a pipeline input as parameter
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
    }

Use pipeline definition to instantiate a pipeline with my dataset, split rate of choice and the name 

In [18]:
registered_model_name = "credit_defaults_model"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = credit_defaults_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=credit_data.path),
    pipeline_job_test_train_ratio=0.25,
    pipeline_job_learning_rate=0.05,
    pipeline_job_registered_model_name=registered_model_name,
)

## Submit the job 


Once completed, the pipeline will register a model in the workspace as a result of training.

In [19]:
import webbrowser

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)
# open the pipeline in web browser
webbrowser.open(pipeline_job.studio_url)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


False

## Deploy the model as an online endpoint

## Create a new online endpoint

In [20]:
import uuid

# Creating a unique name for the endpoint
online_endpoint_name = "credit-endpoint-" + str(uuid.uuid4())[:8]

In [21]:
from azure.ai.ml.entities import (
    ManagedOnlineEndpoint,
    ManagedOnlineDeployment,
    Model,
    Environment,
)

# create an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="this is an online endpoint",
    auth_mode="key",
    tags={
        "training_dataset": "credit_defaults",
        "model_type": "sklearn.GradientBoostingClassifier",
    },
)

endpoint_result = ml_client.begin_create_or_update(endpoint).result()

print(
    f"Endpint {endpoint_result.name} provisioning state: {endpoint_result.provisioning_state}"
)

Endpint credit-endpoint-7ef6e030 provisioning state: Succeeded


Once the endpoint is created, can be retrieved as below

In [22]:
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

print(
    f'Endpint "{endpoint.name}" with provisioning state "{endpoint.provisioning_state}" is retrieved'
)

Endpint "credit-endpoint-7ef6e030" with provisioning state "Succeeded" is retrieved


## Deploy the model to the endpoint

Once the endpoint is created, deploy the model with the entry script. 

In [23]:
# Let's pick the latest version of the model
latest_model_version = max(
    [int(m.version) for m in ml_client.models.list(name=registered_model_name)]
)

Deploy the latest version of the model.  


In [24]:
# picking the model to deploy. Here we use the latest version of our registered model
model = ml_client.models.get(name=registered_model_name, version=latest_model_version)


# create an online deployment.
blue_deployment = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type="Standard_F4s_v2",
    instance_count=1,
)

blue_deployment_results = ml_client.online_deployments.begin_create_or_update(
    blue_deployment
).result()

print(
    f"Deployment {blue_deployment_results.name} provisioning state: {blue_deployment_results.provisioning_state}"
)

Check: endpoint credit-endpoint-7ef6e030 exists


..............................................................................................Deployment blue provisioning state: Succeeded


### Test with a sample query

Now that the model is deployed to the endpoint, we can run inference with it by creating a sample request file following the design expected in the run method in the score script.

In [25]:
deploy_dir = "./deploy"
os.makedirs(deploy_dir, exist_ok=True)

In [26]:
%%writefile {deploy_dir}/sample-request.json
{
  "input_data": {
    "columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],
    "index": [0, 1],
    "data": [
            [20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0],
            [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8]
        ]
  }
}

Writing ./deploy/sample-request.json


In [27]:
# test the blue deployment with some sample data
ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    request_file="./deploy/sample-request.json",
    deployment_name="blue",
)

'[1, 0]'

## Clean up resources

In [28]:
# ml_client.online_endpoints.begin_delete(name=online_endpoint_name)