In [None]:
!pip install --upgrade azureml-sdk

In [None]:
!pip install --upgrade scikit-learn

In [1]:
import azureml.core
from azureml.core import Workspace

# check core SDK version number
print("Azure ML SDK Version: ", azureml.core.VERSION)


Azure ML SDK Version:  1.5.0


In [2]:
# sckit learn version
import sklearn
print(sklearn.__version__)

0.20.3


## Initialize workspace

In [3]:
# load workspace configuration from the config.json file in the current folder.
ws = Workspace.from_config()

print(ws.name, ws.location, ws.resource_group, ws.location, sep='\t')


eacbmlservicews	westus2	adlsgen2	westus2


## Create an experiment

In [4]:
experiment_name = 'diabetes_pipeline_dxd'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

exp


Name,Workspace,Report Page,Docs Page
diabetes_pipeline_exp,sandboxaml,Link to Azure Machine Learning studio,Link to Documentation


## Upload dataset

In [4]:
from azureml.core import Dataset

diabetes_dataset = Dataset.get_by_name(ws, name='diabetes')


In [5]:
diabetes = diabetes_dataset.to_pandas_dataframe().drop("Path", axis=1)

In [6]:
diabetes.head()

Unnamed: 0,AGE,SEX,BMI,BP,S1,S2,S3,S4,S5,S6,Y
0,59,2,32.1,101.0,157,93.2,38.0,4.0,4.8598,87,151
1,48,1,21.6,87.0,183,103.2,70.0,3.0,3.8918,69,75
2,72,2,30.5,93.0,156,93.6,41.0,4.0,4.6728,85,141
3,24,1,25.3,84.0,198,131.4,40.0,5.0,4.8903,89,206
4,50,1,23.0,101.0,192,125.4,52.0,4.0,4.2905,80,135


In [7]:
diabetes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 442 entries, 0 to 441
Data columns (total 11 columns):
AGE    442 non-null int64
SEX    442 non-null int64
BMI    442 non-null float64
BP     442 non-null float64
S1     442 non-null int64
S2     442 non-null float64
S3     442 non-null float64
S4     442 non-null float64
S5     442 non-null float64
S6     442 non-null int64
Y      442 non-null int64
dtypes: float64(6), int64(5)
memory usage: 38.1 KB


In [8]:
diabetes.describe()

Unnamed: 0,AGE,SEX,BMI,BP,S1,S2,S3,S4,S5,S6,Y
count,442.0,442.0,442.0,442.0,442.0,442.0,442.0,442.0,442.0,442.0,442.0
mean,48.5181,1.468326,26.375792,94.647014,189.140271,115.43914,49.788462,4.070249,4.641411,91.260181,152.133484
std,13.109028,0.499561,4.418122,13.831283,34.608052,30.413081,12.934202,1.29045,0.522391,11.496335,77.093005
min,19.0,1.0,18.0,62.0,97.0,41.6,22.0,2.0,3.2581,58.0,25.0
25%,38.25,1.0,23.2,84.0,164.25,96.05,40.25,3.0,4.2767,83.25,87.0
50%,50.0,1.0,25.7,93.0,186.0,113.0,48.0,4.0,4.62005,91.0,140.5
75%,59.0,2.0,29.275,105.0,209.75,134.5,57.75,5.0,4.9972,98.0,211.5
max,79.0,2.0,42.2,133.0,301.0,242.4,99.0,9.09,6.107,124.0,346.0


In [9]:
features_names = ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6']
target = "Y"

In [10]:
diabetes.to_csv("datasets/diabetes.csv", index=False)

In [12]:
from azureml.core import Datastore

#https://docs.microsoft.com/fr-fr/azure/machine-learning/how-to-create-your-first-pipeline

# Default datastore 
def_data_store = ws.get_default_datastore()

# Get the blob storage associated with the workspace
def_blob_store = Datastore(ws, "workspaceblobstore")

# Get file storage associated with the workspace
def_file_store = Datastore(ws, "workspacefilestore")

