Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# Scope/ExePool Experiment using AML Pipelines
This notebook will show how you can run a Scope job in a migrated ADLA account, and then run an Windows exe on the result of the Scope job using AML Pipeline.

## Initialize Workspace

Initialize a workspace object from persisted configuration. Make sure the config file is present at .\config.json

In [1]:
import os
import azureml.core
from azureml.core import Workspace, Run, Experiment
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
from azureml.exceptions import ComputeTargetException
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import AdlaStep, AzureBatchStep, DataTransferStep
from azureml.pipeline.steps.azurebatch_step import AzureBatchTaskInfo
from azureml.pipeline.steps_internal import ScopeStep

print("SDK version:", azureml.core.VERSION)

SDK version: 0.0.0+dev


### Account Details
Get the details of the account to be used from this [OneNote page](https://microsoft.sharepoint.com/teams/azuremlnursery/_layouts/OneNote.aspx?id=%2Fteams%2Fazuremlnursery%2FSiteAssets%2FAzure%20ML%20Nursery%20Notebook&wd=target%28Workshop.one%7C265D85D5-44C8-9D40-B556-A31FA098E708%2FPipeline%3A%20Scope%20and%20Batch%7CE40F0043-079C-4551-BAD6-27E5D056D05A%2F%29
onenote:https://microsoft.sharepoint.com/teams/azuremlnursery/SiteAssets/Azure%20ML%20Nursery%20Notebook/Workshop.one#Pipeline%20Scope%20and%20Batch&section-id={265D85D5-44C8-9D40-B556-A31FA098E708}&page-id={E40F0043-079C-4551-BAD6-27E5D056D05A}&end)

In [2]:
subscription_id ='15ae9cb6-95c1-483d-a0e3-b1a1a3b06324'
resource_group ='PipelinesUsabilityStudy'
workspace_name = 'sanpilinternal'
workspace_region = 'eastus2euap'

In [3]:
from azureml.core import Workspace

ws = Workspace.from_config()
ws = Workspace(subscription_id,resource_group, workspace_name)
print(ws.subscription_id, ws.resource_group, ws.name, ws.location, sep = '\n')

Found the config file in: C:\repos\dogbreeds\aml_config\config.json
15ae9cb6-95c1-483d-a0e3-b1a1a3b06324
PipelinesUsabilityStudy
sanpilinternal
eastus2euap


## Create AML experiment

In [4]:
from datetime import datetime
date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "Scope_And_ExePool-Exp"
exp = Experiment(ws, exp_name)

## Register the migrated ADLS Datastore
For this, you will first need to assign the Azure AD application to the Azure Data Lake Storage Gen1 account file or folder. This is detailed in [this article](https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory).

Get the details of the account to be used from this [OneNote page](https://microsoft.sharepoint.com/teams/azuremlnursery/_layouts/OneNote.aspx?id=%2Fteams%2Fazuremlnursery%2FSiteAssets%2FAzure%20ML%20Nursery%20Notebook&wd=target%28Workshop.one%7C265D85D5-44C8-9D40-B556-A31FA098E708%2FPipeline%3A%20Scope%20and%20Batch%7CE40F0043-079C-4551-BAD6-27E5D056D05A%2F%29
onenote:https://microsoft.sharepoint.com/teams/azuremlnursery/SiteAssets/Azure%20ML%20Nursery%20Notebook/Workshop.one#Pipeline%20Scope%20and%20Batch&section-id={265D85D5-44C8-9D40-B556-A31FA098E708}&page-id={E40F0043-079C-4551-BAD6-27E5D056D05A}&end)

In [5]:
adl_datastore_name='MigratedAMLPlayground'
# ADLS Details:
subscription_id="4062f5e8-5e2b-447f-a06f-62b65b731b0d"
resource_group="conv"
store_name="amlplayground-c09"
# Team Service Principal details
tenant_id="72f988bf-86f1-41af-91ab-2d7cd011db47"
client_id="f81e59a4-84ae-4ff3-885a-7b941b3db22d"
client_secret="qFmjLWhqyZaj6fA3xyLHBfN12fp4kAnv/xNYnIn9eB4="

In [6]:
try:
    adls_datastore = Datastore.get(ws, adl_datastore_name)
    print("Found datastore with name: %s" % adl_datastore_name)
except:
    adls_datastore = Datastore.register_azure_data_lake(
        workspace=ws,
        datastore_name=adl_datastore_name,
        subscription_id=subscription_id, # subscription id of ADLS account
        resource_group=resource_group, # resource group of ADLS account
        store_name=store_name, # ADLS account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
    print("Registered datastore with name: %s" % adl_datastore_name)

Found datastore with name: MigratedAMLPlayground


## Define Input and Output for Scope Job
Input data is already in the ADLS store. We will write outout data also in the ADLS datastore defined above.

In [7]:
input_data = DataReference(
    datastore=adls_datastore,
    data_reference_name="InputData",
    path_on_datastore="local/AMLTest/input-s.tsv")

output_ref = PipelineData("Destination", datastore=adls_datastore)

# Create Scope step

**ScopeStep** is used to run a scope script using cosmos-migrated Azure Data Lake Analytics account.

- **name:** Name of module
- **script_name:** Name of scope script
- **scope_param:** Parameters to pass to scope job
- **params:** Dictionary of name-value pairs to replace in script *(optional)*
- **custom_job_name_suffix:** Optional string to append to scope job name
- **inputs:** List of input port bindings
- **outputs:** List of output port bindings
- **resources:** List of input port bindings to download resource files and substitute their local path in script
- **adla_account_name:** the ADLA account name to use for this job
- **source_directory:** folder that contains the script, assemblies etc. *(optional)*
- **hash_paths:** list of paths to hash to detect a change (script file is always hashed) *(optional)*

In [8]:
script_folder = '.'

script_step = ScopeStep(
    name='Remove_Duplicates',
    script_name='script.script',
    inputs=[input_data],
    outputs=[output_ref],
    allow_reuse=True,
    adla_account_name='amlplayground-c09', #ADLA Name, could be any ADLA name
    source_directory=script_folder)

## Copy the output of the ScopeStep to Azure blob
### Define data destination
#### Register Blob

In [9]:
try:
    def_blob_store = Datastore(workspace=ws, name="myblobdatastore")
    print("Got blob")
except:
    def_blb_store = Datastore.register_azure_blob_container(
        ws, 
        "myblobdatastore", 
        container_name="amltest", 
        account_name="sanpilinternal")
    print("Register the blob")

Got blob


#### Define the destination

In [10]:
blob_destination = DataReference(datastore=def_blob_store,
                       path_on_datastore="input",
                       data_reference_name="Copy_Destination")

### Copy data using DataTransferStep

#### Register ADF

In [11]:
from azureml.core.compute import ComputeTarget, DataFactoryCompute
data_factory_name = 'adftest'

try:
    data_factory_compute = DataFactoryCompute(ws, data_factory_name)
    print("Got ADF")
except:
    print("Registering ADF")
    provisioning_config = DataFactoryCompute.provisioning_configuration()
    data_factory_compute = ComputeTarget.create(ws, data_factory_name, provisioning_config)
    data_factory_compute.wait_for_completion()

Got ADF


#### Define DataTransferStep

In [12]:
transfer_adls_to_blob = DataTransferStep(
    name="Copy_from_ADLS_to_Blob",
    source_data_reference=output_ref,
    destination_data_reference=blob_destination,
    source_reference_type='file',
    compute_target=data_factory_compute)
print("data transfer step created")

data transfer step created


## Run a Windows Exe on Azure Batch
Using the output we copied to Azure Blob as the input, run a Windows exe in Azure Batch.
### Define AzureBatchTaskInfo

In [13]:
file_name = "input/Destination_f99a1b07-252a-43f2-bea1-ea9e3b42e8ed"
batch_input = transfer_adls_to_blob.get_output().as_input('input')

In [14]:
task_command_line = "cmd /c wc.exe -w " + file_name + " >> output.txt"

blob_out = DataReference(datastore=def_blob_store,
                         path_on_datastore="",
                         data_reference_name="output")

azurebatch_task = AzureBatchTaskInfo(task_name="wordcount",
                                     task_command_line=task_command_line,
                                     task_output_patterns=["output.txt"],
                                     task_input_data_references=[batch_input],
                                     task_output_data_reference=blob_out)

In [15]:
batch_step = AzureBatchStep(
            name="Word Count",
            account_name="batch3pdev",
            pool_id="sanpilpool",
            tasks=[azurebatch_task],
            source_directory="wordcount"
)

# Run the pipeline

In [16]:
pipeline = Pipeline(
    description="ScopeAndExePool",
    workspace=ws, 
#    steps=[script_step])
    steps=[script_step, transfer_adls_to_blob, batch_step])

Using dev endpoint: https://westus2.aether-dev.ms


In [17]:
pipeline_run = exp.submit(pipeline, regenerate_outputs=False)
#pipeline_run.wait_for_completion()

Created step Remove_Duplicates [41dd13e9][b7dd28f7-3acf-4012-9d72-31d51b623977], (This step will run and generate new outputs)
Created step Copy_from_ADLS_to_Blob [f8386db7][3622dc8e-4867-4346-a149-601c2b2e4827], (This step is eligible to reuse a previous run's output)
Created step Word Count [d31f86c6][98cc9157-ab13-4ad5-9666-de978b211cd1], (This step is eligible to reuse a previous run's output)
Using data reference InputData for StepId [c86d4e74][01a0c647-08da-4ba1-ac48-4fb4d458a353], (Consumers of this data are eligible to reuse prior runs.)
Using data reference Copy_Destination for StepId [93d215e7][6f9a3177-11d4-4ce7-b108-9562c1f54a56], (Consumers of this data are eligible to reuse prior runs.)
Submitted pipeline run: cd9e8516-5460-4f03-8ea7-604ab655bee6


In [18]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …