# Amazon SageMaker Notebook for ProcGen Starter Kit with heterogeneous scaling of multiple instances 

In [None]:
import os
import yaml

import sagemaker
from sagemaker.rl import RLEstimator, RLToolkit, RLFramework
import boto3

from IPython.display import HTML, Markdown
from source.common.docker_utils import build_and_push_docker_image
from source.common.markdown_helper import generate_help_for_s3_endpoint_permissions, create_s3_endpoint_manually

In [None]:
with open(os.path.join("config", "sagemaker_config.yaml")) as f:
    sagemaker_config = yaml.safe_load(f)

## Initialize Amazon SageMaker

In [None]:
sm_session = sagemaker.session.Session()
s3_bucket = sagemaker_config["S3_BUCKET"]

s3_output_path = 's3://{}/'.format(s3_bucket)
print("S3 bucket path: {}".format(s3_output_path))

In [None]:
job_name_prefix = 'sm-ray-hetero-dist-procgen'

role = sagemaker.get_execution_role()
print(role)

#### Note that `local_mode = True` does not work with heterogeneous scaling

# Configure the framework you want to use

Set `framework` to `"tf"` or `"torch"` for tensorflow or pytorch respectively.

You will also have to edit your entry point i.e., `train-sagemaker-distributed-gpu.py` with the configuration parameter `"use_pytorch"` to match the framework that you have selected.

In [None]:
framework = "tf"

# Train your model here

### Build custom container with procgen installed

We build a custom docker container with procgen installed. This takes care of everything:

1. Fetching base container image
2. Installing procgen and its dependencies
3. Uploading the new container image to ECR
4. This step can take a long time if you are running on a machine with a slow internet connection. If your notebook instance is in SageMaker or EC2 it should take 3-10 minutes depending on the instance type

In [None]:
# Build CPU image
cpu_repository_short_name = "sagemaker-procgen-ray-%s" % "cpu"
docker_build_args = {
    'CPU_OR_GPU': "cpu", 
    'AWS_REGION': boto3.Session().region_name,
    'FRAMEWORK': framework
}
cpu_image_name = build_and_push_docker_image(cpu_repository_short_name, build_args=docker_build_args)
print("Using CPU ECR image %s" % cpu_image_name)

# Build GPU image
gpu_repository_short_name = "sagemaker-procgen-ray-%s" % "gpu"
docker_build_args = {
    'CPU_OR_GPU': "gpu", 
    'AWS_REGION': boto3.Session().region_name,
    'FRAMEWORK': framework
}
gpu_image_name = build_and_push_docker_image(gpu_repository_short_name, build_args=docker_build_args)
print("Using GPU ECR image %s" % gpu_image_name)

# Train your Ray heterogeneous scaling job here

### Edit the training code

The training code is written in the file `train-sagemaker-distributed-gpu.py` which is uploaded in the /source directory.

*Note that ray will automatically set `"ray_num_cpus"` and `"ray_num_gpus"` in `_get_ray_config`*

In [None]:
!pygmentize source/train-sagemaker-distributed-gpu.py

### Train the RL model using the Python SDK Script mode

If you are using local mode, the training will run on the notebook instance. When using SageMaker for training, you can select a GPU or CPU instance. The RLEstimator is used for training RL jobs.

1. Specify the source directory where the environment, presets and training code is uploaded.
2. Specify the entry point as the training code
3. Specify the custom image to be used for the training environment.
4. Define the training parameters such as the instance count, job name, S3 path for output and job name.
5. Define the metrics definitions that you are interested in capturing in your logs. These can also be visualized in CloudWatch and SageMaker Notebooks.

