# 1. Import Azure ML Python SDK

In [None]:
import azureml.core

print(f"SDK version: {azureml.core.VERSION}")

# 2. Authenticate and initielize Azure ML Workspace

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
print(
    f"Workspace name: {ws.name}", 
    f"Azure region: {ws.location}", 
    f"Subscription id: {ws.subscription_id}", 
    f"Resource group: {ws.resource_group}",
    sep='\n'
)

# 3. Create a compute target

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.exceptions import ComputeTargetException

cluster_name = os.environ.get("AML_CLUSTER_NAME", "cpu-cluster")
cluster_sku = os.environ.get("AML_CLUSTER_SKU", "STANDARD_D2_V2")
cluster_priority = os.environ.get("AML_CLUSTER_SKU", "dedicated")
cluster_min_nodes = os.environ.get("AML_CLUSTER_MIN_NODES", 0)
cluster_max_nodes = os.environ.get("AML_CLUSTER_MAX_NODES", 4)

try:
    compute = AmlCompute(
        workspace=ws,
        name=cluster_name
    )
    print("Loaded existing aml cluster")
except ComputeTargetException as exception:
    print(f"Could not load aml cluster: {exception}")
    print("Creating new aml cluster")
    aml_config = AmlCompute.provisioning_configuration(
        vm_size=cluster_sku,
        vm_priority=cluster_priority,
        min_nodes=0,
        max_nodes=4,
        idle_seconds_before_scaledown=300
    )
    compute = AmlCompute.create(
        workspace=ws,
        name=cluster_name,
        provisioning_configuration=aml_config
    )
    
    compute.wait_for_completion(
        show_output=True
    )

print(compute.get_status().serialize())

# 4. Upload and register data

In [None]:
# List all datastores
datastores = ws.datastores
for name, datastore in datastores.items():
    print(name, datastore.datastore_type)

In [None]:
# get default datastore
datastore = ws.get_default_datastore()
print(
    f"Datastore name: {datastore.name}",
    f"Datastore type: {datastore.datastore_type}",
    f"Datastore account name: {datastore.account_name}",
    f"Datastore container name: {datastore.container_name}",
    sep="\n"
)

In [None]:
# upload dataset
datastore.upload_files(
    files=["./train_dataset/iris.csv"],
    target_path="train_dataset/iris.csv",
    overwrite=True,
    show_progress=True
)

In [None]:
from azureml.core import Dataset

# Register as file dataset
file_dataset = Dataset.File.from_files(
    path=[(datastore, "train_dataset/iris.csv")]
)
file_dataset = file_dataset.register(
    workspace=ws,
    name="iris_file",
    description="iris file dataset",
    create_new_version=True
)
file_dataset.to_path()

# 5. Create a Module

In [None]:
import os

script_folder = "steps"
script_file_name = "preprocess.py"
script_file_name_dataset = "preprocess_dataset.py"

os.makedirs(script_folder, exist_ok=True)

In [None]:
%%writefile $script_folder/$script_file_name

import os
import json
import argparse
import pandas as pd
import shutil

from azureml.core import Run
run = Run.get_context()

def main(args):
    print(f"Args: {args}")
    
    # Load data paths
    print(f"Input: {args.input_path}")
    print(f"Output: {args.output_path}")
    
    # Get file paths
    input_file_paths = get_file_list(args.input_path)
    print(f"Input file paths: {input_file_paths}")
    
    # Create output folder
    os.makedirs(args.output_path, exist_ok=True)
    
    # Load input data
    for i, input_file_path in enumerate(input_file_paths):
        df = pd.read_csv(input_file_path, engine='python')
        print(df)
        df.to_csv(os.path.join(args.output_path, f"myfile{i}.csv"))


def get_file_list(path):
    path_list = []
    for root, dirs, files in os.walk(path):
        for filename in files:
            path = os.path.join(root, filename)
            path_list.append(path)
    return path_list


def parse_args():
    parser = argparse.ArgumentParser(description="Argument Parser Sample")
    parser.add_argument("--input_path", type=str, help="argument sample")
    parser.add_argument("--output_path", type=str, help="argument sample")
    args = parser.parse_args()
    return args

if __name__ == "__main__":
    args = parse_args()
    main(args=args)

In [None]:
%%writefile $script_folder/$script_file_name_dataset

import os
import json
import argparse
import pandas as pd
import shutil

from azureml.core import Run
run = Run.get_context()

