![image.png](attachment:image.png)

### 1. Connect to Your Workspace

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Performing interactive authentication. Please follow the instructions on the terminal.
To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code ACMWGQDXU to authenticate.
Interactive authentication successfully completed.
Ready to use Azure ML 1.6.0 to work with dp101-workspace


### 2. Prepare the Training Data

In [2]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['../mslearn-aml-labs/data/diabetes.csv', '../mslearn-aml-labs/data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


### 3. Create Scripts
script includes a parameter named **output_folder**, which references the folder where the trained model should be saved.

In [3]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


In [4]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier #DecisionTreeClassifier
from sklearn.metrics import roc_auc_score

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train a decision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

Overwriting diabetes_pipeline/train_diabetes.py


### 4. Load scripts
Pipeline will load the model from where it was saved, and then register it in the workspace. 

It includes a single **model_folder** parameter that contains the path to the folder where the model was saved by the previous step.

In [5]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

#Load the model
print("Loading Model from" + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace = run.experiment.workspace,
              model_path = model_file,
              model_name = 'diabetes_model',
              tags = {'Training Context' : 'Pipeline'})

run.complete()

Overwriting diabetes_pipeline/register_diabetes.py


### 5. Prepare a Compute Environment for the Pipeline Steps

The pipeline will eventually be published and run on-demand, so it needs a compute environment in which to run.

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "aml-cluster"

# Verify that cluster exists
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           max_nodes=4,
                                                           idle_seconds_before_scaledown=1800)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

Creating
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


#### The compute will require a Python environment with the necessary package dependencies installed, so we'll create a run configuration.

In [7]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-pipeline-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','pandas'],
                                             pip_packages=['azureml-sdk'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case you want to use it again)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-pipeline-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## 6. Create and Run pipeline

Steps for the pipeline

1. Write the model to a folder that can be read from by the second step
2. The **PipelineData** object is a special kind of data reference that is used to pass data from the output of one pipeline step to the input of another, creating a dependency between them.


In [8]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory = experiment_folder, #diabetes_pipeline
                     compute_target = pipeline_cluster,
                     environment_definition = pipeline_run_config.environment,
                     entry_script = 'train_diabetes.py')

