In [None]:
#Connect to AML Workspace
from azureml.core import Workspace
subscription_id = os.getenv("SUBSCRIPTION_ID", default="")
resource_group = os.getenv("RESOURCE_GROUP", default="")
workspace_name = os.getenv("WORKSPACE_NAME", default="")
workspace_region = os.getenv("WORKSPACE_REGION", default="")

try:
    ws = Workspace.from_config()
    # ws = Workspace(subscription_id=subscription_id, 
    #                resource_group=resource_group, 
    #                workspace_name=workspace_name)
    print("Workspace configuration succeeded. Skip the workspace creation steps below")
except:
    print("Workspace does not exist. Creating workspace")
    ws = Workspace.create(name=workspace_name, subscription_id=subscription_id, resource_group=resource_group,
                            location=workspace_region, create_resource_group=True, sku='enterprise', exist_ok=True)

#Select AML Compute Cluster
cpu_cluster_name = 'cpucluster'

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D13_V2',
                                                           min_nodes=0,
                                                           max_nodes=10)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)

#Create Experiment
from azureml.core import Experiment
experiment = Experiment(ws, 'unified-dataprep-pipeline')

#Get Default AML Datastore
from azureml.core import Datastore
ds = ws.get_default_datastore()

In [None]:
#Set Up Data Prep Pipeline Step
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data import OutputFileDatasetConfig

# create a new runconfig object
dataprep_run_config = RunConfiguration()

# enable Docker 
dataprep_run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
dataprep_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
dataprep_run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
dataprep_run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['requests', 'pandas'])
dataprep_run_config.environment.python.conda_dependencies.add_pip_package('snowflake-connector-python[pandas]')
dataprep_run_config.environment.python.conda_dependencies.add_pip_package('azureml-opendatasets')

#Configure three output datasets to be rgistered in the workspace (source data, training data, and inferencing data)
inference_dataset = OutputFileDatasetConfig(name='oj_inference_data', destination=(ds, 'oj_inference_data/{run-id}')).register_on_complete(name='oj_inference_data')
train_dataset = OutputFileDatasetConfig(name='oj_train_data',destination=(ds, 'oj_train_data/{run-id}')).register_on_complete(name='oj_train_data')
data_dataset = OutputFileDatasetConfig(name='oj_sales_data', destination=(ds, 'oj_sales_data/{run-id}')).register_on_complete(name='oj_sales_data')

#Alternate Dataset configuration - omits experimental classes
# inference_data = PipelineData('oj_inference_data', datastore = ds, is_directory=True)
# inference_dataset = inference_data.as_dataset()
# inference_dataset.register(name='oj_inference_data', create_new_version=True)

# train_data = PipelineData('oj_train_data', datastore = ds, is_directory=True)
# train_dataset = train_data.as_dataset()
# train_dataset.register(name='oj_train_data', create_new_version=True)

# data_path = PipelineData('oj_sales_data', datastore=ds, is_directory=True)
# data_dataset = data_path.as_dataset()
# data_dataset.register(name='oj_sales_data', create_new_version=True)

#Create PythonScriptStep to gather data from remote source and register as AML dataset
aggregate_data_step = PythonScriptStep(
    script_name="gather_data.py", 
    arguments=["--train_path", train_dataset, "--inference_path", inference_dataset, "--data_path", data_dataset],
    outputs=[data_dataset, train_dataset, inference_dataset],
    compute_target=cpu_cluster, 
    source_directory='./dataprep',
    allow_reuse=False,
    runconfig=dataprep_run_config
)


In [None]:
#AutoML Training Step
import logging
from automl_train.scripts.helper import write_automl_settings_to_file, build_parallel_run_config
from azureml.pipeline.core import PipelineParameter, PipelineData

#Set up AutoML configuration and write to a file
automl_settings = {
    "task" : 'forecasting',
    "primary_metric" : 'normalized_root_mean_squared_error',
    "iteration_timeout_minutes" : 10, # This needs to be changed based on the dataset. We ask customer to explore how long training is taking before settings this value
    "iterations" : 15,
    "experiment_timeout_hours" : 1,
    "label_column_name" : 'Quantity',
    "n_cross_validations" : 3,
    "verbosity" : logging.INFO, 
    "debug_log": 'automl_oj_sales_debug.txt',
    "time_column_name": 'WeekStarting',
    "max_horizon" : 20,
    "track_child_runs": False,
    "group_column_names": ['Store', 'Brand'],
    "grain_column_names": ['Store', 'Brand']
}
write_automl_settings_to_file(automl_settings)

