Import the necessary packages.

In [1]:
import azureml.core
from azureml.core import Workspace, LinkedService
from azureml.core.datastore import Datastore
from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig, HDFSOutputDatasetConfig
from azureml.core.compute import SynapseCompute, ComputeTarget, AmlCompute, ComputeTarget
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_GPU_IMAGE
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep, SynapseSparkStep
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline, StepSequence

import os
import pandas as pd
import tempfile

This code was tested on the Azure ML SDK 1.36.0 version. Please make sure you have this version installed.

In [2]:
print("SDK version:", azureml.core.VERSION)

SDK version: 1.36.0


Get a reference to the Azure ML Workspace.

In [3]:
ws = Workspace.from_config()

Here we define the name and key for the storage account we use to store the input, intermediate, and output data.

To make the configuration easier, we are using the default Azure Blob Storage account created with the Azure ML Workspace. In a real-world implementation, one would probably have that data in an [Azure Data Lake Storage](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction) account instead.

Please make sure you enter your own storage account name and key.

In [4]:
account_name = '<your storage account name>'
account_key = '<your storage account key>'

For the data to be accessed by the pipeline steps to be defined here, we first create [Azure ML Datasets](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-train-with-datasets) pointing to the storage locations where the data will be read from or written to.

The 'raw_dataset' object is defined as an input dataset pointing to the raw data storage location.

The 'prepared_dataset' and 'featurized_dataset' objects are defined as output datasets pointing to storage locations to be written by the data preparation and feature selection steps, respectively. They are intermediate datasets and will also be used as inputs for the feature selection and model training steps, respectively.

The 'featurized_dataset' object is not directly consumed as input by the model training step. Instead, it is first referenced by the 'model_input_dataset' object as a partitioned file dataset, for each partition (the partitioning is by training task and output sensor name) to be processed independently, in parallel.

The 'model_output_dir' object is defined as an output dataset pointing to the storage location to be written by the model training step.

The 'raw_dataset', 'prepared_dataset', and 'featurized_dataset' objects are defined as HDFS datasets because they are accessed by Apache Spark. The 'model_input_dataset' and 'model_output_dir' objects are defined as file datasets because they are accessed by standard Python running on Azure ML.

In [5]:
raw_datastore_name = 'softsensors_raw_datastore'
raw_container_name = 'softsensors-raw'
raw_datastore = Datastore.register_azure_blob_container(workspace=ws, account_name=account_name, account_key=account_key, datastore_name=raw_datastore_name, container_name=raw_container_name, create_if_not_exists=True)
raw_dataset = Dataset.File.from_files(path=(raw_datastore, '/*.parquet'), validate=False).as_named_input('raw_dataset').as_hdfs()

prepared_datastore_name = 'softsensors_prepared_datastore'
prepared_container_name = 'softsensors-prepared'
prepared_datastore = Datastore.register_azure_blob_container(workspace=ws, account_name=account_name, account_key=account_key, datastore_name=prepared_datastore_name, container_name=prepared_container_name, create_if_not_exists=True)
prepared_dataset = HDFSOutputDatasetConfig(name='prepared_dataset', destination=(prepared_datastore, '/data'))

featurized_datastore_name = 'softsensors_featurized_datastore'
featurized_container_name = 'softsensors-featurized'
featurized_datastore = Datastore.register_azure_blob_container(workspace=ws, account_name=account_name, account_key=account_key, datastore_name=featurized_datastore_name, container_name=featurized_container_name, create_if_not_exists=True)
featurized_dataset = HDFSOutputDatasetConfig(name='featurized_dataset', destination=(featurized_datastore, '/data'))

model_input_dataset = Dataset.File.from_files(path=(featurized_datastore, 'data/train/*/*/*'), partition_format='data/train/{task}/{target}/*', validate=False)
model_datastore_name = 'softsensors_models_datastore'
model_container_name = 'softsensors-models'
model_datastore = Datastore.register_azure_blob_container(workspace=ws, account_name=account_name, account_key=account_key, datastore_name=model_datastore_name, container_name=model_container_name, create_if_not_exists=True)
model_output_dir = OutputFileDatasetConfig(name='model_output_dir', destination=(model_datastore, '/model_outputs'))

Here we [link our Azure Synapse Spark pool with our Azure ML workspace and create an Azure ML Compute Target pointing to it](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-link-synapse-ml-workspaces). This compute target will be used to run the data preparation and feature selection steps.