In [13]:
def_blob_store.upload_files(
    ["datasets/diabetes.csv"],
    target_path="datasets",
    overwrite=True)

Uploading an estimated of 1 files
Uploading datasets/diabetes.csv
Uploaded datasets/diabetes.csv, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_08a4c5aeadc0439b9a501879de6b4e5d

In [14]:
from azureml.core import Dataset

diabetes_tabular_dataset = Dataset.Tabular.from_delimited_files([(def_blob_store, 'datasets/diabetes.csv')])

In [15]:
diabetes_registred_dataset = diabetes_tabular_dataset.register(workspace=ws,
                                 name='my_diabetes',
                                 description='Diabetes training data'
                                 , create_new_version=True)

In [16]:
input_named = diabetes_tabular_dataset.as_named_input('diabetes_input')

## Create a (remote) target compute

In [None]:
# Compute target creation

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cpu_cluster_name = "myComputeCluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print(" Cluster already exists")
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',
                                                           min_nodes=0, max_nodes=4)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True, min_node_count=0, timeout_in_minutes=30)

In [17]:
# Retrieve existing compute target

from azureml.core.compute import ComputeTarget

compute_target_name = "ds3-compute-cls"
compute_target = ComputeTarget(workspace=ws, name=compute_target_name)

print(compute_target.provisioning_state)

Succeeded


In [None]:
#### NE PAS EXECUTER : l'environnement sera défini plus bas ####

# Set up the (compute target) environnement

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies


env = Environment("diabetes_remote_env")

env.docker.enabled = True
#env.docker.base_image = DEFAULT_GPU_IMAGE

env.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn',
                                                                          'pandas',
                                                                          'numpy',
                                                                          'joblib'
                                                                         ])
env.python.conda_dependencies.add_pip_package("inference-schema[numpy-support]")

env.python.conda_dependencies.save_to_file(".", "diabetes_env.yml")


### Pipeline input & output

In [18]:
from azureml.core import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core.graph import PipelineParameter

# Default datastore (Azure blob storage)
# def_blob_store = ws.get_default_datastore()
# def_blob_store = Datastore(ws, "diabetesblobstore")

In [19]:
# Define intermediate data using PipelineData
processed_data = PipelineData("processed_data", datastore=def_blob_store).as_dataset()
final_data = PipelineData("final_data" ,datastore=def_blob_store).as_dataset()

print("PipelineData objects created")

PipelineData objects created


In [20]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

# Create a new runconfig object
aml_run_config = RunConfiguration()

# Use the aml_compute you created above. 
aml_run_config.target = compute_target

# Enable Docker
aml_run_config.environment.docker.enabled = True

# Set Docker base image to the default CPU-based image
aml_run_config.environment.docker.base_image = "mcr.microsoft.com/azureml/base:0.2.1"

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn','numpy','joblib'], 
    pip_packages=['azureml-sdk[automl,explain]'])

print ("Run configuration created.")

Run configuration created.


## Pipeline

The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the source_directory for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the source_directory would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the source_directory of the step.

In [51]:
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

# Syntax
# PythonScriptStep(
#     script_name, 
#     name=None, 
#     arguments=None, 
#     compute_target=None, 
#     runconfig=None, 
#     inputs=None, 
#     outputs=None, 
#     params=None, 
#     source_directory=None, 
#     allow_reuse=True, 
#     version=None, 
#     hash_paths=None)
# This returns a Step

#[diabetes_input] is repeated. Input port names must be unique.

train_step = PythonScriptStep(
    script_name="step1.py",
    arguments=["--regularization", 0.5, "--output", processed_data], #"--input", input_named, 
    inputs=[input_named],
    outputs=[processed_data],
    compute_target=compute_target,
    runconfig=aml_run_config,
    source_directory="remote_train",
    allow_reuse=True
)

