Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/parallel-run/tabular-dataset-partition-per-column.png)

# Using Azure Machine Learning Pipelines and Parallel Run Step (PRS) for Many Model Training and Batch Inference using tabular input partitioned by column value


This example will create a partitioned tabular dataset by splitting the rows in a large csv file by its value on specified column. Each partition will form up a mini-batch in the parallel processing procedure.

The outline of this notebook is as follows:

- Create a tabular dataset partitioned by value on specified column.
- Do ML training of forecast model per each partition
- Do batch inference on the dataset with each mini-batch corresponds to one partition.


### Connect to workspace

In [1]:
from azureml.core.workspace import Workspace

from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core import Workspace

subscription_id = '43464086-3df3-46f2-8194-66a0f80095a6'
# Azure Machine Learning resource group NOT the managed resource group
resource_group = 'ml-resource-group' 

#Azure Machine Learning workspace name, NOT Azure Databricks workspace
workspace_name = 'ml-workspace'  


#auth = InteractiveLoginAuthentication(tenant_id =tenant_id)
# Instantiate Azure Machine Learning workspace
ws = Workspace.get(name=workspace_name,
                   subscription_id=subscription_id,
                   resource_group=resource_group)

print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

datastore = ws.get_default_datastore()

Workspace name: ml-workspace
Azure region: westeurope
Subscription id: 43464086-3df3-46f2-8194-66a0f80095a6
Resource group: ml-resource-group


In [3]:
from azureml.core.authentication import MsiAuthentication,ServicePrincipalAuthentication,TokenAuthentication, Audience
msi_auth = MsiAuthentication()
msi_auth


<azureml.core.authentication.MsiAuthentication at 0x7f16f62f9a60>

In [4]:
import azureml.core
print(azureml.core.VERSION)

1.30.0


### Download OJ sales data from opendataset url

In [2]:
 #! pip install azureml-opendatasets --user

In [3]:
import os
from azureml.opendatasets import OjSalesSimulated
dataset_maxfiles = 10 # Set to 11973 or 0 to get all the files

# Pull all of the data
oj_sales_files = OjSalesSimulated.get_file_dataset()

# Pull only the first `dataset_maxfiles` files
if dataset_maxfiles:
    oj_sales_files = oj_sales_files.take(dataset_maxfiles)

# Create a folder to download
download_path = 'data/oj_sales_data' 
os.makedirs(download_path, exist_ok=True)

# Download the data
oj_sales_files.download(download_path, overwrite=True,)

