# Pipelines

# Setup

**General Imports**

In [1]:
import logging # Importing the logging module for logging purposes
import json # Importing the json module for working with JSON data

**Azure ML imports**

In [2]:
pip install azureml

Collecting azureml
  Downloading azureml-0.2.7-py2.py3-none-any.whl (23 kB)
Installing collected packages: azureml
Successfully installed azureml-0.2.7
Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install --upgrade azureml-sdk

Collecting azureml-sdk
  Downloading azureml_sdk-1.51.0-py3-none-any.whl (2.7 kB)
Collecting azureml-dataset-runtime[fuse]~=1.51.0
  Downloading azureml_dataset_runtime-1.51.0-py3-none-any.whl (2.3 kB)
Collecting azureml-pipeline~=1.51.0
  Downloading azureml_pipeline-1.51.0-py3-none-any.whl (2.4 kB)
Collecting azureml-train-core~=1.51.0
  Downloading azureml_train_core-1.51.0-py3-none-any.whl (8.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.6/8.6 MB[0m [31m25.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting azureml-train-automl-client~=1.51.0
  Downloading azureml_train_automl_client-1.51.0.post1-py3-none-any.whl (136 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m136.5/136.5 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting azureml-core~=1.51.0
  Downloading azureml_core-1.51.0.post1-py3-none-any.whl (3.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m26.3 MB/s[0m 

In [4]:
pip install azureml-core

Note: you may need to restart the kernel to use updated packages.


In [5]:
import azureml.core
print(azureml.core.VERSION)


1.49.0


In [6]:
from azure.ai.ml import MLClient # Importing the MLClient class from the azure.ai.ml module
from azure.identity import DefaultAzureCredential # Importing the DefaultAzureCredential class from the azure.identity module
from azure.ai.ml import command, Input, Output, dsl  # Importing the command, Input, Output, and dsl classes from the azure.ai.ml module

**Azure authentication**

In [7]:
#add your authentication details
subscription_id = "1c6d9ef7-1867-436f-986f-f37c475a295b"
resource_group = "testml"
workspace = "azureml"

We could define Azure to not log verbose configuration messages by specifying the log settings to only include warnings.

In [8]:
logger = logging.getLogger("azure.core.pipeline.policies.http_logging_policy")
# Get the logger instance for the "azure.core.pipeline.policies.http_logging_policy" logger.
# This logger is used for logging HTTP requests and responses in the Azure SDK pipeline.
logger.setLevel(logging.WARNING)
# Set the logging level for the logger to WARNING.
# This means that only log messages with a severity level of WARNING or higher will be logged.
# Log messages with lower severity levels, such as INFO or DEBUG, will be ignored.

In [9]:
az_credentials = DefaultAzureCredential(
    exclude_interactive_browser_credential=False
)
# Create an instance of DefaultAzureCredential.
# The DefaultAzureCredential class is used for authenticating with Azure services using default Azure Active Directory credentials.

In [10]:
# The code creates an instance of the MLClient class with the provided parameters.

ml_client = MLClient(
    az_credentials, subscription_id, resource_group, workspace
)

**Note: Upload the dataset as Data asset with name "car_mpg_jun"**

In [11]:
# The code retrieves a list of registered data assets using the ml_client object and prints their names.
for registered_data in ml_client.data.list():
    print(registered_data.name)

check
airbnb_data
car_mpg_jun
advertising


### Create a compute resource to run the job

Azure Machine Learning needs a compute resource to run a job. This resource can be single or multi-node machines with Linux or Windows OS, or a specific compute fabric like Spark.

We only need a basic cluster for this task; thus, we'll pick a Standard_D2_v3 model with 2 CPU cores and 7 GB RAM to create an Azure Machine Learning compute.

In [12]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure ML compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure ML Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_D2_V3",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=1,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )

    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster).result()

print(
    f"AMLCompute with name {cpu_cluster.name} is created, the compute size is {cpu_cluster.size}"
)

Creating a new cpu compute target...
AMLCompute with name cpu-cluster is created, the compute size is STANDARD_D2_V3


# Azure Pipelines

## Step 1: Preprocessing

In [13]:
# The code retrieves a specific version of a registered data asset using the ml_client object.
car_mpg = ml_client.data.get("car_mpg_jun", version="1")

Note: Create a folder and name it "data_prep" (which can be used as a directory to store the script) 

**The code writes the contents of the processing script to a file named "tts.py" in the "data_prep" folder.**

In [15]:
%%writefile data_prep/tts.py

import os
import argparse

import logging
import mlflow

import pandas as pd

from sklearn.model_selection import train_test_split


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.2)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)

    df = pd.read_csv(args.data)

    train_df, test_df = train_test_split(
        df,
        test_size=args.test_train_ratio,
        random_state=42
    )

    # output paths are mounted as folder, therefore, we are adding a filename to the path
    train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)

    test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Writing data_prep/tts.py


**The code defines a data preparation step as a command using the command() function**

In [16]:
# Define a data preparation step
step_process = command(
    name="data_prep_car_mpg", # Name of the step
    display_name="Data preparation for training", # Display name of the step
    description="read a .csv input, split the input to train and test", # Description of the step
    inputs={
        "data": Input(type="uri_folder"), # Input named "data" of type "uri_folder"
        "test_train_ratio": Input(type="number"), # Input named "test_train_ratio" of type "number"
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),  # Output named "train_data" of type "uri_folder" with read-write mount mode
        test_data=Output(type="uri_folder", mode="rw_mount"), # Output named "test_data" of type "uri_folder" with read-write mount mode
    ),
    # The source folder of the component
    code='data_prep/', # Source folder of the component
    command="""python tts.py \
            --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            """, # Command to be executed
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest", # Environment for executing the command
)

**The code retrieves the component associated with the step_process**

In [17]:
step_process.component

CommandComponent({'auto_increment_version': True, 'source': 'BUILDER', 'is_anonymous': False, 'name': 'data_prep_car_mpg', 'description': 'read a .csv input, split the input to train and test', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': None, 'Resource__source_path': None, 'base_path': PosixPath('.'), 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7f70e0862470>, 'command': 'python tts.py             --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}}             --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}}             ', 'code': 'data_prep/', 'environment_variables': None, 'environment': 'AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest', 'distribution': None, 'resources': None, 'queue_settings': None, 'version': None, 'latest_version': None, 'schema': None, 'type': 'command', 'display_name': 'Data preparation for training', 'is_deterministic': True, 'inputs': {'data': {'type': 'uri_folde

