Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions examples/vector_env/hello.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from maro.simulator.scenarios.cim.common import Action, DecisionEvent
Comment thread
chaosddp marked this conversation as resolved.
from maro.vector_env import VectorEnv


if __name__ == "__main__":
with VectorEnv(batch_num=4, scenario="cim", topology="toy.5p_ssddd_l0.0", durations=100) as env:
for ep in range(2):
print("current episode:", ep)

metrics, decision_event, is_done = (None, None, False)

while not is_done:
action = None

# Usage:
# 1. Only push speicified (1st for this example) environment, leave others behind
# if decision_event:
# env0_dec: DecisionEvent = decision_event[0]

# # 1.1 After 1st environment is done, then others will push forward.
# if env0_dec:
# ss0 = env.snapshot_list["vessels"][env0_dec.tick:env0_dec.vessel_idx:"remaining_space"]
# action = {0: Action(env0_dec.vessel_idx, env0_dec.port_idx, -env0_dec.action_scope.load)}
Comment thread
Jinyu-W marked this conversation as resolved.

# 2. Only pass action to 1st environment (give None to other environments),
# but keep pushing all the environment, until the end
if decision_event:
env0_dec: DecisionEvent = decision_event[0]

if env0_dec:
ss0 = env.snapshot_list["vessels"][env0_dec.tick:env0_dec.vessel_idx:"remaining_space"]

action = [None] * env.batch_number

# with a list of action, will push all environment to next step
action[0] = Action(env0_dec.vessel_idx, env0_dec.port_idx, -env0_dec.action_scope.load)
Comment thread
Jinyu-W marked this conversation as resolved.

metrics, decision_event, is_done = env.step(action)

print("Final tick for each environment:", env.tick)
print("Final frame index for each environment:", env.frame_index)

env.reset()
33 changes: 22 additions & 11 deletions maro/simulator/scenarios/cim/business_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from maro.simulator.scenarios.helpers import DocableDict
from maro.simulator.scenarios.matrix_accessor import MatrixAttributeAccessor

from .common import Action, ActionScope, DecisionEvent
from .common import Action, ActionScope, ActionType, DecisionEvent
from .event_payload import EmptyReturnPayload, LadenReturnPayload, VesselDischargePayload, VesselStatePayload
from .events import Events
from .frame_builder import gen_cim_frame
Expand Down Expand Up @@ -615,20 +615,31 @@ def _on_action_received(self, event: CascadeEvent):
port_empty = port.empty
vessel_empty = vessel.empty

assert (
-min(port.empty, vessel.remaining_space) <= move_num <= vessel_empty
)
action_type: ActionType = getattr(action, "action_type", None)

port.empty = port_empty + move_num
vessel.empty = vessel_empty - move_num
# Make it compatiable with previous action.
if action_type is None:
action_type = ActionType.DISCHARGE if move_num > 0 else ActionType.LOAD

event.event_type = Events.DISCHARGE_EMPTY if move_num > 0 else Events.LOAD_EMPTY
# Make sure the move number is positive, as we have the action type.
move_num = abs(move_num)

# Update cost.
num = abs(move_num)
if action_type == ActionType.DISCHARGE:
assert(move_num <= vessel_empty)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to check the port capacity?

I remember we have this attribute but just with a big enough value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arthur and Ruidong removed this checking long long ago, I am not sure why, so sometime we can see that the empty+full may greater than capacity in cim vis page.


port.empty = port_empty + move_num
vessel.empty = vessel_empty - move_num
else:
assert(move_num <= min(port_empty, vessel.remaining_space))

port.empty = port_empty - move_num
vessel.empty = vessel_empty + move_num

# Align the event type to make the output readable.
event.event_type = Events.DISCHARGE_EMPTY if action_type == ActionType.DISCHARGE else Events.LOAD_EMPTY

# Update transfer cost for port and metrics.
self._total_operate_num += num
port.transfer_cost += num
self._total_operate_num += move_num
port.transfer_cost += move_num

self._vessel_plans[vessel_idx, port_idx] += self._data_cntr.vessel_period[vessel_idx]
18 changes: 16 additions & 2 deletions maro/simulator/scenarios/cim/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the MIT license.


from enum import IntEnum
from enum import Enum, IntEnum

from maro.backends.frame import SnapshotList

Expand All @@ -13,6 +13,12 @@ class VesselState(IntEnum):
SAILING = 1


class ActionType(Enum):
"""Type of CIM action."""
LOAD = "load",
DISCHARGE = "discharge"


class Action:
"""Action object that used to pass action from agent to business engine.

Expand All @@ -23,10 +29,11 @@ class Action:
"""
summary_key = ["port_idx", "vessel_idx", "quantity"]

def __init__(self, vessel_idx: int, port_idx: int, quantity: int):
def __init__(self, vessel_idx: int, port_idx: int, quantity: int, action_type: ActionType):
self.vessel_idx = vessel_idx
self.port_idx = port_idx
self.quantity = quantity
self.action_type = action_type

def __repr__(self):
return self.__str__()
Expand Down Expand Up @@ -114,6 +121,13 @@ def __getstate__(self):
"early_discharge": self.early_discharge
}

def __setstate__(self, state):
Comment thread
Jinyu-W marked this conversation as resolved.
self.tick = state["tick"]
self.port_idx = state["port_idx"]
self.vessel_idx = state["vessel_idx"]
self._action_scope = state["action_scope"]
self._early_discharge = state["early_discharge"]

def __repr__(self):
return self.__str__()

Expand Down
7 changes: 7 additions & 0 deletions maro/simulator/scenarios/citi_bike/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ def __getstate__(self):
"type": self.type,
"action_scope": self.action_scope}

def __setstate__(self, state):
self.station_idx = state["station_idx"]
self.tick = state["tick"]
self.frame_index = state["frame_index"]
self.type = state["type"]
self._action_scope = state["action_scope"]

def __repr__(self):
return self.__str__()

Expand Down
4 changes: 4 additions & 0 deletions maro/vector_env/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from .vector_env import VectorEnv
Comment thread
Jinyu-W marked this conversation as resolved.
67 changes: 67 additions & 0 deletions maro/vector_env/env_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from multiprocessing import Process
from multiprocessing.connection import Connection

from maro.simulator import Env


class EnvProcess(Process):
"""Wrapper for envrioment process,

Args:
pipe (Connection): Pipe that used to communicate between main process and this process.
(args, kwargs): Parameter for Env class.
"""

def __init__(self, pipe: Connection, *args, **kwargs):
super().__init__()

self._pipe = pipe
self._env: Env = None
self._args = args
self._kwargs = kwargs

def run(self):
"""Initialize environment and process commands."""
metrics = None
decision_event = None,
is_done = False

env = Env(*self._args, **self._kwargs)

while True:
cmd, content = self._pipe.recv()

if cmd == "step":
if is_done:
# Skip is current environment is completed.
self._pipe.send((None, None, True, env.frame_index))
else:
metrics, decision_event, is_done = env.step(content)

self._pipe.send((metrics, decision_event))
elif cmd == "reset":
env.reset()

metrics = None
decision_event = None
is_done = False

self._pipe.send(None)
elif cmd == "query":
node_name, args = content

states = env.snapshot_list[node_name][args]

self._pipe.send(states)
elif cmd == "tick":
self._pipe.send(env.tick)
elif cmd == "frame_index":
self._pipe.send(env.frame_index)
elif cmd == "is_done":
self._pipe.send(is_done)
elif cmd == "stop":
self._pipe.send(None)
break
Loading