# Training Pipeline

We do this using a 'pipeline first mentality' i.e. we want to have a production pipeline.

In [1]:
import os
import pandas as pd
import azureml.core
from azureml.core import Workspace, Experiment, Run
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.widgets import RunDetails
from azureml.core import Workspace, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.widgets import RunDetails
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from azureml.core.dataset import Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core import Experiment
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE
from azureml.contrib.pipeline.steps import ParallelRunStep, ParallelRunConfig

print("SDK version:", azureml.core.VERSION)

SDK version: 1.0.74


## Set up workspace, datastore, experiment and compute

In [2]:
# ws = Workspace(subscription_id="bbd86e7d-3602-4e6d-baa4-40ae2ad9303c", resource_group="ManyModelsSA", workspace_name="ManyModelsSAv1")
# auth = InteractiveLoginAuthentication(force=True, tenant_id="72f988bf-86f1-41af-91ab-2d7cd011db47")
ws = Workspace(subscription_id="bbd86e7d-3602-4e6d-baa4-40ae2ad9303c", resource_group="ManyModelsSA", workspace_name="ManyModelsSAv1")

# set up workspace
# ws = Workspace.from_config()
ws.get_details()

# choose a compute target
compute = AmlCompute(ws, "train-cluster")

# choose a datastore
dstore = ws.get_default_datastore()

# choose a experiment
experiment = Experiment(ws, 'automl-ojforecasting')
print(dstore.name, dstore.datastore_type, dstore.account_name, dstore.container_name)

workspaceblobstore AzureBlob manymodelssav16457539585 azureml-blobstore-77752be6-01b4-4a3e-9d42-03c9c0d6248f


## Set up run configuration

Set up the run config for experiment to run targeting different compute targets in Azure Machine Learning. We use 2 python libraries, sklearn and pmdarima.

In [3]:
# create a new runconfig object
run_config = RunConfiguration()
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['sklearn','pmdarima'])

## Set up environment 

Environment defines a collection of resources that we will need to run our pipelines.

In [7]:
batch_conda_deps = CondaDependencies.create(pip_packages=['sklearn','pmdarima'])