def main(args):
    print(f"Args: {args}")
    
    # Load data paths
    print(f"Dataset name: {args.dataset_name}")
    print(f"Output: {args.output_path}")
    
    # Load dataset path if you use datasets
    input_path = run.input_datasets[args.dataset_name]
    
    # Get file paths
    input_file_paths = get_file_list(input_path)
    print(f"Input file paths: {input_file_paths}")
    
    # Create output folder
    os.makedirs(args.output_path, exist_ok=True)
    
    # Load input data
    for i, input_file_path in enumerate(input_file_paths):
        df = pd.read_csv(input_file_path, engine='python')
        print(df)
        df.to_csv(os.path.join(args.output_path, f"myfile{i}.csv"))


def get_file_list(path):
    path_list = []
    for root, dirs, files in os.walk(path):
        for filename in files:
            path = os.path.join(root, filename)
            path_list.append(path)
    return path_list


def parse_args():
    parser = argparse.ArgumentParser(description="Argument Parser Sample")
    parser.add_argument("--dataset_name", type=str, help="argument sample")
    parser.add_argument("--output_path", type=str, help="argument sample")
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    args = parse_args()
    main(args=args)

In [None]:
from azureml.pipeline.core.module import Module
from azureml.pipeline.core.graph import InputPortDef, OutputPortDef

module_name = os.environ.get("MODULE_NAME", "mystep")

input_def = InputPortDef(
    name="input",
    default_data_reference_name=datastore.name,
    default_datastore_mode="mount",
    label="input"
)
output_def = OutputPortDef(
    name="output",
    default_datastore_name=datastore.name,
    default_datastore_mode="mount",
    label="output"
)

try:
    module = Module.create(
        workspace=ws,
        name=module_name,
        description="A sample module."
    )
    module_version = module.publish_python_script(
        script_name=script_file_name,
        source_directory=script_folder,
        description="Sample module",
        version="1",
        inputs=[input_def],
        outputs=[output_def],
        is_default=True
    )
except:
    module = Module.get(
        workspace=ws,
        name=module_name
    )
    
    module_version = module.publish_python_script(
        script_name=script_file_name,
        source_directory=script_folder,
        description="Sample module",
        version="2",
        inputs=[input_def],
        outputs=[output_def],
        is_default=True
    )

# 6. Create a Pipeline

In [None]:
from azureml.core import RunConfiguration
from azureml.core.runconfig import CondaDependencies

# Create conda dependencies
dependencies = CondaDependencies.create(
    pip_packages=["azureml-dataprep[pandas,fuse]", "azureml-defaults", "pandas"],
    conda_packages=[],
    python_version="3.6.2"
)

# Create run configuration
run_config = RunConfiguration(
    conda_dependencies=dependencies,
    framework="Python"
)

In [None]:
from azureml.pipeline.core import PipelineData, PipelineParameter
from azureml.data.datapath import DataPath, DataPathComputeBinding

# Create PipelineParameter for dynamic pipeline input
input_path = DataPath(
    datastore=datastore,
    path_on_datastore="train_dataset/iris.csv"
)
input_path_pipeline_parameter = PipelineParameter(
    name="input_path",
    default_value=input_path
)
input_data = (input_path_pipeline_parameter, DataPathComputeBinding(mode="mount"))

# OPTION 2
# Create DataReference for static input
#from azureml.data.data_reference import DataReference
#input_data = DataReference(
#    datastore=datastore,
#    data_reference_name="iris",
#    path_on_datastore="train_dataset/iris.csv",
#    mode="mount"
#)

# OPTION 3
# Use dataset as input
#input_dataset_name = "input_path"
#input_data = file_dataset.as_named_input(input_dataset_name).as_mount()

# Create PipelineData for output
output_data = PipelineData(
    name="output",
    datastore=datastore,
    output_mode="mount"
)

# Create wiring
input_wiring = {"input": input_data}
output_wiring = {"output": output_data}

In [None]:
from azureml.pipeline.steps import ModuleStep

step = ModuleStep(
    module_version=module_version,
    inputs_map=input_wiring,
    outputs_map=output_wiring,
    runconfig=run_config,
    compute_target=compute,
    arguments=["--input_path", input_data,
               "--output_path", output_data],
    version="1"
)

In [None]:
#from azureml.pipeline.steps import PythonScriptStep

# Create a PythonScriptStep
#step = PythonScriptStep(
#    name=module_name,
#    script_name=script_file_name,
#    source_directory=script_folder,
#    arguments=["--input_path", input_data,
#               "--output_path", output_data],
#    compute_target=compute,
#    runconfig=run_config,
#    inputs=[input_data],
#    outputs=[output_data],
#    allow_reuse=True,
#    version="1"
#)

