# Run Pipeplines with Azure Machine Learning Python SDK

You use the Azure python SDK to orchestrate steps in a Pipeline will run in succession or in Parallel on a compute target.

You'll need the latest version of the **azure.ai.ml** package to run the code in this notebook. Run the cell below to verify that it is installed.

> **Note**:
> If the **azure.ai.ml** package is not installed, run `pip install azure.ai.ml` to install it.

In [87]:
%pip show azure.ai.ml

Name: azure-ai-ml
Version: 1.20.0
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /opt/anaconda3/envs/automate/lib/python3.12/site-packages
Requires: azure-common, azure-core, azure-mgmt-core, azure-storage-blob, azure-storage-file-datalake, azure-storage-file-share, colorama, isodate, jsonschema, marshmallow, msrest, opencensus-ext-azure, opencensus-ext-logging, pydash, pyjwt, pyyaml, strictyaml, tqdm, typing-extensions
Required-by: 
Note: you may need to restart the kernel to use updated packages.


# Connect to a Workspace

To connect to a workspace, we need identifier parameters - a subscription ID, resource group name, and workspace name. A `config.json` file containing these parameters can be downloaded from the Azure Machine Learning workspace or Azure portal.

In [2]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(
    credential=DefaultAzureCredential(), path="../../config.json"
)

Found the config file in: ../../config.json


## Register the dataset

Azure Machine Learning providers several datastores that encapsulates a Dataset. Be considerate about the kind of datastores, use cases, and associated costs to determine the best datasource. Here we use the default datasource which is `blob` data store.

**Authenticate with `Azure CLI` is required here**

## Register the dataset

Azure Machine Learning providers several datastores that encapsulates a Dataset. Be considerate about the kind of datastores, use cases, and associated costs to determine the best datasource. Here we use the default datasource which is `blob` data store.

**Authenticate with `Azure CLI` is required here**

In [89]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

file = "../../data/diabetes.csv"

diabetes_data = Data(
    name="diabetes_csv",
    path=file,
    type=AssetTypes.URI_FILE,
    description="Dataset for Diabetes model training",
    tags={"source_type": "file", "source": "Local file"},
    version="1.0.0",
)

try:
    diabetes_data = ml_client.data.create_or_update(diabetes_data)
except Exception as ex:
    print("Exception while registering dataset ", ex)