['/mnt/batch/tasks/shared/LS_root/mounts/clusters/kchatziprmou2/code/Users/kchatziprmou/csa-misc-utils/sa-dsml-many-models/code/aml_prs/data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/Store1000_dominicks.csv',
 '/mnt/batch/tasks/shared/LS_root/mounts/clusters/kchatziprmou2/code/Users/kchatziprmou/csa-misc-utils/sa-dsml-many-models/code/aml_prs/data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/Store1000_minute.maid.csv',
 '/mnt/batch/tasks/shared/LS_root/mounts/clusters/kchatziprmou2/code/Users/kchatziprmou/csa-misc-utils/sa-dsml-many-models/code/aml_prs/data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/Store1000_tropicana.csv',
 '/mnt/batch/tasks/shared/LS_root/mounts/clusters/kchatziprmou2/code/Users/kchatziprmou/csa-misc-utils/sa-dsml-many-models/code/aml_prs/data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.n

### Upload OJ sales data to datastore

In [9]:
target_path = 'oj_sales_data'

datastore.upload(src_dir = download_path,
                target_path = target_path,
                overwrite = True, 
                show_progress = True)


Uploading an estimated of 12 files
Uploading data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/.amlignore
Uploaded data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/.amlignore, 1 files out of an estimated total of 12
Uploading data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/.amlignore.amltmp
Uploaded data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/.amlignore.amltmp, 2 files out of an estimated total of 12
Uploading data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/Store1000_dominicks.csv
Uploaded data/oj_sales_data/https%3A/%2Fazureopendatastorage.azurefd.net/ojsales-simulatedcontainer/oj_sales_data/Store1000_dominicks.csv, 3 files out of an estimated total of 12
Uploading data/oj_sales_data/https%3A/%2Fazureope

$AZUREML_DATAREFERENCE_01dd060ea18840b2845e0653d657dac8

### Create tabular dataset
Create normal tabular dataset

In [13]:
from azureml.core import Dataset

dataset = Dataset.Tabular.from_delimited_files(path=(datastore, 'oj_sales_data/*/*/*/*/*.csv'))
print(dataset.to_pandas_dataframe())

     WeekStarting  Store      Brand  Quantity  Advert  Price   Revenue
0      1990-06-14   1000  dominicks     12003       1   2.59  31087.77
1      1990-06-21   1000  dominicks     10239       1   2.39  24471.21
2      1990-06-28   1000  dominicks     17917       1   2.48  44434.16
3      1990-07-05   1000  dominicks     14218       1   2.33  33127.94
4      1990-07-12   1000  dominicks     15925       1   2.01  32009.25
...           ...    ...        ...       ...     ...    ...       ...
1205   1992-09-03   1003  dominicks     10302       1   1.94  19985.88
1206   1992-09-10   1003  dominicks     13502       1   2.16  29164.32
1207   1992-09-17   1003  dominicks     19644       1   2.67  52449.48
1208   1992-09-24   1003  dominicks     13860       1   2.29  31739.40
1209   1992-10-01   1003  dominicks     11040       1   1.99  21969.60

[1210 rows x 7 columns]


### Partition the tabular dataset
Partition the dataset by column 'store' and 'brand'. You can get a partition of data by specifying the value of one or more partition keys. E.g., by specifying `store=1000 and brand='tropicana'`, you can get all the rows that matches this condition in the dataset.

In [14]:
partitioned_dataset = dataset.partition_by(partition_keys=['Store', 'Brand'], target=(datastore, "partition_by_key_res"), name="partitioned_oj_data")
partitioned_dataset.partition_keys

Method partition_by: This is an experimental method, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Validating arguments.
Arguments validated.
Uploading file to /partition_by_key_res/b1b7783e-5690-4349-955b-6e6b8e530511/
Successfully uploaded file to datastore.
Creating a new dataset.
Successfully created a new dataset.
registering a new dataset.
Successfully created and registered a new dataset.


['Store', 'Brand']

### Create or Attach existing compute resource

In [5]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

compute_target = ComputeTarget(workspace=ws, name="cpu-cluster")

# choose a name for your cluster
#compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "cpu-cluster")
#compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
#compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 2)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
#vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


#if compute_name in ws.compute_targets:
   # compute_target = ws.compute_targets[compute_name]
    #if compute_target and type(compute_target) is AmlCompute:
       # print('found compute target. just use it. ' + compute_name)
#else:
   # print('creating a new compute target...')
    #provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                               # min_nodes = compute_min_nodes, 
                                                               # max_nodes = compute_max_nodes)

    # create the cluster
   # compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    #compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    #print(compute_target.get_status().serialize())

### Intermediate/Output Data

In [6]:
from azureml.pipeline.core import Pipeline, PipelineData

output_dir = PipelineData(name="inferences", datastore=datastore)

In [17]:
scripts_folder = "..\..\code"
inference_script_file = "aml_prs\prediction.py"
train_script_file = "aml_prs\model_train.py"

## Build and run the model training  pipeline
### Specify the environment to run the script
You would need to specify the required private azureml packages in dependencies. 

In [18]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

batch_conda_deps = CondaDependencies.create(pip_packages=['sklearn', 'pandas', 'joblib', 'azureml-defaults', 'azureml-core', 'azureml-dataprep[fuse]'])
batch_env = Environment(name="many_models_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

## Training 


### Create the configuration to wrap the training script
The parameter `partition_keys` is a list containing a subset of the dataset partition keys, specifying how is the input dataset partitioned. Each and every possible combination of values of partition_keys will form up a mini-batch. E.g., by specifying `partition_keys=['store', 'brand']` will result in mini-batches like `store=1000 && brand=tropicana`, `store=1000 && brand=dominicks`, `store=1001 && brand=dominicks`, ...

In [19]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_train_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=train_script_file,  # the user script to run against each input
    partition_keys=['Store', 'Brand'],
    error_threshold=-1,
    output_action='append_row',
    append_row_file_name="training_output.txt",
    environment=batch_env,
    compute_target=compute_target, 
    node_count=2,
    run_invocation_timeout=600
)

Parameter partition_keys: This is an experimental parameter, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


In [40]:
parallel_run_train_step = ParallelRunStep(
    name='train',
    inputs=[partitioned_dataset.as_named_input("partitioned_tabular_input")],
    output=output_dir,
    parallel_run_config=parallel_run_train_config,
    allow_reuse=False
)

In [36]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallel_run_train_step])

pipeline_run = Experiment(ws, 'Many_Model_Forecast').submit(pipeline)

