Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/cim/dqn/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ agents:
distributed:
group_name: "dqn_distributed_test"
actor:
peer: {"actor": 1}
peer: {"learner": 1}
learner:
peer: {"actor_worker": 1}
peer: {"actor": 1}
redis:
host_name: "localhost"
port: 6379
port: 6379
4 changes: 2 additions & 2 deletions examples/cim/dqn/multi_process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from components.config import config


ACTOR_NUM = config.distributed.learner.peer["actor_worker"] # must be same as in config
LEARNER_NUM = config.distributed.actor.peer["actor"]
ACTOR_NUM = config.distributed.learner.peer["actor"] # must be same as in config
LEARNER_NUM = config.distributed.actor.peer["learner"]

learner_path = f"{os.path.split(os.path.realpath(__file__))[0]}/dist_learner.py &"
actor_path = f"{os.path.split(os.path.realpath(__file__))[0]}/dist_actor.py &"
Expand Down
48 changes: 28 additions & 20 deletions maro/rl/dist_topologies/single_learner_multi_actor_sync_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ class ActorProxy(object):
proxy_params: Parameters for instantiating a ``Proxy`` instance.
"""
def __init__(self, proxy_params):
self._proxy = Proxy(component_type="actor", **proxy_params)
self._proxy = Proxy(component_type="learner", **proxy_params)

def roll_out(self, model_dict: dict = None, epsilon_dict: dict = None, done: bool = False,
return_details: bool = True):
def roll_out(
self, model_dict: dict = None, epsilon_dict: dict = None, done: bool = False, return_details: bool = True
):
"""Send roll-out requests to remote actors.

This method has exactly the same signature as ``SimpleActor``'s ``roll_out`` method but instead of doing
Expand All @@ -46,19 +47,24 @@ def roll_out(self, model_dict: dict = None, epsilon_dict: dict = None, done: boo
Performance and per-agent experiences from the remote actor.
"""
if done:
self._proxy.ibroadcast(tag=MessageTag.ROLLOUT,
session_type=SessionType.NOTIFICATION,
payload={PayloadKey.DONE: True})
self._proxy.ibroadcast(
tag=MessageTag.ROLLOUT,
session_type=SessionType.NOTIFICATION,
payload={PayloadKey.DONE: True}
)
return None, None
else:
performance, exp_by_agent = {}, {}
payloads = [(peer, {PayloadKey.MODEL: model_dict,
PayloadKey.EPSILON: epsilon_dict,
PayloadKey.RETURN_DETAILS: return_details})
for peer in self._proxy.peers["actor_worker"]]
for peer in self._proxy.peers["actor"]]
# TODO: double check when ack enable
replies = self._proxy.scatter(tag=MessageTag.ROLLOUT, session_type=SessionType.TASK,
destination_payload_list=payloads)
replies = self._proxy.scatter(
tag=MessageTag.ROLLOUT,
session_type=SessionType.TASK,
destination_payload_list=payloads
)
for msg in replies:
performance[msg.source] = msg.payload[PayloadKey.PERFORMANCE]
if msg.payload[PayloadKey.EXPERIENCE] is not None:
Expand All @@ -80,9 +86,9 @@ class ActorWorker(object):
"""
def __init__(self, local_actor: AbsActor, proxy_params):
self._local_actor = local_actor
self._proxy = Proxy(component_type="actor_worker", **proxy_params)
self._proxy = Proxy(component_type="actor", **proxy_params)
self._registry_table = RegisterTable(self._proxy.get_peers)
self._registry_table.register_event_handler("actor:rollout:1", self.on_rollout_request)
self._registry_table.register_event_handler("learner:rollout:1", self.on_rollout_request)

def on_rollout_request(self, message):
"""Perform local roll-out and send the results back to the request sender.
Expand All @@ -94,15 +100,17 @@ def on_rollout_request(self, message):
if data.get(PayloadKey.DONE, False):
sys.exit(0)

performance, experiences = self._local_actor.roll_out(model_dict=data[PayloadKey.MODEL],
epsilon_dict=data[PayloadKey.EPSILON],
return_details=data[PayloadKey.RETURN_DETAILS])

self._proxy.reply(received_message=message,
tag=MessageTag.UPDATE,
payload={PayloadKey.PERFORMANCE: performance,
PayloadKey.EXPERIENCE: experiences}
)
performance, experiences = self._local_actor.roll_out(
model_dict=data[PayloadKey.MODEL],
epsilon_dict=data[PayloadKey.EPSILON],
return_details=data[PayloadKey.RETURN_DETAILS]
)

self._proxy.reply(
received_message=message,
tag=MessageTag.UPDATE,
payload={PayloadKey.PERFORMANCE: performance, PayloadKey.EXPERIENCE: experiences}
)

def launch(self):
"""Entry point method.
Expand Down
7 changes: 1 addition & 6 deletions maro/rl/learner/simple_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@ def train(self, total_episodes):
model_dict = None if self._is_shared_agent_instance() else self._trainable_agents.get_models()
epsilon_dict = self._trainable_agents.explorer.epsilon if self._trainable_agents.explorer else None
performance, exp_by_agent = self._actor.roll_out(model_dict=model_dict, epsilon_dict=epsilon_dict)
if isinstance(performance, dict):
for actor_id, perf in performance.items():
self._logger.info(f"ep {current_ep} - performance: {perf},"
f"source: {actor_id}, epsilons: {epsilon_dict}")
else:
self._logger.info(f"ep {current_ep} - performance: {performance}, epsilons: {epsilon_dict}")
self._logger.info(f"ep {current_ep} - performance: {performance}, epsilons: {epsilon_dict}")

self._trainable_agents.store_experiences(exp_by_agent)
self._trainable_agents.train()
Expand Down