https://github.com/cloudacademy/using-the-azure-machine-learning-sdk/blob/main/05-Creating_a_Pipeline_sdk.ipynb

## Prepare a Compute Environment for the Pipeline Steps

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "qa-azureml-sdk"

# Verify that cluster exists
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4,
                                                           idle_seconds_before_scaledown=1800)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-sdk'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case you want to use it again)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

## Create and Run a Pipeline
Now you're ready to define and run the pipeline.

- First you need to define the steps for the pipeline, and any data references that need to passed between them.
- In this case, the first step must write the model to a folder that can be read from by the second step.
- Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace.
- The PipelineData object is a special kind of data reference that is used to pass data from the output of one pipeline step to the input of another, creating a dependency between them.
- You'll create one and use it as the output for the first step and the input for the second step.
- Note that you also need to pass it as a script argument so your code can access the datastore location referenced by the data reference.

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

# Step 1, run the estimator to train the model
train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

The widget above shows details of the pipeline as it runs. You can also monitor pipeline runs in the Experiments page in Azure Machine Learning studio.

When the pipeline has finished, a new model should be registered with a Training context tag indicating it was trained in a pipeline. Run the following code to verify this.

In [None]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

## Publish the Pipeline
Now that you've created a pipeline and verified it works, you can publish it as a REST service.

In [None]:
published_pipeline = pipeline.publish(name="Diabetes_Training_Pipeline",
                                      description="Trains diabetes model",
                                      version="1.0")
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. A real application would require a service principal with which to be authenticated, but to test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

In [None]:
import requests
experiment_name = 'Run-diabetes-pipeline'

response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id


Since you have the run ID, you can use the RunDetails widget to view the experiment as it runs.

Note: The pipeline should complete quickly, because each step was configured to allow output reuse. This was done primarily for convenience and to save time in this example. In reality, you'd likely want the first step to run every time in case the data has changed, and trigger the subsequent steps only if the output from step one changes.

In [None]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
RunDetails(published_pipeline_run).show()