#La réutilisation des résultats précédents (allow_reuse) est essentielle lors de l’utilisation des pipelines dans un environnement de collaboration. En effet, cela permet de supprimer les réexécutions inutiles et ainsi d’offrir une grande souplesse. La réutilisation est le comportement par défaut quand le nom du script, les entrées et les paramètres d’une étape restent les mêmes. Lorsque la sortie de l’étape est réutilisée, le travail n’est pas envoyé pour le calcul. À la place, les résultats de l’exécution précédente sont immédiatement disponibles pour l’exécution de l’étape suivante.
#Si allow_reuse a la valeur false, une nouvelle exécution sera toujours générée pour cette étape pendant l’exécution du pipeline.



In [52]:
old_score  = 0.8

register_step = PythonScriptStep(
    script_name="step2.py",
    arguments=["--old_score", old_score, "--output", final_data], #"--input", processed_data, 
    inputs=[processed_data.parse_delimited_files(file_extension=None)],
    outputs=[final_data],
    compute_target=compute_target,
    runconfig=aml_run_config,
    source_directory="remote_train"
)


In [53]:
pipeline_steps = [train_step, register_step]

In [None]:
from azureml.train.estimator import Estimator

script_params = {
    '--regularization': 0.5
}

estimator = Estimator(source_directory=script_folder,
              script_params=script_params,
              compute_target=compute_target,
              environment_definition=env,
              entry_script='train.py')


In [None]:
from azureml.pipeline.steps import EstimatorStep

est_step = EstimatorStep(name="Estimator_Train", 
                         estimator=estimator, 
                         estimator_entry_script_arguments=["--datadir", input_data, "--output", output],
                         runconfig_pipeline_params=None, 
                         inputs=[input_data], 
                         outputs=[output], 
                         compute_target=compute_target)

#### Publish the pipelie ?

In [54]:
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
pipeline.validate() #?
print ("Pipeline is built")

Step step2.py is ready to be created [0fef4f4e]
Pipeline is built


In [55]:
published_pipeline = pipeline.publish(
    name="diabetes-pipeline4",
    description="Diabetes training scoring pipeline"
)
published_pipeline

Created step step1.py [a46aa89c][87b7e1b6-fc30-44fe-a801-4dc9c548289f], (This step is eligible to reuse a previous run's output)
Created step step2.py [0fef4f4e][95444224-f9d1-41cb-9be3-fa24e5042e85], (This step will run and generate new outputs)


Name,Id,Status,Endpoint
diabetes-pipeline4,69913165-4087-43ee-afdd-8adc7b22e21e,Active,REST Endpoint


In [56]:
published_pipeline.submit(ws, 'diabetes-pipeline4')

Submitted PipelineRun a216e726-652c-4c0f-8660-82db9c2b00b6
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-pipeline4/runs/a216e726-652c-4c0f-8660-82db9c2b00b6?wsid=/subscriptions/f80606e5-788f-4dc3-a9ea-2eb9a7836082/resourcegroups/rg-sandbox/workspaces/sandboxaml


Experiment,Id,Type,Status,Details Page,Docs Page
diabetes-pipeline4,a216e726-652c-4c0f-8660-82db9c2b00b6,azureml.PipelineRun,NotStarted,Link to Azure Machine Learning studio,Link to Documentation


#### Run the pipeline ?

In [28]:
pipeline_run = exp.submit(pipeline, regenerate_outputs=False)
pipeline_run

NameError: name 'pipeline' is not defined

In [29]:
from azureml.widgets import RunDetails

RunDetails(pipeline_run).show()

NameError: name 'pipeline_run' is not defined

In [None]:
# specify show_output to True for a verbose log
pipeline_run.wait_for_completion(show_output=True) 

In [None]:
step_runs = pipeline_run.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

## Scheduler

In [None]:
from azureml.pipeline.core import Pipeline, PublishedPipeline

published_pipelines = PublishedPipeline.list(ws)

for published_pipeline in  published_pipelines:
    print(f"{published_pipeline.name},'{published_pipeline.id}'")


In [None]:
from azureml.pipeline.core import Schedule, ScheduleRecurrence

pipeline_id = '092268ad-992c-43bc-af61-549c85fc3b68'

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",
                            pipeline_id=pipeline_id, 
                            experiment_name=experiment_name, 
                            recurrence=recurrence)

