# Create Cluster

In this notebook we make use of the Batch AI extensions to generate values for hyperparameters, and create the Batch AI cluster.

## Imports

In [None]:
%load_ext dotenv
from __future__ import print_function
import os
import sys
import glob
import dotenv
import azure.mgmt.batchai.models as models
from azure.storage.blob import BlockBlobService
from azure.storage.file import FileService
sys.path.append('.')
import utilities as utils
from utilities.job_factory import ParameterSweep, NumericParameter, DiscreteParameter

In the next cell are the names of various files and services used or created in this notebook.

In [None]:
# The location of the dotenv file
dotenv_path = dotenv.find_dotenv()
# The mount point of the Azure file share in the Docker container
dotenv.set_key(dotenv_path, 'azure_file_share_mount_path', 'afs')
# The mount point of the Azure blob container in the the Docker container
dotenv.set_key(dotenv_path, 'azure_blob_mount_path', 'bfs')
# The Batch AI experiment
dotenv.set_key(dotenv_path, 'experiment_name', 'hyperparameter_search_experiment')

Import the contents of the `.env` file into the environment

In [None]:
%dotenv -o

Define Python variables used in this notebook.

In [None]:
configuration_path = os.getenv('configuration_path')
image_name = os.getenv('docker_login') + os.getenv('image_repo') + ':latest'
azure_blob_container_name = os.getenv('azure_blob_container_name')
dataset_path = os.getenv('dataset_path')
azure_file_share_name = os.getenv('azure_file_share_name')
script_path = os.getenv('script_path')
script_name = os.getenv('script_name')
cluster_name = os.getenv('cluster_name')
azure_file_share_mount_path = os.getenv('azure_file_share_mount_path')
azure_blob_mount_path = os.getenv('azure_blob_mount_path')
experiment_name = os.getenv('experiment_name')

## Create a Batch AI client
Read the configuration, and use it to create a Batch AI client.

In [None]:
cfg = utils.config.Configuration(configuration_path)
client = utils.config.create_batchai_client(cfg)

## Set up the hyperparameter sweeping experiment
Specify the Docker image used to create the Docker containers that run the experiment's jobs.

In [None]:
container_settings = models.ContainerSettings(
    image_source_registry=models.ImageSourceRegistry(image=image_name)
)

We define the mount points to be created in each Docker container. These will give the containers access to the datasets and scripts.

In [None]:
mount_volumes = models.MountVolumes(
    azure_file_shares=[
        models.AzureFileShareReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            azure_file_url='https://{0}.file.core.windows.net/{1}'.format(
                cfg.storage_account_name, azure_file_share_name),
            relative_mount_path=azure_file_share_mount_path)
    ],
    azure_blob_file_systems=[
        models.AzureBlobFileSystemReference(
            account_name=cfg.storage_account_name,
            credentials=models.AzureStorageCredentialsInfo(
                account_key=cfg.storage_account_key),
            container_name=azure_blob_container_name,
            relative_mount_path=azure_blob_mount_path)
    ]
)

Define the locations in a container's file system for
- storing the job's standard output and error,
- obtaining the datasets, and
- storing the job's outputs.

In [None]:
std_out_err_path_prefix = '$AZ_BATCHAI_JOB_MOUNT_ROOT/{0}'.format(azure_file_share_mount_path)

input_directories = [
    models.InputDirectory(
        id='SCRIPT',
        path='$AZ_BATCHAI_JOB_MOUNT_ROOT/{0}/{1}'.format(azure_blob_mount_path, dataset_path))
]

output_directories = [
    models.OutputDirectory(
        id='ALL',
        path_prefix='$AZ_BATCHAI_JOB_MOUNT_ROOT/{0}'.format(azure_file_share_mount_path))
]

We define the path to the training script.

In [None]:
python_script_file_path='$AZ_BATCHAI_JOB_MOUNT_ROOT/{0}/{1}/{2}'.format(azure_file_share_mount_path, 
                                                                        script_path, 
                                                                        script_name)

We define specifications for the hyperparameters, and use them to create a parameter substitution object.

In [None]:
param_specs = [
    DiscreteParameter(
        parameter_name="ESTIMATORS",
        values=[100]
    ),
    DiscreteParameter(
        parameter_name="NGRAMS",
        values=[1, 2, 3, 4]
    ),
    DiscreteParameter(
        parameter_name="MATCH",
        values=[10, 20, 30, 40]
    ),
    DiscreteParameter(
        parameter_name="MIN_CHILD_SAMPLES",
        values=[5, 10, 20]
    ),
    DiscreteParameter(
        parameter_name="WEIGHT",
        values=["", "--unweighted"]
    ),
]

