# Azure Machine Learning - Many Models Training/Forecasting Pipeline Creation
This notebook demonstrates creation and execution of an Azure ML pipeline designed to load bulk time-series data from an AML-linked datastore, split the data into individual time-series and register as a File Dataset, train a forecasting model for each distinct time-series using AutoML, generate a forward-looking forecast for each distinct time-series, and finally aggregate all forecasted results and register them as a dataset in the AML workspace

### Import required packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging

### Connect to Azure ML workspace, provision compute resources, and get references to datastores
Connect to workspace using config associated config file. Get a reference to you pre-existing AML compute cluster or provision a new cluster to facilitate processing. Finally, get references to your default blob datastore.

In [None]:
# Connect to AML Workspace
ws = Workspace.from_config()

#Select AML Compute Cluster
compute_target_name = 'cpucluster'

# Verify that cluster does not exist already
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_target_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D13_V2',
                                                           min_nodes=0,
                                                           max_nodes=10)
    compute_target = ComputeTarget.create(ws, compute_target_name, compute_config)
    compute_target.wait_for_completion(show_output=True)
    
#Get default datastore
ds = ws.get_default_datastore()

### Create Run Configuration
The `RunConfiguration` defines the environment used across all python steps. You can optionally add additional conda or pip packages to be added to your environment. [More details here](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.conda_dependencies.condadependencies?view=azure-ml-py).
Here, we also register the environment to the AML workspace so that it can be used for future retraining and inferencing operations.

In [None]:
run_config = RunConfiguration()
run_config.docker.use_docker = True
run_config.environment = Environment.get(ws, 'AzureML-AutoML')
# run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
# # run_config.environment.name = 'ManyModels_AutoML'
# run_config.environment.python.conda_dependencies = CondaDependencies.create()
# run_config.environment.python.conda_dependencies = CondaDependencies.create(pip_packages=['sklearn', 'pandas', 'joblib', 'azureml-defaults', 'azureml-core', 'azureml-dataprep[fuse]'])
# run_config.environment.python.conda_dependencies.set_python_version('3.8.10')

#Register environment for reuse 
# run_config.environment.register(ws)

In [None]:
run_config.environment

### Define Output Datasets
Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the register_on_complete() call. In this example, we also define column types to support AutoML operations during training.

In [None]:
forecast_dataset = OutputFileDatasetConfig(name='oj_forecast_data', destination=(ds, 'oj_forecast_data/{run-id}')).register_on_complete(name='oj_forecast_data')
train_dataset = OutputFileDatasetConfig(name='oj_train_data',destination=(ds, 'oj_train_data/{run-id}')).register_on_complete(name='oj_train_data')
result_dataset = OutputFileDatasetConfig(name='oj_result_data',destination=(ds, 'oj_result_data/{run-id}')).read_delimited_files().register_on_complete(name='oj_result_data')

### Define Pipeline Parameters
`PipelineParameter` objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we define the following parameters for our Azure ML Pipeline:

| Parameter Name  | Parameter Description |
|------------- | -------------|
|`source_dataset_name`  | The name of the bulk time-series dataset available in the AML workspace. |
|`group_column_names`  | Semicolon-delimited list of column names which uniquely identify individual time-series.|
|`timestamp_column`  | Name of the column which contains timestamps.|
|`cutoff_date`  | Final date in the dataset to be included in forecast model training. All datapoints after the cutoff date will be included in the `forecast_dataset`.|

<b><i>Modification Note:</i></b> When leveraging this sample for your own forecasting activities, update the default values of all columns to reflect your registered dataset, group columns, timestamp column, and cutoff date within your dataset.

In [None]:
source_dataset_name = PipelineParameter(name='source_dataset_name', default_value='OJ-Sales-Data')
group_column_names = PipelineParameter(name='group_column_names', default_value='Store;Brand')
timestamp_column = PipelineParameter(name='timestamp_column', default_value='WeekStarting')
cutoff_date = PipelineParameter(name='cutoff_date', default_value='1992-05-28')

