## PPO Hyperparameter Tuning

Here we will use Ray's Tune to search for the optimal hyperparameters of RLlib's Proximal Policy Optimization (PPO) algorithm

In [1]:
from wildcatter.advanced_environment_for_RLib import AdvancedDriller
import ray
from ray import air, tune
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.models import ModelCatalog
from gym.spaces import Box, Dict
import numpy as np
import matplotlib.pyplot as plt
from time import time

tf1, tf, tfv = try_import_tf(error=True)

class WildcatterActionMaskedModel(TFModelV2):
     
    def __init__(self, 
                 obs_space,
                 action_space,
                 num_outputs,
                 model_config,
                 name,
                 true_obs_shape=(11,40),
                 action_embed_size=4+38+1,
                 *args, **kwargs):
         
        super(WildcatterActionMaskedModel, self).__init__(obs_space,
            action_space, num_outputs, model_config, name, 
            *args, **kwargs)
         
        self.action_embed_model = FullyConnectedNetwork(
            Box(-np.inf, np.inf, shape=true_obs_shape), 
                action_space, action_embed_size,
            model_config, name + "_action_embed")
 
    def forward(self, input_dict, state, seq_lens):
        # Extract the available actions tensor from the observation.
        action_mask = input_dict["obs"]["action_mask"]
        # Compute the predicted action embedding
        action_embed, _ = self.action_embed_model({
            "obs": input_dict["obs"]["obs"]})
        # Mask out invalid actions (use tf.float32.min for stability)
        inf_mask = tf.maximum(tf.math.log(action_mask), tf.float32.min)
        # Return action_logits + inf_mask, state
        return action_embed + inf_mask, state
 
    def value_function(self):
        return self.action_embed_model.value_function()
    
ModelCatalog.register_custom_model('wildcatter_masked', WildcatterActionMaskedModel)

In [2]:
env_random_config = dict(model_type = "random",
                  nrow=11,
                  ncol=40,
                  funds=20,
                  oil_price = 40,
                  relocation_cost = 0.2,
                  drilling_cost = 0.5,
                  drilling_depth_markup = 0.1,
                  #seed = 0,
                 )

env_random_pockets_config = dict(model_type = "random_pockets",
                  nrow=11,
                  ncol=40,
                  #nrow=40,
                  #ncol=80,
                  funds=20,
                  oil_price = 1,
                  relocation_cost = 0.2,
                  drilling_cost = 0.5,
                  drilling_depth_markup = 0.1,
                  #seed = 0,
                 )

env_2d_from_csv_config = dict(model_type = "from_csv",
                  #model_path=r"/home/studio-lab-user/sagemaker-studiolab-notebooks/wildcatter-ThreeAmigos/examples/data/2d_two_rectangular_targets.csv",
                  #model_path=r"/home/studio-lab-user/sagemaker-studiolab-notebooks/wildcatter-ThreeAmigos/examples/data/2d_stacked.csv",
                  model_path=r"/home/studio-lab-user/sagemaker-studiolab-notebooks/wildcatter-ThreeAmigos/examples/data/x-sec_targets.csv",
                  delim=",",
                  funds=20,
                  oil_price = 40,
                  relocation_cost = 0.2,
                  drilling_cost = 0.5,
                  drilling_depth_markup = 0.1,
                  #seed = 0,
                  )

env_config = env_random_pockets_config
env = AdvancedDriller(env_config)
# Setting variables for PPO trainer
true_obs_shape = env.observation_space["obs"].shape
action_embed_size = env.action_space.n
# Environment can be registered for easy access, if needed.
#tune.register_env('wildcatter_driller', lambda environment_config_dict: AdvancedDriller(environment_config_dict))

In [3]:
import ray.rllib.algorithms.ppo as ppo
from ray.tune.logger import pretty_print

In [4]:
# Custom Trial Plateau stopper that computes St.D. of windowed average
from typing import Dict, Optional
from collections import defaultdict, deque
import numpy as np
from ray.tune import Stopper

