# Training an ML model with sweep and hosting a pipeline on batch endpoint

Ref : https://github.com/Azure/azureml-examples/tree/main/sdk/python/endpoints/batch/deploy-pipelines/training-with-components

In [1]:
subscriptionID = '2213e8b1-dbc7-4d54-8aff-b5e315df5e5b'
RG = '1-44deb668-playground-sandbox'
ws_name = "MLOPS101"

In [2]:
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient

ws = MLClient(DefaultAzureCredential(), subscription_id = subscriptionID,
              resource_group_name = RG, workspace_name = ws_name)

print(ws)

MLClient(credential=<azure.identity._credentials.default.DefaultAzureCredential object at 0x7f89c3a9f400>,
         subscription_id=2213e8b1-dbc7-4d54-8aff-b5e315df5e5b,
         resource_group_name=1-44deb668-playground-sandbox,
         workspace_name=MLOPS101)


In [3]:
from pathlib import Path

ROOT_DIR = Path('./assets')
ENV_DIR = ROOT_DIR / 'env'
TRAIN_DIR = ROOT_DIR / 'train'
DATA_DIR = ROOT_DIR / 'data'

ENV_DIR.mkdir(parents = True, exist_ok = True)
TRAIN_DIR.mkdir(parents = True, exist_ok = True)
DATA_DIR.mkdir(parents = True, exist_ok = True)

In [4]:
!wget https://raw.githubusercontent.com/Azure/azureml-examples/main/sdk/python/endpoints/batch/deploy-pipelines/training-with-components/data/train/heart.csv -P assets/data/

--2023-06-10 14:31:14--  https://raw.githubusercontent.com/Azure/azureml-examples/main/sdk/python/endpoints/batch/deploy-pipelines/training-with-components/data/train/heart.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13273 (13K) [text/plain]
Saving to: ‘assets/data/heart.csv’


2023-06-10 14:31:14 (1.55 MB/s) - ‘assets/data/heart.csv’ saved [13273/13273]



In [5]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

heart_disease_train_url = ""

heart_disease_train_data = Data(
    name = "heart_disease_train_data",
    path = str(DATA_DIR),
    type = AssetTypes.URI_FOLDER,
    description = "Train heart disease dataset",
    tags = {"source_type": "web"},
    version = "1.0.0",
)

heart_disease_train_data = ws.data.create_or_update(heart_disease_train_data)
print(f"Dataset with name {heart_disease_train_data.name} was registered to workspace, the dataset version is {heart_disease_train_data.version}")

