# Azure Machine Learning - Many Models + MLflow

Sample machine learning pipeline for training/evaluating/registering multiple regression models (Scikit-Learn `ElasticNetCV`) using MLflow. This pipeline was adapted from Microsoft's [Many Models Solution Accelerator](https://github.com/microsoft/solution-accelerator-many-models) and utilizes [`ParallelRunStep`](https://learn.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.parallelrunstep?view=azure-ml-py)s throughout to achieve parallelism on training and evaluation - for large scale training efforts this can afford substantial performance benefits. 

### Import required packages

In [None]:
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset, Run
from azureml.core.compute import ComputeTarget, AmlCompute, DataFactoryCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.data.datapath import DataPath
from azureml.data.data_reference import DataReference
from azureml.data.sql_data_reference import SqlDataReference
from azureml.pipeline.steps import DataTransferStep
import logging
import os

### Connect to AML workspace, provision compute cluster, and gather reference to default datastore

In [None]:
# Connect to AML Workspace
ws = None
try:
    ws = Workspace.from_config()
except Exception:
    ws = Workspace(subscription_id=os.getenv('SUBSCRIPTION_ID'),  resource_group = os.getenv('RESOURCE_GROUP'), workspace_name = os.getenv('WORKSPACE_NAME'))


#Select AML Compute Cluster
cpu_cluster_name = 'cluster001'

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found an existing cluster, using it instead.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2',
                                                           min_nodes=0,
                                                           max_nodes=3)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
    cpu_cluster.wait_for_completion(show_output=True)
    
#Get default datastore
default_ds = ws.get_default_datastore()

### Create pipeline execution environment
Environment here is defined from the `requirements.txt` file located in this repository. This environment is registered in the AML workspace following registration.

In [None]:
aml_env = Environment.from_pip_requirements(name='ManyModelsEnv', file_path='./requirements.txt')

run_config = RunConfiguration()
run_config.docker.use_docker = True
run_config.environment = aml_env
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE
run_config.docker.arguments = ['-v', '/var/run/docker.sock:/var/run/docker.sock']
run_config.environment.python.conda_dependencies.set_python_version('3.8.10')

#Register environment for reuse 
run_config.environment.register(ws)

### Define output datasets

In [None]:
raw_data = OutputFileDatasetConfig(name='HomePrices_Raw_Data', destination=(default_ds, 'homeprices_raw_data/{run-id}')).read_delimited_files().register_on_complete(name='HomePrices_Raw_Data')
training_data = OutputFileDatasetConfig(name='HomePrices_Training_Data', destination=(default_ds, 'homeprices_training_data/{run-id}')).register_on_complete(name='HomePrices_Training_Data')
testing_data = OutputFileDatasetConfig(name='HomePrices_Testing_Data', destination=(default_ds, 'homeprices_testing_data/{run-id}')).register_on_complete(name='HomePrices_Testing_Data')

### Define location for training/evaluation parallel run outputs

In [None]:
from azureml.pipeline.core import PipelineData

output_dir = PipelineData(name="training_output", datastore=default_ds)
test_output_dir = PipelineData(name="testing_output", datastore=default_ds)

### Define pipeline parameters

In [None]:
testing_size = PipelineParameter(name='testing_size', default_value=0.3)
target_column = PipelineParameter(name='target_column', default_value='target')
deployment_name = PipelineParameter(name='deployment_name', default_value='manymodelsdeployment')

### Define pipeline execution steps

In [None]:
# Get raw data from AML-linked datastore
# Register tabular dataset after retrieval
get_data_step = PythonScriptStep(
    name='Get Data',
    script_name='get_data.py',
    arguments =['--raw_data', raw_data],
    outputs=[raw_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

# Load raw data and split into test and train
# subsets according to the specified split percentage
# and logical categories 
split_data_step = PythonScriptStep(
    name='Split Train and Test Data',
    script_name='split_data.py',
    arguments =['--training_data', training_data,
                '--testing_data', testing_data,
                '--testing_size', testing_size],
    inputs=[raw_data.as_input(name='Raw_Data')],
    outputs=[training_data, testing_data],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)

# For each training sub-dataset, train a single model. 
# Use a parallel run step here to train multiple models
# simultaneously.
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# Define the ParallelRunConfig object
parallel_run_config = ParallelRunConfig(
    source_directory='./pipeline_step_scripts',
    entry_script='train_models.py', # the user script that will be run in parallel
    mini_batch_size="1",
    error_threshold=-1,
    output_action="append_row",
    environment=aml_env,
    compute_target=cpu_cluster,
    node_count=1
)

train_models_step = ParallelRunStep(
    name='Train Models',
    parallel_run_config=parallel_run_config,
    inputs=[training_data.as_input(name='train_data')],
    output=output_dir,
    arguments=[],
    allow_reuse=False
)

# Prior to model registration, evaluate each
# newly trained model against previous variants. 
# Register the best performer in all cases.
parallel_run_config = ParallelRunConfig(
    source_directory='./pipeline_step_scripts',
    entry_script='evaluate_models.py', # the user script that will be run in parallel
    mini_batch_size="1",
    error_threshold=-1,
    output_action="append_row",
    environment=aml_env,
    compute_target=cpu_cluster,
    node_count=1
)

evaluate_models_step = ParallelRunStep(
    name='Evaluate Models',
    parallel_run_config=parallel_run_config,
    inputs=[testing_data.as_input(name='test_data')],
    output=test_output_dir,
    arguments=[],
    allow_reuse=False
)

evaluate_models_step.run_after(train_models_step)

# Finally, create a containerized deployment which contains
# all trained models (best version of each)
create_deployment_step = PythonScriptStep(
    name='Package Models',
    script_name='package_models.py',
    arguments =['--deployment_name', deployment_name],
    inputs=[training_data.as_input(name='train_data').as_download('./training-data')],
    compute_target=cpu_cluster,
    source_directory='./pipeline_step_scripts',
    allow_reuse=False,
    runconfig=run_config
)
create_deployment_step.run_after(evaluate_models_step)

### Assemble steps into an executable pipeline

In [None]:
pipeline = Pipeline(workspace=ws, steps=[get_data_step, split_data_step, train_models_step, evaluate_models_step, create_deployment_step])

### Submit pipeline run as new experiment

In [None]:
experiment_name = os.getenv('EXPERIMENT_NAME', 'many-models-pipeline-run')
experiment = Experiment(ws, experiment_name)
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)