diff --git a/maro/cli/local/commands.py b/maro/cli/local/commands.py index 8f1904c46..041208651 100644 --- a/maro/cli/local/commands.py +++ b/maro/cli/local/commands.py @@ -61,7 +61,7 @@ def get_redis_conn(port=None): # Functions executed on CLI commands -def run(conf_path: str, containerize: bool = False, evaluate_only: bool = False, **kwargs): +def run(conf_path: str, containerize: bool = False, seed: int = None, evaluate_only: bool = False, **kwargs): # Load job configuration file parser = ConfigParser(conf_path) if containerize: @@ -71,13 +71,14 @@ def run(conf_path: str, containerize: bool = False, evaluate_only: bool = False, LOCAL_MARO_ROOT, DOCKERFILE_PATH, DOCKER_IMAGE_NAME, + seed=seed, evaluate_only=evaluate_only, ) except KeyboardInterrupt: stop_rl_job_with_docker_compose(parser.config["job"], LOCAL_MARO_ROOT) else: try: - start_rl_job(parser, LOCAL_MARO_ROOT, evaluate_only=evaluate_only) + start_rl_job(parser, LOCAL_MARO_ROOT, seed=seed, evaluate_only=evaluate_only) except KeyboardInterrupt: sys.exit(1) diff --git a/maro/cli/local/utils.py b/maro/cli/local/utils.py index 0505863b3..25d98a88d 100644 --- a/maro/cli/local/utils.py +++ b/maro/cli/local/utils.py @@ -4,7 +4,7 @@ import os import subprocess from copy import deepcopy -from typing import List +from typing import List, Optional import docker import yaml @@ -110,12 +110,15 @@ def exec(cmd: str, env: dict, debug: bool = False) -> subprocess.Popen: def start_rl_job( parser: ConfigParser, maro_root: str, + seed: Optional[int], evaluate_only: bool, background: bool = False, ) -> List[subprocess.Popen]: procs = [ exec( - f"python {script}" + ("" if not evaluate_only else " --evaluate_only"), + f"python {script}" + + ("" if not evaluate_only else " --evaluate_only") + + ("" if seed is None else f" --seed {seed}"), format_env_vars({**env, "PYTHONPATH": maro_root}, mode="proc"), debug=not background, ) @@ -169,6 +172,7 @@ def start_rl_job_with_docker_compose( context: str, dockerfile_path: str, image_name: str, + seed: Optional[int], evaluate_only: bool, ) -> None: common_spec = { @@ -185,7 +189,9 @@ def start_rl_job_with_docker_compose( **deepcopy(common_spec), **{ "container_name": component, - "command": f"python3 {script}" + ("" if not evaluate_only else " --evaluate_only"), + "command": f"python3 {script}" + + ("" if not evaluate_only else " --evaluate_only") + + ("" if seed is None else f" --seed {seed}"), "environment": format_env_vars(env, mode="docker-compose"), }, } diff --git a/maro/rl/workflows/main.py b/maro/rl/workflows/main.py index e28a46035..5df535bb6 100644 --- a/maro/rl/workflows/main.py +++ b/maro/rl/workflows/main.py @@ -15,7 +15,7 @@ from maro.rl.utils.common import float_or_none, get_env, int_or_none, list_or_none from maro.rl.utils.training import get_latest_ep from maro.rl.workflows.callback import CallbackManager, Checkpoint, EarlyStopping, MetricsRecorder -from maro.utils import LoggerV2 +from maro.utils import LoggerV2, set_seeds class WorkflowEnvAttributes: @@ -85,6 +85,7 @@ def __init__(self) -> None: def _get_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="MARO RL workflow parser") parser.add_argument("--evaluate_only", action="store_true", help="Only run evaluation part of the workflow") + parser.add_argument("--seed", type=int, help="The random seed set before running this job") return parser.parse_args() @@ -243,9 +244,13 @@ def evaluate_only_workflow(rl_component_bundle: RLComponentBundle, env_attr: Wor if __name__ == "__main__": + args = _get_args() + if args.seed is not None: + set_seeds(seed=args.seed) + scenario_path = get_env("SCENARIO_PATH") scenario_path = os.path.normpath(scenario_path) sys.path.insert(0, os.path.dirname(scenario_path)) module = importlib.import_module(os.path.basename(scenario_path)) - main(getattr(module, "rl_component_bundle"), WorkflowEnvAttributes(), args=_get_args()) + main(getattr(module, "rl_component_bundle"), WorkflowEnvAttributes(), args=args) diff --git a/tests/rl/gym_wrapper/env_sampler.py b/tests/rl/gym_wrapper/env_sampler.py index 20d387b72..f95aaa546 100644 --- a/tests/rl/gym_wrapper/env_sampler.py +++ b/tests/rl/gym_wrapper/env_sampler.py @@ -68,26 +68,32 @@ def _post_eval_step(self, cache_element: CacheElement) -> None: self._eval_rewards.append((len(rewards), np.sum(rewards))) def post_collect(self, info_list: list, ep: int) -> None: - cur = { - "n_steps": sum([n for n, _ in self._sample_rewards]), - "n_segment": len(self._sample_rewards), - "avg_reward": np.mean([r for _, r in self._sample_rewards]), - "avg_n_steps": np.mean([n for n, _ in self._sample_rewards]), - "max_n_steps": np.max([n for n, _ in self._sample_rewards]), - "n_interactions": self._total_number_interactions, - } - self.metrics.update(cur) - # clear validation metrics - self.metrics = {k: v for k, v in self.metrics.items() if not k.startswith("val/")} - self._sample_rewards.clear() + if len(self._sample_rewards) > 0: + cur = { + "n_steps": sum([n for n, _ in self._sample_rewards]), + "n_segment": len(self._sample_rewards), + "avg_reward": np.mean([r for _, r in self._sample_rewards]), + "avg_n_steps": np.mean([n for n, _ in self._sample_rewards]), + "max_n_steps": np.max([n for n, _ in self._sample_rewards]), + "n_interactions": self._total_number_interactions, + } + self.metrics.update(cur) + # clear validation metrics + self.metrics = {k: v for k, v in self.metrics.items() if not k.startswith("val/")} + self._sample_rewards.clear() + else: + self.metrics = {"n_interactions": self._total_number_interactions} def post_evaluate(self, info_list: list, ep: int) -> None: - cur = { - "val/n_steps": sum([n for n, _ in self._eval_rewards]), - "val/n_segment": len(self._eval_rewards), - "val/avg_reward": np.mean([r for _, r in self._eval_rewards]), - "val/avg_n_steps": np.mean([n for n, _ in self._eval_rewards]), - "val/max_n_steps": np.max([n for n, _ in self._eval_rewards]), - } - self.metrics.update(cur) - self._eval_rewards.clear() + if len(self._eval_rewards) > 0: + cur = { + "val/n_steps": sum([n for n, _ in self._eval_rewards]), + "val/n_segment": len(self._eval_rewards), + "val/avg_reward": np.mean([r for _, r in self._eval_rewards]), + "val/avg_n_steps": np.mean([n for n, _ in self._eval_rewards]), + "val/max_n_steps": np.max([n for n, _ in self._eval_rewards]), + } + self.metrics.update(cur) + self._eval_rewards.clear() + else: + self.metrics = {k: v for k, v in self.metrics.items() if not k.startswith("val/")} diff --git a/tests/rl/log/Ant_1.png b/tests/rl/log/Ant_1.png index d18195f88..e9bb76687 100644 Binary files a/tests/rl/log/Ant_1.png and b/tests/rl/log/Ant_1.png differ diff --git a/tests/rl/log/Ant_11.png b/tests/rl/log/Ant_11.png index 2f6dc3235..d19411e77 100644 Binary files a/tests/rl/log/Ant_11.png and b/tests/rl/log/Ant_11.png differ diff --git a/tests/rl/log/HalfCheetah_1.png b/tests/rl/log/HalfCheetah_1.png index faba7a07d..a20382a69 100644 Binary files a/tests/rl/log/HalfCheetah_1.png and b/tests/rl/log/HalfCheetah_1.png differ diff --git a/tests/rl/log/HalfCheetah_11.png b/tests/rl/log/HalfCheetah_11.png index 19844ceb2..4d2e031e3 100644 Binary files a/tests/rl/log/HalfCheetah_11.png and b/tests/rl/log/HalfCheetah_11.png differ diff --git a/tests/rl/log/Hopper_1.png b/tests/rl/log/Hopper_1.png index c54a9ffe4..d7944ac90 100644 Binary files a/tests/rl/log/Hopper_1.png and b/tests/rl/log/Hopper_1.png differ diff --git a/tests/rl/log/Hopper_11.png b/tests/rl/log/Hopper_11.png index 8197dcac9..693ed96d4 100644 Binary files a/tests/rl/log/Hopper_11.png and b/tests/rl/log/Hopper_11.png differ diff --git a/tests/rl/log/Swimmer_1.png b/tests/rl/log/Swimmer_1.png index 5408aafde..895e5599e 100644 Binary files a/tests/rl/log/Swimmer_1.png and b/tests/rl/log/Swimmer_1.png differ diff --git a/tests/rl/log/Swimmer_11.png b/tests/rl/log/Swimmer_11.png index 483a54806..e259bb707 100644 Binary files a/tests/rl/log/Swimmer_11.png and b/tests/rl/log/Swimmer_11.png differ diff --git a/tests/rl/log/Walker2d_1.png b/tests/rl/log/Walker2d_1.png index 20560bc77..caa9f1336 100644 Binary files a/tests/rl/log/Walker2d_1.png and b/tests/rl/log/Walker2d_1.png differ diff --git a/tests/rl/log/Walker2d_11.png b/tests/rl/log/Walker2d_11.png index 822aa9394..d9e841101 100644 Binary files a/tests/rl/log/Walker2d_11.png and b/tests/rl/log/Walker2d_11.png differ diff --git a/tests/rl/performance.md b/tests/rl/performance.md index 9b7afdd8c..75442035c 100644 --- a/tests/rl/performance.md +++ b/tests/rl/performance.md @@ -1,13 +1,14 @@ # Performance for Gym Task Suite -We benchmarked the MARO RL Toolkit implementation in Gym task suite. -Some are compared to the benchmarks in [OpenAI Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#). -Limited by the environment version difference, -there may be some gaps between the performance here and that in Spinning Up benchmarks. +We benchmarked the MARO RL Toolkit implementation in Gym task suite. Some are compared to the benchmarks in +[OpenAI Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#). We've tried to align the +hyper-parameters for these benchmarks , but limited by the environment version difference, there may be some gaps +between the performance here and that in Spinning Up benchmarks. Generally speaking, the performance is comparable. ## Experimental Setting -The hyper-parameters are set to align with those used in [Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#experiment-details): +The hyper-parameters are set to align with those used in +[Spinning Up](https://spinningup.openai.com/en/latest/spinningup/bench.html#experiment-details): **Batch Size**: @@ -24,13 +25,25 @@ The hyper-parameters are set to align with those used in [Spinning Up](https://s - For on-policy algorithms: measured as the average trajectory return across the batch collected at each epoch; - For off-policy algorithms: measured once every 10,000 steps by running the deterministic policy (or, in the case of SAC, the mean policy) without action noise for ten trajectories, and reporting the average return over those test trajectories; -**Total timesteps**: set to 3M for all task suites and algorithms. +**Total timesteps**: set to 4M for all task suites and algorithms. -Other parameters are set to the values in *tests/rl/tasks/*. +More details about the parameters can be found in *tests/rl/tasks/*. -## Performance Comparison +## Performance -Five environments from the MuJoCo Gym task suite are reported in Spinning Up, they are: HalfCheetah, Hopper, Walker2d, Swimmer, and Ant. +Five environments from the MuJoCo Gym task suite are reported in Spinning Up, they are: HalfCheetah, Hopper, Walker2d, +Swimmer, and Ant. The commit id of the code used to conduct the experiments for MARO RL benchmarks is ee25ce1e97. +The commands used are: + +```sh +# Step 1: Set up the MuJoCo Environment in file tests/rl/gym_wrapper/common.py + +# Step 2: Use the command below to run experiment with ALGORITHM (ddpg, ppo, sac) and random seed SEED. +python tests/rl/run.py tests/rl/tasks/ALGORITHM/config.yml --seed SEED + +# Step 3: Plot performance curves by environment with specific smooth window size WINDOWSIZE. +python tests/rl/plot.py --smooth WINDOWSIZE +``` | **Env** | **Spinning Up** | **MARO RL w/o Smooth** | **MARO RL w/ Smooth** | |:---------------:|:---------------:|:----------------------:|:---------------------:| diff --git a/tests/rl/plot.py b/tests/rl/plot.py index 7f0ec4d98..7fd394c62 100644 --- a/tests/rl/plot.py +++ b/tests/rl/plot.py @@ -54,32 +54,32 @@ def get_on_policy_data(log_dir: str) -> Tuple[np.ndarray, np.ndarray]: def plot_performance_curves(title: str, dir_names: List[str], smooth_window_size: int) -> None: - for name in dir_names: - log_dir = os.path.join(LOG_DIR, name) - if not os.path.exists(log_dir): + for algorithm in color_map.keys(): + if algorithm in ["ddpg", "sac", "td3"]: + func = get_off_policy_data + elif algorithm in ["ppo", "vpg"]: + func = get_on_policy_data + + log_dirs = [os.path.join(LOG_DIR, name) for name in dir_names if algorithm in name] + series = [func(log_dir) for log_dir in log_dirs if os.path.exists(log_dir)] + if len(series) == 0: continue - if "ppo" in name: - algorithm = "ppo" - func = get_on_policy_data - elif "sac" in name: - algorithm = "sac" - func = get_off_policy_data - elif "ddpg" in name: - algorithm = "ddpg" - func = get_off_policy_data - else: - raise "unknown algorithm name" + x = series[0][0] + assert all(len(_x) == len(x) for _x, _ in series), f"Input data should share the same length!" + ys = np.array([smooth(y, smooth_window_size) for _, y in series]) + y_mean = np.mean(ys, axis=0) + y_std = np.std(ys, axis=0) - x, y = func(log_dir) - y = smooth(y, smooth_window_size) - plt.plot(x, y, label=algorithm, color=color_map[algorithm]) + plt.plot(x, y_mean, label=algorithm, color=color_map[algorithm]) + plt.fill_between(x, y_mean - y_std, y_mean + y_std, color=color_map[algorithm], alpha=0.2) plt.legend() + plt.grid() plt.title(title) plt.xlabel("Total Env Interactions") - plt.ylabel(f"Average Trajectory Return (moving average with window size = {smooth_window_size})") - plt.savefig(os.path.join(LOG_DIR, f"{title}_{smooth_window_size}.png")) + plt.ylabel(f"Average Trajectory Return \n(moving average with window size = {smooth_window_size})") + plt.savefig(os.path.join(LOG_DIR, f"{title}_{smooth_window_size}.png"), bbox_inches="tight") plt.close() @@ -91,6 +91,10 @@ def plot_performance_curves(title: str, dir_names: List[str], smooth_window_size for env_name in ["HalfCheetah", "Hopper", "Walker2d", "Swimmer", "Ant"]: plot_performance_curves( title=env_name, - dir_names=[f"{algorithm}_{env_name.lower()}" for algorithm in ["ppo", "sac", "ddpg"]], + dir_names=[ + f"{algorithm}_{env_name.lower()}_{seed}" + for algorithm in ["ppo", "sac", "ddpg"] + for seed in [42, 729, 1024, 2023, 3500] + ], smooth_window_size=args.smooth, ) diff --git a/tests/rl/run.py b/tests/rl/run.py index a03947eec..a03ee5e2e 100644 --- a/tests/rl/run.py +++ b/tests/rl/run.py @@ -4,7 +4,6 @@ import argparse from maro.cli.local.commands import run -from maro.utils.utils import set_seeds def get_args() -> argparse.Namespace: @@ -17,6 +16,4 @@ def get_args() -> argparse.Namespace: if __name__ == "__main__": args = get_args() - if args.seed is not None: - set_seeds(seed=args.seed) - run(conf_path=args.conf_path, containerize=False, evaluate_only=args.evaluate_only) + run(conf_path=args.conf_path, containerize=False, seed=args.seed, evaluate_only=args.evaluate_only) diff --git a/tests/rl/tasks/ddpg/__init__.py b/tests/rl/tasks/ddpg/__init__.py index 3e3814f92..861904a43 100644 --- a/tests/rl/tasks/ddpg/__init__.py +++ b/tests/rl/tasks/ddpg/__init__.py @@ -2,6 +2,7 @@ # Licensed under the MIT license. import torch +from gym import spaces from torch.optim import Adam from maro.rl.model import QNet @@ -10,12 +11,14 @@ from maro.rl.policy import ContinuousRLPolicy from maro.rl.rl_component.rl_component_bundle import RLComponentBundle from maro.rl.training.algorithms import DDPGParams, DDPGTrainer +from maro.rl.utils import ndarray_to_tensor from tests.rl.gym_wrapper.common import ( action_limit, action_lower_bound, action_upper_bound, gym_action_dim, + gym_action_space, gym_state_dim, learn_env, num_agents, @@ -37,7 +40,14 @@ class MyContinuousDDPGNet(ContinuousDDPGNet): - def __init__(self, state_dim: int, action_dim: int, action_limit: float, noise_scale: float = 0.1) -> None: + def __init__( + self, + state_dim: int, + action_dim: int, + action_limit: float, + action_space: spaces.Space, + noise_scale: float = 0.1, + ) -> None: super(MyContinuousDDPGNet, self).__init__(state_dim=state_dim, action_dim=action_dim) self._net = FullyConnected( @@ -50,6 +60,7 @@ def __init__(self, state_dim: int, action_dim: int, action_limit: float, noise_s self._optim = Adam(self._net.parameters(), lr=critic_learning_rate) self._action_limit = action_limit self._noise_scale = noise_scale + self._action_space = action_space def _get_actions_impl(self, states: torch.Tensor, exploring: bool) -> torch.Tensor: action = self._net(states) * self._action_limit @@ -59,6 +70,11 @@ def _get_actions_impl(self, states: torch.Tensor, exploring: bool) -> torch.Tens action = torch.clamp(action, -self._action_limit, self._action_limit) return action + def _get_random_actions_impl(self, states: torch.Tensor) -> torch.Tensor: + return torch.stack( + [ndarray_to_tensor(self._action_space.sample(), device=self._device) for _ in range(states.shape[0])], + ) + class MyQCriticNet(QNet): def __init__(self, state_dim: int, action_dim: int) -> None: @@ -86,7 +102,8 @@ def get_ddpg_policy( return ContinuousRLPolicy( name=name, action_range=(action_lower_bound, action_upper_bound), - policy_net=MyContinuousDDPGNet(gym_state_dim, gym_action_dim, action_limit), + policy_net=MyContinuousDDPGNet(gym_state_dim, gym_action_dim, action_limit, gym_action_space), + warmup=10000, )