In [28]:
pipeline_run.wait_for_completion(show_output=True)

## View the training results
In the model_train.py file you can see that the ResultList with training metrics gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called inferences. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows

In [18]:
import pandas as pd
import tempfile

batch_run = pipeline_run.find_step_run(parallel_run_train_step.name)[0]
batch_output = batch_run.get_output_data(output_dir.name)

target_dir = tempfile.mkdtemp()
batch_output.download(local_path=target_dir)
result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_train_config.append_row_file_name)

df = pd.read_csv(result_file, delimiter=" ", header=None)

df.columns = [ "Store", "Brand", "mse", "mape", "rmse", "model_name"]
print("Train result has ", df.shape[0], " rows")
df.head(10)

Train result has  10  rows


Unnamed: 0,Store,Brand,mse,mape,rmse,model_name
0,1000,minute.maid,10220110.0,18.172361,3196.891234,prs_1000_minute.maid
1,1001,tropicana,8552595.0,19.463062,2924.481928,prs_1001_tropicana
2,1000,dominicks,10801810.0,23.172431,3286.61138,prs_1000_dominicks
3,1001,dominicks,8920132.0,20.350161,2986.658951,prs_1001_dominicks
4,1001,minute.maid,15560510.0,26.075884,3944.68067,prs_1001_minute.maid
5,1003,dominicks,12786000.0,23.512086,3575.751535,prs_1003_dominicks
6,1002,tropicana,10698820.0,20.099309,3270.904584,prs_1002_tropicana
7,1000,tropicana,9844161.0,19.846187,3137.540593,prs_1000_tropicana
8,1002,dominicks,9404042.0,23.352023,3066.601004,prs_1002_dominicks
9,1002,minute.maid,6130244.0,17.805611,2475.932918,prs_1002_minute.maid


## Inference

### Create the configuration to wrap the inference script
The parameter `partition_keys` is a list containing a subset of the dataset partition keys, specifying how is the input dataset partitioned. Each and every possible combination of values of partition_keys will form up a mini-batch. E.g., by specifying `partition_keys=['store', 'brand']` will result in mini-batches like `store=1000 && brand=tropicana`, `store=1000 && brand=dominicks`, `store=1001 && brand=dominicks`, ...

In [21]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_inference_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=inference_script_file,  # the user script to run against each input
    partition_keys=['Store', 'Brand'],
    error_threshold= -1,
    output_action='append_row',
    append_row_file_name="prediction_output.txt",
    environment=batch_env,
    compute_target=compute_target, 
    node_count=2,
    run_invocation_timeout=600
)



### Create the pipeline step

In [22]:
parallel_run_inference_step = ParallelRunStep(
    name='forecast',
    inputs=[partitioned_dataset.as_named_input("partitioned_tabular_input")],
    output=output_dir,
    parallel_run_config=parallel_run_inference_config,
    allow_reuse=False
)

### Run the pipeline

In [37]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallel_run_inference_step])

pipeline_run = Experiment(ws, 'Many_Model_Forecast').submit(pipeline)

In [31]:
pipeline_run.wait_for_completion(show_output=True)

## View the prediction results
In the prediction.py file you can see that the ResultList with prediction result gets returned. These are written to the DataStore specified in the PipelineData object as the output data, which in this case is called inferences. This containers the outputs from all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to the first 10 rows

In [26]:
import pandas as pd
import tempfile

batch_run = pipeline_run.find_step_run(parallel_run_inference_step.name)[0]
batch_output = batch_run.get_output_data(output_dir.name)

target_dir = tempfile.mkdtemp()
batch_output.download(local_path=target_dir)
result_file = os.path.join(target_dir, batch_output.path_on_datastore, parallel_run_config.append_row_file_name)

df = pd.read_csv(result_file, delimiter=" ", header=None)
df.columns = ["WeekStarting", "Prediction", "Store", "Brand"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

Prediction has  1210  rows


Unnamed: 0,WeekStarting,Prediction,Store,Brand
0,1990-06-14,,1001,dominicks
1,1990-06-21,,1001,dominicks
2,1990-06-28,,1001,dominicks
3,1990-07-05,,1001,dominicks
4,1990-07-12,13554.387879,1001,dominicks
5,1990-07-19,15375.931186,1001,dominicks
6,1990-07-26,15243.409687,1001,dominicks
7,1990-08-02,13649.643619,1001,dominicks
8,1990-08-09,14017.612778,1001,dominicks
9,1990-08-16,15976.467391,1001,dominicks