[32mUploading diabetes.csv[32m (< 1 MB): 100%|██████████| 518k/518k [00:00<00:00, 987kB/s]
[39m



# Create Component dirs

In [3]:
import os

components_dirs = {
    "eda": "./components/eda",
    "data_prep": "./components/data_prep",
    "train": "./components/train",
}

for key, value in components_dirs.items():
    os.makedirs(value, exist_ok=True)

# Data preparation script


In [65]:
%%writefile {components_dirs["data_prep"]}/data_prep.py
import os
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
import logging
import mlflow

def main():
    # Parse job parameters
    parser = argparse.ArgumentParser()
    parser.add_argument('--data', type=str)
    parser.add_argument('--test_size', type=float, default=0.30)
    parser.add_argument('--train_data', type=str)
    parser.add_argument('--test_data', type=str)
    args = parser.parse_args()

    # Start logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}\n" for k, v in vars(args).items()))

    data = pd.read_csv(args.data, header=0)
    print("num_samples:", data.shape[0])
    mlflow.log_metric("num_samples:", data.shape[0])
    print("num_features", data.shape[1] - 1)
    mlflow.log_metric("num_features", data.shape[1] - 1)

    train_data, test_data,  = train_test_split(data, test_size=args.test_size, random_state=0)

    # args are mounted folders, so write the data into the folder
    train_data.to_csv(os.path.join(args.train_data,"train_data.csv"), index=False)
    test_data.to_csv(os.path.join(args.test_data,"test_data.csv"), index=False)

    # Stop logging
    mlflow.end_run()
    
if __name__ == "__main__":
    main()

Overwriting ./components/data_prep/data_prep.py


## Exploratory data analysis script

Plot correlation, feature-wise distributions and pairwise scatter plots

In [66]:
%%writefile {components_dirs["eda"]}/diabetes-exploratory-plots.py
# Plot distrubtions step

import pandas as pd
import seaborn as sns
import os
import matplotlib.pyplot as plt
from itertools import combinations
import argparse

# Create a function that we can re-use
def plot_correlations(data,output_path="outputs"):
    """
    This function will make a correlation graph and save it
    """
    correlation = data.corr()
    print("Correlation between features\n", correlation)

    fig = plt.figure(figsize=(10, 12))
    sns.heatmap(data=correlation, annot=True)
    plt.title("Correlation betweeen features")

    # Save plot
    filename = os.path.join(output_path, "correlations-between-features.png") 
    fig.savefig(filename)


def plot_distribution(var_data, column_name=None, output_path="outputs"):
    """
    This function will make a distribution (graph) and save it
    """

    # Get statistics
    min_val = var_data.min()
    max_val = var_data.max()
    mean_val = var_data.mean()
    med_val = var_data.median()
    mod_val = var_data.mode()[0]

    print(
        "{} Statistics:\nMinimum:{:.2f}\nMean:{:.2f}\nMedian:{:.2f}\nMode:{:.2f}\nMaximum:{:.2f}\n".format(
            "" if column_name is None else column_name,
            min_val,
            mean_val,
            med_val,
            mod_val,
            max_val,
        )
    )

    # Create a figure for 2 subplots (2 rows, 1 column)
    fig, ax = plt.subplots(2, 1, figsize=(10, 4))

    # Plot the histogram
    ax[0].hist(var_data)
    ax[0].set_ylabel("Frequency")

    # Add lines for the mean, median, and mode
    ax[0].axvline(x=min_val, color="gray", linestyle="dashed", linewidth=2, label="min")
    ax[0].axvline(x=mean_val, color="cyan", linestyle="dashed", linewidth=2, label = "mean")
    ax[0].axvline(x=med_val, color="red", linestyle="dashed", linewidth=2, label = "median")
    ax[0].axvline(x=mod_val, color="yellow", linestyle="dashed", linewidth=2, label = "mode")
    ax[0].axvline(x=max_val, color="gray", linestyle="dashed", linewidth=2 , label = "max")
    ax[0].legend()

    # Plot the boxplot
    ax[1].boxplot(var_data, vert=False)
    xlabel = "Value" if column_name is None else column_name
    ax[1].set_xlabel(xlabel)

    # Add a title to the Figure
    title = (
        "Data Distribution"
        if column_name is None
        else "{} Data Distribution".format(column_name)
    )
    fig.suptitle(title)

    # Save plot
    filename = os.path.join(output_path,"{}-distribution.png".format(column_name))
    fig.savefig(filename)


def plot_scatters(x_y_data, output_path="outputs"):
    """
    Plot scatter plots with :y_column: on y-axis and save them. 
    """
    
    x_column = x_y_data.columns.values[0]
    y_column = x_y_data.columns.values[1]

    fig = plt.figure(figsize=(10, 12))
    sns.regplot(data=x_y_data,x=x_column, y=y_column)
    plt.xlabel(x_column)
    plt.ylabel(y_column)
    plt.title("Scatter plot of {} vs {}".format(x_column,y_column))

    # Save plot
    filename = os.path.join(output_path,"Scatter plot of {} vs {}.png".format(x_column,y_column))
    fig.savefig(filename)

def main():
    print("Loading Data...")

    parser = argparse.ArgumentParser()
    parser.add_argument('--data', type=str)
    parser.add_argument('--plots_dir', type=str)
    args = parser.parse_args()

    diabetes = pd.read_csv(args.data, header= 0)

    # plot correlations
    plot_correlations(data=diabetes, output_path = args.plots_dir)

    # plot distributions
    exlude_column = set(["Diabetic", "PatientID"])
    columns = diabetes.columns.values
    for x in columns:
        if x not in exlude_column:
            plot_distribution(var_data=diabetes[x],column_name=x, output_path = args.plots_dir)

    # plot scatter plots
    columns = set(columns)
    column_comb=list(combinations(columns-exlude_column,2))
    column_comb = [list(x) for x in column_comb]

    for x_y_pairs in column_comb:
        plot_scatters(diabetes[x_y_pairs], output_path = args.plots_dir)

if __name__ == "__main__":
    main()

Overwriting ./components/eda/diabetes-exploratory-plots.py


## Model Training script

To train a model, you'll first create the **diabetes_training.py** script in the **src** folder. 

In [84]:
%%writefile {components_dirs["train"]}/diabetes-training.py
# import libraries
import os
import argparse
import pandas as pd
import numpy as np
import pickle
import logging
import mlflow
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve

def main():

    # Parse job parameters
    parser = argparse.ArgumentParser()
    parser.add_argument('--reg_rate', type=float, default=0.01)
    parser.add_argument('--train_data', type=str, help="path to train data")
    parser.add_argument('--test_data', type=str, help="path to test data")
    parser.add_argument("--trained_model", type=str,help="path to trained data")
    args = parser.parse_args()

    # Start logging
    mlflow.start_run()
    
    # load the diabetes dataset
    print("Loading Data...")
    train_file = os.path.join(args.train_data, "train_data.csv")
    test_file = os.path.join(args.test_data, "test_data.csv")
    train_df = pd.read_csv(train_file, header=0, index_col="PatientID")
    y_train = train_df.pop("Diabetic")
    x_train = train_df.values
    test_df = pd.read_csv(test_file, header=0, index_col="PatientID")
    y_test = test_df.pop("Diabetic")
    x_test = test_df.values

    print("num_train_samples:", x_train.shape[0])
    mlflow.log_metric("num_train_samples:", x_train.shape[0])
    print("num_test_samples:", x_test.shape[0])
    mlflow.log_metric("num_test_samples", x_test.shape[0])
    print("num_features:", x_train.shape[1])
    mlflow.log_metric("num_features", x_train.shape[1])
    print("features:", train_df.columns.values)
    mlflow.log_param("features", train_df.columns.values)


    # train a logistic regression model
    print('Training a logistic regression model with regularization rate of', args.reg_rate)
    mlflow.log_param("estimator","LogisticRegression")
    mlflow.log_param("regularization_rate", args.reg_rate)
    mlflow.log_param("solver", "liblinear")
    model = LogisticRegression(C=1/args.reg_rate, solver="liblinear").fit(x_train, y_train)

    # calculate accuracy
    y_hat = model.predict(x_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', float(acc))
    mlflow.log_metric('Accuracy:', float(acc))

    # calculate AUC
    y_scores = model.predict_proba(x_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC:', str(auc))
    mlflow.log_metric('AUC:', str(auc))

    # Save the model to file
    print("Saving model to file")
    filename = os.path.join(args.trained_model,"model.pkl")
    os.makedirs('outputs', exist_ok=True)
    with open(filename, 'wb') as file:
        pickle.dump(model,file)

    # Stop logging
    mlflow.end_run()

if __name__ == "__main__":
    main()

Overwriting ./components/train/diabetes-training.py


# Create Environment 

In [7]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

In [68]:
%%writefile {dependencies_dir}/conda.yaml
name: sklearn-1.5
channels:
- conda-forge
- anaconda
dependencies:
- python=3.10
- pip=21.3.1
- pandas~=1.5.3
- scipy~=1.10.0
- numpy~=1.22.0
- pip:
  - scikit-learn-intelex==2024.6.0
  - azureml-core==1.57.0
  - azureml-defaults==1.57.0
  - azureml-mlflow==1.57.0.post1
  - azureml-telemetry==1.57.0
  - scikit-learn~=1.5.0
  - joblib~=1.2.0
  # azureml-automl-common-tools packages
  - py-spy==0.3.12
  - debugpy~=1.6.3
  - ipykernel~=6.0
  - tensorboard
  - psutil~=5.8.0
  - matplotlib~=3.5.0
  - seaborn~=0.13.2
  - tqdm~=4.66.3
  - py-cpuinfo==5.0.0
  - torch-tb-profiler~=0.4.0


Overwriting ./dependencies/conda.yaml


In [69]:
from azure.ai.ml.entities import Environment

env_name = "diabetes-scikit-learn"

env = Environment(
    name=env_name,
    description="Custom environment for Diabetes prediction pipeline",
    tags={"scikit-learn": "1.5.0"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
)
env = ml_client.environments.create_or_update(env)

print(
    f"Environment with name {env.name} is registered to workspace, the environment version is {env.version}"
)

Environment with name diabetes-scikit-learn is registered to workspace, the environment version is 4


# Create pipeline components

In [85]:
from azure.ai.ml import command, Input, Output

command_env = "{0}:{1}".format(env.name, env.version)
experiment_name = "diabetes-training"

data_prep_command = command(
    name="data_prep_diabetes",
    display_name="Data preparation for training",
    description="Read the diabetes csv data and splits into train and test sets",
    inputs=dict(data=Input(type="uri_file"), test_size=Input(type="number")),
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount"),
    ),
    code=components_dirs["data_prep"],
    command="python data_prep.py  --data ${{inputs.data}} --test_size ${{inputs.test_size}}  --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}}",
    environment=command_env,
    experiment_name=experiment_name,
)

# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_command.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

eda_command = command(
    name="eda_diabetes",
    display_name="Exploratory data analysis",
    description="Reads the diabetes csv data and plot exploratory data analysis graphs",
    inputs=dict(data=Input(type="uri_file")),
    outputs=dict(plots_dir=Output(type="uri_folder", mode="rw_mount")),
    code=components_dirs["eda"],
    command="python diabetes-exploratory-plots.py --data ${{inputs.data}} --plots ${{outputs.plots_dir}}",
    environment=command_env,
    experiment_name=experiment_name,
)

# Now we register the component to the workspace
eda_command_component = ml_client.create_or_update(eda_command.component)

# Create (register) the component in your workspace
print(
    f"Component {eda_command_component.name} with Version {eda_command_component.version} is registered"
)

model_train_command = command(
    name="model_train_diabetes",
    display_name="Train model",
    description="Train a Logistic Regression model diabetes dataset",
    inputs=dict(
        train_data=Input(type="uri_folder", mode="rw_mount"),
        test_data=Input(type="uri_folder", mode="rw_mount"),
        reg_rate=Input(type="number"),
    ),
    outputs=dict(trained_model=Output(type="uri_folder", mode="rw_mount")),
    code=components_dirs["train"],
    command="python diabetes-training.py --train_data ${{inputs.train_data}} --test_data ${{inputs.test_data}} --reg_rate ${{inputs.reg_rate}} --trained_model ${{outputs.trained_model}}",
    environment=command_env,
    experiment_name=experiment_name,
)

# Now we register the component to the workspace
model_train_command_component = ml_client.create_or_update(
    model_train_command.component
)

# Create (register) the component in your workspace
print(
    f"Component {model_train_command_component.name} with Version {model_train_command_component.version} is registered"
)

Component data_prep_diabetes with Version 2024-09-15-12-22-47-3858108 is registered
Component eda_diabetes with Version 2024-09-15-12-22-51-8291496 is registered


[32mUploading train (0.0 MBs): 100%|██████████| 2670/2670 [00:00<00:00, 16188.70it/s]
[39m



Component model_train_diabetes with Version 2024-09-15-12-22-57-9253183 is registered


## Building the Pipeline
Test data size (`test_size`) and Regularization rate (`reg_rate`) for the `LogisticRegression` are passed as parameters. Other parameters such as a registered dataset in a data store could also be passed.

In [86]:
from azure.ai.ml import dsl, Input

# Define the pipeline
@dsl.pipeline(compute="serverless", description=" Diabetes prediction pipeline")
def diabetes_pipeline(data_input, test_size, reg_rate):
    data_prep_job =data_prep_component(
                data = data_input,
                test_size = test_size
    )

    eda_job = eda_command_component(
        data = data_input
    )

    model_train_job = model_train_command_component(
        train_data = data_prep_job.outputs.train_data,
        test_data = data_prep_job.outputs.test_data,
        reg_rate = reg_rate
    )

    return {
        "train_data": data_prep_job.outputs.train_data,
        "test_data": data_prep_job.outputs.test_data,
        "eda_plots": eda_job.outputs.plots_dir,
        "trained_model": model_train_job.outputs.trained_model
    }


# Define arguments / parameters
diabetes_data = ml_client.data.get("diabetes_csv", version="1.0.0")
test_size = 0.30
reg_rate = 0.01

# Create the pipeline
pipeline = diabetes_pipeline(
    data_input=Input(type="uri_file", path= diabetes_data.path),
    test_size=test_size,
    reg_rate=reg_rate
)

#submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name= experiment_name
)

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored
pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


<img src="03.completed_pipeline.png" alt="drawing" width="500"/>
<img src="06.Distribution_plots.png" alt="drawing" width="500"/>

In [None]:
import webbrowser
# Open job url
webbrowser.open(pipeline_job.studio_url)

# Model selection and Explainability with Automated Machine Learning

## Create a compute target

In [91]:

from azure.ai.ml.entities import ComputeInstance, AmlCompute


compute_name = "diabest-compute"

computeInstance = ComputeInstance(
    name=compute_name, 
    size="Standard_A1_v2", 
    idle_time_before_shutdown_minutes=30
)

ml_client.begin_create_or_update(computeInstance)

"""
cluster_basic = AmlCompute(
    name=compute_name,
    type="amlcompute",
    size="Standard_A1_v2",
    min_instances=0,
    max_instances=4,
    idle_time_before_scale_down=120,
)
ml_client.begin_create_or_update(cluster_basic)
"""

'\ncluster_basic = AmlCompute(\n    name=compute_name,\n    type="amlcompute",\n    size="Standard_A1_v2",\n    min_instances=0,\n    max_instances=4,\n    idle_time_before_scale_down=120,\n)\nml_client.begin_create_or_update(cluster_basic)\n'

## Automated ML requires MLTable. 


In [19]:
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.entities import Data
#Create MLTable from Diabetes data asset

# Define the Data asset object
diabetes_mlt = Data(
    path="../../data/diabetes.mltable",
    type=AssetTypes.MLTABLE,
    description="Diabetes table data in mltable format",
    name="diabetes_mlt2",
    version="2",
)
ml_client.data.create_or_update(diabetes_mlt)
diabetes_mlt.path

'azureml://datastores/workspaceblobstore/paths/LocalUpload/0e528e47e39c2caeecb4c8d2c9d3a61f/diabetes.mltable'

In [20]:
from azure.ai.ml import automl, Input

diabetes_data_ml_table = Input(
    type= AssetTypes.MLTABLE,
    path= diabetes_mlt.path
) 

# Configure the classification job
classification_job = automl.classification(
    compute=compute_name,
    experiment_name=experiment_name,
    training_data=diabetes_data_ml_table,
    target_column_name="Diabetic",
    primary_metric="AUC_weighted",
    n_cross_validations=5,
    enable_model_explainability=True,
    tags={"data": "diabetes_csv"}
)

# set job limits. These are optional
classification_job.set_limits(
    timeout_minutes=600, 
    trial_timeout_minutes=20, 
    max_trials=5,
    enable_early_termination=True,
)

# set training properties. Do not use logistic regression
classification_job.set_training(
    blocked_training_algorithms=["logistic_regression"], 
    enable_onnx_compatible_models=True
)

# Submit the AutoML job
returned_job = ml_client.jobs.create_or_update(
    classification_job
)  # submit the job to the backend

In [17]:
# Get a URL for the status of the job
returned_job.services["Studio"].endpoint

'https://ml.azure.com/runs/witty_car_tfcm4m1v7l?wsid=/subscriptions/ed463f81-92a5-476c-b6e1-82f1a28d21e2/resourcegroups/ml-prod-scale/workspaces/diabetes_prediction&tid=746a21b3-a76e-4d67-a142-eeb97ab5314f'

## More reading more: 
- IntepretML : https://interpret.ml/
- FairLearn: https://fairlearn.org/