## Step 2: Training

Note: Create a folder and name it "train" (which can be used as a directory to store the script) 

**The code writes the contents of the training script to a file named "gbr.py" in the "train"**

In [18]:
%%writefile train/gbr.py

import os
import mlflow
import argparse

import pandas as pd

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import make_pipeline
from sklearn.compose import make_column_transformer

from sklearn.model_selection import train_test_split

from sklearn.ensemble import GradientBoostingRegressor

mlflow.start_run() # Start a new MLflow run

os.makedirs("./outputs", exist_ok=True) # Create the "outputs" directory if it doesn't exist


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


def main():

    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args() # Parse the command-line arguments

    car_mpg_train = pd.read_csv(select_first_file(args.train_data))  # Read the training data
    car_mpg_test = pd.read_csv(select_first_file(args.test_data)) # Read the test data

    target = 'mpg'
    numeric_features = ['cyl','disp','hp','wt','acc','yr','origin','car_type',]

    # Extract the features from the training data
    X_train = car_mpg_train.drop(columns=[target]) 
    y_train = car_mpg_train[target]

    # Extract the features from the test data
    X_test = car_mpg_test.drop(columns=[target])
    y_test = car_mpg_test[target]

    # Create a column transformer for preprocessing the numeric features
    preprocessor = make_column_transformer(
        (StandardScaler(), numeric_features)
    )

    # Create a Gradient Boosting Regressor model
    model_gbr = GradientBoostingRegressor(
        n_estimators=args.n_estimators,
        learning_rate=args.learning_rate
    )

    # Create a pipeline with preprocessing and the model
    model_pipeline = make_pipeline(preprocessor, model_gbr)

    model_pipeline.fit(X_train, y_train)

    rmse = model_pipeline.score(X_test, y_test)

    mlflow.log_metric("RMSE", float(rmse))

    print("Registering model pipeline")

    mlflow.sklearn.log_model(
        sk_model=model_pipeline,
        registered_model_name="gbr-car-mpg-predictor",
        artifact_path="gbr-car-mpg-predictor"
    ) # Register the model pipeline in MLflow

    mlflow.end_run()


if __name__ == '__main__':
    main()

