# Pipelines avec Azure ML

<img src='https://github.com/retkowsky/images/blob/master/AzureMLservicebanniere.png?raw=true'>

Azure Machine Learning pipelines allow you to create workflows in your machine learning projects. <br>These workflows have a number of benefits:
<br>
- Simplicity<br>
- Speed<br>
- Repeatability<br>
- Flexibility<br>
- Versioning and tracking<br>
- Modularity<br>
- Quality assurance<br>
- Cost control<br>

> https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines

## 1. Infos

In [68]:
import sys
sys.version

'3.6.9 |Anaconda, Inc.| (default, Jul 30 2019, 19:07:31) \n[GCC 7.3.0]'

In [69]:
import datetime
now = datetime.datetime.now()
print(now)

2020-03-09 11:23:39.006607


In [70]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.widgets import RunDetails

print("Version Azure ML service : ", azureml.core.VERSION)

Version Azure ML service :  1.0.83


In [72]:
# Chargement config workspace
ws = Workspace.from_config()

## 2. Données

In [74]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()
default_ds.upload_files(files=['./donnees/diabetes.csv', './donnees/diabetes2.csv'], # Upload the diabetes csv files in /data
                       target_path='diabetes-data/', # Put it in a folder path in the datastore
                       overwrite=True, # Replace existing files of the same name
                       show_progress=True)

#Create a tabular dataset from the path on the datastore (this may take a short while)
tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

# Register the tabular dataset
tab_data_set = tab_data_set.register(workspace=ws, 
                           name='diabetes dataset',
                           description='diabetes data',
                           tags = {'format':'CSV'},
                           create_new_version=True)

print('OK')

Uploading an estimated of 2 files
Uploading ./donnees/diabetes.csv
Uploading ./donnees/diabetes2.csv
Uploaded ./donnees/diabetes2.csv, 1 files out of an estimated total of 2
Uploaded ./donnees/diabetes.csv, 2 files out of an estimated total of 2
Uploaded 2 files
OK


In [75]:
from azureml.core import ComputeTarget, Datastore, Dataset

print("Compute Targets :")
for compute_name in ws.compute_targets:
    compute = ws.compute_targets[compute_name]
    print("\t", compute.name, ':', compute.type)
    
print("Datastores :")
for datastore_name in ws.datastores:
    datastore = Datastore.get(ws, datastore_name)
    print("\t", datastore.name, ':', datastore.datastore_type)
    
print("Datasets :")
for dataset_name in list(ws.datasets.keys()):
    dataset = Dataset.get_by_name(ws, dataset_name)
    print("\t", dataset.name)

Compute Targets :
	 automl2 : AmlCompute
	 cpu-cluster : AmlCompute
	 gpu-cluster : AmlCompute
	 train-many-model : AmlCompute
	 gpu-cluster2 : AmlCompute
	 aml-cluster : AmlCompute
	 pipeline : AmlCompute
Datastores :
	 workspaceblobstore : AzureBlob
	 workspacefilestore : AzureFile
	 azureml_globaldatasets : AzureBlob
	 training_output_datastore : AzureBlob
	 forecasting_output_datastore : AzureBlob
Datasets :
	 diabetes dataset
	 holidays
	 test
	 MD-Titanic_Designer-Train_Model-Trained_model-75a42e2a
	 Cars Data
	 USAHousing
	 USAHousing-Test
	 USAHousing-Train
	 PollutionChinaUS
	 Temperatures
	 Population
	 Iris
	 Diabetes
	 GermanCreditRisk
	 MD-titanic-visual-Train_Model-Trained_model-a7f08033
	 oj_data_small
	 oj_data
	 target
	 connected_car_components
	 glove_6B_100d
	 MD-Titanic_Designer-Train_Model-Trained_model-f35ce2b3
	 Titanic
	 mnist dataset
	 machineData_test_dataset
	 machineData_train_dataset


## 3. Création des scripts pour le pipeline

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like an Auto ML training estimator or a data transfer step that copies data from one location to another. Each step can run in its own compute context.

In this exercise, you'll build a simple pipeline that contains an estimator step (to train a model) and a Python script step (to register the trained model).

In [76]:
import os

# Create a folder for the pipeline step files
experiment_folder = 'WorkshopPipelines'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

WorkshopPipelines


In [77]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()


Overwriting WorkshopPipelines/train_diabetes.py


In [78]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()


Overwriting WorkshopPipelines/register_diabetes.py


## 4. Création Azure ML compute et environnement


In [81]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "pipeline"

# Verify that cluster exists
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If not, create it
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D4_V2', 
                                                           #vm_priority='lowpriority', 
                                                           max_nodes=4)
    pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)

pipeline_cluster.wait_for_completion(show_output=True)

Creating
Succeeded
AmlCompute wait for completion finished
Minimum number of nodes requested have been provisioned


The compute will require a Python environment with the necessary package dependencies installed, so we'll create a run configuration.

