Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.2 rl refinement dist #377

Merged
merged 36 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2b26d44
Support `slice` operation in ExperienceSet
buptchan Jul 27, 2021
2445f4c
Support naive distributed policy training by proxy
buptchan Jul 27, 2021
96b983a
Dynamically allocate trainers according to number of experience
buptchan Jul 27, 2021
3dc1226
code check
buptchan Jul 27, 2021
d5601b4
code check
buptchan Jul 27, 2021
8de192f
code check
buptchan Jul 27, 2021
311c243
Fix a bug in distributed trianing with no gradient
buptchan Jul 29, 2021
1479ffa
Code check
buptchan Jul 29, 2021
e4fc71b
Move Back-Propagation from trainer to policy_manager and extract trai…
buptchan Aug 3, 2021
295f551
1.call allocate_trainer() at first of update(); 2.refine according to…
buptchan Aug 4, 2021
887b90b
Merge from v0.2_rl_refinement
buptchan Aug 5, 2021
8402253
Code check
buptchan Aug 5, 2021
5c294a5
Refine code with new interface
buptchan Aug 5, 2021
42c2547
Update docs of PolicyManger and ExperienceSet
buptchan Aug 6, 2021
ce2e2d2
Add images for rl_toolkit docs
buptchan Aug 6, 2021
e27fcb3
Update diagram of PolicyManager
buptchan Aug 6, 2021
4d27cd6
merge v0.2_rl_refinement
buptchan Aug 9, 2021
38fecb1
Refine with new interface
buptchan Aug 10, 2021
9b11b65
Extract allocation strategy into `allocation_strategy.py`
buptchan Aug 12, 2021
d5d4f71
Pull from v0.2_rl_refinement
buptchan Aug 18, 2021
0a9c588
Pull rl_refinement
buptchan Aug 19, 2021
437f29d
add `distributed_learn()` in policies for data-parallel training
buptchan Aug 25, 2021
a0fee62
Pull latest v0.2_rl_refinement branch
buptchan Aug 25, 2021
48f40fb
Update doc of RL_toolkit
buptchan Aug 26, 2021
6f77be1
Add gradient workers for data-parallel
buptchan Sep 3, 2021
327aa83
Refine code and update docs
buptchan Sep 6, 2021
5deb4dc
Merge from `v0.2_rl_refinement`, refactoring based on `numpy` batch d…
buptchan Sep 8, 2021
ab5a631
Lint check
buptchan Sep 8, 2021
74b17c0
Fix bugs in multiprocessing mode with data-parallel
buptchan Sep 8, 2021
f938318
Refine by comments
buptchan Sep 9, 2021
267bf6c
Rename `trainer` to `worker`
buptchan Sep 9, 2021
807ec7c
Rename `distributed_learn` to `learn_with_data_parallel`
buptchan Sep 13, 2021
b67953d
Refine allocator and remove redundant code in policy_manager
buptchan Sep 15, 2021
d6f9a6b
Pull latest v0.2_rl_refinement
buptchan Sep 15, 2021
aed932f
import `agent2policies` in base branch
buptchan Sep 15, 2021
4eaebfb
remove arugments in allocate_by_policy and so on
buptchan Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/source/images/rl/policy_manager.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
83 changes: 68 additions & 15 deletions docs/source/key_components/rl_toolkit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ minimum number of workers required to receive from before proceeding to the next
losing precious data in the case of all workers reporting back at roughly the same time, we also provide the option
to continue to receive after receiving the minimum number of results, but with a timeout to keep the wait time
upperbounded. Note that the transition from the policy update phase to the data collection phase is still strictly
synchronous. This means that in the case of the policy instances distributed amongst a set of trainer nodes, the
central controller waits until all trainers report back with the latest policy states before starting the next
synchronous. This means that in the case of the policy instances distributed amongst a set of worker nodes, the
central controller waits until all workers report back with the latest policy states before starting the next
cycle.


