# Use a command job to train a model in Azure Machine Learning

## Getting a handle to the workspace is needed to work with the AML SDK

In [3]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# authenticate
credential = DefaultAzureCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id="14e9fdd5-e0e8-49ae-8fc1-611b7619464a",
    resource_group_name="jknrg",
    workspace_name="arthrex_seminar",
    )

## Create a compute cluster to run your job

In [4]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu2-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        # if you run into an out of quota error, change the size to a comparable VM that is available.\
        # Learn more on https://azure.microsoft.com/en-us/pricing/details/machine-learning/.

        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

You already have a cluster named cpu2-cluster, we'll reuse it as is.


In [5]:
try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("This should not happen...")

You already have a cluster named cpu2-cluster, we'll reuse it as is.


## Create an environment in 3 steps

In [6]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)



In [7]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=1.0.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - mlflow== 2.8.0
    - azureml-mlflow==1.51.0
    - psutil>=5.8,<5.9
    - tqdm>=4.59,<4.60
    - ipykernel~=6.0
    - matplotlib

Overwriting ./dependencies/conda.yaml


In [8]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

custom_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults job",
    tags={"scikit-learn": "1.2.0", "mlflow":"2.8.0"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest",
)
custom_job_env = ml_client.environments.create_or_update(custom_job_env)

print(
    f"Environment with name {custom_job_env.name} is registered to workspace, the environment version is {custom_job_env.version}"
)

Environment with name aml-scikit-learn is registered to workspace, the environment version is 16


### Review environments

## Next steps - write a script, creat the Job command, and run the script

In [9]:
import os

train_src_dir = "./src"
os.makedirs(train_src_dir, exist_ok=True)

### Script includes the arguments, starts mlflow logging, does the usual for training, and registers the fitted model

In [10]:
%%writefile {train_src_dir}/main.py
import os
import argparse
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    args = parser.parse_args()
   
    # Start Logging
    #mlflow.start_run()


    # enable autologging
    mlflow.sklearn.autolog()

    # Start Logging
    mlflow.start_run()

    ###################
    #<prepare the data>
    ###################
    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)
    
    credit_df = pd.read_csv(args.data, header=1, index_col=0)

    mlflow.log_metric("num_samples", credit_df.shape[0])
    mlflow.log_metric("num_features", credit_df.shape[1] - 1)

    #Split train and test datasets
    train_df, test_df = train_test_split(
        credit_df,
        test_size=args.test_train_ratio,
    )
    ####################
    #</prepare the data>
    ####################

    ##################
    #<train the model>
    ##################
    # Extracting the label column
    y_train = train_df.pop("default payment next month")

    # convert the dataframe values to array
    X_train = train_df.values

    # Extracting the label column
    y_test = test_df.pop("default payment next month")

    # convert the dataframe values to array
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    clf = GradientBoostingClassifier(
        n_estimators=args.n_estimators, learning_rate=args.learning_rate
    )
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)

    print(classification_report(y_test, y_pred))
    ###################
    #</train the model>
    ###################

    ##########################
    #<save and register model>
    ##########################
    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file, note that the documentation uses 'trained_model' instead of "credit_defaults_model"
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.registered_model_name, "credit_defaults_model"),
    )
    ###########################
    #</save and register model>
    ###########################
    
    # Stop Logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Overwriting ./src/main.py


### Set-up the job command

In [11]:
from azure.ai.ml import command
from azure.ai.ml import Input

registered_model_name = "credit_defaults_model"

job = command(
    inputs=dict(
        data=Input(
            type="uri_file",
            path="https://azuremlexamples.blob.core.windows.net/datasets/credit_card/default_of_credit_card_clients.csv",
        ),
        test_train_ratio=0.2,
        learning_rate=0.25,
        registered_model_name=registered_model_name,
    ),
    code="./src/",  # location of source code
    command="python main.py --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} --learning_rate ${{inputs.learning_rate}} --registered_model_name ${{inputs.registered_model_name}}",
    environment="aml-scikit-learn@latest",
    #environment='requiredenv:1',
    
    compute="cpu2-cluster", #delete this line to use serverless compute
    display_name="credit_default_prediction",
)


### now submit the job -- note that we called it job

