# 01_Pipeline
Train many forecasting models per product

---

Import Libraries

In [None]:
import os
import pandas as pd
import azureml.core
from azureml.core.dataset import Dataset
from azureml.pipeline.core import Pipeline
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.train.automl.automlconfig import AutoMLConfig
from azureml.pipeline.core import Schedule, ScheduleRecurrence
from azureml.core import Workspace, Datastore, Experiment, Environment
from azureml.contrib.automl.pipeline.steps import AutoMLPipelineBuilder
from azureml.core.compute import ComputeTarget, ComputeInstance, AmlCompute
from azureml.automl.core.forecasting_parameters import ForecastingParameters
from azureml.train.automl.runtime._many_models.many_models_parameters import ManyModelsTrainParameters
from azureml.train.automl.runtime._many_models.many_models_parameters import ManyModelsInferenceParameters

# Connect to  workspace
ws = Workspace.from_config()

Create control file to store all parameters

In [None]:
control_file = {
    'train_experiment': 'oj_training_pipeline',
    'inference_experiment': 'oj_inference_pipeline',
    'compute': {
        'name': '_',
        'version'
        'source_location' : './sripts',
        'datastore_name' : '_',
    },
    'environment' : {
        'name': 'oj_env',
        'version': 1
    },
    'data': {
        'data_asset': 'oj_data_small_train',
        'output': 'oj_sales_data',
        'forecast': 'oj_forecast'
    },
    'pipeline': {
        'source': './scripts',
    },
    'forecasting': {
        'horizon': 8,
        'partitions': ['Store', 'Brand'],
        'lags': [1,2,4,8],
        'time_column_name': 'WeekStarting',
        'cv_step_size': 'auto',
        'target_rolling_window_size': 4,
        'seasonality': 'auto',
        'use_stl': 'season_trend'
    },
    'automl': {
        'iteration_timeout_minutes': 60,
        'iterations': 10,
        'experiment_timeout_hours': 3,
        'label_column_name': 'Quantity',
        'n_cross_validations': 'auto',
        'track_child_runs': False,
        'allowed_models': ['Prophet'],
    },
    'training': {
        'node_count': 5,
        'process_count_per_node': 2,
        'run_invocation_timeout': 50000
    },
    'inference': {
        'node_count': 5,
        'process_count_per_node': 2,
        'run_invocation_timeout': 50000
    }
}

### 0.0 Create Env (run once)

In [None]:
# Define the Conda environment
env = Environment.from_conda_specification(name=control_file['environment']['name'], 
                                           file_path='environment.yml')

# Register environment
env.register(workspace=ws)

# Build environment
env.build(workspace=ws)

### 1.0 Connect to Datastore and Compute, and Name the Training Experiment

In [None]:
# Check SDK verison
print("Azure ML SDK Version:", azureml.core.VERSION)

# Connect to datastores
dstore = ws.get_default_datastore()

# Define the compute instance name
compute_name = control_file['compute']['name']

# Determine if the compute target exists
if compute_name in ws.compute_targets:
    compute = ws.compute_targets[compute_name]
    
    # Check if the compute target is of the right type (ComputeInstance or AmlCompute).
    if compute and (type(compute) is ComputeInstance or type(compute) is AmlCompute):
        print(f'Compute instance {compute_name} is found')
else:
    # Raise an exception and print an error message if the compute instance is not found.
    print(f'Compute instance {compute_name} is not found, the code cannot be executed')
    raise Exception("Compute instance not found")

# Name experiment
experiment = Experiment(ws, control_file['train_experiment'])
print('Experiment name: ' + experiment.name)

# Create a run configuration and assign compute
run_configuration = RunConfiguration()
run_configuration.target = compute

# Assign environment
run_configuration.environment = Environment.get(workspace = ws, 
                                                name      = control_file['environment']['name'], 
                                                version   = control_file['environment']['version'])

# Print details
output = {}
output['SDK version']     = azureml.core.VERSION
output['Subscription ID'] = ws.subscription_id
output['Workspace']       = ws.name
output['Resource Group']  = ws.resource_group
output['Location']        = ws.location
output['Datastore name']  = dstore.name