Expand Down Expand Up @@ -81,28 +81,34 @@ Policy
A policy is a an agent's mechanism to choose actions based on its observations of the environment.
Accordingly, the abstract ``AbsPolicy`` class exposes a ``choose_action`` interface. This abstraction encompasses
both static policies, such as rule-based policies, and updatable policies, such as RL policies. The latter is
abstracted through the ``AbsCorePolicy`` sub-class which also exposes a ``update`` interface. By default, updatable
policies require an experience manager to store and retrieve simulation data (in the form of "experiences sets")
based on which updates can be made.
abstracted through the ``RLPolicy`` sub-class which also exposes ``update_with_multi_loss_info`` and
``learn_from_multi_trajectories`` interface. Replay buffer is optional to updatable policies to store and
retrieve simulation data (in the form of ``ExperienceSet``) based on which updates can be made.


.. code-block:: python

class AbsPolicy(ABC):
def __init__(self, name)
super().__init__()
self._name = name

@abstractmethod
def choose_action(self, state):
raise NotImplementedError


class AbsCorePolicy(AbsPolicy):
def __init__(self, experience_memory: ExperienceMemory):
super().__init__()
self.experience_memory = experience_memory
class RLPolicy(AbsPolicy):
def __init__(self, name):
super().__init__(name)

@abstractmethod
def update(self):
def update_with_multi_loss_info(self):
raise NotImplementedError

@abstractmethod
def learn_from_multi_trajectories(self):
raise NotImplementedError

Policy Manager
--------------
Expand All @@ -113,17 +119,30 @@ In asynchrounous learning, the policy manager is the centerpiece of the policy s
Individual policy updates, however, may or may not occur within the policy manager itself depending
on the policy manager type used. The provided policy manager classes include:

* ``LocalPolicyManager``, where the policies are updated within the manager itself;
* ``MultiProcessPolicyManager``, which distributes policies amongst a set of trainer processes to parallelize
policy update;
* ``MultiNodePolicyManager``, which distributes policies amongst a set of remote compute nodes to parallelize
policy update;
* ``SimplePolicyManager``, where the policies are updated within the manager itself, sequentially or in parallel through multi-processing;
* ``DistributedPolicyManager``, which distributes policies amongst a set of remote compute nodes to parallelize policy update.

Moreover, in ``data-parallel`` mode, each policy manager has an additional worker(``grad_worker``)
allocator, which provides a policy-to-worker mapping. The worker allocator performs auto-balance
during training, by dynamically adjusting worker number for policies according to the
experience/agent/policy number.

.. image:: ../images/rl/policy_manager.svg
:target: ../images/rl/policy_manager.svg
:alt: RL Overview

The ``DistributedPolicyManager`` runs a set of ``policy_host`` and a ``TrainerAllocator``.
``policy_host`` is a process/VM/node that hosts the update of a policy. The ``TrainerAllocator``
dynamically adjusts worker node numbers for policies according to the experience/agent/policy
number. Each ``policy_host`` independently updates its own policies for policy-level parallelism.

During training, the ``PolicyManager`` receives training data collected by the ``RolloutManager``,
then send them to corresponding ``policy_host``. Each ``policy_host`` will send gradient tasks consist
of policy state and experience batch, to several stateless ``grad_worker`` for gradient computation.
The ``grad_worker`` is stateless, and computes gradients using the policy state and data
batch provided in a task.
Then ``policy_host`` aggregates the gradients from ``grad_worker`` s, and performs gradient descent
on its parameters.

Core Model
----------
Expand Down Expand Up @@ -176,6 +195,23 @@ To performing a single gradient step on the model, call the ``step`` function:
Here it is assumed that the losses have been computed using the same model instance and the gradients have
been generated for the internal components.

Additionally, core model provides ``get_gradients`` and ``apply_gradients`` interfaces for finer control in
model update, for example, distributed training. Here is an example code of gradient related interfaces:
Jinyu-W marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

# for single gradient source
grad_dict = ac_model_0.get_gradients(loss_0)
ac_model.apply_gradients(grad_dict)
# for multiple gradient sources
grad_dict_list = []
grad_dict_list.append(ac_model_0.get_gradients(loss_0))
grad_dict_list.append(ac_model_1.get_gradients(loss_1))
ac_model.apply_gradients(average_grads(grad_dict_list))

The ``get_gradients`` function returns a gradient dict, in the form {param_name: gradient_value}.
The ``apply_gradients`` function takes it as input, then apply it to model. ``average_grads`` enables
the gradient gathering from multiple sources.

