In [1]:
from azureml.core import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


Workspace name: ws01ent
Azure region: westus2
Subscription id: 0e9bace8-7a81-4922-83b5-d995ff706507
Resource group: azureml


### Create or Attach existing compute resource for Python steps
By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.

**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**

In [3]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.data.data_reference import DataReference

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "worker-cpu")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D12_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

found compute target. just use it. worker-cpu


In [7]:
from azureml.core import Workspace
from azureml.core import Keyvault
import os

keyvault = ws.get_default_keyvault()
keyvault.set_secret(name="servicebusconstr", value = 'YOUR_SERVICE_BUS_CONNECTION_STRING')

In [8]:

#This is to mount a BLob datastore
from azureml.core import Datastore

key =keyvault.get_secret('adlsgen6key')
account_name = 'adlsdatalakegen6'
datastore_sourcefiles= Datastore.register_azure_blob_container(workspace=ws, datastore_name = 'adlsgen6landing', 
                                                     container_name='landing',
                                                     account_name= account_name, account_key=key,create_if_not_exists=False)

datastore_preprocess= Datastore.register_azure_blob_container(workspace=ws, datastore_name = 'adlsgen6process', 
                                                     container_name='process',
                                                     account_name= account_name, account_key=key,create_if_not_exists=False)




### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

**Constructing PipelineData**
- name: [Required] Name of the data item within the pipeline graph
- datastore_name: Name of the Datastore to write this output to
- output_name: Name of the output
- output_mode: Specifies "upload" or "mount" modes for producing output (default: mount)
- output_path_on_compute: For "upload" mode, the path to which the module writes this output during execution
- output_overwrite: Flag to overwrite pre-existing data

In [17]:
from azureml.pipeline.core import Pipeline, PipelineData,PipelineParameter
from azureml.core.dataset import Dataset



scripts_folder = "scripts"
#This is the source directory for the landing folder. It's used to read from files from
datastore_sourcefiles_dir = Dataset.File.from_files((datastore_sourcefiles, "/")).as_named_input("datastore_sourcefiles_dir")

#This is to output the mapping file (mapping.csv)
mapping_output_dir = PipelineData(name="grouping_output", 
                          datastore=datastore_sourcefiles, 
                          output_path_on_compute="grouping_output")
mapping_output_ds = mapping_output_dir.as_dataset().parse_delimited_files()
#This is to output the final file

preprocess_output_dir = PipelineData(name="preprocess_output", 
                      datastore=datastore_preprocess,output_path_on_compute="preprocess_output")
#This is to output the final file





## Define parameters for the pipelines

In [25]:
queue_length = PipelineParameter(name="queue_length", default_value=5)
num_node = PipelineParameter(name="num_node", default_value=2)


## Define environments and processing steps

In [26]:
#Environment for mapping/distributing step
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
mapping_run_config = RunConfiguration()

# enable Docker 
mapping_run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
mapping_run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
mapping_run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
mapping_run_config.environment.python.conda_dependencies = CondaDependencies.create(pip_packages=['azure-servicebus','azureml-defaults','pandas'])


#Mapping/grouping step
from azureml.pipeline.steps import PythonScriptStep

distributing_step = PythonScriptStep(name="distributing_step",
   script_name="distributing.py",
   arguments=["--queue_length", queue_length, "--num_node", num_node,"--grouping_output", grouping_output_ds],
   outputs=[mapping_output_ds],
   compute_target=compute_target,
   source_directory=scripts_folder,
   runconfig =mapping_run_config,
    allow_reuse=False
)
    


In [27]:
# for preprocessing step
from azureml.pipeline.core import PipelineParameter,StepSequence
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.core import Environment

preprocess_conda_deps = CondaDependencies.create(pip_packages=["azureml-defaults","azureml-dataset-runtime[fuse,pandas]","xlrd","azure-servicebus",'azure-storage-blob'])
preprocess_env = Environment(name="preprocess_environment")
preprocess_env.python.conda_dependencies = preprocess_conda_deps
preprocess_env.docker.enabled = True
preprocess_env.docker.base_image = DEFAULT_CPU_IMAGE



preprocess_parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    
    entry_script="preprocessing.py",
    mini_batch_size=PipelineParameter(name="preprocess_batch_size_param", default_value="1"),

    error_threshold=1,
    output_action="summary_only",
    append_row_file_name="outputs.txt",
    environment=preprocess_env,
    compute_target=compute_target,
#     process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
    node_count=num_node)

preprocess_parallelrun_step = ParallelRunStep(
    name="preprocess",
    arguments=["--datastore_sourcefiles_dir", datastore_sourcefiles_dir,
               "--preprocess_output_dir", preprocess_output_dir],

    parallel_run_config=preprocess_parallel_run_config,
    side_inputs=[datastore_sourcefiles_dir],
    inputs=[grouping_output_ds],
    output=preprocess_output_dir,
    allow_reuse=False
)

## Define pipeline and submit a run


In [28]:
from azureml.core import Experiment

pipeline = Pipeline(workspace=ws, steps=[distributing_step, preprocess_parallelrun_step])
experiment = Experiment(ws, 'preprocessing_example')
pipeline_run = experiment.submit(pipeline)

Created step distributing_step [a6c5f12d][86af27ce-7450-4a8c-895d-ce10359e0b19], (This step will run and generate new outputs)Created step distributing_step [331602a9][97c12b09-2478-4c6b-8396-6109fb0dd28d], (This step will run and generate new outputs)

Created step preprocess [5078ac33][b4dd53a4-d810-4452-926d-13f5e13a29bd], (This step will run and generate new outputs)
Using data reference datastore_sourcefiles_dir_0 for StepId [af240703][f92e9235-3f6c-4b14-8c1c-0df1be1748fc], (Consumers of this data are eligible to reuse prior runs.)
Submitted PipelineRun bd166167-653f-490e-b291-72020522035c
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/preprocessing_example/runs/bd166167-653f-490e-b291-72020522035c?wsid=/subscriptions/0e9bace8-7a81-4922-83b5-d995ff706507/resourcegroups/azureml/workspaces/ws01ent


In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

## Publish the pipeline to use with external applications


In [29]:
pipeline_run.publish_pipeline(name ="preprocessing_example", description ="preprocessing_example pipeline", version=1.0)

Name,Id,Status,Endpoint
preprocessing_example,7f233a74-c3fb-4736-a32c-90bbc7753cbf,Active,REST Endpoint