class MyTrialPlateauStopper(Stopper):
    """Early stop single trials when their windowed average reaches a plateau.

    When the standard deviation of the window-averaged `metric` result of a trial is
    below a threshold `std`, the trial plateaued and will be stopped
    early.

    Args:
        metric: Metric to check for convergence.
        std: Maximum metric standard deviation to decide if a
            trial plateaued. Defaults to 0.01.
        num_results: Number of results to consider for stdev
            calculation.
        grace_period: Minimum number of timesteps before a trial
            can be early stopped
        metric_threshold (Optional[float]):
            Minimum or maximum value the result has to exceed before it can
            be stopped early.
        mode: If a `metric_threshold` argument has been
            passed, this must be one of [min, max]. Specifies if we optimize
            for a large metric (max) or a small metric (min). If max, the
            `metric_threshold` has to be exceeded, if min the value has to
            be lower than `metric_threshold` in order to early stop.
    """

    def __init__(
        self,
        metric: str,
        std: float = 15,
        num_results: int = 10,
        grace_period: int = 10,
        metric_threshold: Optional[float] = None,
        mode: Optional[str] = None,
    ):
        self._metric = metric
        self._mode = mode

        self._std = std
        self._num_results = num_results
        self._grace_period = grace_period
        self._metric_threshold = metric_threshold

        if self._metric_threshold:
            if mode not in ["min", "max"]:
                raise ValueError(
                    f"When specifying a `metric_threshold`, the `mode` "
                    f"argument has to be one of [min, max]. "
                    f"Got: {mode}"
                )

        self._iter = defaultdict(lambda: 0)
        self._trial_results = defaultdict(lambda: deque(maxlen=self._num_results))
        self._trial_averages = defaultdict(lambda: deque(maxlen=self._num_results))

    def __call__(self, trial_id: str, result: Dict):
        metric_result = result.get(self._metric)
        self._trial_results[trial_id].append(metric_result)
        # Calculate running averages
        try:
            windowed_avg = np.average(self._trial_results[trial_id])
        except Exception:
            windowed_avg = metric_result
        self._trial_averages[trial_id].append(windowed_avg)
        self._iter[trial_id] += 1

        # If still in grace period, do not stop yet
        if self._iter[trial_id] < self._grace_period:
            return False

        # If not enough results yet, do not stop yet
        if len(self._trial_results[trial_id]) < self._num_results:
            return False

        # If metric threshold value not reached, do not stop yet
        if self._metric_threshold is not None:
            if self._mode == "min" and metric_result > self._metric_threshold:
                return False
            elif self._mode == "max" and metric_result < self._metric_threshold:
                return False

        # Calculate stdev of last `num_results` averages
        try:
            current_std = np.std(self._trial_averages[trial_id])
        except Exception:
            current_std = float("inf")

        # If stdev is lower than threshold, stop early.
        return current_std < self._std

    def stop_all(self):
        return False

In [None]:
# Hyperparameter tuning
from ray.tune.tuner import Tuner
from ray.tune.stopper import ExperimentPlateauStopper
from ray.tune.schedulers import AsyncHyperBandScheduler as ASHAScheduler
from ray.air.config import CheckpointConfig
#from ray.tune import Callback

param_space_config = ppo.DEFAULT_CONFIG.copy()
special_config = {"num_gpus" : 0,
                  "num_workers" : 0,
                  "num_envs_per_worker" : 1,
                  "env": AdvancedDriller,
                  "env_config": env_config,
                  "model": {
                      "custom_model": "wildcatter_masked",
                      "custom_model_config": {
                          "true_obs_shape":true_obs_shape,
                          "action_embed_size":action_embed_size,
                      },
                  },
                  "framework": "tf2",
                  "horizon" : 40,
                  #"clip_param" : tune.uniform(0.1, 0.4),
                  "clip_param" : 0.2,
                  "eager_tracing" : True,
                  #"entropy_coeff" : tune.loguniform(1e-8, 1e-1),
                  "entropy_coeff" : 0,
                  "gamma" : tune.loguniform(0.9, 0.9999),
                  #"gamma" : 0.9,
                  #"lambda" : tune.loguniform(0.8, 1.0),
                  "lambda" : 1,
                  "lr" : tune.loguniform(1e-5, 1e-2),
                  "sgd_minibatch_size" : tune.choice([8, 16, 32, 64, 128, 256, 512]),
                  #"sgd_minibatch_size" : 128,
                  #"vf_loss_coeff" : tune.uniform(0,1),
                  "vf_loss_coeff" : 1.0,
                  "num_sgd_iter" : tune.choice([4, 8, 16, 32]),
                  #"num_sgd_iter" : 30,
                 }
param_space_config.update(special_config)

