In [None]:
experiment_name = 'iris-classification-training'
upload_sample_data = False
register_data = False

In [None]:
#import required packages to build the pipeline artifact
from azureml.core import Experiment, Dataset
from azureml.core.compute import AmlCompute, ComputeTarget, DatabricksCompute
from azureml.core.datastore import Datastore
from azureml.core.runconfig import CondaDependencies, RunConfiguration
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData, PortDataReference
from azureml.pipeline.steps import PythonScriptStep, DatabricksStep
from azureml.core.model import Model


In [None]:
import os
from azureml.core import Workspace

ws = Workspace.from_config()

In [None]:
from azureml.core.compute_target import ComputeTargetException

aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 0, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Aml Compute attached")

In [None]:
# Getting the default blob store (Datastore) for the Azure ML workspace
ds = ws.get_default_datastore()
print("Default Blobstore's name: {}".format(ds.name))
print("Default Blobstore's container name: {}".format(ds.container_name))

Upload and register the iris data to Azure ML to be used in pipelines

In [None]:
# upload iris data
if upload_sample_data:
    ds.upload_files(['../sample_data.csv'], target_path='/data', overwrite=True, show_progress=True)


In [None]:
if register_data:
    # create target dataset 
    registereddata = Dataset.Tabular.from_delimited_files(ds.path('/data/sample_data.csv'))
    # NO TIMESTAMP COLUMN EXISTS
    #target = target.with_timestamp_columns('datetime')
    # register the target dataset
    registereddata = registereddata.register(ws, 'iris-data')

In [None]:
dataset = Dataset.get_by_name(ws,name='iris-data',version='latest')

In [None]:
dataset_ref = DataReference(
    datastore=ds,
    data_reference_name='irisdata',
    path_on_datastore="data/sample_data.csv")
print("DataReference object created")


model_output = PipelineData("model_output",datastore=ds)
print("PipelineData object created for models")

#### Create Pipeline Steps to be executed each time the pipeline runs

In [None]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True
# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(
    pip_packages=['azureml-sdk','sklearn', 'scipy', 'numpy', 'pandas'],
    conda_packages=['matplotlib'])

In [None]:
# run the transformation script to produce the intermediate data that will go to the inferencing step
trainingScript = PythonScriptStep(
    script_name="iris_supervised_model.py", 
    inputs=[dataset_ref],
    outputs=[model_output],
    compute_target=aml_compute, 
    source_directory=".",
    runconfig=run_config
)
print("trainingStep created")

### Using the output
In the previous PythonScriptStep, a PipelineOutputFileDataset was created as an output and assigned to "model_output". Doc is here: https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline_output_dataset.pipelineoutputfiledataset?view=azure-ml-py

In [None]:
# run the transformation script to produce the intermediate data that will go to the inferencing step
registerModelStep = PythonScriptStep(
    script_name="register_model.py", 
    arguments=["--model_name", "iris_classifier_model","--training_step_name","iris_supervised_model.py"],
    inputs=[dataset_ref,model_output],
    #outputs=[model_output],
    compute_target=aml_compute, 
    source_directory=".",
    runconfig=run_config
)
print("registerModelStep created")

In [None]:
iris_train_pipeline = Pipeline(workspace=ws, steps=[trainingScript,registerModelStep])
print ("Pipeline is built")

In [None]:
exp = Experiment(ws,experiment_name)
exp.set_tags({'automl':'no','working':'no'})

pipeline_run1 = exp.submit(iris_train_pipeline)
print("Pipeline is submitted for execution")

In [None]:
pipeline_run1.wait_for_completion()

In [None]:
# Get Steps
model_type=None
model_accuracy=None
run_id=0
for step in pipeline_run1.get_steps():
    print("Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()

    if step.name == 'iris_supervised_model.py':
        #step.download_file('model_output',output_file_path='.')
        model_type = step.get_properties()['best_model']
        model_accuracy = float(step.get_properties()['accuracy'])
        run_id = step.id
    for name, output in output_dict.items():
        
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

            


## Model Registration
The below code is an example of how to register a model, in the automated code, this is completed in register_model.py instead of train_pipeline.py

In [None]:

from azureml.core import Model
from azureml.core.resource_configuration import ResourceConfiguration
import sklearn

model_name = 'iris-model'
model_path = 'model_output'

try:
    model = Model(ws, model_name)
    current_accuracy = float(model.properties["accuracy"])
except:
    current_accuracy = 0

print("current accuracy",current_accuracy)
if model_accuracy > current_accuracy:
    print("model is better")
    model = Model.register(workspace=ws,
                       model_name=model_name,                # Name of the registered model in your workspace.
                       model_path=model_path,  # Local file to upload and register as a model.
                       model_framework=Model.Framework.SCIKITLEARN,  # Framework used to create the model.
                       model_framework_version=sklearn.__version__,  # Version of scikit-learn used to create the model.
                       sample_input_dataset=dataset,
                       #sample_output_dataset=output_dataset,
                       resource_configuration=ResourceConfiguration(cpu=1, memory_in_gb=0.5),
                       description='basic iris classification',
                       tags={'quality': 'good', 'type': 'classification'})
    model.add_properties({"accuracy":model_accuracy,"model_type":model_type})
    model.experiment_name=experiment_name
    model.run_id = run_id



## Pipeline Publish and Schedule
train_pipeline.py needs to be modified to perform publishing (change pipeline=True) to be published from the automated run. The below cells can be performed interactively after the pipeline above is submitted if desired

In [None]:
published_pipeline = iris_train_pipeline.publish(name="iris_training_demo", description="Iris Classification Demo", continue_on_step_failure=True)
published_pipeline

In [None]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

recurrence = ScheduleRecurrence(frequency="Day", interval=1, hours=[22], minutes=[30]) # Runs every day at 10:30pm

schedule = Schedule.create(workspace=ws, name="iris_training_demo_schedule",
                           pipeline_id=published_pipeline.id, 
                           experiment_name='iris_training_demo_daily_schedule_run',
                           recurrence=recurrence,
                           wait_for_provisioning=True,
                           description="iris training demo daily Schedule Run")