## Azure ML - Sample Pipeline for File Dataset Creation/Consumption
This notebook demonstrates creation and execution of an Azure ML pipeline designed to create pandas dataframes filled with random data and save these as CSVs to a File Dataset. This File Dataset is subsequently consumed both as a mount and a download in downstream steps.  

### Import Required Packages

In [1]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData, PipelineEndpoint
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig

### Connect to Azure ML Workspace, Provision Compute Resources, and get References to Datastores
Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. Finally, get references to your default blob datastore.

In [2]:
#Connect to AML Workspace
ws = Workspace.from_config()

# #Select AML Compute Cluster
cpu_cluster_name = 'mm-cluster-new'

# Verify that cluster does not exist already
try:
    pipeline_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    pipeline_cluster = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2',
                                                           min_nodes=0,
                                                           max_nodes=1)
    pipeline_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    pipeline_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

Found an existing cluster, using it instead.


 ### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).

In [12]:
import pandas as pd
df = pd.read_csv('./datasets/spamformodel.csv')

for x in range(0, 1):
    df_temp = df.iloc[:x,:x+100]
    text = df.to_json(orient='records', lines=True)
    textfile = open("./datasets/jsonl/example_" + str(x) + ".jsonl", "w")
    print(textfile)
    a = textfile.write(text)
    textfile.close()

<_io.TextIOWrapper name='./datasets/jsonl/example_0.jsonl' mode='w' encoding='UTF-8'>


In [13]:
%%writefile ./datasets/tree.yaml

treeroot:
    branch1:
        name: Node 1
        branch1-1:
            name: Node 1-1
    branch2:
        name: Node 2
        branch2-1:
            name: Node 2-1

Overwriting ./datasets/tree.yaml


In [14]:
try:
    registered_env_name = 'env'
    conda_yml_file = 'textclassification_env.yml'
    env = Environment.from_conda_specification(registered_env_name, conda_yml_file)
    env.register(workspace=ws)
    registered_env = Environment.get(ws, registered_env_name)
    pipeline_run_config = RunConfiguration()
    
    # Use the compute you created above. 
    pipeline_run_config.target = pipeline_cluster

    # Assign the environment to the run configuration
    pipeline_run_config.environment = registered_env
    print ("Run configuration created.")
except Exception as e: 
    print(e)

Run configuration created.


