In [None]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

In [None]:
from azureml.core import Workspace, Experiment

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

In [None]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "cpu-cluster-2")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout.
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

In [None]:
from azureml.core.datastore import Datastore
from azureml.core.dataset import Dataset
from azureml.pipeline.core import Pipeline
from azureml.pipeline.core import PipelineParameter

# Default datastore
def_blob_store = ws.get_default_datastore() 
# The following call GETS the Azure Blob Store associated with your workspace.
# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** 
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

sample_csv = 'sample_data'

path_on_datastore = def_blob_store.path('samples/*.csv')
input_sample_ds = Dataset.File.from_files(path=path_on_datastore, validate=True)
# named_sample_ds = input_sample_ds.as_named_input(sample_csv)

#input_sample_ds.register(ws, 'sample_csv')

In [None]:
# Uses default values for PythonScriptStep construct.

source_directory = '../code/scripts'
script_file = 'sample_csv_script.py'
# os.mkdir(source_directory)
print("Path is created")
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

In [None]:
%%writefile $source_directory/$script_file
# import argparse
# import os

# try:
#     parser = argparse.ArgumentParser()
#     # parser.add_argument("--arg1", type=str, help="sample string argument")
#     parser.add_argument("--arg1", type=str, help="sample datapath argument")
#     args = parser.parse_args()

#     # print("Sample string argument  : %s" % args.arg1)
#     print("Sample datapath argument: %s" % args.datapath)

#     print(datapath_input, dir(datapath_input))

# except Exception as e:
#     print(f"Encountered error processing {datapath_input} :")
#     print(e)

import argparse
import os
import sys

print("In script.py")
print("As a data scientist, this is where I use my training code.")

parser = argparse.ArgumentParser("train")

parser.add_argument("--pipeline_arg", type=str, help="pipeline_arg")
parser.add_argument("--sampledata", type=str, help="sample data files")

args = parser.parse_args()

print("Argument 1: %s" % args.pipeline_arg)
print("Argument 2: %s" % args.sampledata)

for fname in args.sampledata:
    try:
        with open(fname, 'r') as fin:
            print(fin.read())

    except Exception as e:
        print(f"An ERROR happened: {e}")


In [None]:
from azureml.data.datapath import DataPath
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

input_sample_ds.as_mount()

#data_path = DataPath(datastore=def_blob_store, path_on_datastore='samples/*.csv')
datapath1_pipeline_param = PipelineParameter(name="input_datapath", default_value=input_sample_ds)
#datapath_input = (datapath1_pipeline_param, DataPathComputeBinding(mode='mount'))
datapath_input = input_sample_ds.as_named_input('sampledata').as_mount('/tmp/samples') #DatasetConsumptionConfig('sample_data', datapath1_pipeline_param, mode='mount')


#datapath_input.path_on_compute()
#string_pipeline_param = PipelineParameter(name="input_string", default_value='sample_string1')

In [None]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.core.environment import Environment
from azureml.pipeline.core import PipelineData

datastore = ws.get_default_datastore()
output_folder = PipelineData(name='outputs', datastore=datastore)

env = Environment.get(workspace=ws, name='AzureML-Minimal') 

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
    source_directory=source_directory,
    entry_script=script_file,  # the user script to run against each input
    mini_batch_size='1',
    error_threshold=0,
    output_action='append_row',
    environment=env,
    compute_target=compute_target, 
    node_count=1
)

parallelrunstep = ParallelRunStep(
    name="sampleparallelrun",
    parallel_run_config=parallel_run_config,
    inputs=[datapath_input],
    output=output_folder,
    arguments=["--arg1", datapath1_pipeline_param]
)

In [None]:
steps = [parallelrunstep]
print("Step lists created")

In [None]:
pipeline1 = Pipeline(workspace=ws, steps=steps)
print ("Pipeline is built")

In [None]:
pipeline1.validate()
print("Pipeline validation complete")

In [None]:
pipeline_run1 = Experiment(ws, 'pipeline3-submit').submit(pipeline1, regenerate_outputs=False)
print("Pipeline is submitted for execution")