## Introduction 

This note book provide a generic tool to create distrebuted jobs on a cluster of machines or clusters. The types of jobs could vary from feature extractions to other training jobs. The type of machines should be selected per type of job.

**The following will focus on private image from acr**


### Each job has the following attributes:
All jobs are identical. they receive the input queues and other storage related parameters.

### Image 
Image should be created and validated prior to start of experiments
The service provide the capability to create image based on base image with specific conda or pip packages.
The best practice is to use conda packages where available, since they handle dependencies.
### Main Script file
The script created by the relavant algo team. This is where the main logic reside. It can read from the input file/folders perform its activities and write an output per the internal logic.

In [None]:
# Check core SDK version number
import azureml.core
import hdutils as utils

print("SDK version:", azureml.core.VERSION)

## subscrition connection etc - load configuration
Only applicable when you have more than one active subscription

In [None]:
#!az login

In [None]:
#!az account set --subscription "XXXXXXX"

#### Initialize Workspace

Initialize a workspace object from persisted configuration.
The config file holds information on the subcsription resource group

In [None]:
from azureml.core.workspace import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

### Create / Update project folder
Assuming the entry script, with the supporting scripts are pushed to the jupyter vm.
Addtional settings may be in place, to correspond to specific file location of the scripts.

In [None]:
import os

project_folder = './mobilenet'
common = './common'
supporting_scripts = '<any supporting scripts>'
entry_script = '<main entry script>.py'
# sub folder of supporting scripts - no need to create the folder
ss_folder = '{0}/{1}'.format(common,supporting_scripts)

os.makedirs(project_folder, exist_ok=True)


In [None]:
import shutil
# assume the supporting scripts are located at:
local_ss_folder = './{0}'.format(supporting_scripts)

#shutil.copy('{0}/{1}'.format(project_folder,entry_script), project_folder)
# recursive copy of the supporting scripts folder - if the folder exist, it will fail.
#shutil.copytree(local_ss_folder, ss_folder)


#### Create Experiment

**Experiment** is a logical container in an Azure ML Workspace. It hosts run records which can include run metrics and output artifacts from your experiments.

In [None]:
experiment_name = '<meaning full name>'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

In [None]:
# get the default datastore
ds = ws.get_default_datastore()
print(ds.name, ds.datastore_type, ds.account_name, ds.container_name)

#### Create or Attach existing AmlCompute
You will need to create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for training your model. In this tutorial, you create `AmlCompute` as your training compute resource.

**Creation of AmlCompute takes approximately 5 minutes.** If the AmlCompute with that name is already in your workspace this code will skip the creation process.

As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Please read [this article](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-manage-quotas) on the default limits and how to request more quota.

#### Prepare input & output



##### Specific Storage Account
Used to host outcome of the jobs (pny files)

In [None]:
from azureml.core import Datastore
prvt_ds_name = '<storage name>'
prvt_ds_container_name = '<container name>'
prvt_ds_key = '<sa key>'
prvt_ds = Datastore.register_azure_blob_container(workspace=ws, 
                                             datastore_name=prvt_ds_name, 
                                             container_name=prvt_ds_container_name,
                                             account_name=prvt_ds_name, 
                                             account_key=prvt_ds_key,
                                             create_if_not_exists=True)

In [None]:
# ensure there is a output & work directories 
prvt_ds.upload(src_dir='output', target_path="./<output>")
prvt_ds.upload(src_dir='output', target_path="./<in case transient area is required by your script>")

#### create links to the output and work directory
To be used as links within the script parameters

In [None]:
# Using specific storage account
work_path = prvt_ds.path('<match from above box>')
out_path = prvt_ds.path('<match from above box>')

print(work_path)
print(out_path)

## Private Image
The image is taken from specific acr, the registry credentials are currently open - will need to find a better way to handle them.

In [None]:
acr_username='<private acr user name>'
acr_pass = '<private acr password>'

#### Run Configuration
The run configration specify, the image name, its credentials, use of gpu, python interperter etc. it is required to be passed to an estimator. 