pd.set_option('display.max_colwidth', None)
outputDf = pd.DataFrame(data = output, index = [''])
outputDf.T

### 2.0 Load Training Data

In [None]:
# Call dataset
sales_data = Dataset.get_by_name(ws, name = control_file['data']['data_asset'])
# If there was an external features dataset, it would be called here
#features = Dataset.get_by_name(ws, name = 'features')

# Create an instance of the data_storage class for managing storage locations.
output_path = OutputFileDatasetConfig(destination = (dstore, f"{control_file['data']['output']}/train")).as_upload(overwrite=True).read_parquet_files()

### 3.0 Build Training Pipeline

In [None]:
# Cast lags list as string
lags = ', '.join([str(item) for item in control_file['forecasting']['lags']])
partitions = ', '.join([str(item) for item in control_file['forecasting']['partitions']])

# Python data prep step
data_prep_step = PythonScriptStep(
    script_name      = "preprocessing.py",
    source_directory = control_file['pipeline']['source'],
    inputs           = [sales_data.as_named_input('train_data')], #features would also be passed in here
    arguments        = ["--step", "train",
                        "--output_path", output_path, 
                        "--lags", lags,
                        "--horizon", control_file['forecasting']['horizon'],
                        "--time_column_name", control_file['forecasting']['time_column_name'],
                        "--label_column_name", control_file['automl']['label_column_name'],
                        "--partitions", partitions],
    compute_target   = compute,
    runconfig        = run_configuration,
    allow_reuse      = False
)

# Forecasting Parameters
forecasting_parameters = ForecastingParameters(
    time_column_name            = control_file['forecasting']['time_column_name'],
    forecast_horizon            = control_file['forecasting']['horizon'],
    time_series_id_column_names = control_file['forecasting']['partitions'],
    cv_step_size                = control_file['forecasting']['cv_step_size'],
    target_lags                 = control_file['forecasting']['lags'],
    target_rolling_window_size  = control_file['forecasting']['target_rolling_window_size'],
    seasonality                 = control_file['forecasting']['seasonality'],
    use_stl                     = control_file['forecasting']['use_stl']
)

# AutoML Config
automl_settings = AutoMLConfig(
    task                      = "forecasting",
    primary_metric            = "normalized_root_mean_squared_error",
    iteration_timeout_minutes = control_file['automl']['iteration_timeout_minutes'],
    iterations                = control_file['automl']['iterations'],
    experiment_timeout_hours  = control_file['automl']['experiment_timeout_hours'],
    label_column_name         = control_file['automl']['label_column_name'],
    n_cross_validations       = control_file['automl']['n_cross_validations'],
    track_child_runs          = control_file['automl']['track_child_runs'],
    allowed_models            = control_file['automl']['allowed_models'],
    forecasting_parameters    = forecasting_parameters    
)

# Many Models Training Parameters
mm_paramters = ManyModelsTrainParameters(
    automl_settings        = automl_settings, 
    partition_column_names = control_file['forecasting']['partitions']
)

# Define train step
train_step = AutoMLPipelineBuilder.get_many_models_train_steps(
    experiment                = experiment,
    train_data                = output_path.as_input('train_10_models'),
    compute_target            = compute,
    node_count                = control_file['training']['node_count'],
    process_count_per_node    = control_file['training']['process_count_per_node'],
    run_invocation_timeout    = control_file['training']['run_invocation_timeout'],
    train_pipeline_parameters = mm_paramters,
)

steps = [data_prep_step, train_step]
print('Pipeline built')

### 4.0 Run the Training Pipeline

In [None]:
# Create and submit pipeline
pipeline = Pipeline(workspace=ws, steps=steps)
training_run = experiment.submit(pipeline)

print(f'Experiment Name: {training_run.experiment.name}, Run ID: {training_run.id}')

### 5.0 Publish the Training Pipeline

In [None]:
published_pipeline = pipeline.publish(name = 'oj_train_many_models',
                                     description = 'Train many models on oj sales data',
                                     version = '1',
                                     continue_on_step_failure = False)

