diff --git a/examples/cim/dqn/components/config.py b/examples/cim/dqn/components/config.py index c53868b6e..e8712e0cf 100644 --- a/examples/cim/dqn/components/config.py +++ b/examples/cim/dqn/components/config.py @@ -2,7 +2,7 @@ # Licensed under the MIT license. """ -This file is used to load config and convert it into a dotted dictionary. +This file is used to load the configuration and convert it into a dotted dictionary. """ import io diff --git a/examples/cim/dqn/single_process_launcher.py b/examples/cim/dqn/single_process_launcher.py index aca2cadfd..0dae3cca8 100644 --- a/examples/cim/dqn/single_process_launcher.py +++ b/examples/cim/dqn/single_process_launcher.py @@ -38,7 +38,6 @@ def launch(config): # Step 4: Create an actor and a learner to start the training process. scheduler = TwoPhaseLinearParameterScheduler(config.main_loop.max_episode, **config.main_loop.exploration) - actor = SimpleActor(env, agent_manager) learner = SimpleLearner( agent_manager, actor, scheduler, diff --git a/examples/cim/policy_optimization/README.md b/examples/cim/policy_optimization/README.md new file mode 100644 index 000000000..816ed5052 --- /dev/null +++ b/examples/cim/policy_optimization/README.md @@ -0,0 +1,22 @@ +# Overview + +The CIM problem is one of the quintessential use cases of MARO. The example can +be run with a set of scenario configurations that can be found under +maro/simulator/scenarios/cim. General experimental parameters (e.g., type of +topology, type of algorithm to use, number of training episodes) can be configured +through config.yml. Each RL formulation has a dedicated folder, e.g., dqn, and +all algorithm-specific parameters can be configured through +the config.py file in that folder. + +## Single-host Single-process Mode + +To run the CIM example using the DQN algorithm under single-host mode, go to +examples/cim/dqn and run single_process_launcher.py. You may play around with +the configuration if you want to try out different settings. + +## Distributed Mode + +The examples/cim/dqn/components folder contains dist_learner.py and dist_actor.py +for distributed training. For debugging purposes, we provide a script that +simulates distributed mode using multi-processing. Simply go to examples/cim/dqn +and run multi_process_launcher.py to start the learner and actor processes. diff --git a/examples/cim/policy_optimization/components/__init__.py b/examples/cim/policy_optimization/components/__init__.py new file mode 100644 index 000000000..b096bd2c5 --- /dev/null +++ b/examples/cim/policy_optimization/components/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from .action_shaper import CIMActionShaper +from .agent_manager import POAgentManager, create_po_agents +from .experience_shaper import TruncatedExperienceShaper +from .state_shaper import CIMStateShaper + +__all__ = [ + "CIMActionShaper", + "POAgentManager", "create_po_agents", + "TruncatedExperienceShaper", + "CIMStateShaper" +] diff --git a/examples/cim/policy_optimization/components/action_shaper.py b/examples/cim/policy_optimization/components/action_shaper.py new file mode 100644 index 000000000..687d18d88 --- /dev/null +++ b/examples/cim/policy_optimization/components/action_shaper.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from maro.rl import ActionShaper +from maro.simulator.scenarios.cim.common import Action + + +class CIMActionShaper(ActionShaper): + def __init__(self, action_space): + super().__init__() + self._action_space = action_space + self._zero_action_index = action_space.index(0) + + def __call__(self, model_action, decision_event, snapshot_list): + scope = decision_event.action_scope + tick = decision_event.tick + port_idx = decision_event.port_idx + vessel_idx = decision_event.vessel_idx + + port_empty = snapshot_list["ports"][tick: port_idx: ["empty", "full", "on_shipper", "on_consignee"]][0] + vessel_remaining_space = snapshot_list["vessels"][tick: vessel_idx: ["empty", "full", "remaining_space"]][2] + early_discharge = snapshot_list["vessels"][tick:vessel_idx: "early_discharge"][0] + assert 0 <= model_action < len(self._action_space) + + if model_action < self._zero_action_index: + actual_action = max(round(self._action_space[model_action] * port_empty), -vessel_remaining_space) + elif model_action > self._zero_action_index: + plan_action = self._action_space[model_action] * (scope.discharge + early_discharge) - early_discharge + actual_action = round(plan_action) if plan_action > 0 else round(self._action_space[model_action] * scope.discharge) + else: + actual_action = 0 + + return Action(vessel_idx, port_idx, actual_action) diff --git a/examples/cim/policy_optimization/components/agent_manager.py b/examples/cim/policy_optimization/components/agent_manager.py new file mode 100644 index 000000000..6ea9cda8e --- /dev/null +++ b/examples/cim/policy_optimization/components/agent_manager.py @@ -0,0 +1,83 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import numpy as np +import torch.nn as nn +from torch.optim import Adam, RMSprop + +from maro.rl import ( + AbsAgent, ActorCritic, ActorCriticConfig, FullyConnectedBlock, LearningModel, NNStack, + OptimizerOptions, PolicyGradient, PolicyOptimizationConfig, SimpleAgentManager +) +from maro.utils import set_seeds + + +class POAgent(AbsAgent): + def train(self, states: np.ndarray, actions: np.ndarray, log_action_prob: np.ndarray, rewards: np.ndarray): + self._algorithm.train(states, actions, log_action_prob, rewards) + + +def create_po_agents(agent_id_list, config): + input_dim, num_actions = config.input_dim, config.num_actions + set_seeds(config.seed) + agent_dict = {} + for agent_id in agent_id_list: + actor_net = NNStack( + "actor", + FullyConnectedBlock( + input_dim=input_dim, + output_dim=num_actions, + activation=nn.Tanh, + is_head=True, + **config.actor_model + ) + ) + + if config.type == "actor_critic": + critic_net = NNStack( + "critic", + FullyConnectedBlock( + input_dim=config.input_dim, + output_dim=1, + activation=nn.LeakyReLU, + is_head=True, + **config.critic_model + ) + ) + + hyper_params = config.actor_critic_hyper_parameters + hyper_params.update({"reward_discount": config.reward_discount}) + learning_model = LearningModel( + actor_net, critic_net, + optimizer_options={ + "actor": OptimizerOptions(cls=Adam, params=config.actor_optimizer), + "critic": OptimizerOptions(cls=RMSprop, params=config.critic_optimizer) + } + ) + algorithm = ActorCritic( + learning_model, ActorCriticConfig(critic_loss_func=nn.SmoothL1Loss(), **hyper_params) + ) + else: + learning_model = LearningModel( + actor_net, + optimizer_options=OptimizerOptions(cls=Adam, params=config.actor_optimizer) + ) + algorithm = PolicyGradient(learning_model, PolicyOptimizationConfig(config.reward_discount)) + + agent_dict[agent_id] = POAgent(name=agent_id, algorithm=algorithm) + + return agent_dict + + +class POAgentManager(SimpleAgentManager): + def train(self, experiences_by_agent: dict): + for agent_id, exp in experiences_by_agent.items(): + if not isinstance(exp, list): + exp = [exp] + for trajectory in exp: + self.agent_dict[agent_id].train( + trajectory["state"], + trajectory["action"], + trajectory["log_action_probability"], + trajectory["reward"] + ) diff --git a/examples/cim/policy_optimization/components/config.py b/examples/cim/policy_optimization/components/config.py new file mode 100644 index 000000000..ee93a8814 --- /dev/null +++ b/examples/cim/policy_optimization/components/config.py @@ -0,0 +1,19 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +This file is used to load the configuration and convert it into a dotted dictionary. +""" + +import io +import os +import yaml + + +CONFIG_PATH = os.path.join(os.path.split(os.path.realpath(__file__))[0], "../config.yml") +with io.open(CONFIG_PATH, "r") as in_file: + config = yaml.safe_load(in_file) + +DISTRIBUTED_CONFIG_PATH = os.path.join(os.path.split(os.path.realpath(__file__))[0], "../distributed_config.yml") +with io.open(DISTRIBUTED_CONFIG_PATH, "r") as in_file: + distributed_config = yaml.safe_load(in_file) diff --git a/examples/cim/policy_optimization/components/experience_shaper.py b/examples/cim/policy_optimization/components/experience_shaper.py new file mode 100644 index 000000000..0ce38104b --- /dev/null +++ b/examples/cim/policy_optimization/components/experience_shaper.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from collections import defaultdict + +import numpy as np + +from maro.rl import ExperienceShaper + + +class TruncatedExperienceShaper(ExperienceShaper): + def __init__(self, *, time_window: int, time_decay_factor: float, fulfillment_factor: float, + shortage_factor: float): + super().__init__(reward_func=None) + self._time_window = time_window + self._time_decay_factor = time_decay_factor + self._fulfillment_factor = fulfillment_factor + self._shortage_factor = shortage_factor + + def __call__(self, trajectory, snapshot_list): + agent_ids = np.asarray(trajectory.get_by_key("agent_id")) + states = np.asarray(trajectory.get_by_key("state")) + actions = np.asarray(trajectory.get_by_key("action")) + log_action_probabilities = np.asarray(trajectory.get_by_key("log_action_probability")) + rewards = np.fromiter( + map(self._compute_reward, trajectory.get_by_key("event"), [snapshot_list] * len(trajectory)), + dtype=np.float32 + ) + return {agent_id: { + "state": states[agent_ids == agent_id], + "action": actions[agent_ids == agent_id], + "log_action_probability": log_action_probabilities[agent_ids == agent_id], + "reward": rewards[agent_ids == agent_id], + } + for agent_id in set(agent_ids)} + + def _compute_reward(self, decision_event, snapshot_list): + start_tick = decision_event.tick + 1 + end_tick = decision_event.tick + self._time_window + ticks = list(range(start_tick, end_tick)) + + # calculate tc reward + future_fulfillment = snapshot_list["ports"][ticks::"fulfillment"] + future_shortage = snapshot_list["ports"][ticks::"shortage"] + decay_list = [self._time_decay_factor ** i for i in range(end_tick - start_tick) + for _ in range(future_fulfillment.shape[0]//(end_tick-start_tick))] + + tot_fulfillment = np.dot(future_fulfillment, decay_list) + tot_shortage = np.dot(future_shortage, decay_list) + + return np.float(self._fulfillment_factor * tot_fulfillment - self._shortage_factor * tot_shortage) diff --git a/examples/cim/policy_optimization/components/state_shaper.py b/examples/cim/policy_optimization/components/state_shaper.py new file mode 100644 index 000000000..93b1bf824 --- /dev/null +++ b/examples/cim/policy_optimization/components/state_shaper.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import numpy as np + +from maro.rl import StateShaper + +PORT_ATTRIBUTES = ["empty", "full", "on_shipper", "on_consignee", "booking", "shortage", "fulfillment"] +VESSEL_ATTRIBUTES = ["empty", "full", "remaining_space"] + + +class CIMStateShaper(StateShaper): + def __init__(self, *, look_back, max_ports_downstream): + super().__init__() + self._look_back = look_back + self._max_ports_downstream = max_ports_downstream + self._dim = (look_back + 1) * (max_ports_downstream + 1) * len(PORT_ATTRIBUTES) + len(VESSEL_ATTRIBUTES) + + def __call__(self, decision_event, snapshot_list): + tick, port_idx, vessel_idx = decision_event.tick, decision_event.port_idx, decision_event.vessel_idx + ticks = [tick - rt for rt in range(self._look_back - 1)] + future_port_idx_list = snapshot_list["vessels"][tick: vessel_idx: 'future_stop_list'].astype('int') + port_features = snapshot_list["ports"][ticks: [port_idx] + list(future_port_idx_list): PORT_ATTRIBUTES] + vessel_features = snapshot_list["vessels"][tick: vessel_idx: VESSEL_ATTRIBUTES] + state = np.concatenate((port_features, vessel_features)) + return str(port_idx), state + + @property + def dim(self): + return self._dim diff --git a/examples/cim/policy_optimization/config.yml b/examples/cim/policy_optimization/config.yml new file mode 100644 index 000000000..267633cd4 --- /dev/null +++ b/examples/cim/policy_optimization/config.yml @@ -0,0 +1,50 @@ +env: + scenario: "cim" + topology: "toy.4p_ssdd_l0.0" + durations: 1120 + state_shaping: + look_back: 7 + max_ports_downstream: 2 + experience_shaping: + time_window: 100 + fulfillment_factor: 1.0 + shortage_factor: 1.0 + time_decay_factor: 0.97 +main_loop: + max_episode: 100 + early_stopping: + warmup_ep: 20 + last_k: 5 + perf_threshold: 0.95 # minimum performance (fulfillment ratio) required to trigger early stopping + perf_stability_threshold: 0.1 # stability is measured by the maximum of abs(perf_(i+1) - perf_i) / perf_i + # over the last k episodes (where perf is short for performance). This value must + # be below this threshold to trigger early stopping +agents: + seed: 1024 # for reproducibility + type: "actor_critic" # "actor_critic" or "policy_gradient" + num_actions: 21 + actor_model: + hidden_dims: + - 256 + - 128 + - 64 + softmax_enabled: true + batch_norm_enabled: false + actor_optimizer: + lr: 0.001 + critic_model: + hidden_dims: + - 256 + - 128 + - 64 + softmax_enabled: false + batch_norm_enabled: true + critic_optimizer: + lr: 0.001 + reward_discount: .0 + actor_critic_hyper_parameters: + train_iters: 10 + actor_loss_coefficient: 0.1 + k: 1 + lam: 0.0 + # clip_ratio: 0.8 diff --git a/examples/cim/policy_optimization/dist_actor.py b/examples/cim/policy_optimization/dist_actor.py new file mode 100644 index 000000000..832142dfd --- /dev/null +++ b/examples/cim/policy_optimization/dist_actor.py @@ -0,0 +1,46 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os + +import numpy as np + +from maro.simulator import Env +from maro.rl import AgentManagerMode, SimpleActor, ActorWorker +from maro.utils import convert_dottable + +from components import CIMActionShaper, CIMStateShaper, POAgentManager, TruncatedExperienceShaper, create_po_agents + + +def launch(config): + config = convert_dottable(config) + env = Env(config.env.scenario, config.env.topology, durations=config.env.durations) + agent_id_list = [str(agent_id) for agent_id in env.agent_idx_list] + state_shaper = CIMStateShaper(**config.env.state_shaping) + action_shaper = CIMActionShaper(action_space=list(np.linspace(-1.0, 1.0, config.agents.num_actions))) + experience_shaper = TruncatedExperienceShaper(**config.env.experience_shaping) + + config["agents"]["input_dim"] = state_shaper.dim + agent_manager = POAgentManager( + name="cim_actor", + mode=AgentManagerMode.INFERENCE, + agent_dict=create_po_agents(agent_id_list, config.agents), + state_shaper=state_shaper, + action_shaper=action_shaper, + experience_shaper=experience_shaper, + ) + proxy_params = { + "group_name": os.environ["GROUP"], + "expected_peers": {"learner": 1}, + "redis_address": ("localhost", 6379) + } + actor_worker = ActorWorker( + local_actor=SimpleActor(env=env, agent_manager=agent_manager), + proxy_params=proxy_params + ) + actor_worker.launch() + + +if __name__ == "__main__": + from components.config import config + launch(config) diff --git a/examples/cim/policy_optimization/dist_learner.py b/examples/cim/policy_optimization/dist_learner.py new file mode 100644 index 000000000..1dd9ba82c --- /dev/null +++ b/examples/cim/policy_optimization/dist_learner.py @@ -0,0 +1,46 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os + +from maro.rl import ActorProxy, AgentManagerMode, Scheduler, SimpleLearner, merge_experiences_with_trajectory_boundaries +from maro.simulator import Env +from maro.utils import Logger, convert_dottable + +from components import CIMStateShaper, POAgentManager, create_po_agents + + +def launch(config): + config = convert_dottable(config) + env = Env(config.env.scenario, config.env.topology, durations=config.env.durations) + agent_id_list = [str(agent_id) for agent_id in env.agent_idx_list] + config["agents"]["input_dim"] = CIMStateShaper(**config.env.state_shaping).dim + agent_manager = POAgentManager( + name="cim_learner", + mode=AgentManagerMode.TRAIN, + agent_dict=create_po_agents(agent_id_list, config.agents) + ) + + proxy_params = { + "group_name": os.environ["GROUP"], + "expected_peers": {"actor": int(os.environ["NUM_ACTORS"])}, + "redis_address": ("localhost", 6379) + } + + learner = SimpleLearner( + agent_manager=agent_manager, + actor=ActorProxy( + proxy_params=proxy_params, experience_collecting_func=merge_experiences_with_trajectory_boundaries + ), + scheduler=Scheduler(config.main_loop.max_episode), + logger=Logger("cim_learner", auto_timestamp=False) + ) + learner.learn() + learner.test() + learner.dump_models(os.path.join(os.getcwd(), "models")) + learner.exit() + + +if __name__ == "__main__": + from components.config import config + launch(config) diff --git a/examples/cim/policy_optimization/distributed_config.yml b/examples/cim/policy_optimization/distributed_config.yml new file mode 100644 index 000000000..5b7c18b13 --- /dev/null +++ b/examples/cim/policy_optimization/distributed_config.yml @@ -0,0 +1,6 @@ +redis: + hostname: "localhost" + port: 6379 +group: test_group +num_actors: 1 +num_learners: 1 diff --git a/examples/cim/policy_optimization/multi_process_launcher.py b/examples/cim/policy_optimization/multi_process_launcher.py new file mode 100644 index 000000000..0de79a77c --- /dev/null +++ b/examples/cim/policy_optimization/multi_process_launcher.py @@ -0,0 +1,26 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +This script is used to debug distributed algorithm in single host multi-process mode. +""" + +import argparse +import os + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("group_name", help="group name") + parser.add_argument("num_actors", type=int, help="number of actors") + args = parser.parse_args() + + learner_path = f"{os.path.split(os.path.realpath(__file__))[0]}/dist_learner.py &" + actor_path = f"{os.path.split(os.path.realpath(__file__))[0]}/dist_actor.py &" + + # Launch the learner process + os.system(f"GROUP={args.group_name} NUM_ACTORS={args.num_actors} python " + learner_path) + + # Launch the actor processes + for _ in range(args.num_actors): + os.system(f"GROUP={args.group_name} python " + actor_path) diff --git a/examples/cim/policy_optimization/single_process_launcher.py b/examples/cim/policy_optimization/single_process_launcher.py new file mode 100644 index 000000000..64f90e26e --- /dev/null +++ b/examples/cim/policy_optimization/single_process_launcher.py @@ -0,0 +1,91 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os +from statistics import mean + +import numpy as np + +from maro.simulator import Env +from maro.rl import AgentManagerMode, Scheduler, SimpleActor, SimpleLearner +from maro.utils import LogFormat, Logger, convert_dottable + +from components import CIMActionShaper, CIMStateShaper, POAgentManager, TruncatedExperienceShaper, create_po_agents + + +class EarlyStoppingChecker: + """Callable class that checks the performance history to determine early stopping. + + Args: + warmup_ep (int): Episode from which early stopping checking is initiated. + last_k (int): Number of latest performance records to check for early stopping. + perf_threshold (float): The mean of the ``last_k`` performance metric values must be above this value to + trigger early stopping. + perf_stability_threshold (float): The maximum one-step change over the ``last_k`` performance metrics must be + below this value to trigger early stopping. + """ + def __init__(self, warmup_ep: int, last_k: int, perf_threshold: float, perf_stability_threshold: float): + self._warmup_ep = warmup_ep + self._last_k = last_k + self._perf_threshold = perf_threshold + self._perf_stability_threshold = perf_stability_threshold + + def get_metric(record): + return 1 - record["container_shortage"] / record["order_requirements"] + self._metric_func = get_metric + + def __call__(self, perf_history) -> bool: + if len(perf_history) < max(self._last_k, self._warmup_ep): + return False + + metric_series = list(map(self._metric_func, perf_history[-self._last_k:])) + max_delta = max( + abs(metric_series[i] - metric_series[i - 1]) / metric_series[i - 1] for i in range(1, self._last_k) + ) + print(f"mean_metric: {mean(metric_series)}, max_delta: {max_delta}") + return mean(metric_series) > self._perf_threshold and max_delta < self._perf_stability_threshold + + +def launch(config): + # First determine the input dimension and add it to the config. + config = convert_dottable(config) + + # Step 1: initialize a CIM environment for using a toy dataset. + env = Env(config.env.scenario, config.env.topology, durations=config.env.durations) + agent_id_list = [str(agent_id) for agent_id in env.agent_idx_list] + + # Step 2: create state, action and experience shapers. We also need to create an explorer here due to the + # greedy nature of the DQN algorithm. + state_shaper = CIMStateShaper(**config.env.state_shaping) + action_shaper = CIMActionShaper(action_space=list(np.linspace(-1.0, 1.0, config.agents.num_actions))) + experience_shaper = TruncatedExperienceShaper(**config.env.experience_shaping) + + # Step 3: create an agent manager. + config["agents"]["input_dim"] = state_shaper.dim + agent_manager = POAgentManager( + name="cim_learner", + mode=AgentManagerMode.TRAIN_INFERENCE, + agent_dict=create_po_agents(agent_id_list, config.agents), + state_shaper=state_shaper, + action_shaper=action_shaper, + experience_shaper=experience_shaper, + ) + + # Step 4: Create an actor and a learner to start the training process. + scheduler = Scheduler( + config.main_loop.max_episode, + early_stopping_checker=EarlyStoppingChecker(**config.main_loop.early_stopping) + ) + actor = SimpleActor(env, agent_manager) + learner = SimpleLearner( + agent_manager, actor, scheduler, + logger=Logger("cim_learner", format_=LogFormat.simple, auto_timestamp=False) + ) + learner.learn() + learner.test() + learner.dump_models(os.path.join(os.getcwd(), "models")) + + +if __name__ == "__main__": + from components.config import config + launch(config) diff --git a/maro/rl/__init__.py b/maro/rl/__init__.py index 6510dcab7..17a3aacb3 100644 --- a/maro/rl/__init__.py +++ b/maro/rl/__init__.py @@ -3,7 +3,10 @@ from maro.rl.actor import AbsActor, SimpleActor from maro.rl.agent import AbsAgent, AbsAgentManager, AgentManagerMode, SimpleAgentManager -from maro.rl.algorithms import DQN, AbsAlgorithm, DQNConfig +from maro.rl.algorithms import ( + DQN, AbsAlgorithm, ActionInfo, ActorCritic, ActorCriticConfig, DQNConfig, PolicyGradient, PolicyOptimization, + PolicyOptimizationConfig +) from maro.rl.dist_topologies import ( ActorProxy, ActorWorker, concat_experiences_by_agent, merge_experiences_with_trajectory_boundaries ) @@ -19,7 +22,8 @@ __all__ = [ "AbsActor", "SimpleActor", "AbsAgent", "AbsAgentManager", "AgentManagerMode", "SimpleAgentManager", - "AbsAlgorithm", "DQN", "DQNConfig", + "AbsAlgorithm", "ActionInfo", "ActorCritic", "ActorCriticConfig", "DQN", "DQNConfig", "PolicyGradient", + "PolicyOptimization", "PolicyOptimizationConfig", "ActorProxy", "ActorWorker", "concat_experiences_by_agent", "merge_experiences_with_trajectory_boundaries", "AbsExplorer", "EpsilonGreedyExplorer", "GaussianNoiseExplorer", "NoiseExplorer", "UniformNoiseExplorer", "AbsLearner", "SimpleLearner", diff --git a/maro/rl/agent/simple_agent_manager.py b/maro/rl/agent/simple_agent_manager.py index 1f1760469..1f07d63fd 100644 --- a/maro/rl/agent/simple_agent_manager.py +++ b/maro/rl/agent/simple_agent_manager.py @@ -4,6 +4,7 @@ import os from abc import abstractmethod +from maro.rl.algorithms.policy_optimization import ActionInfo from maro.rl.shaping.action_shaper import ActionShaper from maro.rl.shaping.experience_shaper import ExperienceShaper from maro.rl.shaping.state_shaper import StateShaper @@ -45,15 +46,20 @@ def __init__( def choose_action(self, decision_event, snapshot_list): self._assert_inference_mode() agent_id, model_state = self._state_shaper(decision_event, snapshot_list) - model_action = self.agent_dict[agent_id].choose_action(model_state) + action_info = self.agent_dict[agent_id].choose_action(model_state) self._transition_cache = { "state": model_state, - "action": model_action, "reward": None, "agent_id": agent_id, "event": decision_event } - return self._action_shaper(model_action, decision_event, snapshot_list) + if isinstance(action_info, ActionInfo): + self._transition_cache["action"] = action_info.action + self._transition_cache["log_action_probability"] = action_info.log_probability + else: + self._transition_cache["action"] = action_info + + return self._action_shaper(self._transition_cache["action"], decision_event, snapshot_list) def on_env_feedback(self, metrics): """This method records the environment-generated metrics as part of the latest transition in the trajectory. diff --git a/maro/rl/algorithms/__init__.py b/maro/rl/algorithms/__init__.py index 5631ac15b..d508eb20c 100644 --- a/maro/rl/algorithms/__init__.py +++ b/maro/rl/algorithms/__init__.py @@ -3,5 +3,13 @@ from .abs_algorithm import AbsAlgorithm from .dqn import DQN, DQNConfig +from .policy_optimization import ( + ActionInfo, ActorCritic, ActorCriticConfig, PolicyGradient, PolicyOptimization, PolicyOptimizationConfig +) -__all__ = ["AbsAlgorithm", "DQN", "DQNConfig"] +__all__ = [ + "AbsAlgorithm", + "DQN", "DQNConfig", + "ActionInfo", "ActorCritic", "ActorCriticConfig", "PolicyGradient", "PolicyOptimization", + "PolicyOptimizationConfig" +] diff --git a/maro/rl/algorithms/dqn.py b/maro/rl/algorithms/dqn.py index db46f61eb..6bbb1e8f1 100644 --- a/maro/rl/algorithms/dqn.py +++ b/maro/rl/algorithms/dqn.py @@ -87,6 +87,7 @@ def choose_action(self, state: np.ndarray) -> Union[int, np.ndarray]: if is_single: return greedy_action if np.random.random() > self._config.epsilon else np.random.choice(self._num_actions) + # batch inference return np.array([ act if np.random.random() > self._config.epsilon else np.random.choice(self._num_actions) for act in greedy_action diff --git a/maro/rl/algorithms/policy_optimization.py b/maro/rl/algorithms/policy_optimization.py new file mode 100644 index 000000000..ffab0633f --- /dev/null +++ b/maro/rl/algorithms/policy_optimization.py @@ -0,0 +1,170 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from collections import namedtuple +from typing import Callable, List, Union + +import numpy as np +import torch + +from maro.rl.algorithms.abs_algorithm import AbsAlgorithm +from maro.rl.models.learning_model import LearningModel +from maro.rl.utils.trajectory_utils import get_lambda_returns, get_truncated_cumulative_reward + +ActionInfo = namedtuple("ActionInfo", ["action", "log_probability"]) + + +class PolicyOptimizationConfig: + """Configuration for the policy optimization algorithm family.""" + __slots__ = ["reward_discount"] + + def __init__(self, reward_discount): + self.reward_discount = reward_discount + + +class PolicyOptimization(AbsAlgorithm): + """Policy optimization algorithm family. + + The algorithm family includes policy gradient (e.g. REINFORCE), actor-critic, PPO, etc. + """ + def choose_action(self, state: np.ndarray) -> Union[ActionInfo, List[ActionInfo]]: + """Use the actor (policy) model to generate stochastic actions. + + Args: + state: Input to the actor model. + + Returns: + A single ActionInfo namedtuple or a list of ActionInfo namedtuples. + """ + state = torch.from_numpy(state).to(self._device) + is_single = len(state.shape) == 1 + if is_single: + state = state.unsqueeze(dim=0) + + action_distribution = self._model(state, task_name="actor", is_training=False).squeeze().numpy() + if is_single: + action = np.random.choice(len(action_distribution), p=action_distribution) + return ActionInfo(action=action, log_probability=np.log(action_distribution[action])) + + # batch inference + batch_results = [] + for distribution in action_distribution: + action = np.random.choice(len(distribution), p=distribution) + batch_results.append(ActionInfo(action=action, log_probability=np.log(distribution[action]))) + + return batch_results + + def train( + self, states: np.ndarray, actions: np.ndarray, log_action_prob: np.ndarray, rewards: np.ndarray + ): + raise NotImplementedError + + +class PolicyGradient(PolicyOptimization): + """The vanilla Policy Gradient (VPG) algorithm, a.k.a., REINFORCE. + + Reference: https://github.com/openai/spinningup/tree/master/spinup/algos/pytorch. + """ + def train( + self, states: np.ndarray, actions: np.ndarray, log_action_prob: np.ndarray, rewards: np.ndarray + ): + states = torch.from_numpy(states).to(self._device) + actions = torch.from_numpy(actions).to(self._device) + returns = get_truncated_cumulative_reward(rewards, self._config.reward_discount) + returns = torch.from_numpy(returns).to(self._device) + action_distributions = self._model(states) + action_prob = action_distributions.gather(1, actions.unsqueeze(1)).squeeze() # (N, 1) + loss = -(torch.log(action_prob) * returns).mean() + self._model.learn(loss) + + +class ActorCriticConfig(PolicyOptimizationConfig): + """Configuration for the Actor-Critic algorithm. + + Args: + reward_discount (float): Reward decay as defined in standard RL terminology. + critic_loss_func (Callable): Loss function for the critic model. + train_iters (int): Number of gradient descent steps per call to ``train``. + actor_loss_coefficient (float): The coefficient for actor loss in the total loss function, e.g., + loss = critic_loss + ``actor_loss_coefficient`` * actor_loss. Defaults to 1.0. + k (int): Number of time steps used in computing returns or return estimates. Defaults to -1, in which case + rewards are accumulated until the end of the trajectory. + lam (float): Lambda coefficient used in computing lambda returns. Defaults to 1.0, in which case the usual + k-step return is computed. + clip_ratio (float): Clip ratio in the PPO algorithm (https://arxiv.org/pdf/1707.06347.pdf). Defaults to None, + in which case the actor loss is calculated using the usual policy gradient theorem. + """ + __slots__ = [ + "reward_discount", "critic_loss_func", "train_iters", "actor_loss_coefficient", "k", "lam", "clip_ratio" + ] + + def __init__( + self, + reward_discount: float, + critic_loss_func: Callable, + train_iters: int, + actor_loss_coefficient: float = 1.0, + k: int = -1, + lam: float = 1.0, + clip_ratio: float = None + ): + super().__init__(reward_discount) + self.critic_loss_func = critic_loss_func + self.train_iters = train_iters + self.actor_loss_coefficient = actor_loss_coefficient + self.k = k + self.lam = lam + self.clip_ratio = clip_ratio + + +class ActorCritic(PolicyOptimization): + """Actor Critic algorithm with separate policy and value models. + + References: + https://github.com/openai/spinningup/tree/master/spinup/algos/pytorch. + https://towardsdatascience.com/understanding-actor-critic-methods-931b97b6df3f + + Args: + model (LearningModel): Multi-task model that computes action distributions and state values. + It may or may not have a shared bottom stack. + config: Configuration for the AC algorithm. + """ + def __init__(self, model: LearningModel, config: ActorCriticConfig): + self.validate_task_names(model.task_names, {"actor", "critic"}) + super().__init__(model, config) + + def _get_values_and_bootstrapped_returns(self, state_sequence, reward_sequence): + state_values = self._model(state_sequence, task_name="critic").detach().squeeze() + return_est = get_lambda_returns( + reward_sequence, state_values, self._config.reward_discount, self._config.lam, k=self._config.k + ) + return state_values, return_est + + def train( + self, states: np.ndarray, actions: np.ndarray, log_action_prob: np.ndarray, rewards: np.ndarray + ): + states = torch.from_numpy(states).to(self._device) + actions = torch.from_numpy(actions).to(self._device) + log_action_prob = torch.from_numpy(log_action_prob).to(self._device) + rewards = torch.from_numpy(rewards).to(self._device) + state_values, return_est = self._get_values_and_bootstrapped_returns(states, rewards) + advantages = return_est - state_values + for _ in range(self._config.train_iters): + critic_loss = self._config.critic_loss_func( + self._model(states, task_name="critic").squeeze(), return_est + ) + action_prob = self._model(states, task_name="actor").gather(1, actions.unsqueeze(1)).squeeze() # (N,) + log_action_prob_new = torch.log(action_prob) + actor_loss = self._actor_loss(log_action_prob_new, log_action_prob, advantages) + loss = critic_loss + self._config.actor_loss_coefficient * actor_loss + self._model.learn(loss) + + def _actor_loss(self, log_action_prob_new, log_action_prob_old, advantages): + if self._config.clip_ratio is not None: + ratio = torch.exp(log_action_prob_new - log_action_prob_old) + clip_ratio = torch.clamp(ratio, 1 - self._config.clip_ratio, 1 + self._config.clip_ratio) + actor_loss = -(torch.min(ratio * advantages, clip_ratio * advantages)).mean() + else: + actor_loss = -(log_action_prob_new * advantages).mean() + + return actor_loss diff --git a/maro/rl/storage/column_based_store.py b/maro/rl/storage/column_based_store.py index ed4c29491..20d29c12a 100644 --- a/maro/rl/storage/column_based_store.py +++ b/maro/rl/storage/column_based_store.py @@ -192,7 +192,7 @@ def sample(self, size, weights: Union[list, np.ndarray] = None, replace: bool = """ if weights is not None: weights = np.asarray(weights) - weights /= np.sum(weights) + weights = weights / np.sum(weights) indexes = np.random.choice(self._size, size=size, replace=replace, p=weights) return indexes, self.get(indexes) diff --git a/maro/rl/utils/trajectory_utils.py b/maro/rl/utils/trajectory_utils.py new file mode 100644 index 000000000..97936b1bd --- /dev/null +++ b/maro/rl/utils/trajectory_utils.py @@ -0,0 +1,103 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from functools import reduce +from typing import Union + +import numpy as np +import torch +import torch.nn.functional as F + + +def get_truncated_cumulative_reward( + rewards: Union[list, np.ndarray, torch.tensor], + discount: float, + k: int = -1 +): + """Compute K-step cumulative rewards from a reward sequence. + Args: + rewards (Union[list, np.ndarray, torch.tensor]): Reward sequence from a trajectory. + discount (float): Reward discount as in standard RL. + k (int): Number of steps in computing cumulative rewards. If it is -1, returns are computed using the + largest possible number of steps. Defaults to -1. + + Returns: + An ndarray or torch.tensor instance containing the k-step cumulative rewards for each time step. + """ + if k < 0: + k = len(rewards) - 1 + pad = np.pad if isinstance(rewards, list) or isinstance(rewards, np.ndarray) else F.pad + return reduce( + lambda x, y: x * discount + y, + [pad(rewards[i:], (0, i)) for i in range(min(k, len(rewards)) - 1, -1, -1)] + ) + + +def get_k_step_returns( + rewards: Union[list, np.ndarray, torch.tensor], + values: Union[list, np.ndarray, torch.tensor], + discount: float, + k: int = -1 +): + """Compute K-step returns given reward and value sequences. + Args: + rewards (Union[list, np.ndarray, torch.tensor]): Reward sequence from a trajectory. + values (Union[list, np.ndarray, torch.tensor]): Sequence of values for the traversed states in a trajectory. + discount (float): Reward discount as in standard RL. + k (int): Number of steps in computing returns. If it is -1, returns are computed using the largest possible + number of steps. Defaults to -1. + + Returns: + An ndarray or torch.tensor instance containing the k-step returns for each time step. + """ + assert len(rewards) == len(values), "rewards and values should have the same length" + assert len(values.shape) == 1, "values should be a one-dimensional array" + rewards[-1] = values[-1] + if k < 0: + k = len(rewards) - 1 + pad = np.pad if isinstance(rewards, list) or isinstance(rewards, np.ndarray) else F.pad + return reduce( + lambda x, y: x * discount + y, + [pad(rewards[i:], (0, i)) for i in range(min(k, len(rewards)) - 1, -1, -1)], + pad(values[k:], (0, k)) + ) + + +def get_lambda_returns( + rewards: Union[list, np.ndarray, torch.tensor], + values: Union[list, np.ndarray, torch.tensor], + discount: float, + lam: float, + k: int = -1 +): + """Compute lambda returns given reward and value sequences and a k. + Args: + rewards (Union[list, np.ndarray, torch.tensor]): Reward sequence from a trajectory. + values (Union[list, np.ndarray, torch.tensor]): Sequence of values for the traversed states in a trajectory. + discount (float): Reward discount as in standard RL. + lam (float): Lambda coefficient involved in computing lambda returns. + k (int): Number of steps where the lambda return series is truncated. If it is -1, no truncating is done and + the lambda return is carried out to the end of the sequence. Defaults to -1. + + Returns: + An ndarray or torch.tensor instance containing the lambda returns for each time step. + """ + if k < 0: + k = len(rewards) - 1 + + # If lambda is zero, lambda return reduces to one-step return + if lam == .0: + return get_k_step_returns(rewards, values, discount, k=1) + + # If lambda is one, lambda return reduces to k-step return + if lam == 1.0: + return get_k_step_returns(rewards, values, discount, k=k) + + k = min(k, len(rewards) - 1) + pre_truncate = reduce( + lambda x, y: x * lam + y, + [get_k_step_returns(rewards, values, discount, k=k) for k in range(k - 1, 0, -1)] + ) + + post_truncate = get_k_step_returns(rewards, values, discount, k=k) * lam**(k - 1) + return (1 - lam) * pre_truncate + post_truncate diff --git a/tests/test_trajectory_utils.py b/tests/test_trajectory_utils.py new file mode 100644 index 000000000..df05d8032 --- /dev/null +++ b/tests/test_trajectory_utils.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import unittest + +import numpy as np + +from maro.rl.utils.trajectory_utils import get_k_step_returns, get_lambda_returns + + +class TestTrajectoryUtils(unittest.TestCase): + def setUp(self) -> None: + self.rewards = np.asarray([3, 2, 4, 1, 5]) + self.values = np.asarray([4, 7, 1, 3, 6]) + self.lam = 0.6 + self.discount = 0.8 + self.k = 4 + + def test_k_step_return(self): + returns = get_k_step_returns(self.rewards, self.values, self.discount, k=self.k) + expected = np.asarray([10.1296, 8.912, 8.64, 5.8, 6.0]) + np.testing.assert_allclose(returns, expected, rtol=1e-4) + + def test_lambda_return(self): + returns = get_lambda_returns(self.rewards, self.values, self.discount, self.lam, k=self.k) + expected = np.asarray([8.1378176, 6.03712, 7.744, 5.8, 6.0]) + np.testing.assert_allclose(returns, expected, rtol=1e-4) + + +if __name__ == "__main__": + unittest.main()