In [None]:
from azureml.core import RunConfiguration
from azureml.train.estimator import *

rc = RunConfiguration()
rc.environment.docker.enabled = True

# this is an image available in Docker Hub
#rc.environment.docker.base_image = '/feature_extraction:0.1'

# point to an image in a private ACR
rc.environment.docker.base_image = "<image name>:<image tag>"
rc.environment.docker.base_image_registry.address = "<private acr>"
rc.environment.docker.base_image_registry.username = acr_username
rc.environment.docker.base_image_registry.password = acr_pass
rc.environment.docker.gpu_support = True

# don't let the system build a new conda environment
rc.environment.python.user_managed_dependencies = True
# point to an existing python environment 
rc.environment.python.interpreter_path = '/usr/bin/python3'

### Script Parms & Queue
The experiment would receive parameters via the estimator configuration. it is provided as a dictionary. 

In [None]:
## specific to queues - need to provide the storage account and key
account_name = '<sa for queues>'
account_key = 'sa key'

In [None]:
from azure.storage.queue import QueueService

input_queue = '<input-queue-name>'
# the main script is using same naming convention
success_queue = '{0}-success'.format(input_queue)
fail_queue = '{0}-fail'.format(input_queue)
queue_service = QueueService(account_name=account_name, account_key=account_key)

#### Load data into queues

In [None]:
input_file = '<file to be broken to queue messages>.csv'
utils.queue_to_csv(input_file,input_queue,queue_service)

#### define the script params

In [None]:
script_params_q = {    
    '-inqueue': input_queue, # input q
    '-sa' : account_name, # storage account of queue
    '-sakey':account_key, # key of the storage account
    '-npy': out_path.as_mount(), #output blob
    '-workdir': work_path.as_mount() #temporary folder (blob)    
    }

##### RandomParameterSampling - run once

In [None]:
from azureml.train.hyperdrive import RandomParameterSampling, BanditPolicy, HyperDriveRunConfig, PrimaryMetricGoal
ps = RandomParameterSampling( {  } )

### Multi Experiment Run 
Will loop over available compute cluster and submit experiment runs to them

The naming convention for a cluster names
* Prefix is the VM type, e.g NC6
* L - low priority VM
* D - Dedicated VM
* Number of maximum nodes
* Random suffix 2 letters & 2 digits


In [None]:
# shows the current avilable clusters
compute_targets = ws.compute_targets
for name, ct in compute_targets.items():
    print(name, ct.type, ct.provisioning_state)   

In [None]:
## Print experiment details
print('Script directory {0}'.format(project_folder))
print('Script {0}'.format(entry_script))

In [None]:
# Create compute clusters - configuration
node_count = 72
max_node_count = 100
num_of_low_clusters = 5
num_of_dedicated_clusters = 9
idle_timeout = 30 # seconds

In [None]:
# if the py file changes, this is the way to reload the module
#import importlib
#importlib.reload(utils)

In [None]:
clust = utils.create_low_clusters(ws,num_of_low_clusters,max_node_count,idle_timeout)
clust = utils.create_dedicated_clusters(ws,num_of_dedicated_clusters,max_node_count,idle_timeout)

In [None]:
# loop over avilable clusters
from azureml.train.dnn import TensorFlow
from azureml.train.hyperdrive import RandomParameterSampling, BanditPolicy, HyperDriveRunConfig, PrimaryMetricGoal
from time import gmtime, strftime, sleep

print("Submitting experiments::")
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()))

cluster_size = node_count
compute_targets = ws.compute_targets
htrDic = {}

for name, ct in compute_targets.items():
    
    est = TensorFlow(source_directory=project_folder,
                 script_params=script_params_q,
                 compute_target=ct,
                 entry_script=entry_script,
                 environment_definition=rc.environment,
                 source_directory_data_store=ds
                )
    htc = HyperDriveRunConfig(estimator=est, 
                          hyperparameter_sampling=ps, 
                          primary_metric_name='wsc', 
                          primary_metric_goal=PrimaryMetricGoal.MAXIMIZE, 
                          max_total_runs=cluster_size,
                          max_concurrent_runs=cluster_size)    
    htrDic[name] = exp.submit(config=htc)
    # add 46 seconds delay (time it takes to submit 70 jobs)    
    print("Submited on cluster: {0} at {1}".format(name,strftime("%Y-%m-%d %H:%M:%S", gmtime())))
    sleep(46)
    
print("Finished submitting experiments::")
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()))

In [None]:
# Delete all compute - once experiment is completed
utils.remove_compute(ws.compute_targets)

In [None]:
# export results to a file
output_file = '<output file to be created>'
auxq = '<if required>'
#utils.push_results_to_file(output_file,success_queue,auxq,queue_service)
utils.queue_to_csv(output_file,success_queue,queue_service)

In [None]:
## enable experiment run cancelation
for item in htrDic:
    print('canceling {}'.format(item))
    htrDic[item].cancel()

In [None]:
# single cancel - test
htrDic['NC6low100-532Vgb'].cancel()

### Single Experiment run

#### Submitting an experiment
For each Experiment:

1. establish compute
2. create an estimator
3. create Hyper Drive config
4. submit

In [None]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# set a cluster to test on
cluster_name = 'NC6-L100-21fj'


try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Please run configuration notebook, or contact devops')

print(compute_target.status.serialize())

#### 2.  Estimator Configuration
An estimator can be generic, or specific sch as TensorFlow. In the sample below there are two specific parameters that are passed to build a cluster of execution nodes. each process is on dedicated node from the cluster.

In [None]:
from azureml.train.dnn import TensorFlow
est = TensorFlow(source_directory=project_folder,
                 script_params=script_params_q,
                 compute_target=compute_target,
                 entry_script=entry_script,
                 environment_definition=rc.environment,
                 source_directory_data_store=ds
                )

#### 3. HyperDriveRunConfig
max_total_runs and max_concurrent_runs should be equal to the number of nodes up to **100**
since this is a test run - best to test with single digits number of nodes

In [None]:
from azureml.train.hyperdrive import RandomParameterSampling, BanditPolicy, HyperDriveRunConfig, PrimaryMetricGoal
htc = HyperDriveRunConfig(estimator=est, 
                          hyperparameter_sampling=ps, 
                          primary_metric_name='wsc', 
                          primary_metric_goal=PrimaryMetricGoal.MAXIMIZE, 
                          max_total_runs=1,
                          max_concurrent_runs=1)

#### 4. Submit
submitting the estimator, would create anoter run under the experiment folder. it is best to view the progress of large scale experiments from the portal. (or by examining the queues)

In [None]:
print("Runing on cluster: " + cluster_name)
htr = exp.submit(config=htc)
print(htr)

##### Cancel run when required

In [None]:
# htr2 htr3

htr.cancel()

#### Monitor progress (via queues)

In [None]:
from time import gmtime, strftime
metadata = queue_service.get_queue_metadata(input_queue)
count = metadata.approximate_message_count
print(strftime("%Y-%m-%d %H:%M:%S", gmtime()))
print('todo:'+str(count))
metadata = queue_service.get_queue_metadata(success_queue)
count = metadata.approximate_message_count
print('good:'+str(count))
metadata = queue_service.get_queue_metadata(fail_queue)
count = metadata.approximate_message_count
print('bad:'+str(count))
auxq = 'soccer-feature-input'
metadata = queue_service.get_queue_metadata(auxq)
count = metadata.approximate_message_count
print('aux:'+str(count))

In [None]:
messages = queue_service.get_messages(fail_queue,8)
for message in messages:
    print(message.content)
    #queue_service.delete_message(input_queue, message.id, message.pop_receipt)

## Canceling/Failing an experiment - by ID

In [None]:
from azureml.core import get_run 

r = get_run(experiment=exp, run_id="fe-Hokey_1551955099682", rehydrate=True)

In [None]:
r.fail()

In [None]:
compute_targets = ws.compute_targets
for name, ct in compute_targets.items():
    print(name, ct.type, ct.provisioning_state)