In [12]:
ml_client.create_or_update(job)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Experiment,Name,Type,Status,Details Page
johannorvik,gentle_bag_xhmhctl4j7,command,Starting,Link to Azure Machine Learning studio


## Confirm that the model is registered

In [14]:
# Let's pick the latest version of the model
latest_model_version = max(
    [int(m.version) for m in ml_client.models.list(name=registered_model_name)]
)

print(latest_model_version)

5


## Create the endpoint

In [15]:
import uuid

# Create a unique name for the endpoint
online_endpoint_name = "credit-endpoint-" + str(uuid.uuid4())[:8]

from azure.ai.ml.entities import ManagedOnlineEndpoint

# define an online endpoint
endpoint = ManagedOnlineEndpoint(
    name=online_endpoint_name,
    description="this is an online endpoint",
    auth_mode="key",
    tags={
        "training_dataset": "credit_defaults",
    },
)

# create the online endpoint
# expect the endpoint to take approximately 2 minutes.

endpoint = ml_client.online_endpoints.begin_create_or_update(endpoint).result()

### check out the endpoint

In [16]:
endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

print(
    f'Endpoint "{endpoint.name}" with provisioning state "{endpoint.provisioning_state}" is retrieved'
)

Endpoint "credit-endpoint-0d74fcaa" with provisioning state "Succeeded" is retrieved


## Deploy the model to the endpoint

In [17]:
from azure.ai.ml.entities import ManagedOnlineDeployment

# Choose the latest version of our registered model for deployment
model = ml_client.models.get(name=registered_model_name, version=latest_model_version)

# define an online deployment
# if you run into an out of quota error, change the instance_type to a comparable VM that is available.\
# Learn more on https://azure.microsoft.com/en-us/pricing/details/machine-learning/.
blue_deployment = ManagedOnlineDeployment(
    name="blue",
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type="Standard_DS3_v2",
    instance_count=1,
)

# create the online deployment
blue_deployment = ml_client.online_deployments.begin_create_or_update(
    blue_deployment
).result()

# blue deployment takes 100% traffic
# expect the deployment to take approximately 8 to 10 minutes.
endpoint.traffic = {"blue": 100}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

Check: endpoint credit-endpoint-0d74fcaa exists
Readonly attribute principal_id will be ignored in class <class 'azure.ai.ml._restclient.v2022_05_01.models._models_py3.ManagedServiceIdentity'>
Readonly attribute tenant_id will be ignored in class <class 'azure.ai.ml._restclient.v2022_05_01.models._models_py3.ManagedServiceIdentity'>


........................................................................................................................