Writing train/gbr.py


In [19]:
step_train = command(
    name="train_car_mpg_model",
    display_name="Training a car mpg model",
    description="read a .csv input, split the input to train and test",
    inputs={
        "train_data": Input(type="uri_folder"),# Input: Path to the training data (URI folder)
        "test_data": Input(type="uri_folder"), # Input: Path to the test data (URI folder)
        "learning_rate": Input(type="number"), # Input: Learning rate for the model (number)
        "registered_model_name": Input(type="string") # Input: Name for the registered model (string)
    },
    outputs=dict(
        model=Output(type="uri_folder", mode="rw_mount") # Output: Path to the trained model (URI folder)
    ),
    # The source folder of the component
    code='train/',
    command="""python gbr.py \
              --train_data ${{inputs.train_data}} \
              --test_data ${{inputs.test_data}} \
              --learning_rate ${{inputs.learning_rate}} \
              --registered_model_name ${{inputs.registered_model_name}} \
              --model ${{outputs.model}}
            """,
    environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",  # Execution environment for the command
)

** step_train represents a step or command in a pipeline. **

In [20]:
step_train.component

CommandComponent({'auto_increment_version': True, 'source': 'BUILDER', 'is_anonymous': False, 'name': 'train_car_mpg_model', 'description': 'read a .csv input, split the input to train and test', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': None, 'Resource__source_path': None, 'base_path': PosixPath('.'), 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x7f70e08632b0>, 'command': 'python gbr.py               --train_data ${{inputs.train_data}}               --test_data ${{inputs.test_data}}               --learning_rate ${{inputs.learning_rate}}               --registered_model_name ${{inputs.registered_model_name}}               --model ${{outputs.model}}\n            ', 'code': 'train/', 'environment_variables': None, 'environment': 'AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest', 'distribution': None, 'resources': None, 'queue_settings': None, 'version': None, 'latest_version': None, 'schema': None, 'type': 'command', 'display_name':

## Step 3: Assemble Pipeline

In [25]:
@dsl.pipeline(
    compute='testlab',
    description="data preparation and training pipeline"
)
def car_mpg_pipeline(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_learning_rate,
    pipeline_job_registered_model_name,
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = step_process(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio,
    )

    # using train_func like a python call with its own inputs
    train_job = step_train(
        train_data=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data=data_prep_job.outputs.test_data,  # note: using outputs from previous step
        learning_rate=pipeline_job_learning_rate,  # note: using a pipeline input as parameter
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
    }

In [26]:
registered_model_name = "car_mpg_model_v1"

In [27]:
pipeline = car_mpg_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=car_mpg.path), # Input path
    pipeline_job_test_train_ratio=0.25, # Input: Test-train ratio for data splitting
    pipeline_job_learning_rate=0.05, # Input: Learning rate for the model
    pipeline_job_registered_model_name=registered_model_name,  # Input: Name for the registered model
)

**The purpose of this code is to create or update a job**

In [28]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="Training pipeline with registered components"
)

**The code allows to stream the output of a specific job using its name**

In [29]:
ml_client.jobs.stream(pipeline_job.name)

RunId: cyan_holiday_bhmdzf3ww0
Web View: https://ml.azure.com/runs/cyan_holiday_bhmdzf3ww0?wsid=/subscriptions/1c6d9ef7-1867-436f-986f-f37c475a295b/resourcegroups/testml/workspaces/azureml

Streaming logs/azureml/executionlogs.txt

[2023-06-16 13:06:39Z] Submitting 1 runs, first five are: 4bc4ff62:c02fdbd2-47fa-4207-9397-64e397eb3ccc
[2023-06-16 13:10:54Z] Completing processing run id c02fdbd2-47fa-4207-9397-64e397eb3ccc.
[2023-06-16 13:10:55Z] Submitting 1 runs, first five are: 3641a29c:4a29005f-c03d-443e-9fbd-4a7298b8e83d
[2023-06-16 13:11:53Z] Completing processing run id 4a29005f-c03d-443e-9fbd-4a7298b8e83d.

Execution Summary
RunId: cyan_holiday_bhmdzf3ww0
Web View: https://ml.azure.com/runs/cyan_holiday_bhmdzf3ww0?wsid=/subscriptions/1c6d9ef7-1867-436f-986f-f37c475a295b/resourcegroups/testml/workspaces/azureml