#Set up training environment (reused for inferencing later)
from automl_train.scripts.helper import get_automl_environment
train_env = get_automl_environment(workspace=ws, automl_settings_dict=automl_settings)

#Configure your cluster
node_count=4
process_count_per_node=8
run_invocation_timeout=3700

#Build parallel run step configuration
parallel_run_config = build_parallel_run_config(train_env, cpu_cluster, node_count, process_count_per_node, run_invocation_timeout)
training_output_name = "training_output"
train_output_dir = PipelineData(name=training_output_name, 
                          datastore=ds)

from azureml.pipeline.steps import ParallelRunStep

#Define training ParallelRunStep
train_parallel_run_step = ParallelRunStep(
    name="many-models-training-automl",
    parallel_run_config=parallel_run_config,
    allow_reuse = False,
    inputs=[train_dataset.as_input(name='oj_train_data')],
    output=train_output_dir,
)

#Specify that training step much occur after data gathering step
train_parallel_run_step.run_after(aggregate_data_step)

In [None]:
#Set up custom model training step
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.steps import ParallelRunConfig

custom_model_train_env = Environment(name="many_models_environment")
custom_model_train_conda_deps = CondaDependencies.create(pip_packages=['sklearn', 'pandas', 'joblib', 'azureml-defaults', 'azureml-core', 'azureml-dataprep[fuse]'])
custom_model_train_env.python.conda_dependencies = custom_model_train_conda_deps

processes_per_node = 8
node_count = 1
timeout = 180

parallel_run_config = ParallelRunConfig(
    source_directory='./custom_scripts/scripts',
    entry_script='train.py',
    mini_batch_size="1",
    run_invocation_timeout=timeout,
    error_threshold=10,
    output_action="append_row",
    environment=custom_model_train_env,
    process_count_per_node=processes_per_node,
    compute_target=cpu_cluster,
    node_count=node_count)

custom_model_output_dir = PipelineData(name="training_output", datastore=ds)

custom_model_training_parallel_run_step = ParallelRunStep(
    name="many-models-training-custom",
    parallel_run_config=parallel_run_config,
    inputs=[train_dataset.as_input(name='oj_train_data')],
    output=custom_model_output_dir,
    allow_reuse=False,
    arguments=['--target_column', 'Quantity', 
               '--timestamp_column', 'WeekStarting', 
               '--timeseries_id_columns', 'Store', 'Brand',
               '--drop_columns', 'Revenue', 'Store', 'Brand',
               '--model_type', 'lr',
               '--test_size', 20]
)

custom_model_training_parallel_run_step.run_after(aggregate_data_step)

In [None]:
#Create Pipeline Step for Inferencing
forecast_env = train_env

from automl_inference.scripts.helper import build_parallel_run_config_for_forecasting

#Set up configuration for parallel inferencing run
node_count=2
process_count_per_node=6
run_invocation_timeout=300 # this timeout(in seconds), for larger models need to change this to a higher timeout

parallel_run_config = build_parallel_run_config_for_forecasting(forecast_env, cpu_cluster, node_count, process_count_per_node, run_invocation_timeout)

from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import ParallelRunStep

#Define location where forecasting output will be saved
forecasting_output_name = 'automl_forecasting_output'
forecast_output_dir = PipelineData(name = forecasting_output_name, 
                          datastore = ds)

#Create parallel inferencing step
inference_parallel_run_step = ParallelRunStep(
    name="many-models-forecasting-automl",
    parallel_run_config=parallel_run_config,
    inputs=[inference_dataset.as_input(name='oj_inference_data')], 
    output=forecast_output_dir,
    arguments=[
              '--group_column_names', 'Store', 'Brand',
              '--time_column_name', 'WeekStarting', #[Optional] # this is needed for timeseries
              '--target_column_name', 'Quantity', # [Optional] Needs to be passed only if inference data contains target column.
              ])

#Specify that inferencing must happen after training
inference_parallel_run_step.run_after(train_parallel_run_step)

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies

custom_model_forecast_env = Environment(name="many_models_environment")
custom_model_forecast_conda_deps = CondaDependencies.create(pip_packages=['sklearn', 'pandas', 'joblib', 'azureml-defaults', 'numpy', 'azureml-core', 'azureml-dataprep[fuse]'])
custom_model_forecast_env.python.conda_dependencies = custom_model_forecast_conda_deps

from azureml.pipeline.steps import ParallelRunConfig 

process_count_per_node = 6
node_count = 1
timeout = 180

parallel_run_config = ParallelRunConfig(
    source_directory='./custom_scripts/scripts',
    entry_script='forecast.py',
    mini_batch_size='1',
    run_invocation_timeout=timeout, 
    error_threshold=10,
    output_action='append_row', 
    environment=custom_model_forecast_env, 
    process_count_per_node=process_count_per_node, 
    compute_target=cpu_cluster, 
    node_count=node_count
)

custom_model_output_dir = PipelineData(name='custom_forecasting_output', datastore=ds)

custom_model_forecast_parallel_run_step = ParallelRunStep(
    name="many-models-forecasting-custom",
    parallel_run_config=parallel_run_config,
    inputs=[inference_dataset.as_input(name='oj_inference_data')],
    output=custom_model_output_dir,
    allow_reuse=False,
    arguments=['--timestamp_column', 'WeekStarting',
               '--timeseries_id_columns', 'Store', 'Brand',
               '--model_type', 'lr']
)

custom_model_forecast_parallel_run_step.run_after(custom_model_training_parallel_run_step)

In [None]:
#Move Data Step
#Execute python script that formats and moves forecasting results to new location 

copy_output_name = "copy_data_outputs"
copy_output_dir = PipelineData(name=copy_output_name, 
                          datastore=ds)
copy_data_step = PythonScriptStep(
    script_name="move_data.py", 
    arguments=["--automl_parallel_run_step_output", forecast_output_dir, "--custom_model_parallel_run_step_output", custom_model_output_dir, '--copy_outputs_dir', copy_output_dir],
    inputs=[forecast_output_dir, custom_model_output_dir],
    outputs=[copy_output_dir],
    compute_target=cpu_cluster, 
    source_directory='./copydata',
    allow_reuse=False,
    name="write-prediction-data"
)

#Specify that data copy must happen after inferencing
copy_data_step.run_after(inference_parallel_run_step)

In [None]:
# #Create pipeline, execute pipeline, and wait for response
pipeline = Pipeline(workspace=ws, steps=[aggregate_data_step, train_parallel_run_step, custom_model_training_parallel_run_step, custom_model_forecast_parallel_run_step, inference_parallel_run_step, copy_data_step])
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)

In [None]:
#Publish pipeline to endpoint
published_pipeline = pipeline.publish(name = 'many_models_sample',
                                     description = 'Gathers data, trains models, generates forecasts, and stores results',
                                     version = '1',
                                     continue_on_step_failure = False)

In [None]:
#Sample remote execution
#Pipeline execution via REST endpoint requires AAD Token (obtained here from service principal)
#Relevant docs:
#https://docs.microsoft.com/en-us/azure/machine-learning/how-to-deploy-pipelines
#https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/manage-azureml-service/authentication-in-azureml/authentication-in-azureml.ipynb

import requests
import os
from azureml.core.authentication import ServicePrincipalAuthentication

#Service principal creds stored as environment vars
client_id = os.environ.get('client_id')
tenant_id = os.environ.get('tenant_id')
service_principal_password = os.environ.get('service_principal_password')
pipeline_endpoint = os.environ.get('pipeline_endpoint')

#Leverage ADAL library for obtaining token
from adal import AuthenticationContext

client_id = client_id
client_secret = service_principal_password
resource_url = "https://login.microsoftonline.com"
tenant_id = tenant_id
authority = "{}/{}".format(resource_url, tenant_id)

auth_context = AuthenticationContext(authority)
token_response = auth_context.acquire_token_with_client_credentials("https://management.azure.com/", client_id, client_secret)

#Format token response for API request to pipeline
headers = {'Authorization': 'Bearer {}'.format(token_response['accessToken'])}

#Trigger remote pipeline run
#Pipeline endpoint can be obtained from AML portal as well
response = requests.post(pipeline_endpoint,
                         headers=headers,
                         json={"ExperimentName": "REST_Unified_Pipeline_Trigger_Test"})