# Create a PythonScriptStep with datasets
#step = PythonScriptStep(
#    name=module_name,
#    script_name=script_file_name_dataset,
#    source_directory=script_folder,
#    arguments=["--dataset_name", input_dataset_name,
#               "--output_path", output_data],
#    compute_target=compute,
#    runconfig=run_config,
#    inputs=[input_data],
#    outputs=[output_data],
#    allow_reuse=True,
#    version="1"
#)

In [None]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(
    workspace=ws,
    steps=[step]
)

In [None]:
from azureml.core import Experiment

experiment_name = os.environ.get("EXPERIMENT_NAME", "modulesample")

experiment  = Experiment(
    workspace=ws,
    name=experiment_name
)

In [None]:
run = experiment.submit(pipeline)

In [None]:
run.wait_for_completion(show_output=True)

# 7. Get Output path via Azure ML SDK

In [None]:
module_step_run = run.find_step_run(name=module_name)[0]
module_step_run_id = module_step_run.id
module_step_run

In [None]:
module_step_run.get_file_names()

In [None]:
module_step_run.get_output_data(name="output")

In [None]:
# Get outputs of steps
for step in run.get_steps():
    print(f"Output of step {step.name}")
    
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        output_ref = output.get_port_data_reference()
        print(f"Name: {name}")
        print(f"Datastore: {output_ref.datastore_name}")
        print(f"Path on Datastore: {output_ref.path_on_datastore}")

# 8. Publish Pipeline 

In [None]:
published_pipeline = run.publish_pipeline(
    name="SamplePipeline",
    description="My sample pipeline",
    continue_on_step_failure=True,
    version="1"
)

# 9. Use REST endpoint to submit a run

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

# Authentication
auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

# Endpoint URI
endpoint = published_pipeline.endpoint

# Request with HTTP request
response = requests.post(
    endpoint,
    headers=aad_token,
    json={
        "ExperimentName": experiment_name,
        "RunSource": "SDK",
        "ParameterAssignments": {},
        "DataPathAssignments": {
            "input_path": {
                "DataStoreName": datastore.name,
                "RelativePath": "train_dataset/iris.csv"
            }
        }
    }
)

In [None]:
try:
    response.raise_for_status()
except Exception:    
    raise Exception('Received bad response from the endpoint: {}\n'
                    'Response Code: {}\n'
                    'Headers: {}\n'
                    'Content: {}'.format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print(f"Submitted pipeline run: {run_id}")

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import time, json

auth = InteractiveLoginAuthentication()

hosturl = f"https://{ws.location}.api.azureml.ms/"
history_base = "history/v1.0/"
resource_base = f"subscriptions/{ws.subscription_id}/resourceGroups/{ws.resource_group}/providers/Microsoft.MachineLearningServices/workspaces/{ws.name}/"
rundetails_base = f"experiments/{experiment_name}/runs/{run_id}/details"
endpoint_rundetails = hosturl + history_base + resource_base + rundetails_base

print("Waiting for run to be completed")
while True:
    response_rundetails = requests.get(
        endpoint_rundetails,
        headers=auth.get_authentication_header()
    )
    status = json.loads(response_rundetails.content)["status"]
    print(f"Current status: {status}")
    if status == "Completed":
        break
    time.sleep(10)

# 10. Get Outputh Path via REST

In [None]:
children_base = f"experiments/{experiment_name}/runs/{run_id}/children"
endpoint_children = hosturl + history_base + resource_base + children_base

response_children = requests.get(
    endpoint_children,
    headers=auth.get_authentication_header()
)
step_list = json.loads(response_children.content)["value"]
for step in step_list:
    if step["name"] == module_name:
        step_id = step["runId"]
        break

print(f"Id of step: {step_id}")

In [None]:
from azureml.core.authentication import AzureCliAuthentication

auth = InteractiveLoginAuthentication()

hosturl = f"https://{ws.location}.api.azureml.ms/"
history_base = "history/v1.0/"
resource_base = f"subscriptions/{ws.subscription_id}/resourceGroups/{ws.resource_group}/providers/Microsoft.MachineLearningServices/workspaces/{ws.name}/"
rundetails_base = f"experiments/{experiment_name}/runs/{step_id}/details"
endpoint_rundetails = hosturl + history_base + resource_base + rundetails_base

response_rundetails = requests.get(
    endpoint_rundetails,
    headers=auth.get_authentication_header()
)

output_details = json.loads(response_rundetails.content)["runDefinition"]["dataReferences"]["output"]
print(f"Output details: {output_details}")