### Define Pipeline Steps
The pipeline below consists of four distinct steps to prepare data, train models, generate forecasts, and aggregate results. First, we call `organize_data.py` and retrieve data from the registered datastore, split into individual time-series according to the columns listed in `group_column_names`, then further separate into training and forecasting subsets based on the specified `cutoff_date`, save each time-series to a file and register as a new File Dataset. 

From here we configure an AutoML job forecasting job which will train a model for each distinct time-series (using it's associated training data) and register into the workspace. For efficiency, these models are trained in parallel across multiple nodes in a compute cluster. To reduce training/inferencing time, increase the number of nodes in your cluster.

Following training, we generate a forecast for each individual time-series across the dates included in the `forecast_dataset` using each time-series' best-performing model.

Finally, we aggregate all of the forecasted results across time-series into a single dataset (`result_dataset`) and register that in the AML datastore.

Each component is broken out individually and described in more detail below.

### Organize Data (Pipeline Step)
The step below executes the script included in `pipeline_step_scripts/organize_data.py` and separates out individual time series into test/inference subsets that get saved into multiple files.

<b><i>Modification Note:</i></b> If you need to extrapolate dates further into the future to support forward looking forecasting, the underlying code can be modified to generate dates that extend into the future. [See this document for reference](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.date_range.html).

In [None]:
#Create PythonScriptStep to gather data from remote source and register as AML dataset
organize_data_step = PythonScriptStep(
    name='Organize Time-Series Data',
    script_name="organize_data.py", 
    arguments=["--train_dataset", train_dataset, 
               "--forecast_dataset", forecast_dataset, 
               "--source_dataset_name", source_dataset_name, 
               '--group_column_names', group_column_names,
              '--timestamp_column', timestamp_column,
              '--cutoff_date', cutoff_date],
    outputs=[train_dataset, forecast_dataset],
    compute_target=compute_target, 
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)


### Configure and Run Many Models AutoML Training Job (Pipeline Step)
The code below configures the settings for your AutoML job (`automl_settings`). Details on these settings can be found [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-configure-auto-train). Once configured, a `ParallelRunStep` is created to execute training jobs for each individual time-series with an appropriate configured environment. Helper functions located in the `automl_train/scripts` dir are leveraged here.

<b><i>Modification Note:</i></b> The following variabels need be updated inside of `automl_settings` when configuring your own forecasting job: `label_column_name`, `time_column_name`, `group_column_names`, and `grain_column_names`. The latter two should include all of the columns which can uniquely identify a particular time-series. To reduce training times, consider increasing the `node_count` and `process_count_per_node` variables below.

In [None]:
import sys
sys.path.append(".")
from automl_train.scripts.helper import write_automl_settings_to_file, build_parallel_run_config
from azureml.pipeline.core import PipelineParameter, PipelineData

#Set up AutoML configuration and write to a file
automl_settings = {
    "task" : 'forecasting',
    "primary_metric" : 'normalized_root_mean_squared_error',
    "iteration_timeout_minutes" : 60, # This needs to be changed based on the dataset. We ask customer to explore how long training is taking before settings this value
    "iterations" : 15,
    "experiment_timeout_hours" : 3,
    "label_column_name" : 'Quantity',
    "n_cross_validations" : 3,
    "verbosity" : logging.INFO, 
    "debug_log": 'automl_sales_debug.txt',
    "time_column_name": 'WeekStarting',
    "max_horizon" : 20,
    "track_child_runs": False,
    "group_column_names": ['Store', 'Brand'],
    "grain_column_names": ['Store', 'Brand']
}
write_automl_settings_to_file(automl_settings)

#Set up training environment (reused for inferencing later)
from automl_train.scripts.helper import get_automl_environment
train_env = get_automl_environment(workspace=ws, automl_settings_dict=automl_settings)

# train_env = Environment.get(ws, 'ManyModels_AutoML_Env', version='4')
# train_env = Environment.get(ws, 'AzureML-AutoML')

#Configure your cluster
node_count=1
process_count_per_node=8
run_invocation_timeout=3700

#Build parallel run step configuration
parallel_run_config = build_parallel_run_config(train_env, compute_target, node_count, process_count_per_node, run_invocation_timeout)
training_output_name = "training_output"
train_output_dir = PipelineData(name=training_output_name, 
                          datastore=ds)

from azureml.pipeline.steps import ParallelRunStep

#Define training ParallelRunStep
train_parallel_run_step = ParallelRunStep(
    name="Many Models Training (AutoML)",
    parallel_run_config=parallel_run_config,
    allow_reuse = False,
    inputs=[train_dataset.as_input(name='train_data')],
    output=train_output_dir,
#     arguments=['--client_sdk_version', '1.41.0']
)

#Specify that training step much occur after data gathering step
train_parallel_run_step.run_after(organize_data_step)

In [None]:
train_env

### Configure and Run Many Models AutoML Forecasting Job (Pipeline Step)
The code below configures a parallel forecasting job for each individual time-series contained within `forecast_dataset`. Helper code located in `automl_inference/scripts` dir is used to help configure job parameters.

<b><i>Modification Note:</i></b> When modifying for your own forecasting activites, update the `--group_column_names`, `--time_column_name`, and `--target_column_name` arguments in the `ParallelRunStep` definition.

In [None]:
#Create Pipeline Step for Inferencing
forecast_env = train_env

from automl_inference.scripts.helper import build_parallel_run_config_for_forecasting

#Set up configuration for parallel inferencing run
node_count=1
process_count_per_node=10
run_invocation_timeout=300 # this timeout(in seconds), for larger models need to change this to a higher timeout

parallel_run_config = build_parallel_run_config_for_forecasting(forecast_env, compute_target, node_count, process_count_per_node, run_invocation_timeout)

from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import ParallelRunStep

#Define location where forecasting output will be saved
forecasting_output_name = 'automl_forecasting_output'
forecast_output_dir = PipelineData(name = forecasting_output_name, 
                          datastore = ds)

#Create parallel inferencing step
inference_parallel_run_step = ParallelRunStep(
    name="Many Models Forecasting",
    parallel_run_config=parallel_run_config,
    inputs=[forecast_dataset.as_input(name='inference_data')], 
    output=forecast_output_dir,
    arguments=[
              '--append_row_dataframe_header', True,
              '--group_column_names', 'Store', 'Brand',
              '--time_column_name', 'WeekStarting', #[Optional] # this is needed for timeseries
              '--target_column_name', 'Quantity', # [Optional] Needs to be passed only if inference data contains target column.
              ])

#Specify that inferencing must happen after training
inference_parallel_run_step.run_after(train_parallel_run_step)

### Aggregate and Save Forecasted Results (Pipeline Step)
The code below executes executes the script at `./pipeline_step_scripts/format_and_save_results.py` designed to aggregate and format forecasted results before registering as a new dataset (`result_dataset`) in the default AML datastore.

<b><i>Modification Note:</i></b> If you desire to land your forecasted results in a location other than AML, the script referenced above can be modified to sink data in other locations.

In [None]:
format_results_step = PythonScriptStep(
    name='Format and Save Forecasting Results',
    script_name="format_and_save_results.py", 
    arguments=["--result_dataset", result_dataset, 
               "--forecast_output_dir", forecast_output_dir],
    inputs=[forecast_output_dir],
    outputs=[result_dataset],
    compute_target=compute_target, 
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
format_results_step.run_after(inference_parallel_run_step)

### Create Pipeline
Create an Azure ML Pipeline by specifying the steps to be executed. Note: based on the dataset dependencies between steps, exection occurs logically such that no step will execute unless all of the necessary input datasets have been generated.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[organize_data_step, train_parallel_run_step, inference_parallel_run_step,  format_results_step])

### Trigger a Pipeline Execution from the Notebook
You can create an Experiment (logical collection for runs) and submit a pipeline run directly from this notebook by running the commands below.

In [None]:
experiment = Experiment(ws, 'many-models-pipeline-run')
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)