ManagedOnlineEndpoint({'public_network_access': 'Enabled', 'provisioning_state': 'Succeeded', 'scoring_uri': 'https://credit-endpoint-0d74fcaa.eastus2.inference.ml.azure.com/score', 'openapi_uri': 'https://credit-endpoint-0d74fcaa.eastus2.inference.ml.azure.com/swagger.json', 'name': 'credit-endpoint-0d74fcaa', 'description': 'this is an online endpoint', 'tags': {'training_dataset': 'credit_defaults'}, 'properties': {'azureml.onlineendpointid': '/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/resourcegroups/jknrg/providers/microsoft.machinelearningservices/workspaces/arthrex_seminar/onlineendpoints/credit-endpoint-0d74fcaa', 'AzureAsyncOperationUri': 'https://management.azure.com/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/providers/Microsoft.MachineLearningServices/locations/eastus2/mfeOperationsStatus/oe:12f8988b-d58c-44b9-b407-766bfb799792:4fcadfb7-8896-4028-9b5a-8004d231b073?api-version=2022-02-01-preview'}, 'print_as_yaml': True, 'id': '/subscriptions/14e9fdd5-e0e8-49a

### Check the status of the endpoint

In [18]:
# return an object that contains metadata for the endpoint
#endpoint = ml_client.online_endpoints.get(name=online_endpoint_name)

# print a selection of the endpoint's metadata
print(
    f"Name: {endpoint.name}\nStatus: {endpoint.provisioning_state}\nDescription: {endpoint.description}"
)

# existing traffic details
print(endpoint.traffic)

# Get the scoring URI
print(endpoint.scoring_uri)

Name: credit-endpoint-0d74fcaa
Status: Succeeded
Description: this is an online endpoint
{'blue': 100}
https://credit-endpoint-0d74fcaa.eastus2.inference.ml.azure.com/score


### Test the endpoint with sample data

In [19]:
#import os

# Create a directory to store the sample request file.
deploy_dir = "./deploy"
os.makedirs(deploy_dir, exist_ok=True)

In [20]:
%%writefile {deploy_dir}/sample-request.json
{
  "input_data": {
    "columns": [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22],
    "index": [0, 1, 2],
    "data": [
            [20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0],
            [10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 10, 9, 8],
            [100, 19, 83, 72, 6, 5, 41, 3, 2, 1, 10, 9, 8, 7, 6, 5, 4, 3, 20, 10, 10, 9, 8]
            ]
                }
}

Overwriting ./deploy/sample-request.json


### Last step - invoke with data

In [21]:
# test the blue deployment with the sample data
ml_client.online_endpoints.invoke(
    endpoint_name=online_endpoint_name,
    deployment_name="blue",
    request_file="./deploy/sample-request.json",
)

'[1, 1, 0]'

### Get logs of the deployment

In [22]:
logs = ml_client.online_deployments.get_logs(
    name="blue", endpoint_name=online_endpoint_name, lines=10
)
print(logs)

Instance status:
SystemSetup: Succeeded
UserContainerImagePull: Succeeded
ModelDownload: Succeeded
UserContainerStart: Succeeded

Container events:
Kind: Pod, Name: ContainerReady, Type: Normal, Time: 2023-11-20T15:37:35.608523454Z, Message: Container is ready

Container logs:
ERROR:entry_module:Error collecting model_inputs collection request. name 'inputs_collector' is not defined
ERROR:entry_module:Error collecting model_outputs collection request. name 'outputs_collector' is not defined
2023-11-20 15:41:13,059 I [649] azmlinfsrv - POST /score 200 8.450ms 9
2023-11-20 15:41:13,060 I [649] gunicorn.access - 127.0.0.1 - - [20/Nov/2023:15:41:13 +0000] "POST /score HTTP/1.0" 200 9 "-" "azure-ai-ml/1.8.0 azsdk-python-core/1.27.1 Python/3.10.11 (Linux-5.15.0-1040-azure-x86_64-with-glibc2.31)"
2023-11-20 15:41:14,872 I [649] gunicorn.access - 127.0.0.1 - - [20/Nov/2023:15:41:14 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-20 15:41:15,557 I [649] gunicorn.access - 127.0.0.1 -

### Create a second deployment

In [32]:
# picking the model to deploy. Here we use the latest version of our registered model
#model = ml_client.models.get(name=registered_model_name, version=latest_model_version)

# define an online deployment using a more powerful instance type
# if you run into an out of quota error, change the instance_type to a comparable VM that is available.\
# Learn more on https://azure.microsoft.com/en-us/pricing/details/machine-learning/.
green_deployment = ManagedOnlineDeployment(
    name="green",
    endpoint_name=online_endpoint_name,
    model=model,
    instance_type="Standard_F4s_v2",
    instance_count=1,
)

# create the online deployment
# expect the deployment to take approximately 8 to 10 minutes
green_deployment = ml_client.online_deployments.begin_create_or_update(
    green_deployment
).result()

Check: endpoint credit-endpoint-5b56c1c7 exists


.................................................................................................................

### Scale the second deployment

In [33]:
# update definition of the deployment
green_deployment.instance_count = 2

# update the deployment
# expect the deployment to take approximately 8 to 10 minutes
ml_client.online_deployments.begin_create_or_update(green_deployment).result()

Check: endpoint credit-endpoint-5b56c1c7 exists


........................................................................................................................................................................

ManagedOnlineDeployment({'private_network_connection': None, 'provisioning_state': 'Succeeded', 'endpoint_name': 'credit-endpoint-5b56c1c7', 'type': 'Managed', 'name': 'green', 'description': None, 'tags': {}, 'properties': {'AzureAsyncOperationUri': 'https://management.azure.com/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/providers/Microsoft.MachineLearningServices/locations/eastus2/mfeOperationsStatus/od:12f8988b-d58c-44b9-b407-766bfb799792:ed46c16a-e2bb-469e-989a-760aeece7352?api-version=2023-04-01-preview'}, 'print_as_yaml': True, 'id': '/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/resourceGroups/jknrg/providers/Microsoft.MachineLearningServices/workspaces/arthrex_seminar/onlineEndpoints/credit-endpoint-5b56c1c7/deployments/green', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/arthrexci/code/Users/johannorvik', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7fb5b8109060>, 'model': '/su

### Update traffic allocation for deployments

In [34]:
endpoint.traffic = {"blue": 80, "green": 20}
ml_client.online_endpoints.begin_create_or_update(endpoint).result()

Readonly attribute principal_id will be ignored in class <class 'azure.ai.ml._restclient.v2022_05_01.models._models_py3.ManagedServiceIdentity'>
Readonly attribute tenant_id will be ignored in class <class 'azure.ai.ml._restclient.v2022_05_01.models._models_py3.ManagedServiceIdentity'>


ManagedOnlineEndpoint({'public_network_access': 'Enabled', 'provisioning_state': 'Succeeded', 'scoring_uri': 'https://credit-endpoint-5b56c1c7.eastus2.inference.ml.azure.com/score', 'openapi_uri': 'https://credit-endpoint-5b56c1c7.eastus2.inference.ml.azure.com/swagger.json', 'name': 'credit-endpoint-5b56c1c7', 'description': 'this is an online endpoint', 'tags': {'training_dataset': 'credit_defaults'}, 'properties': {'azureml.onlineendpointid': '/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/resourcegroups/jknrg/providers/microsoft.machinelearningservices/workspaces/arthrex_seminar/onlineendpoints/credit-endpoint-5b56c1c7', 'AzureAsyncOperationUri': 'https://management.azure.com/subscriptions/14e9fdd5-e0e8-49ae-8fc1-611b7619464a/providers/Microsoft.MachineLearningServices/locations/eastus2/mfeOperationsStatus/oe:12f8988b-d58c-44b9-b407-766bfb799792:0f229e03-f8bc-466a-8ec4-bdefe8029044?api-version=2022-02-01-preview'}, 'print_as_yaml': True, 'id': '/subscriptions/14e9fdd5-e0e8-49a

In [40]:
# You can invoke the endpoint several times
for i in range(30):
    ml_client.online_endpoints.invoke(
        endpoint_name=online_endpoint_name,
        request_file="./deploy/sample-request.json",
    )

In [41]:
# Logs from the green deployment

logs = ml_client.online_deployments.get_logs(
    name="green", endpoint_name=online_endpoint_name, lines=50
)
print(logs)

Instance status:
SystemSetup: Succeeded
UserContainerImagePull: Succeeded
ModelDownload: Succeeded
UserContainerStart: Succeeded

Container events:
Kind: Pod, Name: ContainerReady, Type: Normal, Time: 2023-11-15T19:07:04.170215926Z, Message: Container is ready

Container logs:
2023-11-15 19:49:53,462 I [649] gunicorn.access - 127.0.0.1 - - [15/Nov/2023:19:49:53 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-15 19:49:53,874 I [649] gunicorn.access - 127.0.0.1 - - [15/Nov/2023:19:49:53 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-15 19:50:03,462 I [649] gunicorn.access - 127.0.0.1 - - [15/Nov/2023:19:50:03 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-15 19:50:03,873 I [649] gunicorn.access - 127.0.0.1 - - [15/Nov/2023:19:50:03 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-15 19:50:13,463 I [649] gunicorn.access - 127.0.0.1 - - [15/Nov/2023:19:50:13 +0000] "GET / HTTP/1.0" 200 7 "-" "kube-probe/1.18"
2023-11-15 19:50:13,873 I [649] 

## Check the endpoints in the UI, go to the Azure Portal for details

In [None]:
# Send all traffic to the new deployment

endpoint.traffic = {"blue": 0, "green": 100}
ml_client.begin_create_or_update(endpoint).result()

In [None]:
# Delete the old deployment

ml_client.online_deployments.begin_delete(
    name="blue", endpoint_name=online_endpoint_name
).result()



In [None]:
# delete the endpoint

ml_client.online_endpoints.begin_delete(name=online_endpoint_name).result()