# Run config
stopper = MyTrialPlateauStopper("episode_reward_mean", std = 0.38, num_results = 10, grace_period = 10)

myCheckpointConfig = CheckpointConfig(num_to_keep = 3,
                                      checkpoint_frequency = 10,
                                      checkpoint_at_end = True)

myRunConfig = air.RunConfig(name = "Test0003",
                            local_dir = "~/sagemaker-studiolab-notebooks/",
                            stop = stopper,
                            checkpoint_config = myCheckpointConfig,
                            log_to_file = True,
                            verbose = 1,
                           )

# Tune config
asha_scheduler = ASHAScheduler(
    time_attr='time_total_s',
    metric='episode_reward_mean',
    mode='max',
    max_t=30*60,
    grace_period=2*60,
    reduction_factor=3,
    brackets=1)

myTuneConfig = tune.TuneConfig(scheduler=asha_scheduler,
                               num_samples=1000,
                               time_budget_s = 9.5 * 60 * 60, # In seconds. It exits gracefully when the wall clock time is reached.
                              )

ray.init(ignore_reinit_error=True)
tuner = Tuner("PPO", run_config=myRunConfig, param_space=param_space_config, tune_config=myTuneConfig )
results = tuner.fit()

In [5]:
# If we need to continue a previously-interrupted tuning experiment
tuner = Tuner.restore( path="~/ray_results/PPO" )
results = tuner.fit()

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__len__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_annotated',
 '_experiment_analysis',
 '_populate_exception',
 '_trial_to_result',
 'errors',
 'get_best_result',
 'get_dataframe',
 'num_errors',
 'num_terminated']

In [11]:
print("Best hyperparameters found were: ", results.get_best_result('episode_reward_mean', mode = 'max').config)

Best hyperparameters found were:  {'extra_python_environs_for_driver': {}, 'extra_python_environs_for_worker': {}, 'num_gpus': 0, 'num_cpus_per_worker': 1, 'num_gpus_per_worker': 0, '_fake_gpus': False, 'custom_resources_per_worker': {}, 'placement_strategy': 'PACK', 'eager_tracing': True, 'eager_max_retraces': 20, 'tf_session_args': {'intra_op_parallelism_threads': 2, 'inter_op_parallelism_threads': 2, 'gpu_options': {'allow_growth': True}, 'log_device_placement': False, 'device_count': {'CPU': 1}, 'allow_soft_placement': True}, 'local_tf_session_args': {'intra_op_parallelism_threads': 8, 'inter_op_parallelism_threads': 8}, 'env': <class 'wildcatter.advanced_environment_for_RLib.AdvancedDriller'>, 'env_config': {'model_type': 'random_pockets', 'nrow': 11, 'ncol': 40, 'available_pipe': 30, 'available_wells': 3, 'oil_price': 1, 'relocation_cost': 0.2, 'drilling_cost': 0.5, 'drilling_depth_markup': 0.1}, 'observation_space': None, 'action_space': None, 'env_task_fn': None, 'render_env': 

In [6]:
print("Optimal clip_param:",results.get_best_result('episode_reward_mean', mode = 'max').config['clip_param'])
print("Optimal entropy_coeff:",results.get_best_result('episode_reward_mean', mode = 'max').config['entropy_coeff'])
print("Optimal gamma:",results.get_best_result('episode_reward_mean', mode = 'max').config['gamma'])
print("Optimal lambda:",results.get_best_result('episode_reward_mean', mode = 'max').config['lambda'])
print("Optimal learning rate:",results.get_best_result('episode_reward_mean', mode = 'max').config['lr'])
print("Optimal sgd_minibatch_size:",results.get_best_result('episode_reward_mean', mode = 'max').config['sgd_minibatch_size'])
print("Optimal vf_loss_coeff:",results.get_best_result('episode_reward_mean', mode = 'max').config['vf_loss_coeff'])
print("Optimal num_sgd_iter:",results.get_best_result('episode_reward_mean', mode = 'max').config['num_sgd_iter'])

Optimal clip_param: 0.2
Optimal entropy_coeff: 0.0
Optimal gamma: 0.9976181731890753
Optimal lambda: 1
Optimal learning rate: 0.00071177770995787
Optimal sgd_minibatch_size: 512
Optimal vf_loss_coeff: 1.0
Optimal num_sgd_iter: 16
