### Setup Ray Cluster in AML

In [232]:
from azureml.core import Workspace

ws = Workspace.from_config()
print(ws.name, ws.location, ws.resource_group, sep = ' | ')

ws01ent | westus2 | azureml


In [186]:
from azureml.core.experiment import Experiment

# Experiment name
experiment_name = 'rl_talk_pong'
exp = Experiment(workspace=ws, name=experiment_name)

In [224]:
vnet_name = 'amlvnet'
from azureml.core.compute import AmlCompute, ComputeTarget

# Choose a name for the Ray head cluster
# head_compute_name = 'head-gpu-v3'
head_compute_name = 'head-gpu-v2'

head_compute_min_nodes = 0
head_compute_max_nodes = 2

# This example uses GPU VM. For using CPU VM, set SKU to STANDARD_D2_V2
head_vm_size = 'STANDARD_NC6S_V2'

if head_compute_name in ws.compute_targets:
    head_compute_target = ws.compute_targets[head_compute_name]
    if head_compute_target and type(head_compute_target) is AmlCompute:
        if head_compute_target.provisioning_state == 'Succeeded':
            print('found head compute target. just use it', head_compute_name)
        else: 
            raise Exception(
                'found head compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new head compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=head_vm_size,
        min_nodes=head_compute_min_nodes, 
        max_nodes=head_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the cluster
    head_compute_target = ComputeTarget.create(ws, head_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    head_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(head_compute_target.get_status().serialize())

found head compute target. just use it head-gpu-v2


In [233]:
# Choose a name for your Ray worker compute target
worker_compute_name = 'worker-cpu-f32'
worker_compute_min_nodes = 0 
worker_compute_max_nodes = 4

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
worker_vm_size = 'Standard_F32s_v2'

# Create the compute target if it hasn't been created already
if worker_compute_name in ws.compute_targets:
    worker_compute_target = ws.compute_targets[worker_compute_name]
    if worker_compute_target and type(worker_compute_target) is AmlCompute:
        if worker_compute_target.provisioning_state == 'Succeeded':
            print('found worker compute target. just use it', worker_compute_name)
        else: 
            raise Exception(
                'found worker compute target but it is in state', head_compute_target.provisioning_state)
else:
    print('creating a new worker compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=worker_vm_size,
        min_nodes=worker_compute_min_nodes,
        max_nodes=worker_compute_max_nodes,
        vnet_resourcegroup_name=ws.resource_group,
        vnet_name=vnet_name,
        subnet_name='default')

    # Create the compute target
    worker_compute_target = ComputeTarget.create(ws, worker_compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    worker_compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(worker_compute_target.get_status().serialize())

found worker compute target. just use it worker-cpu-f32


In [226]:
from azureml.contrib.train.rl import WorkerConfiguration

# Pip packages we will use for both head and worker
pip_packages=["ray[rllib]==0.8.7", "torch"] # Latest version of Ray has fixes for isses related to object transfers

# Specify the Ray worker configuration
worker_conf = WorkerConfiguration(
    
    # Azure Machine Learning compute target to run Ray workers
    compute_target=worker_compute_target, 
    
    # Number of worker nodes
    node_count=2,
    
    # GPU
    use_gpu=False, 
    
    # PIP packages to use
    pip_packages=pip_packages
)

### Running distributed RL pong training from scratch

In [227]:
from azureml.contrib.train.rl import ReinforcementLearningEstimator, Ray


# Training script parameters
script_params = {
    

    "--env": rl_environment,
    

    "--config": '\'{"num_gpus": 1, "num_workers": 66}\''
    

}
pip_packages_head=["ray[rllib]==0.8.7", "torch"] # Latest version of Ray has fixes for isses related to object transfers

#  Reinforcement learning estimator
rl_estimator = ReinforcementLearningEstimator(
    environment = myenv,
    
    # Location of source files
    source_directory='distributed_rl_from_scratch',
    
    # Python script file
    entry_script="dqn_pong.py",
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure Machine Learning compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages_head,
    
    # GPU usage
    use_gpu=True,
    
    # Reinforcement learning framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    max_run_duration_seconds=360000,
    
    # Allow the docker container Ray runs in to make full use
    # of the shared memory available from the host OS.
    shm_size=24*1024*1024*1024
)

In [235]:
run = exp.submit(config=rl_estimator)

In [234]:
from azureml.widgets import RunDetails

RunDetails(run).show()

_RLWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'sdk_v…

Exception in thread Exception in threading.excepthook:Exception ignored in thread started byException ignored in sys.unraisablehook

In [220]:
run.cancel()

### Running distributed RL pong with RLib

In [179]:
from azureml.contrib.train.rl import ReinforcementLearningEstimator, Ray

training_algorithm = "IMPALA"
rl_environment = "PongNoFrameskip-v4"

# Training script parameters
script_params = {
    
#     # Training algorithm, IMPALA in this case
    "--run": training_algorithm,
#     "--use_pytorch":True,
#     "--checkpoint-freq": 10,
#     "--checkpoint-at-end": True,
    # Environment, Pong in this case
    "--env": rl_environment,
    
    # Add additional single quotes at the both ends of string values as we have spaces in the 
    # string parameters, outermost quotes are not passed to scripts as they are not actually part of string
    # Number of GPUs
    # Number of ray workers
    "--config": '\'{"num_gpus": 1, "num_workers": 36}\'',
        "--stop": '\'{"episode_reward_mean": 18, "time_total_s": 3600}\''

    # Target episode reward mean to stop the training
    # Total training time in seconds
#     "--stop": '\'{"episode_reward_mean": 0.9, "time_total_s": 3600}\'',
}
pip_packages_head=["ray[rllib]==0.8.7"] # Latest version of Ray has fixes for isses related to object transfers

#  Reinforcement learning estimator
rl_estimator2 = ReinforcementLearningEstimator(
#     environment = myenv,
    
    # Location of source files
    source_directory='distributed_rl_with_rllib/files',
    
    # Python script file
    entry_script="pong_rllib.py",
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure Machine Learning compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages,
    
    # GPU usage
    use_gpu=True,
    
    # Reinforcement learning framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    max_run_duration_seconds=360000,
    
    # Allow the docker container Ray runs in to make full use
    # of the shared memory available from the host OS.
    shm_size=24*1024*1024*1024
)

In [236]:
run2 = exp.submit(config=rl_estimator2)

In [237]:
from azureml.widgets import RunDetails

RunDetails(run2).show()

_RLWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'sdk_v…

Exception in thread Exception in threading.excepthook:Exception ignored in thread started byException ignored in sys.unraisablehook