# Distributed Processing

## Connect to Your Workspace

The first thing you need to do is to connect to your workspace using the Azure ML SDK.

> **Note**: If the authenticated session with your Azure subscription has expired since you completed the previous exercise, you'll be prompted to reauthenticate.

In [None]:
import azureml.core
from azureml.core import Workspace
from azureml.core.authentication import InteractiveLoginAuthentication

# Load the workspace from the saved config file
auth = InteractiveLoginAuthentication(tenant_id='<tenant_id>')
ws = Workspace.from_config(path='.', auth=auth)
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

## Run an experiment script
This is the case when we have a dataset with one single file.

In [None]:
import os, shutil

# Create a folder for the experiment files
folder_name = 'diabetes-distributed-processing'
experiment_folder = './' + folder_name
os.makedirs(folder_name, exist_ok=True)

# Copy the data file into the experiment folder
#shutil.copy('data/diabetes.csv', os.path.join(folder_name, "diabetes.csv"))

In [None]:
%%writefile $folder_name/diabetes_data_preparation.py
from azureml.core import Run
import pandas as pd
import os


def main():
    # Get the experiment run context
    run = Run.get_context()

    # load the diabetes dataset
    data = pd.read_csv('diabetes.csv')

    # [0    to 18.5 ) = Low       => 0
    # [18.5 to 25.0 ) = Normal    => 1
    # [25.0 to 28.0 ) = Increased => 2
    # [28.0 to 32.0 ) = High      => 3
    # [32.0 to 100.0) = Very high => 4
    data['BMILevel'] = pd.cut(data['BMI'], right=False, 
                              bins=[0, 18.5, 25, 28, 32, 100], 
                              labels=[0, 1, 2, 3, 4])

    # Save a sample of the data in the outputs folder (which gets uploaded automatically)
    os.makedirs('outputs', exist_ok=True)
    data.to_csv("outputs/data.csv", index=False, header=True)

    # Complete the run
    run.complete()

    
if __name__ == '__main__':
    main()


In [None]:
import os
import sys
from azureml.core import Experiment, ScriptRunConfig
from azureml.widgets import RunDetails


# Create a script config
script_config = ScriptRunConfig(
    source_directory=experiment_folder, 
    script='diabetes_data_preparation.py'
)

# submit the experiment
experiment = Experiment(workspace = ws, name = 'diabetes-experiment-distributed-processing')
run = experiment.submit(config=script_config)
RunDetails(run).show()
run.wait_for_completion()

## Parallel Run for File Dataset

### Prepare data for an experiment

Read data from a single file, split into chunks and save them as separate files.

In [None]:
import os
import pandas as pd

data_folder_name = 'data-multiple-files'
os.makedirs(data_folder_name, exist_ok=True)

# read a file with all data
df = pd.read_csv('./data/diabetes.csv')

# split data into chunks and save them as separate files
size = 100
list_of_dfs = [df.loc[i:i+size-1,:] for i in range(0, len(df),size)]
for idx, df in enumerate(list_of_dfs):
    df.to_csv(f'{data_folder_name}/diabetes_{idx}.csv', index=False)

### Register File dataset

In [None]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes multi dataset' not in ws.datasets:
    default_ds.upload(
        src_dir=data_folder_name,  # Upload all csv files in the folder
        target_path='diabetes-multi-data/', # Put it in a folder path in the datastore
        overwrite=True,  # Replace existing files of the same name
        show_progress=True
    )


    #Create a tabular dataset from the path on the datastore (this may take a short while)
    file_data_set = Dataset.File.from_files(path=(default_ds, 'diabetes-multi-data/*.csv'))

    # Register the file dataset
    try:
        file_data_set = file_data_set.register(workspace=ws, 
                                name='diabetes multi dataset',
                                description='diabetes data split into multiple files',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')
    file_data_set = Dataset.get_by_name(ws, name='diabetes multi dataset')

In [None]:
# List all files that are in the File Dataset
file_data_set.to_path()

### Create compute cluster

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "cpu-comp-cluster"

try:
    # Check for existing compute target
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D11_V2', max_nodes=4)
        compute_target = ComputeTarget.create(ws, cluster_name, compute_config)
        compute_target.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

### Create a pipeline for batch processing

In [None]:
%%writefile $folder_name/diabetes_batch_data_preparation.py
import os
import numpy as np
import pandas as pd
from azureml.core import Model


def init():
    print("Executing init() function...")


def run(mini_batch):
    print("Executing run() function...")
    print(f'dataprep start: {__file__}, run({mini_batch})')
    
    # empty dataframe that will collect processing result of the entire mini batch.
    df_result = pd.DataFrame()

    # process each file in the batch
    for f in mini_batch:
        print("f: ", f)

        # load the diabetes dataset
        data = pd.read_csv(f)

        # [0    to 18.5 ) = Low       => 0
        # [18.5 to 25.0 ) = Normal    => 1
        # [25.0 to 28.0 ) = Increased => 2
        # [28.0 to 32.0 ) = High      => 3
        # [32.0 to 100.0) = Very high => 4
        data['BMILevel'] = pd.cut(data['BMI'], right=False, 
                                  bins=[0, 18.5, 25, 28, 32, 100], 
                                  labels=[0, 1, 2, 3, 4])        
        
        # append result
        df_result = pd.concat([df_result, data])

    print(f"Processed:\n{df_result}")
    return df_result


In [None]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

batch_env = Environment(name="batch_environment")
batch_env.python.conda_dependencies = CondaDependencies.create(
    pip_packages=['azureml-defaults', 'pandas']
)
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# create output location for processed data
output_dir = PipelineData(name="prepared_data", datastore=ws.get_default_datastore())

parallel_run_config = ParallelRunConfig(
    source_directory=folder_name,
    entry_script='diabetes_batch_data_preparation.py',
    mini_batch_size="20", # process max 20 csv files in one run() call
    error_threshold=10,  # do not ignore any file failure
    output_action="append_row",
    environment=batch_env,
    compute_target=compute_target,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name="data-pre-process",
    parallel_run_config=parallel_run_config,
    inputs=[ file_data_set.as_named_input('diabetes_ds') ],
    output=output_dir,
    allow_reuse=True
)

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'diabetes-experiment-distributed-processing').submit(pipeline)

### Explore output

Note: The output has no header.

In [None]:
import pandas as pd

prep_step = pipeline_run.find_step_run('data-pre-process')[0]
prepared_data = prep_step.get_output_data("prepared_data")
prepared_data.download(local_path="output", overwrite=True)

for root, dirs, files in os.walk("output"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)
            
            
df = pd.read_csv(result_file, delimiter=" ", header=None)
df.columns

## Parallel Run for Tabular Dataset