# Exercise09 : ML Pipeline

With AML pipeline, you can create ML workflows for the following purposes.

- You can build retraining pipeline for MLOps integration.
- You can build batch-scoring pipeline instead of real-time scoring in "[Exercise08 : Publish as a Web Service](./exercise08_publish_model.ipynb)".

ML pipeline can be invoked by the following methods. 

- Time-based schedule invocation
- On-demand invocation by the published endpoint (REST)
- Trigger-based invocation, such as, file change or other combined events (with Azure Event Grid, Azure Logic Apps, etc)

In this exercise, we create a training pipeline for MLOps integration. (See [here](https://docs.microsoft.com/en-us/azure/architecture/reference-architectures/ai/mlops-python) for the reference architecture integrating with CI/CD tools.)

*back to [index](https://github.com/tsmatz/azureml-tutorial-tensorflow-v1/)*

## 1. Variable's Setting

Replace below's branket's string and set the required variables.

Using ```az login --service-principal``` command, you would be able to involve ML pipeline in CI/CD utilities, such as, in GitHub actions, without login UI.

In [1]:
my_resource_group = "{AML-RESOURCE-GROUP-NAME}"
my_workspace = "{AML-WORSPACE-NAME}"

## 2. Create compute

Create your new AML compute for running pipeline.

When the pipeline is invoked, the compute will be started. When the pipeline is completed, this compute will be automatically scaled down to zero.

In [2]:
!az ml compute create --name mycluster01 \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace \
  --type amlcompute \
  --min-instances 0 \
  --max-instances 1 \
  --size Standard_D2_v2

[36mCommand group 'ml compute' is in preview and under development. Reference and support levels: https://aka.ms/CLI_refstatus[0m
{
  "id": "/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/AzureML-rg/providers/Microsoft.MachineLearningServices/workspaces/ws01/computes/mycluster01",
  "idle_time_before_scale_down": 120,
  "location": "eastus",
  "max_instances": 1,
  "min_instances": 0,
  "name": "mycluster01",
  "network_settings": {},
  "provisioning_state": "Succeeded",
  "resourceGroup": "AzureML-rg",
  "size": "STANDARD_D2_V2",
  "ssh_public_access_enabled": true,
  "tier": "dedicated",
  "type": "amlcompute"
}
[0m

## 3. Save scripts

In this example, I create a pipeline for model training, evaluation, and model registration.<br>
Each source code will then be saved as follows.

- training script ```./pipeline_script/train.py```
- evaluation script ```./pipeline_script/evaluate.py```
- registration script ```./pipeline_script/register_model.py```

Model name (sub folder name in model dir) is set in model info file (JSON text), which is passed into the next step.

In [3]:
import os
script_folder = './pipeline_script'
os.makedirs(script_folder, exist_ok=True)

In [4]:
%%writefile pipeline_script/train.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys
import os
import shutil
import argparse
import math

import tensorflow as tf

FLAGS = None
batch_size = 100

#
# Define functions for Estimator
#

def _my_input_fn(filepath, num_epochs):
    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    data_queue = tf.train.string_input_producer(
        [filepath],
        num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
    data_reader = tf.TFRecordReader()
    _, serialized_exam = data_reader.read(data_queue)
    data_exam = tf.parse_single_example(
        serialized_exam,
        features={
            'image_raw': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64)
        })
    data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
    data_image.set_shape([784])
    data_image = tf.cast(data_image, tf.float32) * (1. / 255)
    data_label = tf.cast(data_exam['label'], tf.int32)
    data_batch_image, data_batch_label = tf.train.batch(
        [data_image, data_label],
        batch_size=batch_size)
    return {'inputs': data_batch_image}, data_batch_label

def _get_input_fn(filepath, num_epochs):
    return lambda: _my_input_fn(filepath, num_epochs)

def _my_model_fn(features, labels, mode):
    # with tf.device(...): # You can set device if using GPUs

    # define network and inference
    # (simple 2 fully connected hidden layer : 784->128->64->10)
    with tf.name_scope('hidden1'):
        weights = tf.Variable(
            tf.truncated_normal(
                [784, FLAGS.first_layer],
                stddev=1.0 / math.sqrt(float(784))),
            name='weights')
        biases = tf.Variable(
            tf.zeros([FLAGS.first_layer]),
            name='biases')
        hidden1 = tf.nn.relu(tf.matmul(features['inputs'], weights) + biases)
    with tf.name_scope('hidden2'):
        weights = tf.Variable(
            tf.truncated_normal(
                [FLAGS.first_layer, FLAGS.second_layer],
                stddev=1.0 / math.sqrt(float(FLAGS.first_layer))),
            name='weights')
        biases = tf.Variable(
            tf.zeros([FLAGS.second_layer]),
            name='biases')
        hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)
    with tf.name_scope('softmax_linear'):
        weights = tf.Variable(
            tf.truncated_normal(
                [FLAGS.second_layer, 10],
                stddev=1.0 / math.sqrt(float(FLAGS.second_layer))),
        name='weights')
        biases = tf.Variable(
            tf.zeros([10]),
            name='biases')
        logits = tf.matmul(hidden2, weights) + biases
 
    # compute evaluation matrix
    predicted_indices = tf.argmax(input=logits, axis=1)
    if mode != tf.estimator.ModeKeys.PREDICT:
        label_indices = tf.cast(labels, tf.int32)
        accuracy = tf.metrics.accuracy(label_indices, predicted_indices)
        tf.summary.scalar('accuracy', accuracy[1]) # output to TensorBoard
 
        loss = tf.losses.sparse_softmax_cross_entropy(
            labels=labels,
            logits=logits)
 
    # define operations
    if mode == tf.estimator.ModeKeys.TRAIN:
        #global_step = tf.train.create_global_step()
        #global_step = tf.contrib.framework.get_or_create_global_step()
        global_step = tf.train.get_or_create_global_step()        
        optimizer = tf.train.GradientDescentOptimizer(
            learning_rate=FLAGS.learning_rate)
        train_op = optimizer.minimize(
            loss=loss,
            global_step=global_step)
        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            train_op=train_op)
    if mode == tf.estimator.ModeKeys.EVAL:
        eval_metric_ops = {
            'accuracy': accuracy
        }
        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops=eval_metric_ops)
    if mode == tf.estimator.ModeKeys.PREDICT:
        probabilities = tf.nn.softmax(logits, name='softmax_tensor')
        predictions = {
            'classes': predicted_indices,
            'probabilities': probabilities
        }
        export_outputs = {
            'prediction': tf.estimator.export.PredictOutput(predictions)
        }
        return tf.estimator.EstimatorSpec(
            mode,
            predictions=predictions,
            export_outputs=export_outputs)

def _my_serving_input_fn():
    inputs = {'inputs': tf.placeholder(tf.float32, [None, 784])}
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

_my_evaluation_input_fn = (tf.estimator.experimental.build_raw_supervised_input_receiver_fn(
    {'inputs': tf.placeholder(dtype=tf.float32, shape=[None, 784])},
    tf.placeholder(dtype=tf.int32, shape=[None])))

#
# Main
#

parser = argparse.ArgumentParser()
parser.add_argument(
    '--data_folder',
    type=str,
    default='./data',
    help='Folder path for input data')
parser.add_argument(
    '--chkpoint_folder',
    type=str,
    default='./logs',  # AML experiments logs folder
    help='Folder path for checkpoint files')
parser.add_argument(
    '--model_folder',
    type=str,
    default='./outputs',  # AML experiments outputs folder
    help='Folder path for model output')
parser.add_argument(
    '--model_info',
    type=str,
    help='JSON file path for saving model information')
parser.add_argument(
    '--learning_rate',
    type=float,
    default='0.07',
    help='Learning Rate')
parser.add_argument(
    '--first_layer',
    type=int,
    default='128',
    help='Neuron number for the first hidden layer')
parser.add_argument(
    '--second_layer',
    type=int,
    default='64',
    help='Neuron number for the second hidden layer')
FLAGS, unparsed = parser.parse_known_args()

# Clean checkpoint and model folder if exists
if os.path.exists(FLAGS.chkpoint_folder) :
    for file_name in os.listdir(FLAGS.chkpoint_folder):
        file_path = os.path.join(FLAGS.chkpoint_folder, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)
if os.path.exists(FLAGS.model_folder) :
    for file_name in os.listdir(FLAGS.model_folder):
        file_path = os.path.join(FLAGS.model_folder, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)
        elif os.path.isdir(file_path):
            shutil.rmtree(file_path)

# Read TF_CONFIG
run_config = tf.estimator.RunConfig()

# Create Estimator
mnist_fullyconnected_classifier = tf.estimator.Estimator(
    model_fn=_my_model_fn,
    model_dir=FLAGS.chkpoint_folder,
    config=run_config)
train_spec = tf.estimator.TrainSpec(
    input_fn=_get_input_fn(os.path.join(FLAGS.data_folder, 'train.tfrecords'), 2),
    max_steps=60000 * 2 / batch_size)
eval_spec = tf.estimator.EvalSpec(
    input_fn=_get_input_fn(os.path.join(FLAGS.data_folder, 'test.tfrecords'), 1),
    steps=10000 * 1 / batch_size,
    start_delay_secs=0)

# Run training !
tf.estimator.train_and_evaluate(
    mnist_fullyconnected_classifier,
    train_spec,
    eval_spec
)

# Save model and parameters
model_folder = mnist_fullyconnected_classifier.experimental_export_all_saved_models(
    export_dir_base=FLAGS.model_folder,
    input_receiver_fn_map={
        tf.estimator.ModeKeys.EVAL: _my_evaluation_input_fn,
        tf.estimator.ModeKeys.PREDICT: _my_serving_input_fn
    })
print('current working directory is ', os.getcwd())
print('model is saved ', model_folder)

# Save model info
model_folder_name = os.path.basename(model_folder.decode("utf-8"))
model_info_dict = {'model_folder_name' : model_folder_name}
model_info_json = json.dumps(model_info_dict)
f = open(FLAGS.model_info,"w")
f.write(model_info_json)
f.close()

Writing pipeline_script/train.py


In [5]:
%%writefile pipeline_script/evaluate.py
import os
import argparse
import json

import tensorflow as tf

from azureml.core import Run

run = Run.get_context()

FLAGS = None
batch_size = 100

def _my_input_fn(filepath, num_epochs):
    # image - 784 (=28 x 28) elements of grey-scaled integer value [0, 1]
    # label - digit (0, 1, ..., 9)
    data_queue = tf.train.string_input_producer(
        [filepath],
        num_epochs = num_epochs) # data is repeated and it raises OutOfRange when data is over
    data_reader = tf.TFRecordReader()
    _, serialized_exam = data_reader.read(data_queue)
    data_exam = tf.parse_single_example(
        serialized_exam,
        features={
            'image_raw': tf.FixedLenFeature([], tf.string),
            'label': tf.FixedLenFeature([], tf.int64)
        })
    data_image = tf.decode_raw(data_exam['image_raw'], tf.uint8)
    data_image.set_shape([784])
    data_image = tf.cast(data_image, tf.float32) * (1. / 255)
    data_label = tf.cast(data_exam['label'], tf.int32)
    data_batch_image, data_batch_label = tf.train.batch(
        [data_image, data_label],
        batch_size=batch_size)
    return {'inputs': data_batch_image}, data_batch_label

def _get_input_fn(filepath, num_epochs):
    return lambda: _my_input_fn(filepath, num_epochs)

parser = argparse.ArgumentParser()
parser.add_argument(
    '--data_folder',
    type=str,
    default='./data',
    help='Folder path for input data')
parser.add_argument(
    '--model_folder',
    type=str,
    default='./model',
    help='Folder path for model base dir')
parser.add_argument(
    '--model_input_info',
    type=str,
    default='./model_input_info',
    help='File path for model evaluation info')
parser.add_argument(
    '--model_output_info',
    type=str,
    default='./model_output_info',
    help='File path for model registration info')
FLAGS, unparsed = parser.parse_known_args()

# Get model folder
f = open(FLAGS.model_input_info)
model_input_json = json.load(f)
model_folder_fullpath = os.path.join(FLAGS.model_folder, model_input_json['model_folder_name'])
f.close()

# Load model
est = tf.contrib.estimator.SavedModelEstimator(model_folder_fullpath)

# Evaluate !
eval_results = est.evaluate(
    input_fn=_get_input_fn(os.path.join(FLAGS.data_folder, 'test.tfrecords'), 1),
    steps=10000 * 1 / batch_size)

# Result check
# (If it doen't achieve threshold, stop running)
if eval_results['metrics/accuracy'] >= 0.92:
    # Evaluation Success
    print(
        "Evaluation has passed. "
        "Accuracy: {}, Loss: {}".format(
            eval_results['metrics/accuracy'], eval_results['loss']
        )
    )
    # Save model info
    model_info_dict = {
        'model_folder_name' : model_input_json['model_folder_name'],
        'evaluation' : True
    }
    model_output_json = json.dumps(model_info_dict)
    f = open(FLAGS.model_output_info,"w")
    f.write(model_output_json)
    f.close()

else:
    # Evaluation Failure
    print(
        "Evaluation has failed. "
        "Accuracy: {}, Loss: {}".format(
            eval_results['metrics/accuracy'], eval_results['loss']
        )
    )
    # Cancel pipeline
    run.parent.cancel()

Writing pipeline_script/evaluate.py


In [6]:
%%writefile pipeline_script/register_model.py
import os
import argparse
import json
from azureml.core import Run, Dataset
from azureml.core.model import Model

run = Run.get_context()

parser = argparse.ArgumentParser()
parser.add_argument(
    '--model_name',
    type=str,
    default='mlops-test-model',
    help='Model name for registeration')
parser.add_argument(
    '--model_folder',
    type=str,
    default='./model',
    help='Folder path for model base dir')
parser.add_argument(
    '--model_input_info',
    type=str,
    default='./model_input_info',
    help='File path for model info')
FLAGS, unparsed = parser.parse_known_args()

f = open(FLAGS.model_input_info)

# Check evaluation result
model_input_json = json.load(f)
if model_input_json['evaluation']:
    # Get model folder
    model_folder_fullpath = os.path.join(FLAGS.model_folder, model_input_json['model_folder_name'])

    # Register model !
    model = Model.register(
        workspace=run.experiment.workspace,
        model_name=FLAGS.model_name,
        model_path=model_folder_fullpath)

f.close()

Writing pipeline_script/register_model.py


## 4. Run ML pipeline

XXXXXXXXXXXXXXXXXXXXX

> Note : In this example, I also use the registered dataset  (train.tfrecords, test.tfrecords) named ```mnist_tfrecords_dataset``` to mount in your compute target. Run "[Exercise02 : Prepare Data](./exercise02_prepare_data.ipynb)" for dataset preparation.

In [7]:
%%writefile 09_training_pipeline_job.yml
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: training-pipeline01
experiment_name: training-pipeline01
inputs:
  mnist_tf:
    dataset: azureml:mnist_tfrecords_dataset:1
jobs:
  train-model:
    command: >-
      python train.py
      --data_folder ${{inputs.mnist_tf}}
      --model_folder ${{outputs.model_dir}}
      --model_info ${{outputs.model_info}}/eval.txt
    code:
      local_path: pipeline_script
    outputs:
      model_dir:
      model_info:
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01
  evaluate-model:
    command: >-
      python evaluate.py
      --data_folder ${{inputs.mnist_tf}}
      --model_folder ${{inputs.model_dir}}
      --model_input_info ${{inputs.model_info}}/eval.txt
      --model_output_info ${{inputs.model_info}}/reg.txt
    code:
      local_path: pipeline_script
    inputs:
      model_dir: ${{jobs.train-model.outputs.model_dir}}
      model_info: ${{jobs.train-model.outputs.model_info}}
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01
  register-model:
    command: >-
      python register_model.py
      --model_name "mlops-test-model"
      --model_folder ${{inputs.model_dir}}
      --model_input_info ${{inputs.model_info}}/reg.txt
    code:
      local_path: pipeline_script
    inputs:
      model_dir: ${{jobs.train-model.outputs.model_dir}}
      model_info: ${{jobs.train-model.outputs.model_info}}
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01

Writing 09_training_pipeline_job.yml


In [8]:
!az ml job create --file 09_training_pipeline_job.yml \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace

[36mCommand group 'ml job' is in preview and under development. Reference and support levels: https://aka.ms/CLI_refstatus[0m
'name', 'display_name', and 'experiment_name' cannot be configured for a child job within a pipeline job. These settings will be ignored.
[32mUploading pipeline_script (0.01 MBs): 100%|█| 12075/12075 [00:00<00:00, 160286.0[0m
[39m

'name', 'display_name', and 'experiment_name' cannot be configured for a child job within a pipeline job. These settings will be ignored.
'name', 'display_name', and 'experiment_name' cannot be configured for a child job within a pipeline job. These settings will be ignored.
{
  "creation_context": {
    "created_at": "2022-02-28T08:54:30.366004+00:00",
    "created_by": "Tsuyoshi Matsuzaki",
    "created_by_type": "User"
  },
  "display_name": "training-pipeline01",
  "experiment_name": "training-pipeline01",
  "id": "azureml:/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/AzureML-rg/providers/Microsoft.Machin

In [13]:
# XXXXXXXXXXXX publish ってどうやれば良いの？？？
from azureml.pipeline.core import Pipeline
import uuid

train_pipeline = Pipeline(workspace=ws, steps=[train_step, eval_step, reg_step])
train_pipeline._set_experiment_name
train_pipeline.validate()
published_pipeline = train_pipeline.publish(
    name="training-pipeline01",
    description="Model training/retraining pipeline",
    version=str(uuid.uuid4()),
)

Step Train Model is ready to be created [ef6019bf]Step Evaluate Model is ready to be created [91e50394]

Step Register Model is ready to be created [1f84f98e]
Created step Train Model [ef6019bf][d036f40e-f502-4079-9b24-0f5c2f7233d1], (This step will run and generate new outputs)Created step Evaluate Model [91e50394][bb9ed118-8e70-4a60-b20e-164756001f0c], (This step will run and generate new outputs)

Created step Register Model [1f84f98e][de7e773a-3c6e-4365-a0be-f9ec4915df2e], (This step will run and generate new outputs)


Let's go to [AML studio UI](https://ml.azure.com/) and you can then visually see pipeline graph. (See the following screenshot.)

![pipeline graph](https://tsmatz.files.wordpress.com/2021/10/20211027_ml_pipeline.jpg)

## 7. Run ML pipeline

When integrating with CI/CD tools, you can submit a new run of this publised pipeline using REST endpoint on demand.

In [14]:
# See endpoint url
published_pipeline.endpoint

'https://eastus.api.azureml.ms/pipelines/v1.0/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourceGroups/TEST20211027/providers/Microsoft.MachineLearningServices/workspaces/ws02/PipelineRuns/PipelineSubmit/10121bb5-8b8b-4696-8225-efa5cb7efa73'

Let's submit a new run using Python AML SDK.
See the progress and results in [AML studio UI](https://ml.azure.com/)

In [15]:
from azureml.core import Experiment

exp = Experiment(workspace=ws, name='pipeline_experiment01')
pipeline_run = exp.submit(published_pipeline)

Submitted PipelineRun e0d5c0fd-9240-4a2b-bfde-3ec1cbe0a68b
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/e0d5c0fd-9240-4a2b-bfde-3ec1cbe0a68b?wsid=/subscriptions/b3ae1c15-4fef-4362-8c3a-5d804cdeb18d/resourcegroups/TEST20211027/workspaces/ws02&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


## 8. Remove Compute

In [16]:
# Delete cluster (nodes) and remove from AML workspace
mycompute = AmlCompute(workspace=ws, name='mycluster01')
mycompute.delete()

# XXXXXXXXXXXXXXXXXXXX

In [10]:
%%writefile 09_training_pipeline_job2.yml
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: training-pipeline02
experiment_name: training-pipeline02
inputs:
  mnist_tf:
    dataset: azureml:mnist_tfrecords_dataset:1
jobs:
  train-model:
    command: >-
      python train.py
      --data_folder ${{inputs.mnist_tf}}
      --model_folder ${{outputs.model_dir}}
      --model_info ${{outputs.model_info}}/eval.txt
    code:
      local_path: pipeline_script
    outputs:
      model_dir:
      model_info:
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01
  evaluate-model:
    command: >-
      python evaluate.py
      --data_folder ${{inputs.mnist_tf}}
      --model_folder ${{inputs.model_dir}}
      --model_input_info ${{inputs.model_info}}/eval.txt
      --model_output_info ${{inputs.model_info}}/reg.txt
    code:
      local_path: pipeline_script
    inputs:
      model_dir: ${{jobs.train-model.outputs.model_dir}}
      model_info: ${{jobs.train-model.outputs.model_info}}
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01
  register-model:
    command: >-
      python register_model.py
      --model_name "mlops-test-model"
      --model_folder ${{inputs.model_dir}}
      --model_input_info ${{inputs.model_info}}/reg.txt
    code:
      local_path: pipeline_script
    inputs:
      model_dir: ${{jobs.train-model.outputs.model_dir}}
      model_info: ${{jobs.train-model.outputs.model_info}}
    environment: azureml:AzureML-TensorFlow-1.13-CPU:30
    compute: azureml:mycluster01

Overwriting 09_training_pipeline_job2.yml


In [None]:
!az ml job create --file 09_training_pipeline_job2.yml \
  --resource-group $my_resource_group \
  --workspace-name $my_workspace