[32mUploading data (0.01 MBs): 100%|██████████| 13273/13273 [00:00<00:00, 119980596.97it/s]
[39m



Dataset with name heart_disease_train_data was registered to workspace, the dataset version is 1.0.0


In [6]:
heart_disease_train_data = ws.data.get(name = 'heart_disease_train_data', label="latest")

In [7]:
dir(heart_disease_train_data)

['_Resource__source_path',
 '__abstractmethods__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_abc_impl',
 '_auto_increment_version',
 '_base_path',
 '_creation_context',
 '_from_container_rest_object',
 '_from_rest_object',
 '_get_arm_resource',
 '_get_arm_resource_and_params',
 '_id',
 '_is_anonymous',
 '_load',
 '_load_from_dict',
 '_mltable_schema_url',
 '_path',
 '_referenced_uris',
 '_resolve_cls_and_type',
 '_serialize',
 '_skip_validation',
 '_source_path',
 '_to_container_rest_object',
 '_to_dict',
 '_to_rest_object',
 '_update_path',
 '_version',
 'base_path',
 'creation_context',
 'datastore',
 'description',
 'dump',
 'id',
 'latest_vers

## Creating a conda environment

In [8]:
%%writefile {ENV_DIR}/conda_definition.yml

name: heart-env
channels:
- conda-forge
dependencies:
- python=3.8.5
- pip
- pip:
  - mlflow
  - azureml-mlflow
  - datasets
  - jobtools
  - cloudpickle==1.6.0
  - dask==2.30.0
  - scikit-learn==1.1.2
  - xgboost==1.3.3
  - pandas==1.4
  - matplotlib

Writing assets/env/conda_definition.yml


In [9]:
from azure.ai.ml.entities import Environment

custom_env_name = "heart-env"

heart_env = Environment(
    name = custom_env_name,
    description = "Custom environment for heart disease classification",
    #tags={"scikit-learn": "0.24.2"},
    conda_file = f'{ENV_DIR}/conda_definition.yml',
    image = "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.0",
)
heart_env = ws.environments.create_or_update(heart_env)

print(
   f"Environment with name {heart_env.name} is registered to workspace, the environment version is {heart_env.version}"
)

Environment with name heart-env is registered to workspace, the environment version is 0.1.0


## Data preprocessing

In [10]:
%%writefile {TRAIN_DIR}/process.yml

name: heart_disease_prepare
display_name : heart disease data preparation component
type: command
inputs:
    raw_data:
        type: uri_folder
    transformations_input:
        type: custom_model
        optional: true
    categorical_encoding:
        type: string
        optional: true
        default: ordinal
outputs:
    prepared_data:
        type: uri_folder
    transformations_output:
        type: custom_model
code: ./process.py
environment: azureml:heart-env@latest
command: >-
    python process.py
    --raw_data ${{inputs.raw_data}}
    $[[--transformations_input ${{inputs.transformations_input}}]]
    $[[--categorical_encoding ${{inputs.categorical_encoding}}]]
    --prepared_data ${{outputs.prepared_data}}
    --transformations_output ${{outputs.transformations_output}}

Writing assets/train/process.yml


In [11]:
%%writefile {TRAIN_DIR}/process.py

import os
import sys
import argparse
from pathlib import Path

import pandas as pd
import numpy as np
import sklearn
import joblib
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer

import mlflow

transform_filename = 'column_transformer.pkl'
continuous_features = ['age', 'chol', 'oldpeak', 'thalach', 'trestbps']
discrete_features = ['ca', 'cp', 'exang', 'fbs', 'restecg', 'sex', 'slope', 'thal']
target_column = 'target'


def preprocessing_pipeline(categorical_encoding, cf, df): #cf -> Continuous feat df -> discrete feat
    try:
        if categorical_encoding == 'ordinal':
            cat_enc = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan)
        elif categorical_encoding == 'onehot':
            cat_enc = OneHotEncoder(handle_unknown="ignore")
        else:
            raise NotImplementedError('Possible values are ordinal or onehot')

        conti_feat_pipeline = sklearn.pipeline.Pipeline([
            ('imputer', SimpleImputer(strategy = 'median')),
            ('scaler', StandardScaler())
        ])

        disc_feat_pipeline = sklearn.pipeline.Pipeline([

            ('imputer', SimpleImputer(strategy = 'most_frequent')),
            ('encoder', cat_enc)

        ])

        transformations = ColumnTransformer([
            ('conti_feat_pipeline', conti_feat_pipeline, cf),
            ('disc_feat_pipeline', disc_feat_pipeline, df)
        ])
        return transformations
    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        print(exc_type, fname, exc_tb.tb_lineno, e)

def preprocess_data(dataframe, label, cf, df, cat_enc = 'ordinal', transformations = None):
    
    mlflow.sklearn.autolog()
    
    try:
    
        if label in dataframe.columns:
            X = dataframe.iloc[:,:-1]
            restore_target = True
        else:
            X = dataframe
            restore_target = False

        if transformations:
            X_transformed = transformations.transform(X)
        else:
            transformations = preprocessing_pipeline(cat_enc, cf, df)
            X_transformed = transformations.fit_transform(X)
        transformed_discrete_features = (
            transformations.transformers_[1][1]
            .named_steps["encoder"]
            .get_feature_names_out(discrete_features)
        )
        all_features = continuous_features + list(transformed_discrete_features)

        if restore_target:
            target_values = dataframe[label].to_numpy().reshape(len(dataframe), 1)
            X_transformed = np.hstack((X_transformed, target_values))
            all_features.append(label)

        return pd.DataFrame(X_transformed, columns = all_features), transformations
    
    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        print(exc_type, fname, exc_tb.tb_lineno, e)
        
def parseArgs():
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--raw_data', type = str)
    parser.add_argument('--categorical_encoding', type = str, required = False)
    parser.add_argument('--transformations_input', type = str, required = False)
    parser.add_argument('--prepared_data', type = str)
    parser.add_argument('--transformations_output', type = str)
    args = parser.parse_args()
    
    return args

def main(args):
    try:
        if args.transformations_input:
            transformations_input = str(Path(args.transformations_input) / transform_filename)
            if os.path.exists(transformations_input):
                transformations = joblib.load(args.transformations_input)
            else:
                transformations = None
        else:
            transformations = None
        print('[Debug] Transformations input defined successfully')
        files = list(Path(args.raw_data).rglob('*.csv'))

        with mlflow.start_run(nested=True):
            for file in files:
                print(f'Working with file {file}')
                temp = pd.read_csv(file)
                preprocessed, transformations = preprocess_data(
                    temp,
                    target_column,
                    continuous_features,
                    discrete_features,
                    args.categorical_encoding,
                    transformations
                )
                output_file_name = Path(file).stem
                output_file_path = str(Path(args.prepared_data) / f'{output_file_name}.csv')
                print(f'Writing file {output_file_path}')
                preprocessed.to_csv(output_file_path, index=False)
        transformations_output_path = str(Path(args.transformations_output) / transform_filename)
        joblib.dump(transformations, transformations_output_path)
        
    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        print(exc_type, fname, exc_tb.tb_lineno, e)

if __name__ == '__main__':
    args = parseArgs()
    main(args)

Writing assets/train/process.py


## Training

In [12]:
%%writefile {TRAIN_DIR}/train.yml

name: heart_disease_train
display_name: Training definition - heart disease dataset
type: command
inputs:
    data:
        type: uri_folder
    target_column:
        type: string
    test_size:
        type: number
        default: 0.3
    register_model:
        type: boolean
        default: true
    model_name:
        type: string
        default: best_model
outputs:
    model_folder:
        type: mlflow_model
    results_folder:
        type: uri_folder
environment: azureml:heart-env@latest
code: ./train.py
command: >-
    python train.py
    --data ${{inputs.data}}
    --target_column ${{inputs.target_column}}
    --test_size ${{inputs.test_size}}
    --register_model ${{inputs.register_model}}
    --model_name ${{inputs.model_name}}
    --model_folder ${{outputs.model_folder}}
    --results_folder ${{outputs.results_folder}}

Writing assets/train/train.yml


In [18]:
%%writefile {TRAIN_DIR}/train.py

import argparse
import os
import sys
from pathlib import Path
import pandas as pd
from distutils.util import strtobool
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score
from xgboost import XGBClassifier
import mlflow
from mlflow.models.signature import infer_signature



parser = argparse.ArgumentParser("score")
parser.add_argument('--data', type=str)
parser.add_argument('--target_column', type=str)
parser.add_argument('--test_size', type=float, default = 0.3)#, required=False)
parser.add_argument("--register_model", type=lambda x: bool(strtobool(x)))
parser.add_argument('--model_name', type=str, default = 'best_model')
parser.add_argument('--model_folder', type=str)
parser.add_argument('--results_folder', type=str)
args = parser.parse_args()
print(vars(args))

try:
    with mlflow.start_run(nested=True):
        mlflow.xgboost.autolog(log_models=False)
        input_files = list(Path(args.data).rglob('*.csv'))
        print(input_files)
        print(pd.read_csv(input_files[0]))
        df = pd.concat(map(pd.read_csv, input_files))
        if args.test_size > 0:
            train, test = train_test_split(df, test_size = args.test_size)
        else:
            train = df
            test = df
        train_features = train.drop(columns=[args.target_column])
        train_target = train[args.target_column]

        model = XGBClassifier(scale_pos_weight=99)
        model.fit(train_features, train_target)

        test_features = test.drop(columns=[args.target_column])
        predictions = model.predict(test_features)
        test['Labels'] = predictions
        test['Probabilities'] = model.predict_proba(test_features)[:, 1]
        test.to_csv(
            os.path.join(args.results_folder, 'test_predictions.csv'), index=False
        )

        accuracy = accuracy_score(test[args.target_column], predictions)
        recall = recall_score(test[args.target_column], predictions)
        mlflow.log_metrics({'accuracy': accuracy, 'recall': recall})

        # Model logging
        signature = infer_signature(train_features, predictions)
        mlflow.xgboost.save_model(model, args.model_folder, signature=signature)

        if args.register_model:
            mlflow.xgboost.log_model(
                model,
                'model',
                signature=signature,
                registered_model_name = args.registered_model_name,
            )
        else:
            mlflow.xgboost.log_model(model, 'model', signature=signature)
except Exception as e:
    exc_type, exc_obj, exc_tb = sys.exc_info()
    fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
    print(exc_type, fname, exc_tb.tb_lineno, e)

Overwriting assets/train/train.py


## Creating cluster

In [14]:
from azure.ai.ml.entities import AmlCompute


def createCluster(cluster_name, size):
    try:
        cpu_cluster = ws.compute.get(cluster_name)
        print(f'{CLUSTER_NAME} exists!')
    except Exception:
        print("Creating a new cpu compute target...")
        cpu_cluster = AmlCompute(
            name=cluster_name,
            type="amlcompute",
            size=size,
            min_instances=0,
            max_instances=1,
            idle_time_before_scale_down=3000,
            tier="Dedicated",
        )
        cpu_cluster = ws.compute.begin_create_or_update(cpu_cluster).result()
        print(f'Cluster created successfully named {cpu_cluster.name} with size {cpu_cluster.size}')
    return cpu_cluster

CLUSTER_NAME = 'iris-cluster'
CLUSTER_SIZE = 'Standard_D2_v2'

trainCluster = createCluster(CLUSTER_NAME, CLUSTER_SIZE)

Creating a new cpu compute target...
Cluster created successfully named iris-cluster with size STANDARD_D2_V2


## Pipeline creation

In [19]:
from azure.ai.ml import dsl, Input, Output, load_component

data_preparation = load_component(source = f'{TRAIN_DIR}/process.yml')
train = load_component(source = f'{TRAIN_DIR}/train.yml')

@dsl.pipeline(compute = trainCluster, description = 'Batch pipeline prediction')
def uci_heart_classifier_trainer(input_data):
    prepared_data = data_preparation(raw_data=input_data)
    trained_model = train(
        data = prepared_data.outputs.prepared_data,
        target_column = 'target',
        register_model = True,
        test_size = 0.3
    )

    return {
        'model': trained_model.outputs.model_folder,
        'evaluation_results': trained_model.outputs.results_folder,
        'transformations_output': prepared_data.outputs.transformations_output,
    }

pipeline_job = uci_heart_classifier_trainer(
    Input(type = 'uri_folder', path = heart_disease_train_data.id)
)

In [20]:
pipeline_job_run = ws.jobs.create_or_update(
    pipeline_job, experiment_name="uci-heart-train-pipeline"
)
pipeline_job_run

[32mUploading train.py[32m (< 1 MB): 100%|██████████| 2.72k/2.72k [00:00<00:00, 104kB/s]
[39m



Experiment,Name,Type,Status,Details Page
uci-heart-train-pipeline,upbeat_sugar_8x13zyd6f6,pipeline,Preparing,Link to Azure Machine Learning studio


In [21]:
ws.jobs.stream(pipeline_job_run.name)

RunId: upbeat_sugar_8x13zyd6f6
Web View: https://ml.azure.com/runs/upbeat_sugar_8x13zyd6f6?wsid=/subscriptions/2213e8b1-dbc7-4d54-8aff-b5e315df5e5b/resourcegroups/1-44deb668-playground-sandbox/workspaces/MLOPS101

Execution Summary
RunId: upbeat_sugar_8x13zyd6f6
Web View: https://ml.azure.com/runs/upbeat_sugar_8x13zyd6f6?wsid=/subscriptions/2213e8b1-dbc7-4d54-8aff-b5e315df5e5b/resourcegroups/1-44deb668-playground-sandbox/workspaces/MLOPS101


JobException: Exception : 
 {
    "error": {
        "code": "UserError",
        "message": "Pipeline has failed child jobs. Failed nodes: /trained_model. For more details and logs, please go to the job detail page and check the child jobs.",
        "message_format": "Pipeline has failed child jobs. {0}",
        "message_parameters": {},
        "reference_code": "PipelineHasStepJobFailed",
        "details": []
    },
    "environment": "eastus2",
    "location": "eastus2",
    "time": "2023-06-10T15:04:58.546618Z",
    "component_name": ""
} 

https://github.com/Azure/azureml-examples/blob/main/sdk/python/endpoints/batch/deploy-pipelines/training-with-components/sdk-deploy-and-test.ipynb