batch_env = Environment(name="manymodels_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

## Read the registered dataset from Workspace

We use 11,973 datasets and ParallelRunStep to build 11,973 time-series ARIMA models to predict the quantity of each store brand.

Each dataset represents a brand's 2 years orange juice sales data which contains 7 columns and 122 rows.

You will need to register all the datasets in the Workspace first. We uploaded our training data to a blob container, hence we set 'Datastore' as workspaceblobstore 'Relative path' as the correspondig directory in the blob when registering the dataset.

In [49]:
FileDst3Models = Dataset.get_by_name(ws, name='3modelsfiledataset')
FileDst3ModelsInput = FileDst3Models.as_named_input('Train3Models')

In [50]:
FileDst1000Models = Dataset.get_by_name(ws, name='1000modelsdataset')
FileDst1000ModelsInput = FileDst1000Models.as_named_input('Train1000Models')

In [51]:
FileDstAllModels = Dataset.get_by_name(ws, name='AllDataProd')
FileDstAllModelsInputs = FileDstAllModels.as_named_input('TrainAllmodels')

## Define ParallelRunConfig

In the ParallelRunConfig, you will want to determine the number of workers and nodes appropriate for your use case. The workercount is based off the number of cores of the compute VM. The nodecount will determine the number of master nodes to use. In time-series ARIMA model scenario, increasing the node count will speed up the training process.

The timeout should be set to be higher than the maximum training time of a model (in seconds), by default it's 60. 

We then set up the AML compute cluster.

We also added tags to preserve the information about our training cluster's node count, process count per node and dataset name. You can find the 'Tags' column in Azure Machine Learning Studio.

Then we define ParallelRunConfig(). entry_script is the name of the training script. The error threshold is set to 10. If 10 or more child runs fail, the rest of the pipeline will be automatically aborted. You can customize the error threshold based on your fault tolerance. 

output_action can be set to either 'append_row' or 'summary_only'. 'append_row' will aggragate all values output by run() method invocations into one unique file named 'parallel_run_step.txt' that is created in the output location. 'summary_only' means that user script is expected to store the output by itself. An output row is still expected for each successful input item processed. The system uses this output only for error threshold calculation (ignoring the actual value of the row).

In [76]:
workercount=8
nodecount=5
timeout=1000
compute = AmlCompute(ws, "train-many-model")

datasetname='AllStoresFileDatasets'

tags1={}
tags1['DatasetName']=datasetname
tags1['Nodes']=nodecount
tags1['WorkersPerNode']=workercount
tags1['Timeout']=timeout

parallel_run_config = ParallelRunConfig(
    source_directory='./scripts',
    entry_script='train.py',
    mini_batch_size="1",
    run_invocation_timeout=timeout,
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    process_count_per_node=workercount,
    compute_target=compute,
    node_count=nodecount)

## Set up ParallelRunStep

First, we set up the output directory and define the Pipeline's output name.

In [75]:
output_dir = PipelineData(name="AllARIMAModels", 
                          datastore=dstore, 
                          output_path_on_compute="AllARIMAModels/")

We added 4 arguments that you can customize based on the prediction goal. 

The target column is the column name you'd like to predict on. The n test periods is the number of periods you'd like to hold off for testing/scoring. We set the timestamp column to be the index column for the ARIMA models to train on. Stepwise training can be set to 'True' or 'False'. 'False' will conduct a full grid search on each model when training hence will take longer to compelete. 'True' will speed up the training process dramatically.

'inputs' points to a registered file dataset in AML that points to a path in the blob container. The number of files in that path determines the number of models will be trained in the ParallelRunStep. 'output' is the output directory we just defined.

In [86]:
parallelrun_step = ParallelRunStep(
    name="many-models-training",
    parallel_run_config=parallel_run_config,
    inputs=[FileDstAllModelsInputs],
    output=output_dir,
    models=[],
    arguments=['--target_column','Quantity', '--n_test_periods',6, '--timestamp_column','WeekStarting', '--stepwise_training',True],
    allow_reuse=False
)

## Submit the pipeline to run

Next we submit our pipeline to run

In [None]:
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

run = experiment.submit(pipeline,tags=tags1)
RunDetails(run).show()

You can run the folowing command if you'd like to monitor the training process in jupyter notebook. It will stream logs live while training.

In [None]:
run.wait_for_completion(show_output=True)

Succesfully trained and registered 11,973 ARIMA models. The whole training pipeline takes 1h 16m 25s.

## Train script

In [98]:
%%writefile ./scripts/train.py

from azureml.core.run import Run
import pandas as pd
import os
import uuid
import argparse
import datetime

from azureml.core.model import Model
from sklearn import metrics
import pickle
from azureml.core import Experiment, Workspace, Run
from azureml.core import ScriptRunConfig
from entry_script_helper import EntryScriptHelper
import logging

from sklearn.externals import joblib
from joblib import dump, load
import pmdarima as pm
import time
from datetime import timedelta

thisrun = Run.get_context()

LOG_NAME = "user_log"

print("Split the data into train and test")

parser = argparse.ArgumentParser("split")
parser.add_argument("--target_column", type=str, help="input target column")
parser.add_argument("--n_test_periods", type=int, help="input number of test periods")
parser.add_argument("--timestamp_column", type=str, help="input timestamp column")
parser.add_argument("--stepwise_training", type=str, help="input stepwise training True or False")

args, unknown = parser.parse_known_args()

print("Argument 1(n_test_periods): %s" % args.n_test_periods)
print("Argument 2(target_column): %s" % args.target_column)
print("Argument 3(timestamp_column): %s" % args.timestamp_column)
print("Argument 4(stepwise_training): %s" % args.stepwise_training)


def init():
    EntryScriptHelper().config(LOG_NAME)
    logger = logging.getLogger(LOG_NAME)
    output_folder = os.path.join(os.environ.get("AZ_BATCHAI_INPUT_AZUREML", ""), "temp/output")
    logger.info(f"{__file__}.output_folder:{output_folder}")
    logger.info("init()")
    return


def run(input_data):
    # 0. Set up logging
    logger = logging.getLogger(LOG_NAME)
    os.makedirs('./outputs', exist_ok=True)
    logger.info('processing all files')
    resultList = []

    # 1. Read in the data file
    for idx, csv_file_path in enumerate(input_data):   
        u1 = uuid.uuid4()
        mname='arima'+str(u1)[0:16]
        logs = []
        date1=datetime.datetime.now()
        logger.info('starting ('+csv_file_path+') ' + str(date1))
        thisrun.log(mname,'starttime-'+str(date1))
            
        data = pd.read_csv(csv_file_path,header=0)
        logger.info(data.head())

        # 2. Split the data into train and test sets based on dates
        data = data.set_index(args.timestamp_column)
        max_date = datetime.datetime.strptime(data.index.max(),'%Y-%m-%d')
        split_date = max_date - timedelta(days=7*args.n_test_periods)
        data.index = pd.to_datetime(data.index)
        train = data[data.index <= split_date]
        test = data[data.index > split_date]

        # 3.Train the model
        model = pm.auto_arima(train[args.target_column],
                  start_p=0,
                  start_q=0,
                  test='adf', #default stationarity test is kpps
                  max_p =3,
                  max_d = 2,
                  max_q=3,
                  m=3, #number of observations per seasonal cycle
                  #d=None,
                  seasonal=True,
                  #trend = None, # adjust this if the series have trend
                  #start_P=0,
                  #D=0,
                  information_criterion = 'aic',
                  trace=True, #prints status on the fits
                  #error_action='ignore',
                  stepwise = args.stepwise_training, # this increments instead of doing a grid search
                  suppress_warnings = True,
                  out_of_sample_size = 16
                 )
        model = model.fit(train[args.target_column])
        logger.info('done training')

        # 4. Save the model
        logger.info(model)
        logger.info(mname)
        with open(mname, 'wb') as file:
            joblib.dump(value=model, filename=os.path.join('./outputs/', mname))

        # 5. Register the model to the workspace
        ws1 = thisrun.experiment.workspace
        try:
            thisrun.upload_file(mname, os.path.join('./outputs/', mname))
        except:
            logger.info('dont need to upload')
        logger.info('register model, skip the outputs prefix')
        model_name = 'arima_'+str(input_data).split('/')[-1][:-6]
        print('Trained '+ model_name)
        
        thisrun.register_model(model_path=mname, model_name=model_name, model_framework='pmdarima') 
        print('Registered '+ model_name)
        
        #6. Log some metrics       
        date2=datetime.datetime.now()
        logger.info('ending ('+str(csv_file_path)+') ' + str(date2))
        
        logs.append(str(csv_file_path).split('/')[-1][:-4])
        logs.append(model_name)
        logs.append(str(date1))
        logs.append(str(date2))
        logs.append(str(date2-date1))
        logs.append(idx)
        logs.append(len(input_data))
        logs.append(thisrun.get_status())

        thisrun.log(mname,'endtime-'+str(date2))
        thisrun.log(mname,'auc-1')
        
    resultList.append(logs)
    return resultList

Overwriting ./scripts/train.py


### Logs

We've also incorporated logs to collect information about our training pipeline. Each row represent a trained model's record.

Let's start by taking a look at our run details.

In [89]:
run

Experiment,Id,Type,Status,Details Page,Docs Page
automl-ojforecasting,146e69ad-b054-4ea7-8783-d08825e5d20b,azureml.PipelineRun,Completed,Link to Azure Machine Learning studio,Link to Documentation


We can then download the pipeline output to a local path. In this case, the output is 1 master log file that is concatenated from all the returns of the ParallelRunStep.

In [91]:
prediction_run = next(run.get_children())
prediction_output = prediction_run.get_output_data("AllARIMAModels")

In [92]:
prediction_output

Name,Datastore,Path on Datastore,Produced By PipelineRun,Produced By StepRun
AllARIMAModels,workspaceblobstore,azureml/6b472277-d20c-43f5-a6dc-fca4fb44d419/AllARIMAModels,146e69ad-b054-4ea7-8783-d08825e5d20b,6b472277-d20c-43f5-a6dc-fca4fb44d419


In [93]:
prediction_output.download(local_path="logs")

1

In [None]:
for root, dirs, files in os.walk("logs"):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)
            print (result_file)