Experience
----------
Expand All @@ -187,6 +223,23 @@ for sampling purposes. An ``ExperienceMemory`` is a storage facility for experie
a policy for storing and retrieving training data. Sampling from the experience memory can be customized by
registering a user-defined sampler to it.

``ExperienceSet`` offers a list-like usage:
Jinyu-W marked this conversation as resolved.
Show resolved Hide resolved
buptchan marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: python

experience_set = ExperienceSet(states, actions, rewards, next_states, info)
# length
print(len(experience_set))
# or
print(experience_set.size)
# index
experience_batch = experience_set[0]
# slice
experience_batch = experience_set[0:5]
# slice with stride
experience_batch = experience_set[0:20:5]
# extend
experience_set.extend(experience_batch)

Exploration
-----------
Expand Down
4 changes: 2 additions & 2 deletions examples/rl/cim/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Licensed under the MIT license.

from .callbacks import post_collect, post_evaluate
from .env_sampler import get_env_sampler
from .env_sampler import agent2policy, get_env_sampler
Jinyu-W marked this conversation as resolved.
Show resolved Hide resolved
from .policies import policy_func_dict

__all__ = ["post_collect", "post_evaluate", "get_env_sampler", "policy_func_dict"]
__all__ = ["post_collect", "post_evaluate", "agent2policy", "get_env_sampler", "policy_func_dict"]
21 changes: 19 additions & 2 deletions examples/rl/scripts/docker/docker_compose_yml.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,28 @@
num_rollouts = config['sync']['distributed']['num_workers']

common_env.append(f"NUMROLLOUTS={num_rollouts}")

# host spec
common_env.append(f"DATAPARALLEL={config['data_parallel']['enable']}")
common_env.append(f"DISTRIBUTED={config['policy_manager']['type'] == 'distributed'}")
if config["data_parallel"]["enable"]:
common_env.append(f"NUMGRADWORKERS={config['data_parallel']['num_workers']}")
common_env.append(f"ALLOCATIONMODE={config['data_parallel']['allocation_mode']}")
if config["policy_manager"]["type"] == "distributed":
common_env.append(f"LEARNGROUP={config['policy_manager']['distributed']['group']}")
common_env.append(f"NUMHOSTS={config['policy_manager']['distributed']['num_hosts']}")

# grad worker config
if config["data_parallel"]["enable"]:
for worker_id in range(config['data_parallel']['num_workers']):
str_id = f"grad_worker.{worker_id}"
grad_worker_spec = deepcopy(common_spec)
del grad_worker_spec["build"]
grad_worker_spec["command"] = "python3 /maro/rl_examples/workflows/grad_worker.py"
grad_worker_spec["container_name"] = f"{namespace}.{str_id}"
grad_worker_spec["environment"] = [f"WORKERID={worker_id}"] + common_env
docker_compose_manifest["services"][str_id] = grad_worker_spec

# policy host spec
if config["policy_manager"]["type"] == "distributed":
for host_id in range(config["policy_manager"]["distributed"]["num_hosts"]):
str_id = f"policy_host.{host_id}"
host_spec = deepcopy(common_spec)
Expand Down
4 changes: 4 additions & 0 deletions examples/rl/workflows/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ policy_manager:
distributed:
group: learn
num_hosts: 2
data_parallel:
enable: false
num_workers: 2
allocation_mode: by-policy # by-policy, by-agent, by-experience
redis:
host: redis-server
port: 6379
1 change: 1 addition & 0 deletions examples/rl/workflows/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
policy_func_dict = getattr(module, "policy_func_dict")
post_collect = getattr(module, "post_collect", None)
post_evaluate = getattr(module, "post_evaluate", None)
agent2policy = getattr(module, "agent2policy", None)
41 changes: 41 additions & 0 deletions examples/rl/workflows/grad_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import sys
from os import getenv
from os.path import dirname, realpath

from maro.rl.learning import grad_worker

workflow_dir = dirname(dirname(realpath(__file__))) # template directory
if workflow_dir not in sys.path:
sys.path.insert(0, workflow_dir)

