Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Build ML Pipeline with do-while node

In the [get started](../../../samples/basics//get-started.ipynb) tutorial we have introduced how to build an ML pipeline by using Azure Machine Learning components. In this tutorial, you will learn how to build pipeline with do-while node.



## Prerequisites
* Install azure cli with azure-cli-ml extension following the [instructions here](setup-environment.ipynb).

# 1. Connect to Azure Machine Learning Workspace

The [workspace](https://docs.microsoft.com/en-us/azure/machine-learning/concept-workspace) is the top-level resource for Azure Machine Learning, providing a centralized place to work with all the artifacts you create when you use Azure Machine Learning. In this section we will connect to the workspace in which the job will be run.

## 1.1 Enable private features

In [None]:
# Import required libraries
import os


# enable private features
os.environ["AZURE_ML_CLI_PRIVATE_FEATURES_ENABLED"] = "True"

## 1.2 Get a handle to the workspace

We use config file to connect to a workspace. The Azure ML workspace should be configured with computer cluster. [Check this notebook for configure a workspace](../../configuration.ipynb)

In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential


try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cluster_name = "cpu-cluster"
print(ml_client.compute.get(cluster_name))

## 2. Build pipeline with do-while node
### 2.1 Build sub pipeline as the body of do-while node

In [None]:
from azure.ai.ml import Input
from azure.ai.ml.dsl import pipeline
from components.aggregate_train.aggregate_train_model import (
    aggregated_train as aggregate_train_model_func,
)
from components.process_data.process_data import process_data as process_data_func
from components.train_model.train_model import train_model as train_model_func


@pipeline()
def silo_pipeline(input_model: Input):
    """Silo for each iteration in federated learning, contains pre_process, training and post_process."""
    pre_data_process = process_data_func(input_data=input_model)
    training_node = train_model_func(input_model=pre_data_process.outputs.output_data)
    post_data_process = process_data_func(input_data=training_node.outputs.output_model)
    return post_data_process.outputs


@pipeline()
def federated_learning_body(input_model: Input):
    """Body of each iteration of federated learning, include multi silos and aggregate node."""
    silo_1 = silo_pipeline(input_model=input_model)
    silo_2 = silo_pipeline(input_model=input_model)
    aggregate_node = aggregate_train_model_func(
        model_1=silo_1.outputs.output_data,
        model_2=silo_2.outputs.output_data,
    )
    return aggregate_node.outputs

### 2.2 Build parent pipeline with do-while node

In [None]:
from mldesigner import dsl
from azure.ai.ml import Input


# define pipeline with do_while node
@pipeline()
def federated_learning_pipeline(train_model: Input):
    # federated_learning iteration body
    loop_body = federated_learning_body(input_model=train_model)
    federated_learning_dowhile_node = dsl.do_while(  # noqa: F841
        body=loop_body,
        condition=loop_body.outputs.output,
        mapping={
            "agg_output": loop_body.inputs.input_model,
        },
        max_iteration_count=5,
    )
    aggregate_node = aggregate_train_model_func(
        model_1=loop_body.outputs.agg_output,
        model_2=loop_body.outputs.agg_output,
    )


pipeline_job = federated_learning_pipeline(
    train_model=Input(path="./dummy_model", type="custom_model")
)

# set pipeline level compute
pipeline_job.settings.default_compute = cluster_name

In [None]:
# validate pipeline
ml_client.jobs.validate(pipeline_job)

### 2.3 Run pipeline job

In [None]:
# pipeline parameter can be overridden when submit pipeline
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)

# show detail information of job
pipeline_job

In [None]:
# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)