In [95]:
result_file='logs/azureml/6b472277-d20c-43f5-a6dc-fca4fb44d419/AllARIMAModels/parallel_run_step.txt'

Read the log file and insert column names. For demonstratetion, we use 3 models example.

In [96]:
df = pd.read_csv(result_file, converters={0: lambda x: x.strip("["),7: lambda x: x.strip("]")}, delimiter=",", header=None) 
df.columns=['FileName','ModelName','StartTime','EndTime','Duration','Index','BatchSize','Status']
df

Unnamed: 0,FileName,ModelName,StartTime,EndTime,Duration,Index,BatchSize,Status
0,'Store2_dominicks','arima_Store2_dominicks','2019-12-13 21:26:27.832852','2019-12-13 21:26:29.456844','0:00:01.623992',0,1,'Running'
1,'Store5_tropicana','arima_Store5_tropicana','2019-12-13 21:26:29.768970','2019-12-13 21:26:31.561115','0:00:01.792145',0,1,'Running'
2,'Store8_minute.maid','arima_Store8_minute.maid','2019-12-13 21:26:31.871903','2019-12-13 21:26:33.095724','0:00:01.223821',0,1,'Running'


Upload the log file to a dedicated path in the blob for PBI monitoring.

In [None]:
dstore.upload('logs/azureml/319e0d12-9aad-41c4-9a22-0e85ad11e331/ARIMAmodels/', target_path='traininglogs', overwrite=False, show_progress=True)

## Next step

1. Add more relevant metrics to logs.
2. Save log files to the dedicated path in the training script.