# Step 1, run the estimator to train the model
train_step = EstimatorStep(name = 'Train Model',
                           estimator = estimator,
                           estimator_entry_script_arguments = ['--output_folder', model_folder],
                           inputs= [diabetes_ds.as_named_input('diabetes_train')],
                           outputs= [model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)


# Step 2, run the model registration script
register_step = PythonScriptStep(name = 'Register Model',
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs = [model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")


Pipeline steps defined


### 7. Run pipeline as an experiment.

In [9]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

#Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps = pipeline_steps)
print("pipeline is built")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion()

pipeline is built
Created step Train Model [12cdce7b][4ca0401e-e8b3-4df5-b522-f535bb3046e1], (This step will run and generate new outputs)
Created step Register Model [7a9d941f][62ed8b16-28c7-4c26-90a2-3f9195716cbc], (This step will run and generate new outputs)
Submitted PipelineRun c87c4bf3-92fe-42be-8975-216b9ef950b0
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/c87c4bf3-92fe-42be-8975-216b9ef950b0?wsid=/subscriptions/661de708-75b1-41ed-806d-85f9bef3c27d/resourcegroups/dp101-resources/workspaces/dp101-workspace
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: c87c4bf3-92fe-42be-8975-216b9ef950b0
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/c87c4bf3-92fe-42be-8975-216b9ef950b0?wsid=/subscriptions/661de708-75b1-41ed-806d-85f9bef3c27d/resourcegroups/dp101-resources/workspaces/dp101-workspace
PipelineRun Status: Running


StepRunId: e86ba504-a2b8-4e05-8add-4fee0d025d60
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/e86ba504-a2b8-4e05-8add-4fee0d025d60?wsid=/subscriptions/661de708-75b1-41ed-806d-85f9bef3c27d/resourcegroups/dp101-resources/workspaces/dp101-workspace
StepRun( Train Model ) Status: NotStarted
StepRun( Train Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_93c1681e087e8631ff5bdb15f44e64999088428d2aad04f77bb993e77c803274_d.txt
2020-06-07T15:29:27Z Starting output-watcher...
2020-06-07T15:29:27Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
4d8355064413fc71256d212d




StepRunId: 66326159-849b-48a9-9cfd-c793b5ba36af
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/66326159-849b-48a9-9cfd-c793b5ba36af?wsid=/subscriptions/661de708-75b1-41ed-806d-85f9bef3c27d/resourcegroups/dp101-resources/workspaces/dp101-workspace
StepRun( Register Model ) Status: NotStarted
StepRun( Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_93c1681e087e8631ff5bdb15f44e64999088428d2aad04f77bb993e77c803274_d.txt
2020-06-07T15:34:06Z Starting output-watcher...
2020-06-07T15:34:06Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
d69c9a251aaa0da9b02d76de4c6845e12c6b20bf792b8869c8d5c4f5dc10f055

Streaming azureml-logs/65_job_prep-tvmps_93c1681e087e8631ff5bdb15f44e64999088428d2aad04f77bb993e77c803274_d.txt
Entering job preparation. Current time:2020-06-07T15:34:10.244126
Starting job preparation. Current time:2020-06-07T15:34:11.601674
Extracting the control code.
fetching and e



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'c87c4bf3-92fe-42be-8975-216b9ef950b0', 'status': 'Completed', 'startTimeUtc': '2020-06-07T15:26:01.501535Z', 'endTimeUtc': '2020-06-07T15:34:42.652478Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://dp101worstoragea3d647ef0.blob.core.windows.net/azureml/ExperimentRun/dcid.c87c4bf3-92fe-42be-8975-216b9ef950b0/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=e6TbfCxvS3ccjuxeypT%2BkZKADNbo%2Btloxes0MvQsMgY%3D&st=2020-06-07T15%3A24%3A45Z&se=2020-06-07T23%3A34%3A45Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://dp101worstoragea3d647ef0.blob.core.windows.net/azureml/ExperimentRun/dcid.c87c4bf3-92fe-42be-8975-216b9ef950b0/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=X6EKDMHeFIMpdeeQOFgKOgZyZmAzwCxRAxrouDtpa1E%3D&st=2020-06-07T15%3A24%3A45Z&se=2020-06-07T2

'Finished'

When the pipeline has finished, a new model should be registered with a **Training context tag** indicating it was trained in a pipeline. Run the following code to verify this.

In [10]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 4
	 Training Context : Pipeline


diabetes_model version: 3
	 Training context : Inline Training
	 AUC : 0.8753594706204287
	 Accuracy : 0.8883333333333333


diabetes_model version: 2
	 Training context : Pipeline


diabetes_model version: 1
	 Training context : Estimator
	 AUC : 0.8483377282451863
	 Accuracy : 0.774




## 8. Publish the Pipeline
Now that you've created a pipeline and verified it works

### 8.1. Publishing it as a REST service.

In [12]:
published_pipeline = pipeline.publish(name = 'Diabetes_Training_Pipeline',
                                     description = 'Trains Diabetes Model',
                                     version = '1.0')

rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://eastus.api.azureml.ms/pipelines/v1.0/subscriptions/661de708-75b1-41ed-806d-85f9bef3c27d/resourceGroups/dp101-resources/providers/Microsoft.MachineLearningServices/workspaces/dp101-workspace/PipelineRuns/PipelineSubmit/534b785a-3a61-4418-82b2-28cad96630b5


### 8.2. Authenticating the request

> To use the endpoint, client applications need to make a **REST call over HTTP**. 
> This **request** must be **authenticated**, so an authorization header is required. 
> A **real application** would require a **service principal** with which to be authenticated, but to test this out, we'll use the authorization header from my current connection to my Azure workspace

In [13]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

### 8.3. Calling the REST interface

To initiate a published endpoint, you make an HTTP request to its REST endpoint, passing an authorization header with a token for a service principal with permission to run the pipeline, and a JSON payload specifying the experiment name. The pipeline is run asynchronously, so the response from a successful REST call includes the run ID. 

In [14]:
import requests
experiment_name = 'Run-diabetes-pipeline'

response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'4f4bd7d9-f205-4998-a096-4e1cd5f422ae'

## 9. RunDetails widget to view the experiment as it runs.

In [15]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
RunDetails(published_pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

### 10. Use pipeline parameters
We can increase the flexibility of a pipeline by defining parameters.  
To define parameters for a pipeline, create a **PipelineParameter** object for each parameter, and specify each parameter in at least one step. 



In [None]:
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)

...

step2 = EstimatorStep(name = 'train model',
                      estimator = sk_estimator,
                      compute_target = 'aml-cluster',
                      inputs=[prepped],
                      estimator_entry_script_arguments=['--folder', prepped,
                                                        '--reg', reg_param])

In [None]:
response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline",
                               "ParameterAssignments": {"reg_rate": 0.1}})

### 11. Scheduling a pipeline for periodic intervals

To schedule a pipeline to run at periodic intervals, you must define a **ScheduleRecurrence** that determines the run frequency, and use it to create a Schedule.

In [None]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Training',
                                        description='trains model every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Training_Pipeline',
                                        recurrence=daily)

### 12. Triggering a pipeline run on data changes
To schedule a pipeline to run whenever data changes, we must create a **Schedule** that monitors a specified path on a datastore, like this:

In [None]:
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline_id,
                                    experiment_name='Training_Pipeline',
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')