-
Notifications
You must be signed in to change notification settings - Fork 157
added example docs #136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
added example docs #136
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
331fb1a
added example docs
d9225d9
added citibike greedy example doc
19e34da
modified citibike doc
2c529fc
Merge remote-tracking branch 'origin/v0.1' into v0.1_example_doc
f845965
fixed PR comments
ed48f3d
fixed more PR comments
ad0ff50
fixed small formatting issue
91294c6
Merge remote-tracking branch 'origin/v0.1' into v0.1_example_doc
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,255 @@ | ||
| Example Scenario: Container Inventory Management | ||
| ================================================ | ||
|
|
||
| This example demonstrates how to use MARO's reinforcement learning (RL) toolkit to solve the | ||
| `container inventory management <https://maro.readthedocs.io/en/latest/scenarios/container_inventory_management.html>`_ | ||
| (CIM) problem. It is formalized as a multi-agent reinforcement learning problem, where each port acts as a decision | ||
| agent. The agents take actions independently, e.g., loading containers to vessels or discharging containers from vessels. | ||
|
|
||
| State Shaper | ||
| ------------ | ||
|
|
||
| `State shaper <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#shapers>`_ converts the environment | ||
| observation to the model input state which includes temporal and spatial information. For this scenario, the model input | ||
| state includes: | ||
|
|
||
| - Temporal information, including the past week's information of ports and vessels, such as shortage on port and | ||
| remaining space on vessel. | ||
| - Spatial information, including related downstream port features. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| class CIMStateShaper(StateShaper): | ||
| ... | ||
| def __call__(self, decision_event, snapshot_list): | ||
| tick, port_idx, vessel_idx = decision_event.tick, decision_event.port_idx, decision_event.vessel_idx | ||
| ticks = [tick - rt for rt in range(self._look_back - 1)] | ||
| future_port_idx_list = snapshot_list["vessels"][tick : vessel_idx : 'future_stop_list'].astype('int') | ||
| port_features = snapshot_list["ports"][ticks : [port_idx] + list(future_port_idx_list) : self._port_attributes] | ||
| vessel_features = snapshot_list["vessels"][tick : vessel_idx : self._vessel_attributes] | ||
| state = np.concatenate((port_features, vessel_features)) | ||
|
ysqyang marked this conversation as resolved.
|
||
| return str(port_idx), state | ||
|
|
||
|
|
||
| Action Shaper | ||
| ------------- | ||
|
|
||
| `Action shaper <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#shapers>`_ is used to convert an | ||
| agent's model output to an environment executable action. For this specific scenario, the action space consists of | ||
| integers from -10 to 10, with -10 indicating loading 100% of the containers in the current inventory to the vessel and | ||
| 10 indicating discharging 100% of the containers on the vessel to the port. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| class CIMActionShaper(ActionShaper): | ||
| ... | ||
| def __call__(self, model_action, decision_event, snapshot_list): | ||
| scope = decision_event.action_scope | ||
| tick = decision_event.tick | ||
| port_idx = decision_event.port_idx | ||
| vessel_idx = decision_event.vessel_idx | ||
|
|
||
| port_empty = snapshot_list["ports"][tick: port_idx: ["empty", "full", "on_shipper", "on_consignee"]][0] | ||
| vessel_remaining_space = snapshot_list["vessels"][tick: vessel_idx: ["empty", "full", "remaining_space"]][2] | ||
| early_discharge = snapshot_list["vessels"][tick:vessel_idx: "early_discharge"][0] | ||
| assert 0 <= model_action < len(self._action_space) | ||
|
|
||
| if model_action < self._zero_action_index: | ||
| actual_action = max(round(self._action_space[model_action] * port_empty), -vessel_remaining_space) | ||
| elif model_action > self._zero_action_index: | ||
| plan_action = self._action_space[model_action] * (scope.discharge + early_discharge) - early_discharge | ||
| actual_action = round(plan_action) if plan_action > 0 else round(self._action_space[model_action] * scope.discharge) | ||
| else: | ||
| actual_action = 0 | ||
|
|
||
| return Action(vessel_idx, port_idx, actual_action) | ||
|
|
||
| Experience Shaper | ||
| ----------------- | ||
|
|
||
| `Experience shaper <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#shapers>`_ is used to convert | ||
| an episode trajectory to trainable experiences for RL agents. For this specific scenario, the reward is a linear | ||
| combination of fulfillment and shortage in a limited time window. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| class TruncatedExperienceShaper(ExperienceShaper): | ||
| ... | ||
| def __call__(self, trajectory, snapshot_list): | ||
| experiences_by_agent = {} | ||
| for i in range(len(trajectory) - 1): | ||
| transition = trajectory[i] | ||
| agent_id = transition["agent_id"] | ||
| if agent_id not in experiences_by_agent: | ||
| experiences_by_agent[agent_id] = defaultdict(list) | ||
| experiences = experiences_by_agent[agent_id] | ||
| experiences["state"].append(transition["state"]) | ||
| experiences["action"].append(transition["action"]) | ||
| experiences["reward"].append(self._compute_reward(transition["event"], snapshot_list)) | ||
| experiences["next_state"].append(trajectory[i + 1]["state"]) | ||
|
|
||
| return experiences_by_agent | ||
|
|
||
| def _compute_reward(self, decision_event, snapshot_list): | ||
| start_tick = decision_event.tick + 1 | ||
| end_tick = decision_event.tick + self._time_window | ||
| ticks = list(range(start_tick, end_tick)) | ||
|
|
||
| # calculate tc reward | ||
| future_fulfillment = snapshot_list["ports"][ticks::"fulfillment"] | ||
| future_shortage = snapshot_list["ports"][ticks::"shortage"] | ||
| decay_list = [self._time_decay_factor ** i for i in range(end_tick - start_tick) | ||
| for _ in range(future_fulfillment.shape[0] // (end_tick - start_tick))] | ||
|
|
||
|
ysqyang marked this conversation as resolved.
|
||
| tot_fulfillment = np.dot(future_fulfillment, decay_list) | ||
| tot_shortage = np.dot(future_shortage, decay_list) | ||
|
|
||
| return np.float(self._fulfillment_factor * tot_fulfillment - self._shortage_factor * tot_shortage) | ||
|
|
||
| Agent | ||
| ----- | ||
|
|
||
| `Agent <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#agent>`_ is a combination of (RL) | ||
| algorithm, experience pool, and a set of parameters that governs the training loop. For this scenario, the agent is the | ||
| abstraction of a port. We choose DQN as our underlying learning algorithm with a TD-error-based sampling mechanism. | ||
|
|
||
| .. code-block:: python | ||
| class CIMAgent(AbsAgent): | ||
| ... | ||
| def train(self): | ||
| if len(self._experience_pool) < self._min_experiences_to_train: | ||
| return | ||
|
|
||
| for _ in range(self._num_batches): | ||
| indexes, sample = self._experience_pool.sample_by_key("loss", self._batch_size) | ||
| state = np.asarray(sample["state"]) | ||
| action = np.asarray(sample["action"]) | ||
| reward = np.asarray(sample["reward"]) | ||
| next_state = np.asarray(sample["next_state"]) | ||
| loss = self._algorithm.train(state, action, reward, next_state) | ||
| self._experience_pool.update(indexes, {"loss": loss}) | ||
|
|
||
| Agent Manager | ||
| ------------- | ||
|
|
||
| `Agent manager <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#agent-manager>`_ | ||
| is an agent assembler and isolates the complexities of the environment and algorithm. For this scenario, It will load | ||
| the DQN algorithm and an experience pool for each agent. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| class DQNAgentManager(AbsAgentManager): | ||
| def _assemble(self, agent_dict): | ||
| set_seeds(config.agents.seed) | ||
| num_actions = config.agents.algorithm.num_actions | ||
| for agent_id in self._agent_id_list: | ||
| eval_model = LearningModel(decision_layers=MLPDecisionLayers(name=f'{agent_id}.policy', | ||
| input_dim=self._state_shaper.dim, | ||
| output_dim=num_actions, | ||
| **config.agents.algorithm.model) | ||
| ) | ||
|
|
||
| algorithm = DQN(model_dict={"eval": eval_model}, | ||
| optimizer_opt=(RMSprop, config.agents.algorithm.optimizer), | ||
| loss_func_dict={"eval": smooth_l1_loss}, | ||
| hyper_params=DQNHyperParams(**config.agents.algorithm.hyper_parameters, | ||
| num_actions=num_actions)) | ||
|
|
||
| experience_pool = ColumnBasedStore(**config.agents.experience_pool) | ||
| agent_dict[agent_id] = CIMAgent(name=agent_id, algorithm=algorithm, experience_pool=experience_pool, | ||
| **config.agents.training_loop_parameters) | ||
|
|
||
| Main Loop with Actor and Learner (Single Process) | ||
| ------------------------------------------------- | ||
|
|
||
| This single-process workflow of a learning policy's interaction with a MARO environment is comprised of: | ||
|
ysqyang marked this conversation as resolved.
|
||
| - Initializing an environment with specific scenario and topology parameters. | ||
| - Defining scenario-specific components, e.g. shapers. | ||
| - Creating an agent manager, which assembles underlying agents. | ||
| - Creating an `actor <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#learner-and-actor>`_ and a | ||
| `learner <https://maro.readthedocs.io/en/latest/key_components/rl_toolkit.html#learner-and-actor>`_ to start the | ||
| training process in which the agent manager interacts with the environment for collecting experiences and updating | ||
| policies. | ||
|
|
||
| .. code-block::python | ||
|
|
||
| env = Env(config.env.scenario, config.env.topology, durations=config.env.durations) | ||
| agent_id_list = [str(agent_id) for agent_id in env.agent_idx_list] | ||
| state_shaper = CIMStateShaper(**config.state_shaping) | ||
| action_shaper = CIMActionShaper(action_space=list(np.linspace(-1.0, 1.0, config.agents.algorithm.num_actions))) | ||
| experience_shaper = TruncatedExperienceShaper(**config.experience_shaping.truncated) | ||
| exploration_config = {"epsilon_range_dict": {"_all_": config.exploration.epsilon_range}, | ||
| "split_point_dict": {"_all_": config.exploration.split_point}, | ||
| "with_cache": config.exploration.with_cache | ||
| } | ||
| explorer = TwoPhaseLinearExplorer(agent_id_list, config.general.total_training_episodes, **exploration_config) | ||
|
|
||
| agent_manager = DQNAgentManager(name="cim_learner", | ||
| mode=AgentMode.TRAIN_INFERENCE, | ||
| agent_id_list=agent_id_list, | ||
| state_shaper=state_shaper, | ||
| action_shaper=action_shaper, | ||
| experience_shaper=experience_shaper, | ||
| explorer=explorer) | ||
|
|
||
| actor = SimpleActor(env=env, inference_agents=agent_manager) | ||
| learner = SimpleLearner(trainable_agents=agent_manager, actor=actor, | ||
| logger=Logger("single_host_cim_learner", auto_timestamp=False)) | ||
|
|
||
| learner.train(total_episodes=config.general.total_training_episodes) | ||
|
|
||
|
|
||
| Main Loop with Actor and Learner (Distributed / Multi-process) | ||
| -------------------------------------------------------------- | ||
|
|
||
| We demonstrate a single-learner and multi-actor topology where the learner drives the program by telling remote actors | ||
| to perform roll-out tasks and using the results they sent back to improve the policies. The workflow usually involves | ||
| launching a learner process and an actor process separately. Because training occurs on the learner side and inference | ||
| occurs on the actor side, we need to create appropriate agent managers on both sides. | ||
|
|
||
| On the actor side, the agent manager must be equipped with all shapers as well as an explorer. Thus, The code for | ||
| creating an environment and an agent manager on the actor side is similar to that for the single-host version, | ||
| except that it is necessary to set the AgentMode to AgentMode.INFERENCE. As in the single-process version, the environment | ||
| and the agent manager are wrapped in a SimpleActor instance. To make the actor a distributed worker, we need to further | ||
| wrap it in an ActorWorker instance. Finally, we launch the worker and it starts to listen to roll-out requests from the | ||
| learner. The following code snippet shows the creation of an actor worker with a simple (local) actor wrapped inside. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| agent_manager = DQNAgentManager(name="cim_remote_actor", | ||
| agent_id_list=agent_id_list, | ||
| mode=AgentMode.INFERENCE, | ||
| state_shaper=state_shaper, | ||
| action_shaper=action_shaper, | ||
| experience_shaper=experience_shaper, | ||
| explorer=explorer) | ||
| proxy_params = {"group_name": config.distributed.group_name, | ||
| "expected_peers": config.distributed.actor.peer, | ||
| "redis_address": (config.distributed.redis.host_name, config.distributed.redis.port) | ||
| } | ||
| actor_worker = ActorWorker(local_actor=SimpleActor(env=env, inference_agents=agent_manager), | ||
| proxy_params=proxy_params) | ||
| actor_worker.launch() | ||
|
|
||
| On the learner side, an agent manager in AgentMode.TRAIN mode is required. However, it is not necessary to create shapers for an | ||
| agent manager in AgentMode.TRAIN mode (although a state shaper is created in this example so that the model input dimension can | ||
| be readily accessed). Instead of creating an actor, we create an actor proxy and wrap it inside the learner. This proxy | ||
| serves as the communication interface for the learner and is responsible for sending roll-out requests to remote actor | ||
| processes and receiving results. Calling the train method executes the usual training loop except that the actual | ||
| roll-out is performed remotely. The code snippet below shows the creation of a learner with an actor proxy wrapped | ||
| inside. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| agent_manager = DQNAgentManager(name="cim_remote_learner", agent_id_list=agent_id_list, mode=AgentMode.TRAIN, | ||
| state_shaper=state_shaper, explorer=explorer) | ||
|
|
||
| proxy_params = {"group_name": config.distributed.group_name, | ||
| "expected_peers": config.distributed.learner.peer, | ||
| "redis_address": (config.distributed.redis.host_name, config.distributed.redis.port) | ||
| } | ||
| learner = SimpleLearner(trainable_agents=agent_manager, | ||
| actor=ActorProxy(proxy_params=proxy_params), | ||
| logger=Logger("distributed_cim_learner", auto_timestamp=False)) | ||
| learner.train(total_episodes=config.general.total_training_episodes) | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| Example Scenario: Bike Repositioning (Citi Bike) | ||
| ================================================ | ||
|
|
||
| In this example we demonstrate using a simple greedy policy for `Citi Bike <https://maro.readthedocs.io/en/latest/scenarios/citi_bike.html>`_, | ||
| a real-world bike repositioning scenario. | ||
|
|
||
| Greedy Policy | ||
| ------------- | ||
|
|
||
| Our greedy policy is simple: if the event type is supply, the policy will make | ||
| the current station send as many bikes as possible to one of k stations with the most empty docks. If the event type is | ||
| demand, the policy will make the current station request as many bikes as possible from one of k stations with the most | ||
| bikes. We use a heap data structure to find the top k supply/demand candidates from the action scope associated with | ||
| each decision event. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| class GreedyPolicy: | ||
| ... | ||
| def choose_action(self, decision_event: DecisionEvent): | ||
| if decision_event.type == DecisionType.Supply: | ||
| """ | ||
| Find k target stations with the most empty slots, randomly choose one of them and send as many bikes to | ||
| it as allowed by the action scope | ||
| """ | ||
| top_k_demands = [] | ||
| for demand_candidate, available_docks in decision_event.action_scope.items(): | ||
| if demand_candidate == decision_event.station_idx: | ||
| continue | ||
|
|
||
| heapq.heappush(top_k_demands, (available_docks, demand_candidate)) | ||
| if len(top_k_demands) > self._demand_top_k: | ||
| heapq.heappop(top_k_demands) | ||
|
|
||
| max_reposition, target_station_idx = random.choice(top_k_demands) | ||
| action = Action(decision_event.station_idx, target_station_idx, max_reposition) | ||
| else: | ||
| """ | ||
| Find k source stations with the most bikes, randomly choose one of them and request as many bikes from | ||
| it as allowed by the action scope. | ||
| """ | ||
| top_k_supplies = [] | ||
| for supply_candidate, available_bikes in decision_event.action_scope.items(): | ||
| if supply_candidate == decision_event.station_idx: | ||
| continue | ||
|
|
||
| heapq.heappush(top_k_supplies, (available_bikes, supply_candidate)) | ||
| if len(top_k_supplies) > self._supply_top_k: | ||
| heapq.heappop(top_k_supplies) | ||
|
|
||
| max_reposition, source_idx = random.choice(top_k_supplies) | ||
| action = Action(source_idx, decision_event.station_idx, max_reposition) | ||
|
|
||
| return action | ||
|
|
||
|
|
||
| Interaction with the Greedy Policy | ||
| ---------------------------------- | ||
|
|
||
| This environment is driven by `real trip history data <https://s3.amazonaws.com/tripdata/index.html>`_ from Citi Bike. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| env = Env(scenario=config.env.scenario, topology=config.env.topology, start_tick=config.env.start_tick, | ||
| durations=config.env.durations, snapshot_resolution=config.env.resolution) | ||
|
|
||
| if config.env.seed is not None: | ||
| env.set_seed(config.env.seed) | ||
|
|
||
| policy = GreedyPolicy(config.agent.supply_top_k, config.agent.demand_top_k) | ||
| metrics, decision_event, done = env.step(None) | ||
| while not done: | ||
| metrics, decision_event, done = env.step(policy.choose_action(decision_event)) | ||
|
|
||
| env.reset() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.