from general import log_dir, policy_func_dict


if __name__ == "__main__":
# TODO: WORKERID in docker compose script.
worker_id = getenv("WORKERID")
num_hosts = getenv("NUMHOSTS")
distributed = getenv("DISTRIBUTED") == "True"
if worker_id is None:
raise ValueError("missing environment variable: WORKERID")
if num_hosts is None:
if distributed:
raise ValueError("missing environment variable: NUMHOSTS")
else:
num_hosts = 0

group = getenv("LEARNGROUP", default="learn")
grad_worker(
policy_func_dict,
int(worker_id),
int(num_hosts),
group,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir
)
7 changes: 7 additions & 0 deletions examples/rl/workflows/policy_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

if __name__ == "__main__":
Jinyu-W marked this conversation as resolved.
Show resolved Hide resolved
host_id = getenv("HOSTID")
data_parallel = getenv("DATAPARALLEL") == "True"
num_grad_workers = getenv("NUMGRADWORKERS")

if host_id is None:
raise ValueError("missing environment variable: HOSTID")
if num_grad_workers is None:
num_grad_workers = 0

group = getenv("LEARNGROUP", default="learn")
policy_host(
Expand All @@ -28,5 +33,7 @@
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
data_parallel=data_parallel,
num_grad_workers=int(num_grad_workers),
log_dir=log_dir
)
40 changes: 34 additions & 6 deletions examples/rl/workflows/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,56 @@
from os import getenv
from os.path import dirname, realpath

from maro.rl.learning import DistributedPolicyManager, SimplePolicyManager
from maro.rl.learning import DistributedPolicyManager, MultiProcessPolicyManager, SimplePolicyManager
from maro.rl.policy import WorkerAllocator

workflow_dir = dirname(dirname(realpath(__file__))) # template directory
if workflow_dir not in sys.path:
sys.path.insert(0, workflow_dir)

from general import log_dir, policy_func_dict
from general import agent2policy, log_dir, policy_func_dict

def get_policy_manager():
manager_type = getenv("POLICYMANAGERTYPE", default="simple")
parallel = int(getenv("PARALLEL", default=0))
if manager_type == "simple":
return SimplePolicyManager(policy_func_dict, parallel=parallel, log_dir=log_dir)

data_parallel = getenv("DATAPARALLEL") == "True"
num_grad_workers = int(getenv("NUMGRADWORKERS", default=1))
group = getenv("LEARNGROUP", default="learn")
allocation_mode = getenv("ALLOCATIONMODE", default="by-policy")
allocator = WorkerAllocator(allocation_mode, num_grad_workers, list(policy_func_dict.keys()), agent2policy)
if manager_type == "simple":
if parallel == 0:
return SimplePolicyManager(
policy_func_dict, group,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir)
else:
return MultiProcessPolicyManager(
policy_func_dict, group,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir)
num_hosts = int(getenv("NUMHOSTS", default=5))
if manager_type == "distributed":
policy_manager = DistributedPolicyManager(
list(policy_func_dict.keys()), group, num_hosts,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
"max_peer_discovery_retries": 50
},
log_dir=log_dir
)
Expand Down
7 changes: 5 additions & 2 deletions maro/rl/learning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
from .early_stopper import AbsEarlyStopper
from .env_sampler import AbsEnvSampler
from .learner import Learner, simple_learner
from .policy_manager import AbsPolicyManager, DistributedPolicyManager, SimplePolicyManager, policy_host
from .policy_manager import (
AbsPolicyManager, DistributedPolicyManager, MultiProcessPolicyManager, SimplePolicyManager, grad_worker, policy_host
)
from .rollout_manager import AbsRolloutManager, DistributedRolloutManager, SimpleRolloutManager

__all__ = [
"AbsEarlyStopper",
"AbsEnvSampler",
"Learner", "simple_learner",
"AbsPolicyManager", "DistributedPolicyManager", "SimplePolicyManager", "policy_host",
"AbsPolicyManager", "DistributedPolicyManager", "MultiProcessPolicyManager", "SimplePolicyManager",
"grad_worker", "policy_host",
"AbsRolloutManager", "DistributedRolloutManager", "SimpleRolloutManager"
]
Loading