In [82]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-experiment-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib', 'pandas'],
                                             pip_packages=['azureml-sdk','pyarrow'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case previous lab wasn't completed)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-experiment-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## 5. Création et exécution du pipeline

Now we're ready to create and run a pipeline.

First we need to define the steps for the pipeline, and any data references that need to passed between them. In this case, the first step must write the model to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **PipelineData** object is a special kind of data reference that is used for interim storage locations that can be passed between pipeline steps, so we'll create one and use at as the output for the first step and the input for the second step. Note that we also need to pass it as a script argument so our code can access the datastore location referenced by the data reference.

In [83]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

train_step = EstimatorStep(name = "1. Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "2. Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


OK, we're ready to go. let's build the pipeline from the steps we've defined and run it as an experiment.

In [84]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'workshop7-Pipeline-Diabetes')

pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")

RunDetails(pipeline_run).show()

Pipeline is built.
Created step 1. Train Model [2431fc04][56d8089f-102a-4d09-a2ae-24e592639d6f], (This step will run and generate new outputs)
Created step 2. Register Model [8caf4e8f][f3ae3a42-bdf7-4235-8d54-18f99b0f129b], (This step will run and generate new outputs)
Submitted PipelineRun de52e151-c1e0-436d-ac7b-cfb39bf1ed30
Link to Azure Machine Learning studio: https://ml.azure.com/experiments/workshop7-Pipeline-Diabetes/runs/de52e151-c1e0-436d-ac7b-cfb39bf1ed30?wsid=/subscriptions/70b8f39e-8863-49f7-b6ba-34a80799550c/resourcegroups/workshopaml2020RG/workspaces/workshop-aml-2020
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', '…

In [87]:
# Pour connaitre le statut du run du pipeline

step_runs = pipeline_run.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script :', step_run.name, '- Statut =', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

Script : 2. Register Model - Statut = Finished
Script : 1. Train Model - Statut = Finished


The widget above shows details of the pipeline as it runs. You can also monitor pipeline runs in the **Experiments** page in [Azure Machine Learning studio](https://ml.azure.com).


In [88]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 2
	 Training context : Pipeline


diabetes_model version: 1
	 Training context : Pipeline


Workshop2-AutoML-model version: 4
	 Training context : Azure Auto ML
	 AUC : 0.9992372509299262
	 Accuracy : 0.9646507603948424


Workshop2-AutoML-model version: 3
	 Training context : Azure Auto ML
	 AUC : 0.9992372509299262
	 Accuracy : 0.9646507603948424


Workshop2-AutoML-model version: 2
	 Training context : Azure Auto ML
	 AUC : 0.9992372509299262
	 Accuracy : 0.9646507603948424


Workshop2-AutoML-model version: 1
	 Training context : Auto ML
	 AUC : 0.9992372509299262
	 Accuracy : 0.9646507603948424


Nist-AutoML-model version: 1
	 Training context : Auto ML
	 AUC : 0.9992372509299262
	 Accuracy : 0.9646507603948424


IBM_attrition_explainer version: 6


local_deploy_model version: 6


sklearn_regression_model.pkl version: 6
	 area : diabetes
	 type : regression


Modele_TensorFlow version: 3


sklearn_regression_model.pkl version: 5
	 area : diabetes
	 type : regr

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.

## 6. Publication du pipeline

In [89]:
# Get the most recent run of the pipeline
experiment_name = 'workshop7-Pipeline-Diabetes'

pipeline_experiment = ws.experiments.get(experiment_name)
pipeline_run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="Workshop7_Training_Pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
Workshop7_Training_Pipeline,bd6a83cb-6346-45f4-a038-65602baf4dbd,Active,REST Endpoint


### API du pipeline

In [90]:
rest_endpoint = published_pipeline.endpoint
print("Endpoint du pipeline :")
print(rest_endpoint)

Endpoint du pipeline :
https://westeurope.aether.ms/api/v1.0/subscriptions/70b8f39e-8863-49f7-b6ba-34a80799550c/resourceGroups/workshopaml2020RG/providers/Microsoft.MachineLearningServices/workspaces/workshop-aml-2020/PipelineRuns/PipelineSubmit/bd6a83cb-6346-45f4-a038-65602baf4dbd


In [91]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print(auth_header)

{'Authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6IkhsQzBSMTJza3hOWjFXUXdtak9GXzZ0X3RERSIsImtpZCI6IkhsQzBSMTJza3hOWjFXUXdtak9GXzZ0X3RERSJ9.eyJhdWQiOiJodHRwczovL21hbmFnZW1lbnQuY29yZS53aW5kb3dzLm5ldC8iLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC83MmY5ODhiZi04NmYxLTQxYWYtOTFhYi0yZDdjZDAxMWRiNDcvIiwiaWF0IjoxNTgzNzUxMjIyLCJuYmYiOjE1ODM3NTEyMjIsImV4cCI6MTU4Mzc1NTEyMiwiYWNyIjoiMSIsImFpbyI6IkFWUUFxLzhPQUFBQVIwNFBPVkxEZi9Gek45aDVHQ2tyOUVGNDRPekZ0bjBWcG5ibFFFV3FxdGowbThxNTlFNVEzaDBqWFJobDhDQVI5UTBEdEJlTXpicUlycFVXU3QyN01KMTFWQ1plbTY3dHpSQUI5aFRBaldzPSIsImFtciI6WyJ3aWEiLCJtZmEiXSwiYXBwaWQiOiIwNGIwNzc5NS04ZGRiLTQ2MWEtYmJlZS0wMmY5ZTFiZjdiNDYiLCJhcHBpZGFjciI6IjAiLCJmYW1pbHlfbmFtZSI6IlJldGtvd3NreSIsImdpdmVuX25hbWUiOiJTZXJnZSIsImdyb3VwcyI6WyI3YzAwZDUyYy1mMmI4LTRjYjctYjkyMy0zMmY3MTg5ZjQzNTEiLCI2NzE0ZjczMy0wNjVlLTQ3ZjctYmZjNy05OTZkNWQyYjYwOGMiLCJkNzYyNGNiOC1lMDAyLTRlZDktYjRiNS1kM2RiNGE3Njk0NTUiLCJlZGM5YzlmZS00ZjFkLTQyOTUtYmIwNC00OGQ1MWYxMTE3YzQiLCJmMzg1NGE5MS1hMTc2LTRmOTEtODY4OS1kMWViN

In [92]:
# Run Id du pipeline
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'605aec5a-78cd-45bb-875c-d3e1d6cf21ae'

In [93]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
RunDetails(published_pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', '…

### Planification du pipeline

In [94]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Planification du pipeline tous les lundis à 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-trainingpipeline", 
                                  description="Pipeline hebdomadaire",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name=experiment_name, 
                                  recurrence=recurrence)

In [95]:
# Visualisation des planifications
schedules = Schedule.list(ws)
schedules

[Pipeline(Name: weekly-diabetes-trainingpipeline,
 Id: 695e9c1e-7efe-4c16-a8c3-2f582ed11422,
 Status: Active,
 Pipeline Id: bd6a83cb-6346-45f4-a038-65602baf4dbd,
 Recurrence Details: Runs at 0:00 on Monday every Week),
 Pipeline(Name: weekly-diabetes-trainingpipeline,
 Id: 78f21292-2029-4159-8733-5abf2699c44b,
 Status: Active,
 Pipeline Id: 53860773-63b0-4606-b166-e7f6f91f383e,
 Recurrence Details: Runs at 0:00 on Monday every Week),
 Pipeline(Name: weekly-diabetes-training,
 Id: 481d4203-511c-4ff6-89c5-59c6fa295ac4,
 Status: Active,
 Pipeline Id: 19ff4a91-3202-415c-9fa8-c1a7ac353993,
 Recurrence Details: Runs at 0:00 on Monday every Week),
 Pipeline(Name: Forecasting-Pipeline-Recurring-Schedule,
 Id: 369212f9-3e61-4e21-ba26-2751d3356cee,
 Status: Active,
 Pipeline Id: 012358c5-6ad2-4d8a-9ccd-aa075f45c081,
 Recurrence Details: Runs every Week),
 Pipeline(Name: training_pipeline_recurring_schedule,
 Id: e7b6e252-4d6a-4b0d-a253-da9e3ade7cc3,
 Status: Active,
 Pipeline Id: 713d3d29-6146-4

In [96]:
pipeline_experiment = ws.experiments.get(experiment_name)
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

{'runId': '605aec5a-78cd-45bb-875c-d3e1d6cf21ae',
 'status': 'Completed',
 'startTimeUtc': '2020-03-09T11:40:07.396664Z',
 'endTimeUtc': '2020-03-09T11:40:11.667294Z',
 'properties': {'azureml.runsource': 'azureml.PipelineRun',
  'runSource': 'Unavailable',
  'runType': 'HTTP',
  'azureml.parameters': '{}',
  'azureml.pipelineid': 'bd6a83cb-6346-45f4-a038-65602baf4dbd'},
 'inputDatasets': [],
 'logFiles': {'logs/azureml/executionlogs.txt': 'https://workshopaml2026611270317.blob.core.windows.net/azureml/ExperimentRun/dcid.605aec5a-78cd-45bb-875c-d3e1d6cf21ae/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=ckUN7L1XNJN18GXB6kkN%2Fkws3KgXkhJY2pSm9jOhhh0%3D&st=2020-03-09T11%3A30%3A14Z&se=2020-03-09T19%3A40%3A14Z&sp=r',
  'logs/azureml/stderrlogs.txt': 'https://workshopaml2026611270317.blob.core.windows.net/azureml/ExperimentRun/dcid.605aec5a-78cd-45bb-875c-d3e1d6cf21ae/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=CSrhtNhu3Xu2rT7I0Absbjj0UEhGCnKu1JbMfe1PYj0%3D&st=2020-03-09T11%3A

> https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-parallel-run-step

<img src="https://github.com/retkowsky/images/blob/master/Powered-by-MS-Azure-logo-v2.png?raw=true" height="300" width="300">