In [None]:
metric_definitions =  [
    {'Name': 'training_iteration', 'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episodes_total', 'Regex': 'episodes_total: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'num_steps_trained', 'Regex': 'num_steps_trained: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'timesteps_total', 'Regex': 'timesteps_total: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
    {'Name': 'training_iteration', 'Regex': 'training_iteration: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},

    {'Name': 'episode_reward_max', 'Regex': 'episode_reward_max: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episode_reward_mean', 'Regex': 'episode_reward_mean: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'}, 
    {'Name': 'episode_reward_min', 'Regex': 'episode_reward_min: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?)'},
] 

To scale out RL training, we can increase the number of rollout workers. However, with more rollouts, training can often become the bottleneck. To prevent this, we can use an instance with one or more GPUs for training, and multiple CPU instances for rollouts.

Since SageMaker supports a single type of instance in a training job, we can achieve the above by spinning two SageMaker jobs and letting them communicate with each other. For the sake of naming, we'll use Primary cluster to refer to 1 or more GPU instances, and Secondary cluster to refer to the cluster of CPU instances.

> Please note that local_mode cannot be used for testing this type of scaling.

Before we configure the SageMaker job, let us first ensure that we run SageMaker in VPC mode. VPC mode will allow the two SageMaker jobs to communicate over network.

This can be done by supplying subnets and security groups to the job launching scripts. We will use the default VPC configuration for this example

In [None]:
ec2 = boto3.client('ec2')
default_vpc = [vpc['VpcId'] for vpc in ec2.describe_vpcs()['Vpcs'] if vpc["IsDefault"] == True][0]

default_security_groups = [group["GroupId"] for group in ec2.describe_security_groups()['SecurityGroups'] \
                   if group["GroupName"] == "default" and group["VpcId"] == default_vpc]

default_subnets = [subnet["SubnetId"] for subnet in ec2.describe_subnets()["Subnets"] \
                  if subnet["VpcId"] == default_vpc and subnet['DefaultForAz']==True]

print("Using default VPC:", default_vpc)
print("Using default security group:", default_security_groups)
print("Using default subnets:", default_subnets)

A SageMaker job running in VPC mode cannot access S3 resources. So, we need to create a VPC S3 endpoint to allow S3 access from SageMaker container. To learn more about the VPC mode, please visit [this link](https://docs.aws.amazon.com/sagemaker/latest/dg/train-vpc.html).

In [None]:
aws_region = boto3.Session().region_name
try:
    route_tables = [route_table["RouteTableId"] for route_table in ec2.describe_route_tables()['RouteTables']\
                if route_table['VpcId'] == default_vpc]
except Exception as e:
    if "UnauthorizedOperation" in str(e):
        display(Markdown(generate_help_for_s3_endpoint_permissions(role)))
    else:
        display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))
    raise e

print("Trying to attach S3 endpoints to the following route tables:", route_tables)

assert len(route_tables) >= 1, "No route tables were found. Please follow the VPC S3 endpoint creation "\
                              "guide by clicking the above link."

try:
    ec2.create_vpc_endpoint(DryRun=False,
                           VpcEndpointType="Gateway",
                           VpcId=default_vpc,
                           ServiceName="com.amazonaws.{}.s3".format(aws_region),
                           RouteTableIds=route_tables)
    print("S3 endpoint created successfully!")
except Exception as e:
    if "RouteAlreadyExists" in str(e):
        print("S3 endpoint already exists.")
    elif "UnauthorizedOperation" in str(e):
        display(Markdown(generate_help_for_s3_endpoint_permissions(role)))
        raise e
    else:
        display(Markdown(create_s3_endpoint_manually(aws_region, default_vpc)))
        raise e

#### Configure instance types

Let us configure a cluster with 1 Volta (V100) GPU and 40 CPU cores. We can do this by using 1 ml.p3.2xlarge instance and 2 ml.c5.4xlarge instances, since ml.p3.2xlarge has 8 CPU cores and ml.c5.4xlarge has 16 CPU cores.

In [None]:
primary_cluster_instance_type = sagemaker_config["GPU_TRAINING_INSTANCE"]
primary_cluster_instance_count = 1

secondary_cluster_instance_type = sagemaker_config["CPU_TRAINING_INSTANCE"]
secondary_cluster_instance_count = 2

total_cpus = 40 - 1 # Leave one for ray scheduler
total_gpus = 1

For heterogeneous training, we also pass some additional parameters to the training job that aid in synchronization across instances:

* `s3_bucket`, `s3_prefix`: Used for storing metadata like master IP address
* `rl_cluster_type`: "primary" or "secondary"
* `aws_region`: This is required for making connection to S3 in VPC mode
* `rl_num_instances_secondary`: Number of nodes in secondary cluster
* `subnets`, `security_group_ids`: Required by VPC mode

In [None]:
# We explicitly need to specify these params so that the two jobs can synchronize using the metadata stored here
s3_prefix = "hetero-training-job"

# Make sure that the prefix is empty
!aws s3 rm --recursive s3://{s3_bucket}/{s3_prefix}

Note for internal testing: If you kick-start another heterogenous training job, make sure to either run tha above cell directly or rename the s3_prefix and re-run the above cell. Otherwise metadata in s3 bucket wiill be incorrect and instances in the new job cannot join the cluster.

#### Launch primary cluster (1 GPU training instance)

Note for internal testing: Make sure at least one gpu is required in rllib config (`exp{'Config':{'num_gpus'}}`). Otherwise the Trainer() will be placed on secondary instance, leading to checkpoint restoration failure during model export.

In [None]:
env = "coinrun"

primary_cluster_estimator = RLEstimator(entry_point="train-sagemaker-distributed-gpu.py",
                            source_dir='source',
                            dependencies=["source/utils", "source/common/", 
                                          "neurips2020-procgen-starter-kit/"],
                            image_uri=gpu_image_name,
                            role=role,
                            instance_type=primary_cluster_instance_type,
                            instance_count=primary_cluster_instance_count,
                            output_path=s3_output_path,
                            base_job_name=job_name_prefix,
                            metric_definitions=metric_definitions,
                            max_run=int(3600 * .5), # Maximum runtime in seconds
                            hyperparameters={
                                "s3_prefix": s3_prefix, # Important for syncing
                                "s3_bucket": s3_bucket, # Important for syncing
                                "aws_region": boto3.Session().region_name, # Important for S3 connection
                                "rl_cluster_type": "primary", # Important for syncing
                                "rl_num_instances_secondary": secondary_cluster_instance_count, # Important for syncing
                                
                                #"rl.training.upload_dir": s3_output_path,
                                "rl.training.config.env_config.env_name": env,
                                
                                #"rl.training.config.num_workers": total_cpus,
                                #"rl.training.config.train_batch_size": 20000,
                                #"rl.training.config.num_gpus": total_gpus,
                            },
                            subnets=default_subnets, # Required for VPC mode
                            security_group_ids=default_security_groups # Required for VPC mode
                        )

primary_cluster_estimator.fit(wait=False)
primary_job_name = primary_cluster_estimator.latest_training_job.job_name
print("Primary Training job: %s" % primary_job_name)

#### Launch secondary cluster (2 CPU instances)

In [None]:
secondary_cluster_estimator = RLEstimator(entry_point="train-sagemaker-distributed-gpu.py",
                            source_dir='source',
                            dependencies=["source/utils", "source/common/", 
                                          "neurips2020-procgen-starter-kit/"],
                            image_uri=cpu_image_name,
                            role=role,
                            instance_type=secondary_cluster_instance_type,
                            instance_count=secondary_cluster_instance_count,
                            output_path=s3_output_path,
                            base_job_name=job_name_prefix,
                            metric_definitions=metric_definitions,
                            max_run=3600, # Maximum runtime in seconds
                            hyperparameters={
                                "s3_prefix": s3_prefix, # Important for syncing
                                "s3_bucket": s3_bucket, # Important for syncing
                                "aws_region": boto3.Session().region_name, # Important for S3 connection
                                "rl_cluster_type": "secondary", # Important for syncing
                                
                                #"rl.training.upload_dir": s3_output_path,
                                "rl.training.config.env_config.env_name": env,
                            },
                            subnets=default_subnets,
                            security_group_ids=default_security_groups # Required for VPC mode
                        )

secondary_cluster_estimator.fit(wait=False)
secondary_job_name = secondary_cluster_estimator.latest_training_job.job_name
print("Secondary Training job: %s" % secondary_job_name)