Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

## Agent Training – Single Job Set

In this notebook we train a RL agent to solve the resource allocation problem, as described in the exploration notebook.

In this implementation, we train using a single job set. This means the agent sees the same single set of jobs at each simulation repeatedly during training.

The point here is to show that the agent can learn a policy that is better than both the random and shortest-job-first policies as shown in the exploration notebook, but is not able to generalize to an unseen job set, different than the one used during training.

We assume that you are running this notebook in a Compute Instance on Azure ML with the Azure ML SDK version 1.17.0

In [1]:
import azureml.core

# Check core SDK version number
print('Azure Machine Learning SDK Version:', azureml.core.VERSION)

Azure Machine Learning SDK Version: 1.17.0


Get a reference to your Azure ML Workspace.

In [2]:
from azureml.core import Workspace

ws = Workspace.from_config()

Create an Azure ML Experiment to track the agent training runs in your Workspace.

In [3]:
from azureml.core.experiment import Experiment

# Experiment name
experiment_name = 'deeprm_single_jobset_rllib_pg'
exp = Experiment(workspace=ws, name=experiment_name)

Create an Azure ML Compute Target for the head node of the Ray cluster. The actual RLLib training process happens in the Ray head node. Here we define a Compute Target having only one node. We use a virtual machine type having a GPU to accelerate the model training.

In [4]:
from azureml.core.compute import AmlCompute, ComputeTarget

# Choose a name for the Ray head cluster
head_compute_name = 'head-gpu'
head_compute_min_nodes = 0
head_compute_max_nodes = 1

# This example uses GPU VM. For using CPU VM, set SKU to STANDARD_D2_V2
head_vm_size = 'STANDARD_NC6'

# Virtual network name
vnet_name = 'your_vnet'

