In [1]:
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.train.estimator import Estimator
from azureml.core.compute import ComputeTarget, AmlCompute

## Get workspace

In [2]:
ws = Workspace.from_config()
ws

Workspace.create(name='benchy', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copeters_benchmarking')

## Setup dataset

In [3]:
if 'weather-files' not in ws.datasets:
    ds = Dataset.File.from_files('https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather/*/*/*.parquet', validate=False)
    ds = ds.register(ws, 'weather-files')
else:
    ds = ws.datasets['weather-files']
    
ds

{
  "source": [
    "https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather/*/*/*.parquet"
  ],
  "definition": [
    "GetFiles"
  ],
  "registration": {
    "id": "5a3248d1-e0ee-49b1-8e2b-d5167afa0e6c",
    "name": "weather-files",
    "version": 1,
    "workspace": "Workspace.create(name='benchy', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copeters_benchmarking')"
  }
}

## Setup parameters 

In [4]:
# {compute_name: (vCPUs, RAM, DISK, $/hr)}

MAX_VCPUS = 1000
MIN_RAM   = 100
MAX_RAM   = 1000

computes = {
    #'STANDARD_D12'    : (4,  28,  200,  .386),
    'STANDARD_D12_V2' : (4,  28,  200,  .370),
    #'STANDARD_D13'    : (8,  56,  400,  .771),
    'STANDARD_D13_V2' : (8,  56,  400,  .741),
    'STANDARD_DS12_V2': (4,  28,   56,  .370),
    'STANDARD_DS13_V2': (8,  56,  112,  .741),
    'STANDARD_DS15_V2': (20, 140, 280, 1.852),
    'STANDARD_DS5_V2' : (16,  56, 112, 1.170),
    #'STANDARD_F32S_V2': (32, 64,  256, 1.360)
}

In [5]:
vm_sizes = list(computes)
nodeses  = list(reversed([1, 2, 5, 10, 20]))

## Create clusters

In [6]:
for vm_size in vm_sizes:
    ct_name = vm_size.replace('STANDARD_', '').replace('_', '-')
    if ct_name not in ws.compute_targets:
        # create config for Azure ML cluster
        # change properties as needed
        # final default values for blog tbd - need to benchmark and minimize cost
        config = AmlCompute.provisioning_configuration(
                 vm_size                 = vm_size,
                 max_nodes               = max(nodeses),
                 vnet_resourcegroup_name = ws.resource_group,
                 vnet_name               = 'bench-vnet',
                 subnet_name             = 'default'
        )

        ct = ComputeTarget.create(ws, ct_name, config)
        ct.wait_for_completion(show_output=True)    
    else:
        print(f'{ct_name} already exists')

D12-V2 already exists
D13-V2 already exists
DS12-V2 already exists
DS13-V2 already exists
DS15-V2 already exists
DS5-V2 already exists


## Submit jobs

In [7]:
exp = Experiment(ws, 'describe')

for nodes in nodeses:
    print(f'\nNodes: {nodes}')
    for vm_size in vm_sizes:
        ct_name = vm_size.replace('STANDARD_', '').replace('_', '-')
        
        vcpus, ram, disk, wage = computes[vm_size]
        
        vcpus *= nodes
        ram   *= nodes
        disk  *= nodes
        wage  *= nodes
        
        if vcpus < MAX_VCPUS and ram > MIN_RAM and ram < MAX_RAM:
            print(f'\t| VM Size - {vm_size:16} | vCPUs - {vcpus} | RAM - {ram}GB | WAGE - ${round(wage, 3)}/hr |')
            
            est = Estimator('code', 
                            compute_target=ws.compute_targets[ct_name], 
                            entry_script='runDask.py', 
                            conda_dependencies_file='environment.yml', 
                            #script_params={'--datastore': ws.get_default_datastore()},
                            inputs=[ds.as_named_input('weather').as_download('/tmp/noaa')],
                            node_count=nodes,
                            distributed_training=MpiConfiguration())
            
            print('\t\tSubmitting run...')
            run = exp.submit(est)
            run.log('nodes', nodes)
            run.log('vm_size', vm_size)
            run.log('vcpus', vcpus)
            run.log('ram', ram)
            run.log('disk', disk)
            run.log('wage', wage)
            print('\t\tRun submitted...')


Nodes: 20
	| VM Size - STANDARD_D12_V2  | vCPUs - 80 | RAM - 560GB | WAGE - $7.4/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_DS12_V2 | vCPUs - 80 | RAM - 560GB | WAGE - $7.4/hr |
		Submitting run...
		Run submitted...

Nodes: 10
	| VM Size - STANDARD_D12_V2  | vCPUs - 40 | RAM - 280GB | WAGE - $3.7/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_D13_V2  | vCPUs - 80 | RAM - 560GB | WAGE - $7.41/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_DS12_V2 | vCPUs - 40 | RAM - 280GB | WAGE - $3.7/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_DS13_V2 | vCPUs - 80 | RAM - 560GB | WAGE - $7.41/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_DS5_V2  | vCPUs - 160 | RAM - 560GB | WAGE - $11.7/hr |
		Submitting run...
		Run submitted...

Nodes: 5
	| VM Size - STANDARD_D12_V2  | vCPUs - 20 | RAM - 140GB | WAGE - $1.85/hr |
		Submitting run...
		Run submitted...
	| VM Size - STANDARD_D13_V2  | vCPUs - 40 

## Visualize data

In [18]:
exp = Experiment(ws, 'describe')
exp

Name,Workspace,Report Page,Docs Page
describe,benchy,Link to Azure Machine Learning studio,Link to Documentation


In [22]:
for run in list(exp.get_runs()):
    runid   = run.id
    print(runid)
    metrics = run.get_metrics()
    nodes   = metrics['nodes']
    vm_size = metrics['vm_size']
    vcpus   = metrics['vcpus']
    wage    = metrics['wage']
    ram     = metrics['ram']
    disk    = metrics['disk']

    if run.get_status() == 'Completed' and 'duration' in metrics and 'cost' not in metrics:
        cost = metrics['duration']*wage/60/60
        print(f'Total cost: ${cost}')
        run.log('cost', f'${round(cost, 4)}')

describe_1576980205_e36001e3
describe_1576979922_b2dd4f0a
describe_1576980259_b3689bd0
describe_1576980099_bd38c24f
describe_1576980368_4a5c951c
describe_1576980012_2fd2d782
describe_1576980233_12d16c65
describe_1576980152_554b09c7
describe_1576979985_287f17a1
describe_1576980313_1b85b1de
describe_1576980391_238f6d7a
Total cost: $0.6873880436007181
describe_1576980340_2c65a67a
describe_1576980067_15046e2c
describe_1576979958_809cdeb8
describe_1576980285_d6f380a2
describe_1576980179_63ccf9b6
describe_1576980125_ca10b665
describe_1576980040_8a1a0f5b


In [14]:
run.get_metrics()

{'nodes': 20,
 'vm_size': 'STANDARD_D12_V2',
 'vcpus': 80,
 'ram': 560,
 'disk': 4000,
 'wage': 7.4}

## Kill clusters

In [None]:
for ct in ws.compute_targets:
    #ws.compute_targets[ct].delete()
    pass

ws.compute_targets

## Cancel runs

In [None]:
exp = Experiment(ws, 'test6')

for run in exp.get_runs():
    if run.get_status() == 'Running':
        run.cancel()