Please make sure you use your own definitions for the Azure Synapse linked service and Spark pool names below.

In [6]:
synapse_linked_service = '<your Azure Synapse linked service name>'
synapse_pool_name = '<your Azure Synapse Spark pool name>'
synapse_compute_name = 'sparkcpu1'

linked_service = LinkedService.get(ws, synapse_linked_service)

attach_config = SynapseCompute.attach_configuration(
    linked_service = linked_service,
    type='SynapseSpark',
    pool_name=synapse_pool_name)

synapse_compute_target=ComputeTarget.attach(
    workspace=ws,
    name=synapse_compute_name,
    attach_configuration=attach_config)

synapse_compute_target.wait_for_completion()

Class LinkedService: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class SynapseCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Provisioning operation finished, operation "Succeeded"


Class SynapseCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Define the [pipeline steps to be run on Azure Synapse](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-synapsesparkstep).

As part of this definition, we can also define an execution environment and add any package dependencies to it. We also specify the name and location of the PySpark script to be run, inputs and outputs datasets, arbitrary arguments, and the compute target we created before with its resource configuration.

Please make sure you use your own Azure Anomaly Detector key and service location values in the arguments definition for the 'data_preparation_step' object below.

In [7]:
synapse_env = Environment(name='synapse_softsensor_environment')
synapse_env.python.conda_dependencies.add_pip_package('azureml-core>=1.20.0')

data_preparation_step = SynapseSparkStep(
    name = 'data_preparation_step',
    file = 'data_preparation.py',
    source_directory='./azure_synapse_code', 
    inputs=[raw_dataset],
    outputs=[prepared_dataset],
    arguments = ['--input_dataset',raw_dataset, '--output_dataset',prepared_dataset,
              '--anomaly_key','<your azure anomaly detector key>', '--anomaly_service_location','<your azure anomaly detector service location>',
              '--anomaly_max_data',8640, '--anomaly_min_data',12],
    compute_target = 'sparkcpu1',
    driver_memory = '16g',
    driver_cores = 8,
    executor_memory = '16g',
    executor_cores = 8,
    num_executors = 4,
    environment = synapse_env)

feature_selection_step = SynapseSparkStep(
    name = 'data_featurization_step',
    file = 'feature_selection.py',
    source_directory='./azure_synapse_code', 
    inputs=[prepared_dataset.as_input().as_hdfs()],
    outputs=[featurized_dataset],
    arguments = ['--input_dataset',prepared_dataset.as_input().as_hdfs(), '--output_dataset',featurized_dataset, '--start_test_date','1970-01-10 08:00:00'],
    compute_target = 'sparkcpu1',
    driver_memory = '16g',
    driver_cores = 8,
    executor_memory = '16g',
    executor_cores = 8,
    num_executors = 4,
    environment = synapse_env)

Class SynapseSparkStep: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class SynapseSparkStep: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


only conda_dependencies specified in environment will be used in Synapse Spark run.
only conda_dependencies specified in environment will be used in Synapse Spark run.


Here we create the [Azure ML Compute Target](https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) to be used for running the model training step. As we are going to train a deep learning-based model, it is advised to use at least GPUs of type Standard NC6. We define the maximum number of nodes in the compute target cluster as 2, so that we can run the model training in parallel for each of the 2 soft sensors to be modeled in our dataset.

In [8]:
aml_compute_name = 'gpucluster'
aml_compute_min_nodes = 0
aml_compute_max_nodes = 2
aml_vm_size = 'STANDARD_NC6'

if aml_compute_name in ws.compute_targets:
    aml_compute_target = ws.compute_targets[aml_compute_name]
    if aml_compute_target and type(aml_compute_target) is AmlCompute:
        print('found compute target. just use it. ' + aml_compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = aml_vm_size,
                                                                min_nodes = aml_compute_min_nodes, 
                                                                max_nodes = aml_compute_max_nodes)

    aml_compute_target = ComputeTarget.create(ws, aml_compute_name, provisioning_config)
    aml_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    print(aml_compute_target.get_status().serialize())

Class SynapseCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class SynapseCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


found compute target. just use it. gpucluster


Define the [pipeline step to be run on Azure ML as a parallel run step](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run).

As part of this definition, we can also define an execution environment and add any package dependencies to it. We also specify a configuration object with the name and location of the Python script to be run, data partitioning schema and execution control parameters. In the parallel run step definition we also have inputs and outputs datasets and arbitrary arguments for the Python script.