if head_compute_name in ws.compute_targets:
    head_compute_target = ws.compute_targets[head_compute_name]
    if head_compute_target and type(head_compute_target) is AmlCompute:
        if head_compute_target.provisioning_state == 'Succeeded':
            print('found head compute target. just use it', head_compute_name)
        else: 
            raise Exception(
                'found head compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new head compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=head_vm_size,
        min_nodes=head_compute_min_nodes, 
        max_nodes=head_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the cluster
    head_compute_target = ComputeTarget.create(ws, head_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    head_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(head_compute_target.get_status().serialize())

found head compute target. just use it head-gpu


Create an Azure ML Compute Target for the worker nodes of the Ray cluster. Ray worker nodes run the environment rollout processes, where the RL agent interacts with the RL environment, executing actions given by the policy model being trained and collecting state observations and rewards. Here we create a Compute Target having two nodes. We use virtual machine types having 4 CPUs core each.

We will have in total 8 CPUs from the worker nodes plus 6 from the head node. You will notice later that we specify 12 worker processes for RLLib, each allocating up to 1 CPU, and each running 5 RL environment simulations.

In [5]:
# Choose a name for your Ray worker compute target
worker_compute_name = 'worker-cpu'
worker_compute_min_nodes = 0 
worker_compute_max_nodes = 2

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
worker_vm_size = 'STANDARD_D4_V3'

# Create the compute target if it hasn't been created already
if worker_compute_name in ws.compute_targets:
    worker_compute_target = ws.compute_targets[worker_compute_name]
    if worker_compute_target and type(worker_compute_target) is AmlCompute:
        if worker_compute_target.provisioning_state == 'Succeeded':
            print('found worker compute target. just use it', worker_compute_name)
        else: 
            raise Exception(
                'found worker compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new worker compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=worker_vm_size,
        min_nodes=worker_compute_min_nodes,
        max_nodes=worker_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the compute target
    worker_compute_target = ComputeTarget.create(ws, worker_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    worker_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(worker_compute_target.get_status().serialize())

found worker compute target. just use it worker-cpu


Define a *WorkerConfiguration* using your worker compute target. We specify the number of nodes in the worker compute target to be used for training and additional PIP packages to install on those nodes as a part of setup. In this case, we define the PIP packages as dependencies for both head and worker nodes. With this setup, the game simulations will run directly on the worker compute nodes.

In [6]:
from azureml.contrib.train.rl import WorkerConfiguration

# Pip packages we will use for both head and worker
pip_packages=['ray[rllib]==0.8.5', 'torch'] # version of Ray tested with Azure ML

# Specify the Ray worker configuration
worker_conf = WorkerConfiguration(
    
    # Azure Machine Learning compute target to run Ray workers
    compute_target=worker_compute_target, 
    
    # Number of worker nodes
    node_count=2,
    
    # GPU
    use_gpu=False, 
    
    # PIP packages to use
    pip_packages=pip_packages
)

To facilitate reinforcement learning, Azure Machine Learning Python SDK provides a high level abstraction, the *ReinforcementLearningEstimator* class, which allows users to easily construct reinforcement learning run configurations for the underlying reinforcement learning framework. Reinforcement Learning in Azure Machine Learning supports the open source [Ray framework](https://ray.io/) and its highly customizable [RLLib](https://ray.readthedocs.io/en/latest/rllib.html#rllib-scalable-reinforcement-learning).

The *ReinforcementLearningEstimator* is used to submit a job to Azure Machine Learning to start the Ray experiment run. We define the training script parameters here that will be passed to the estimator.

Those parameters include:
- Parameters for the RL algorithm implemented in RLLib, and the underlying Ray infrastructure. We are using Policy Gradients in our implementation.
- Parameters for the RL environment. Here we specify only the simulation length and number of job sets.

The *ReinforcementLearningEstimator* specifies the location of the training scripts and associated files by the *source_directory* parameter. There we have the training script, specified by the *entry_script* parameter and our custom environment file.

In [7]:
from azureml.contrib.train.rl import ReinforcementLearningEstimator, Ray

# Training script parameters
script_params = {
    # parameter for Ray
    '--default_ray_address': 'localhost:6379',
    # parameters for RLLib
    '--gamma': 0.99,
    '--num_gpus': 1,
    '--num_workers': 12,
    '--num_envs_per_worker': 5,
    '--num_cpus_per_worker': 1,
    '--use_pytorch': 1,
    '--timesteps_per_iteration': 100000,
    '--rollout_fragment_length': 50,
    '--train_batch_size': 500,
    '--lr': 0.00025,
    '--num_iterations': 300,
    # parameters for the resource allocation environment
    '--simu_len': 50,
    '--num_ex': 1
}

#  Reinforcement learning estimator
rl_estimator = ReinforcementLearningEstimator(
    
    # Location of source files
    source_directory='training_scripts',
    
    # Python script file
    entry_script='deeprm_rllib_pg_train.py',
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure Machine Learning compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages,
    
    # GPU usage
    use_gpu=True,
    
    # Reinforcement learning framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    #max_run_duration_seconds=3600,
    
    # Allow the docker container Ray runs in to make full use
    # of the shared memory available from the host OS.
    shm_size='16g'
)

Submit the estimator to start a run in Azure ML.

In [8]:
run = exp.submit(config=rl_estimator)

Monitor and view results using the Azure Machine Learning Jupyter widget to see the status of your runs in real time. In our implementation, the widget shows two child runs: one for head and one for workers.

When the *Status* in the table below shows *Running*, you can click on the onr corresponding to the head node (the lower run number) and a new window will open showing the metrics being captured by the training script.

In [9]:
from azureml.widgets import RunDetails

RunDetails(run).show()

_RLWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'sdk_v…

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

Wait for the run to complete before proceeding. The run may take about 1 hour to complete.

In [10]:
run.wait_for_completion()

{'runId': 'deeprm_single_jobset_rllib_pg_1604867900_74b66959',
 'status': 'Completed',
 'startTimeUtc': '2020-11-08T20:38:48.97646Z',
 'endTimeUtc': '2020-11-08T21:30:29.459505Z',
 'properties': {},
 'inputDatasets': [],
 'outputDatasets': [],
 'logFiles': {'azureml-logs/reinforcementlearning.txt': 'https://alvilcekmlwork8166615291.blob.core.windows.net/azureml/ExperimentRun/dcid.deeprm_single_jobset_rllib_pg_1604867900_74b66959/azureml-logs/reinforcementlearning.txt?sv=2019-02-02&sr=b&sig=gq1Itn4UBVsBsVFP4l8YL8b9kEAnvOpb%2FsHdcYIo7iU%3D&st=2020-11-08T20%3A28%3A33Z&se=2020-11-09T04%3A38%3A33Z&sp=r'}}

Our *ReinforcementLearningEstimator* run has 2 child runs: one for the Ray head node and another for the Ray worker nodes. Here we get a reference to the first child run, where the Ray head node runs the RLLib trainer that performs the agent training process.

In [11]:
import time

child_run_0 = None
timeout = 30
while timeout > 0 and not child_run_0:
    child_runs = list(run.get_children())
    print('Number of child runs:', len(child_runs))
    if len(child_runs) > 0:
        child_run_0 = child_runs[0]
        break
    time.sleep(2)
    timeout -= 2

print('Child run info:')
print(child_run_0)

Number of child runs: 2
Child run info:
Run(Experiment: deeprm_single_jobset_rllib_pg,
Id: deeprm_single_jobset_rllib_pg_1604867900_74b66959_head,
Type: azureml.scriptrun,
Status: Completed)


We can also simply use run id to get a handle to an in-progress or a previously concluded run.

In [12]:
from azureml.core import Run

run_id = child_run_0.id
child_run_0 = Run(exp, run_id=run_id)

Then we can list all logs and artifacts produced by the run.

In [13]:
child_run_0.get_file_names()

['azureml-logs/55_azureml-execution-tvmps_77a89e2d10a78122301f1e03a5eb5955680510e70fa7a6dfbc980638e0a28980_d.txt',
 'azureml-logs/55_azureml-execution-tvmps_c852d7abefb9b65c2f17d2264bc9d8a7088d2d5a6a65aa4c6f1bd792dd8907bc_d.txt',
 'azureml-logs/55_azureml-execution-tvmps_ff97d8d8dc8a2150423c77c501e229a810d3513908bf597cf5cc6a3d83486869_d.txt',
 'azureml-logs/65_job_prep-tvmps_77a89e2d10a78122301f1e03a5eb5955680510e70fa7a6dfbc980638e0a28980_d.txt',
 'azureml-logs/65_job_prep-tvmps_c852d7abefb9b65c2f17d2264bc9d8a7088d2d5a6a65aa4c6f1bd792dd8907bc_d.txt',
 'azureml-logs/65_job_prep-tvmps_ff97d8d8dc8a2150423c77c501e229a810d3513908bf597cf5cc6a3d83486869_d.txt',
 'azureml-logs/70_driver_log.txt',
 'azureml-logs/70_driver_log_0.txt',
 'azureml-logs/70_driver_log_1.txt',
 'azureml-logs/70_mpi_log.txt',
 'azureml-logs/75_job_post-tvmps_77a89e2d10a78122301f1e03a5eb5955680510e70fa7a6dfbc980638e0a28980_d.txt',
 'azureml-logs/process_info.json',
 'azureml-logs/process_status.json',
 'logs/azureml/0_1

In the training script, our model checkpoint is saved using the RLLib trainer. By default, the naming convention for the checkpoint file is *checkpoint-\<iteration\>*. Here we download the checkpoint files needed to recreate the trained agent later.

In [14]:
checkpoint_folder = '../model_checkpoints/single_jobset'
os.makedirs(checkpoint_folder, exist_ok = True)

for f in child_run_0.get_file_names():
    f_name = os.path.basename(f)
    if f_name.startswith('checkpoint-'):
        child_run_0.download_file(f, checkpoint_folder)

Below you can find code snippets to clean up any resources created as part of this tutorial that you don't wish to retain.

In [None]:
# To archive the created experiment:
#exp.archive()

# To delete the compute targets:
#head_compute_target.delete()
#worker_compute_target.delete()