In [1]:
from azureml.core import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: udacity-aml
Azure region: eastus
Subscription id: 1e04f273-c7e2-451f-8bf1-d3116603299b
Resource group: demo


In [2]:
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.compute_target import ComputeTargetException

# NOTE: update the cluster name to match the existing cluster
# Choose a name for your CPU cluster
amlcompute_cluster_name = "my-cluster"

# Verify that cluster does not exist already
try:
    compute_target = ComputeTarget(workspace=ws, name=amlcompute_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2',# for GPU, use "STANDARD_NC6"
                                                           #vm_priority = 'lowpriority', # optional
                                                           max_nodes=4)
    compute_target = ComputeTarget.create(ws, amlcompute_cluster_name, compute_config)

compute_target.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


In [3]:
from azureml.pipeline.core import PipelineData

datastore = ws.get_default_datastore()
output_folder = PipelineData(name='score', datastore=datastore)

In [4]:
from azureml.core.dataset import Dataset
import pandas as pd

dataset_test = Dataset.Tabular.from_delimited_files(path='https://automlsamplenotebookdata.blob.core.windows.net/automl-sample-notebook-data/bankmarketing_test.csv')
input_dataset = dataset_test.as_named_input('score_data')
#df_test = dataset_test.to_pandas_dataframe()
#df_test.describe()
#df_test = df_test[pd.notnull(df_test['y'])]

#y_test = df_test['y']
#print(y_test)
#X_test = df_test.drop(['y'], axis=1)
#X_test.describe()

In [5]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies

predict_conda_deps = CondaDependencies.create(python_version="3.8.5", 
                                              conda_packages=['pip==20.1.1'],
                                              pip_packages=["scikit-learn==0.22.1",
                                                            "azureml-core", "azureml-dataset-runtime[pandas,fuse]"])

predict_env = Environment(name="predict_environment")
predict_env.python.conda_dependencies = predict_conda_deps
predict_env.spark.precache_packages = False

In [16]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    source_directory='.',
    entry_script='score.py',  # the user script to run against each input
    mini_batch_size='1KB',
    error_threshold=5,
    output_action='append_row',
    append_row_file_name="score_outputs.txt",
    environment=predict_env,
    compute_target=compute_target, 
    node_count=2,
    run_invocation_timeout=600
)

In [17]:
distributed_score_step = ParallelRunStep(
    name='example-score',
    inputs=[input_dataset],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'pipeline_model'],
    allow_reuse=False
)

In [18]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_score_step])

pipeline_run = Experiment(ws, 'score').submit(pipeline)

Created step example-score [0be1ea5d][5a0692f3-f9f8-4307-94ff-a1f8f180e220], (This step will run and generate new outputs)
Submitted PipelineRun c6705926-562a-4990-a676-daae11f7f5aa
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/c6705926-562a-4990-a676-daae11f7f5aa?wsid=/subscriptions/1e04f273-c7e2-451f-8bf1-d3116603299b/resourcegroups/demo/workspaces/udacity-aml&tid=8f4f9ddc-5495-4e83-942e-2027fe2b644d


In [19]:
# This will output information of the pipeline run, including the link to the details page of portal.
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
score,c6705926-562a-4990-a676-daae11f7f5aa,azureml.PipelineRun,Preparing,Link to Azure Machine Learning studio,Link to Documentation


In [None]:
## Wait the run for completion and show output log to console
pipeline_run.wait_for_completion(show_output=True)

In [None]:
import pandas as pd
import tempfile
import os

prediction_run = pipeline_run.find_step_run(distributed_score_step.name)[0]
prediction_output = prediction_run.get_output_data(output_folder.name)

target_dir = tempfile.mkdtemp()

In [None]:
num_file_downloaded = prediction_output.download(local_path=target_dir,show_progress=True)
#result_file = os.path.join(target_dir, prediction_output.path_on_datastore, parallel_run_config.append_row_file_name)

# cleanup output format
#df = pd.read_csv(result_file, delimiter=" ", header=None)
#print(df)

In [None]:
# uncomment below and run if compute resources are no longer needed 
# compute_target.delete()