In [None]:
# Variables (Change them to match your environment)
compute_cluster_name = "cpu-cluster"
file_dataset_name = "product-images"
output_folder = "r-script-outputs/" # This folder will be created in the default datastore to store the outputs of the R script.

In [None]:
import azureml.core
from azureml.core import Workspace

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

# Get reference to workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

In [None]:
# Get reference to your compute cluster
compute_target = ws.compute_targets[compute_cluster_name]
max_cluster_nodes = compute_target.scale_settings.maximum_node_count

print(f"Got reference to {compute_target.name} with max nodes {max_cluster_nodes}")

In [None]:
# For debuging purposes, I set my cluster to have 1 node hot stand by
# and limited parallelization to a single node.
max_cluster_nodes = 1

In [None]:
# Create reference to the dataset
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import PipelineParameter

dataset = ws.datasets[file_dataset_name]
pipeline_param = PipelineParameter(name="dataset_param", default_value=dataset)
input_dataset = DatasetConsumptionConfig("dataset_consumption_config", pipeline_param).as_mount()

In [None]:
# Create an output folder in default datastore to store results.
from azureml.data import OutputFileDatasetConfig
# To find more help
# help(OutputFileDatasetConfig)
datastore = ws.get_default_datastore()

# Create an output folder
output = OutputFileDatasetConfig(destination=(datastore, output_folder))

In [None]:
# Create environment to execute 
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE
from azureml.core.environment import RSection, RCranPackage

r_env = Environment(name="renv")
r_env.docker.base_image = DEFAULT_CPU_IMAGE
r_env.r = RSection()    # R details with required packages

opt_cran_package = RCranPackage()
opt_cran_package.name = "optparse"

r_env.r.cran_packages = [opt_cran_package]

r_env.python.conda_dependencies= CondaDependencies() # This should have the azureml-defaults in it
r_env.python.conda_dependencies.set_python_version("3.7")

In [None]:
# Create the parallel run step for the pipeline
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# https://docs.microsoft.com/en-us/python/api/azureml-contrib-pipeline-steps/azureml.contrib.pipeline.steps.parallelrunconfig?view=azure-ml-py

parallel_run_config = ParallelRunConfig(
    source_directory="scripts", # The folder containing the scripts
    entry_script= "batch_wrapper.py",  # the wrapper script to execut the R code
    mini_batch_size= 2, # How many files to run in each batch
    error_threshold=5,
    output_action='summary_only', # User script is expected to store the output by itself. An output row is still expected for each successful input item processed.
    environment=r_env,
    compute_target=compute_target, 
    node_count= max_cluster_nodes,
    # process_count_per_node default is the number of cores
)

parallel_run_step = ParallelRunStep(
    name='parallel-r-script',
    inputs=[input_dataset],
    arguments= ["--r-output", output],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

pipeline = Pipeline(workspace=ws, steps=[parallel_run_step])
pipeline_run = Experiment(ws, 'parallel-r-pipeline').submit(pipeline)

In [None]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()