### 6.0 Schedule the Training Pipeline

In [None]:
training_pipeline_id = published_pipeline.id

recurrence = ScheduleRecurrence(frequency="Month", interval=1, start_time="2020-01-01T09:00:00")
recurring_schedule = Schedule.create(ws, name="automl_training_recurring_schedule", 
                            description="Schedule Training Pipeline to run on the first day of every month",
                            pipeline_id=training_pipeline_id, 
                            experiment_name=experiment.name, 
                            recurrence=recurrence)

### 7.0 Name the Inference Experiment

In [None]:
# Name experiment
experiment = Experiment(ws, control_file['inference_experiment'])
print('Experiment name: ' + experiment.name)

### 8.0 Load Inference Data

In [None]:
# Call dataset
sales_data = Dataset.get_by_name(ws, name = control_file['data']['data_asset'])

# Create an instance of the data_storage class for managing storage locations.
output_path = OutputFileDatasetConfig(destination = (dstore, f"{control_file['data']['output']}/inference")).as_upload(overwrite=True).read_parquet_files()

# Define the forecast output location
output_file_name = "forecast.csv"
forecast_path = OutputFileDatasetConfig(destination = (dstore, f"{control_file['data']['data_output']}/forecast")).register_on_complete(name=control_file['data']['forecast'])

### 9.0 Build Inference Pipeline

In [None]:
# Cast lags list as string
lags = ', '.join([str(item) for item in control_file['forecasting']['lags']])

# Python data prep step
data_prep_step = PythonScriptStep(
    script_name      = "preprocessing.py",
    source_directory = control_file['pipeline']['source'],
    inputs           = [sales_data.as_named_input('train_data')],
    arguments        = ["--step", "inference",
                        "--output_path", output_path, 
                        "--lags", lags,
                        "--horizon", control_file['forecasting']['horizon'], 
                        "--label_column_name", control_file['automl']['label_column_name']],
    compute_target   = compute,
    runconfig        = run_configuration,
    allow_reuse      = False
)

# Many Models Inference Parameters
mm_parameters = ManyModelsInferenceParameters(
    partition_column_names = control_file['forecasting']['partitions'],
    time_column_name       = control_file['forecasting']['time_column_name'],
    target_column_name     = control_file['forecasting']['label_column_name'],
    inference_type         = "forecast",
    forecast_mode          = "recursive",
    forecast_quantiles     = [0.1, 0.5, 0.9]
)

# Create inference step
inference_step = AutoMLPipelineBuilder.get_many_models_batch_inference_steps(
    experiment                    = experiment, 
    inference_data                = output_path,
    compute_target                = compute,
    node_count                    = control_file['inference']['node_count'],
    process_count_per_node        = control_file['inference']['process_count_per_node'],
    run_invocation_timeout        = control_file['inference']['run_invocation_timeout'],
    output_datastore              = forecast_path,
    train_experiment_name         = training_run.experiment.name,
    train_run_id                  = training_run.id,
    inference_pipeline_parameters = mm_parameters,
    append_row_file_name          = output_file_name
)

steps = [data_prep_step, inference_step]
print('Pipeline built')

### 10.0 Run the Inference Pipeline

In [None]:
# Create and submit pipeline
pipeline = Pipeline(workspace = ws, steps = steps)
inference_run = experiment.submit(pipeline)

### 11.0 Publish Inference Pipeline

In [None]:
published_pipeline = pipeline.publish(name = 'oj_inference_many_models',
                                     description = 'Forecast many models on oj sales data',
                                     version = '1',
                                     continue_on_step_failure = False)

### 12.0 Schedule Inference Pipeline

In [None]:
inference_pipeline_id = published_pipeline.id

recurrence = ScheduleRecurrence(frequency="Month", interval=1, start_time="2020-01-01T09:00:00")
recurring_schedule = Schedule.create(ws, name="automl_training_recurring_schedule", 
                            description="Schedule Training Pipeline to run on the first day of every month",
                            pipeline_id=inference_pipeline_id, 
                            experiment_name=experiment.name, 
                            recurrence=recurrence)