parameters = ParameterSweep(param_specs)

We define the command line arguments that will be passed to the training script. We will use the parameter substitution object to specify where we would like to substitute the values of the parameters in the command line. Note that `parameters` is used like a dict, with the `parameter_name` being used as the key to specify which parameter to substitute. When `parameters.generate_jobs` is called below, the `parameters[name]` variables will be replaced with actual values.

In [None]:
command_line_args = '--inputs $AZ_BATCHAI_INPUT_SCRIPT --outputs $AZ_BATCHAI_OUTPUT_ALL'\
    ' --estimators {estimators}'\
    ' --ngrams {ngrams}'\
    ' --match {match}'\
    ' --min_child_samples {min_child_samples}'\
    ' {weight}'.format(
    estimators=parameters['ESTIMATORS'],
    ngrams=parameters['NGRAMS'],
    match=parameters['MATCH'],
    min_child_samples=parameters['MIN_CHILD_SAMPLES'],
    weight=parameters['WEIGHT'])

We put the script path and command line arguments together in a module settings structure.

In [None]:
custom_toolkit_settings = models.CustomToolkitSettings(
        command_line=' '.join(['python', python_script_file_path, command_line_args]),
    )
print(custom_toolkit_settings.command_line)

Retrieve the cluster information.

In [None]:
cluster = client.clusters.get(cfg.resource_group, cfg.workspace, cluster_name)

We put together the information we just created into a set of job control parameters that will be used by `parameters.generate_jobs` to create the definitions of the jobs to execute on the cluster.

In [None]:
jcp = models.JobCreateParameters(
    cluster=models.ResourceId(id=cluster.id),
    node_count=1,
    std_out_err_path_prefix=std_out_err_path_prefix,
    input_directories=input_directories,
    output_directories=output_directories,
    mount_volumes=mount_volumes,
    container_settings=container_settings,
    custom_toolkit_settings=custom_toolkit_settings
)

## Generate the jobs to be run
Next, we generate a list of jobs to submit, each with a combinations of the parameters.

In [None]:
jobs_to_submit, param_combinations = parameters.generate_jobs(jcp)
for idx, comb in enumerate(param_combinations, 1):
    print("Parameters {0}: {1}".format(idx, comb))

In [None]:
print('{:,} jobs.'.format(len(param_combinations)))

## Run the jobs in an experiment
Create a new experiment called ```hyperparameter_search_experiment```.

In [None]:
experiment = client.experiments.create(cfg.resource_group, cfg.workspace, experiment_name).result()

Submit the jobs to the experiment.

In [None]:
%%time
experiment_utils = utils.experiment.ExperimentUtils(client, cfg.resource_group, cfg.workspace, experiment_name)
jobs = experiment_utils.submit_jobs(jobs_to_submit, 'hyperparam_job2').result()

Wait for the jobs to complete. This should take about thirty minutes. You can interrupt and restart this cell as needed.

In [None]:
%%time
experiment_utils.wait_all_jobs()

If you need to interrupt the experiment before it is complete, you can delete all the queued and running jobs on the cluster.

In [None]:
experiment_utils.delete_jobs_in_experiment(execution_state=models.ExecutionState.queued)
experiment_utils.delete_jobs_in_experiment(execution_state=models.ExecutionState.running)

Define an extractor that pulls desired metric from each job's log file. 
- In this example, we extract the number between "`INFO:root:Accuracy @3 =`" and "`%`".

In [None]:
metric_extractor = utils.job.MetricExtractor(
                        output_dir_id='ALL',
                        logfile='TrainTestClassifier.log',
                        regex='INFO:root:Accuracy @3 = (.*?)\%')

We get the metric values from the log files of the finished jobs.

In [None]:
# Get each job's metrics
results = experiment_utils.get_metrics_for_jobs(jobs, metric_extractor)

# Sort them in decreasing order.
results.sort(key=lambda r: r['metric_value'], reverse=True)

# Print the results
for result in results:
    print("Job {0} completed with metric value {1}".format(result['job_name'], result['metric_value']))
    
# Print the best job
print("Best job: {0} with parameters {1}".format(
    results[0]['job_name'], 
    {ev.name:ev.value for ev in results[0]['job'].environment_variables}
))

To tear down the experiment and all related resources go to [the last notebook](06_Tear_Down.ipynb).