In [None]:
pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()


## Register the model

In [None]:
print(run.get_file_names())

In [None]:
# register model 
model = run.register_model(model_name='diabetes_ridge_regression', model_path='outputs/diabetes_reg_remote_model.pkl')

print(model.name, model.id, model.version, sep='\t')

## Create a scoring script

In [None]:
%%writefile score.py

import joblib
import numpy as np
import os

from inference_schema.schema_decorators import input_schema, output_schema
from inference_schema.parameter_types.numpy_parameter_type import NumpyParameterType


# The init() method is called once, when the web service starts up.
#
# Typically you would deserialize the model file, as shown here using joblib,
# and store it in a global variable so your run() method can access it later.
def init():
    global model

    # The AZUREML_MODEL_DIR environment variable indicates
    # a directory containing the model file you registered.
    model_filename = 'diabetes_regression_model.pkl'
    model_path = os.path.join(os.environ['AZUREML_MODEL_DIR'], model_filename)

    model = joblib.load(model_path)


# The run() method is called each time a request is made to the scoring API.
#
# Shown here are the optional input_schema and output_schema decorators
# from the inference-schema pip package. Using these decorators on your
# run() method parses and validates the incoming payload against
# the example input you provide here. This will also generate a Swagger
# API document for your web service.
@input_schema('data', NumpyParameterType(np.array([[59, 2, 32.1, 101.0, 157, 93.2, 38.0, 4.0, 4.8598, 87]])))
@output_schema(NumpyParameterType(np.array([151.000])))
def run(data):
    # Use the model object loaded by init().
    result = model.predict(data)

    # You can return any JSON-serializable object.
    return result.tolist()

## Define the (inference) environement

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies


environment = Environment('my-sklearn-environment')
environment.python.conda_dependencies = CondaDependencies.create(pip_packages=[
    'azureml-defaults',
    'inference-schema[numpy-support]',
    'joblib',
    'numpy',
    'scikit-learn'
])


## Define a inference configuration

In [None]:
from azureml.core.model import InferenceConfig


inference_config = InferenceConfig(entry_script='score.py', environment=environment)

## Deploy in a custom environment

In [None]:
from azureml.core import Webservice
from azureml.core.webservice import AciWebservice
from azureml.exceptions import WebserviceException


service_name = 'diabetes-custom-service3'

# Remove any existing service under the same name.
try:
    Webservice(ws, service_name).delete()
except WebserviceException:
    pass

aci_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)

service = Model.deploy(workspace=ws,
                       name=service_name,
                       models=[model],
                       inference_config=inference_config,
                       deployment_config=aci_config)

service.wait_for_deployment(show_output=True)


In [None]:
# reload the model

from azureml.core import Webservice


service_name = 'diabetes-custom-service3'
service = Webservice(ws, service_name)

print(service.name, service.scoring_uri, sep='\t')


## Test the service

In [None]:
import json

input_payload = json.dumps({
    'data': [
        [59, 2, 32.1, 101.0, 157, 93.2, 38.0, 4.0, 4.8598, 87],
        [69, 2, 32.1, 101.0, 157, 93.2, 38.0, 4.0, 4.8598, 87]
    ],
    'method': 'predict'  # If you have a classification model, you can get probabilities by changing this to 'predict_proba'.
})

output = service.run(input_payload)

print(output)


In [None]:
#service.delete()

### Run published pipeline using REST endpoint

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

rest_endpoint1 = published_pipeline.endpoint

print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint1))

# specify the param when running the pipeline
response = requests.post(rest_endpoint1, 
                         headers=aad_token, 
                         json={"ExperimentName": "My_Pipeline1",
                               "RunSource": "SDK",
                               "ParameterAssignments": {"pipeline_arg": 45}})

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)