In [9]:
aml_conda_deps = CondaDependencies.create(pip_packages=['azureml-core', 'azureml-dataset-runtime[fuse]', 'pandas', 'ipython', 'tsai'])
aml_env = Environment(name='aml_softsensor_environment')
aml_env.python.conda_dependencies = aml_conda_deps
aml_env.docker.base_image = DEFAULT_GPU_IMAGE

model_training_config = ParallelRunConfig(
    source_directory='./azure_ml_code',
    entry_script='model_training.py',
    partition_keys=model_input_dataset.partition_keys,
    error_threshold=5,
    output_action='append_row',
    append_row_file_name='soft_sensor_predictions.txt',
    environment=aml_env,
    compute_target=aml_compute_target, 
    node_count=2,
    run_invocation_timeout=10000
)

model_training_step = ParallelRunStep(
    name='model_training_step',
    inputs=[model_input_dataset.as_named_input('model_input_dataset')],
    output=model_output_dir,
    arguments=['--output_dir',model_output_dir,
               '--regression_window_length',9, '--regression_stride',1, '--regression_horizon',0,
               '--batch_size',512, '--max_epochs',500,
               '--arch','GRU',
               '--hidden_size',128, '--n_layers',1, '--bias','True', '--rnn_dropout',0.2, '--bidirectional','True', '--fc_dropout',0.1,
               '--min_delta',0.001, '--patience',15],
    parallel_run_config=model_training_config,
    allow_reuse=False
)

After defining the pipeline steps, we pack them into a StepSequence object to establish an execution sequence dependency. We then use it to create a Pipeline object and submit it for execution through an [Azure ML Experiment](https://docs.microsoft.com/en-us/azure/machine-learning/concept-azure-machine-learning-architecture#experiments).

In [10]:
step_sequence = StepSequence(steps=[data_preparation_step, feature_selection_step, model_training_step])
pipeline = Pipeline(workspace=ws, steps=step_sequence)

pipeline_run = Experiment(ws, 'softsensor_model_training').submit(pipeline, regenerate_outputs=True)

Class SynapseCompute: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Created step data_preparation_step [6513b8d4][acd3e051-b064-46cf-a448-597e5d75b266], (This step will run and generate new outputs)
Created step data_featurization_step [79a0a7e2][b7d2227f-c72a-44fa-9115-7338c32442a5], (This step will run and generate new outputs)
Created step model_training_step [2c6f64e1][4b1fc141-7692-480e-a94b-49a77af72d87], (This step will run and generate new outputs)
Submitted PipelineRun 4a2dba18-9e64-42a4-8ad8-22b11b0b83bc
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/4a2dba18-9e64-42a4-8ad8-22b11b0b83bc?wsid=/subscriptions/a6c2a7cc-d67e-4a1a-b765-983f08c0423a/resourcegroups/alvilcek-ml-rg/workspaces/alvilcek-ml-workspace&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


The experiment submission above is asynchronous, so you can call wait_for_completion on the Experiment object to get run notifications.

In [11]:
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 4a2dba18-9e64-42a4-8ad8-22b11b0b83bc
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/4a2dba18-9e64-42a4-8ad8-22b11b0b83bc?wsid=/subscriptions/a6c2a7cc-d67e-4a1a-b765-983f08c0423a/resourcegroups/alvilcek-ml-rg/workspaces/alvilcek-ml-workspace&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRun Status: NotStarted
PipelineRun Status: Running


Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment







Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment







Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment






PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '4a2dba18-9e64-42a4-8ad8-22b11b0b83bc', 'status': 'Completed', 'startTimeUtc': '2022-04-12T23:29:05.011173Z', 'endTimeUtc': '2022-04-12T23:47:40.419253Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.pipelineComponent': 'pipelinerun'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://alvilcekmlwork0917427776.blob.core.windows.net/azureml/ExperimentRun/dcid.4a2dba18-9e64-42a4-8ad8-22b11b0b83bc/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=HXFCEnMl5%2BB%2BwTleePgrvMIw8uN38tFeY0nCE%2F1mspE%3D&skoid=6d8efc04-c3a3-4cb2-ab5c-74ac9eddbbac&sktid=72f988bf-86f1-41af-91ab-2d7cd011db47&skt=2022-04-12T16%3A32%3A32Z&ske=2022-04-14T00%3A42%3A32Z&sks=b&skv=2019-07-07&st=2022-04-12T23%3A37%3A47Z&se=2022-04-13T07%3A47%3A47Z&sp=

'Finished'