In [15]:
default_ds.upload_files(files=['./datasets/jsonl/example_0.jsonl'], # Upload the diabetes csv files in /data
                        target_path= 'datasets2/jsonl', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

default_ds.upload_files(files=['./datasets/tree.yaml'], # Upload the diabetes csv files in /data
                        target_path= 'datasets2/yaml', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

Uploading an estimated of 1 files
Uploading ./datasets/jsonl/example_0.jsonl
Uploaded ./datasets/jsonl/example_0.jsonl, 1 files out of an estimated total of 1
Uploaded 1 files
Uploading an estimated of 1 files
Uploading ./datasets/tree.yaml
Uploaded ./datasets/tree.yaml, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_8cdbe7c6d8f34dc194377d36b358c8e4

### Define Output Datasets
Below we define the configuration for the `FileDataset` that will be passed between steps in our pipeline. 

In [17]:
input_json = Dataset.File.from_files((default_ds, "/datasets2/jsonl/example_0.jsonl"))
input_yaml = Dataset.File.from_files((default_ds, "/datasets2/yaml/tree.yaml"))

In [18]:
sample_file_dataset = OutputFileDatasetConfig(name='sample_file_dataset', destination=(default_ds, 'sample_file_dataset/{run-id}')).register_on_complete(name='sample_file_dataset')

### Define Pipeline Steps
The pipeline below consists of steps to gather and register data from a remote source, a scoring step where the registered model is used to make predictions on loaded, and a data publish step where scored data can be exported to a remote data source. All of the PythonScriptSteps have a corresponding *.py file which is referenced in the step arguments. Also, any PipelineParameters defined above can be passed to and consumed within these steps.

In [19]:
%%writefile ./pipeline_step_scripts/register_file_dataset.py

from azureml.core import Run, Workspace, Datastore, Dataset
from azureml.core.model import Model
from azureml.data.datapath import DataPath
import pandas as pd
import os
import argparse
import yaml

# Parse input arguments
parser = argparse.ArgumentParser("Register File Dataset")
parser.add_argument("--input_json", type=str, dest='input_json', help='input json dataset')
parser.add_argument("--input_yaml", type=str, dest='input_yaml', help='input yaml dataset')
parser.add_argument('--sample_file_dataset', dest='sample_file_dataset', required=True)

args, _ = parser.parse_known_args()
input_json = args.input_json
input_yaml = args.input_yaml
sample_file_dataset = args.sample_file_dataset

# Get current run
current_run = Run.get_context()

# Get associated AML workspace
ws = current_run.experiment.workspace

# Get default datastore
ds = ws.get_default_datastore()

# Generate random sample data
random_df = pd.util.testing.makeDataFrame()
print(random_df)

random_df2 = pd.util.testing.makeDataFrame()
print(random_df2)

print("input file location")
print(input_json)
testObject = pd.read_json(path_or_buf=input_json, lines=True)
print(testObject)

print('*****************')
print("input yaml location")
print(input_yaml)



with open(input_yaml) as f:
    # use safe_load instead load
    dataMap = yaml.safe_load(f)


# Save file dataset
os.makedirs(sample_file_dataset, exist_ok=True)
random_df.to_csv(os.path.join(sample_file_dataset, 'sample_data.csv'))
random_df2.to_csv(os.path.join(sample_file_dataset, 'sample_data_2.csv'))

Overwriting ./pipeline_step_scripts/register_file_dataset.py


In [23]:
register_data_step = PythonScriptStep(
    name='Register File Dataset',
    script_name='register_file_dataset.py',
    arguments=[
        '--input_json', input_json.as_named_input('input_json').as_download(),
        '--input_yaml', input_yaml.as_named_input('input_yaml').as_download(),
        '--sample_file_dataset', sample_file_dataset,
    ],
    #inputs=[input_json],
    outputs=[sample_file_dataset],
    compute_target=pipeline_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=pipeline_run_config
)

consume_data_as_download_step = PythonScriptStep(
    name='Consume File Dataset as Download',
    script_name='consume_file_dataset_as_download.py',
    arguments=[
        '--local_download_dir', './tmpdir'
    ],
    inputs=[sample_file_dataset.as_input(name='sample_file_dataset').as_download('./tmpdir')],
    outputs=[],
    compute_target=pipeline_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=pipeline_run_config
)

consume_data_as_mount_step = PythonScriptStep(
    name='Consume File Dataset as Mount',
    script_name='consume_file_dataset_as_mount.py',
    arguments=[],
    inputs=[sample_file_dataset.as_input(name='sample_file_dataset').as_mount()],
    outputs=[],
    compute_target=pipeline_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=pipeline_run_config
)

### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [24]:
pipeline = Pipeline(workspace=ws, steps=[register_data_step, consume_data_as_download_step, consume_data_as_mount_step])

### Create Experiment and Run Pipeline
Define a new experiment (logical container for pipeline runs) and execute the pipeline. You can modify the values of pipeline parameters here when submitting a new run.

In [25]:
experiment = Experiment(ws, 'file_dataset_testing')
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)

Created step Register File Dataset [e8012e10][1967c230-3139-46fd-861c-067c33514b5f], (This step will run and generate new outputs)
Created step Consume File Dataset as Download [21aaa857][3048d4ad-c40e-4767-8741-480792894076], (This step will run and generate new outputs)
Created step Consume File Dataset as Mount [4abd9c7a][76cbf0da-e673-408b-b6a4-4e6668010c73], (This step will run and generate new outputs)
Submitted PipelineRun 3f1024b8-f993-420f-99f5-2927de4f6c51
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/3f1024b8-f993-420f-99f5-2927de4f6c51?wsid=/subscriptions/b071bca8-0055-43f9-9ff8-ca9a144c2a6f/resourcegroups/aml-dev-rg/workspaces/aml-dev&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
PipelineRunId: 3f1024b8-f993-420f-99f5-2927de4f6c51
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/3f1024b8-f993-420f-99f5-2927de4f6c51?wsid=/subscriptions/b071bca8-0055-43f9-9ff8-ca9a144c2a6f/resourcegroups/aml-dev-rg/workspaces/aml-dev&tid=16b3c013-d300-468d-ac64-

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "Execution failed. User process '/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/bin/python' exited with status code 1. Please check log file 'user_logs/std_log.txt' for error details. Error: Traceback (most recent call last):\n  File \"register_file_dataset.py\", line 50, in <module>\n    dataMap = yaml.safe_load(f)\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/__init__.py\", line 125, in safe_load\n    return load(stream, SafeLoader)\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/__init__.py\", line 81, in load\n    return loader.get_single_data()\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/constructor.py\", line 49, in get_single_data\n    node = self.get_single_node()\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/composer.py\", line 39, in get_single_node\n    if not self.check_event(StreamEndEvent):\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/parser.py\", line 98, in check_event\n    self.current_event = self.state()\n  File \"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/parser.py\", line 171, in parse_document_start\n    raise ParserError(None, None,\nyaml.parser.ParserError: expected '<document start>', but found '{'\n  in \"/mnt/azureml/cr/j/96b30169937344c2ac399ae35070bd03/cap/data-capability/wd/INPUT_input_yaml/example_0.jsonl\", line 2, column 1\n\n",
        "messageParameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z",
    "componentName": "CommonRuntime"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"Execution failed. User process '/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/bin/python' exited with status code 1. Please check log file 'user_logs/std_log.txt' for error details. Error: Traceback (most recent call last):\\n  File \\\"register_file_dataset.py\\\", line 50, in <module>\\n    dataMap = yaml.safe_load(f)\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/__init__.py\\\", line 125, in safe_load\\n    return load(stream, SafeLoader)\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/__init__.py\\\", line 81, in load\\n    return loader.get_single_data()\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/constructor.py\\\", line 49, in get_single_data\\n    node = self.get_single_node()\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/composer.py\\\", line 39, in get_single_node\\n    if not self.check_event(StreamEndEvent):\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/parser.py\\\", line 98, in check_event\\n    self.current_event = self.state()\\n  File \\\"/azureml-envs/azureml_87856afa2bfd1e928f01779ebba2a100/lib/python3.8/site-packages/yaml/parser.py\\\", line 171, in parse_document_start\\n    raise ParserError(None, None,\\nyaml.parser.ParserError: expected '<document start>', but found '{'\\n  in \\\"/mnt/azureml/cr/j/96b30169937344c2ac399ae35070bd03/cap/data-capability/wd/INPUT_input_yaml/example_0.jsonl\\\", line 2, column 1\\n\\n\",\n        \"messageParameters\": {},\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\",\n    \"componentName\": \"CommonRuntime\"\n}"
    }
}