Skip to content

Commit

Permalink
V0.2 rl refinement dist (#377)
Browse files Browse the repository at this point in the history
* Support `slice` operation in ExperienceSet

* Support naive distributed policy training by proxy

* Dynamically allocate trainers according to number of experience

* code check

* code check

* code check

* Fix a bug in distributed trianing with no gradient

* Code check

* Move Back-Propagation from trainer to policy_manager and extract trainer-allocation strategy

* 1.call allocate_trainer() at first of update(); 2.refine according to code review

* Code check

* Refine code with new interface

* Update docs of PolicyManger and ExperienceSet

* Add images for rl_toolkit docs

* Update diagram of PolicyManager

* Refine with new interface

* Extract allocation strategy into `allocation_strategy.py`

* add `distributed_learn()` in policies for data-parallel training

* Update doc of RL_toolkit

* Add gradient workers for data-parallel

* Refine code and update docs

* Lint check

* Refine by comments

* Rename `trainer` to `worker`

* Rename `distributed_learn` to `learn_with_data_parallel`

* Refine allocator and remove redundant code in policy_manager

* remove arugments in allocate_by_policy and so on
  • Loading branch information
buptchan committed Sep 16, 2021
1 parent 9030200 commit f2dd5c0
Show file tree
Hide file tree
Showing 18 changed files with 728 additions and 89 deletions.
3 changes: 3 additions & 0 deletions docs/source/images/rl/policy_manager.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 25 additions & 0 deletions docs/source/key_components/rl_toolkit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ provides various policy improvement interfaces to support single-threaded and di
.. code-block:: python
class AbsPolicy(ABC):
def __init__(self, name)
super().__init__()
self._name = name
@abstractmethod
def __call__(self, state):
"""Select an action based on a state."""
Expand Down Expand Up @@ -108,6 +112,27 @@ is present as a server process. The types of policy manager include:
to reeeive roll-out information for update. This approach allows the policies to be updated in parallel and may be
necessary when the combined size of the policies is too big to fit in a single node.

Moreover, in ``data-parallel`` mode, each policy manager has an additional worker(``grad_worker``)
allocator, which provides a policy-to-worker mapping. The worker allocator performs auto-balance
during training, by dynamically adjusting worker number for policies according to the
experience/agent/policy number.

.. image:: ../images/rl/policy_manager.svg
:target: ../images/rl/policy_manager.svg
:alt: PolicyManager

The ``DistributedPolicyManager`` runs a set of ``policy_host`` and a ``TrainerAllocator``.
``policy_host`` is a process/VM/node that hosts the update of a policy. The ``TrainerAllocator``
dynamically adjusts worker node numbers for policies according to the experience/agent/policy
number. Each ``policy_host`` independently updates its own policies for policy-level parallelism.

During training, the ``PolicyManager`` receives training data collected by the ``RolloutManager``,
then send them to corresponding ``policy_host``. Each ``policy_host`` will send gradient tasks consist
of policy state and experience batch, to several stateless ``grad_worker`` for gradient computation.
The ``grad_worker`` is stateless, and computes gradients using the policy state and data
batch provided in a task.
Then ``policy_host`` aggregates the gradients from ``grad_worker`` s, and performs gradient descent
on its parameters.

Core Model
----------
Expand Down
20 changes: 19 additions & 1 deletion examples/rl/scripts/docker/docker_compose_yml.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,28 @@
]

common_env.append(f"NUMROLLOUTS={config[config['mode']]['num_rollouts']}")
# host spec
common_env.append(f"DATAPARALLEL={config['data_parallel']['enable']}")
common_env.append(f"DISTRIBUTED={config['policy_manager']['type'] == 'distributed'}")
if config["data_parallel"]["enable"]:
common_env.append(f"NUMGRADWORKERS={config['data_parallel']['num_workers']}")
common_env.append(f"ALLOCATIONMODE={config['data_parallel']['allocation_mode']}")
if config["policy_manager"]["type"] == "distributed":
common_env.append(f"LEARNGROUP={config['policy_manager']['distributed']['group']}")
common_env.append(f"NUMHOSTS={config['policy_manager']['distributed']['num_hosts']}")

# grad worker config
if config["data_parallel"]["enable"]:
for worker_id in range(config['data_parallel']['num_workers']):
str_id = f"grad_worker.{worker_id}"
grad_worker_spec = deepcopy(common_spec)
del grad_worker_spec["build"]
grad_worker_spec["command"] = "python3 /maro/rl_examples/workflows/grad_worker.py"
grad_worker_spec["container_name"] = f"{namespace}.{str_id}"
grad_worker_spec["environment"] = [f"WORKERID={worker_id}"] + common_env
docker_compose_manifest["services"][str_id] = grad_worker_spec

# host spec
if config["policy_manager"]["type"] == "distributed":
for host_id in range(config["policy_manager"]["distributed"]["num_hosts"]):
str_id = f"policy_host.{host_id}"
host_spec = deepcopy(common_spec)
Expand Down
4 changes: 4 additions & 0 deletions examples/rl/workflows/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ policy_manager:
distributed:
group: learn
num_hosts: 2
data_parallel:
enable: false
num_workers: 2
allocation_mode: by-policy # by-policy, by-agent, by-experience
redis:
host: redis-server
port: 6379
1 change: 1 addition & 0 deletions examples/rl/workflows/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
get_env_sampler = getattr(module, "get_env_sampler")
post_collect = getattr(module, "post_collect", None)
post_evaluate = getattr(module, "post_evaluate", None)
agent2policy = getattr(module, "agent2policy", None)
41 changes: 41 additions & 0 deletions examples/rl/workflows/grad_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import sys
from os import getenv
from os.path import dirname, realpath

from maro.rl.learning import grad_worker

workflow_dir = dirname(dirname(realpath(__file__))) # template directory
if workflow_dir not in sys.path:
sys.path.insert(0, workflow_dir)

from general import log_dir, policy_func_dict


if __name__ == "__main__":
# TODO: WORKERID in docker compose script.
worker_id = getenv("WORKERID")
num_hosts = getenv("NUMHOSTS")
distributed = getenv("DISTRIBUTED") == "True"
if worker_id is None:
raise ValueError("missing environment variable: WORKERID")
if num_hosts is None:
if distributed:
raise ValueError("missing environment variable: NUMHOSTS")
else:
num_hosts = 0

group = getenv("LEARNGROUP", default="learn")
grad_worker(
policy_func_dict,
int(worker_id),
int(num_hosts),
group,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir
)
7 changes: 7 additions & 0 deletions examples/rl/workflows/policy_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@

if __name__ == "__main__":
host_id = getenv("HOSTID")
data_parallel = getenv("DATAPARALLEL") == "True"
num_grad_workers = getenv("NUMGRADWORKERS")

if host_id is None:
raise ValueError("missing environment variable: HOSTID")
if num_grad_workers is None:
num_grad_workers = 0

group = getenv("LEARNGROUP", default="learn")
policy_host(
Expand All @@ -28,5 +33,7 @@
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
data_parallel=data_parallel,
num_grad_workers=int(num_grad_workers),
log_dir=log_dir
)
40 changes: 34 additions & 6 deletions examples/rl/workflows/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,56 @@
from os import getenv
from os.path import dirname, realpath

from maro.rl.learning import DistributedPolicyManager, SimplePolicyManager
from maro.rl.learning import DistributedPolicyManager, MultiProcessPolicyManager, SimplePolicyManager
from maro.rl.policy import WorkerAllocator

workflow_dir = dirname(dirname(realpath(__file__))) # template directory
if workflow_dir not in sys.path:
sys.path.insert(0, workflow_dir)

from general import log_dir, policy_func_dict
from general import agent2policy, log_dir, policy_func_dict

def get_policy_manager():
manager_type = getenv("POLICYMANAGERTYPE", default="simple")
parallel = int(getenv("PARALLEL", default=0))
if manager_type == "simple":
return SimplePolicyManager(policy_func_dict, parallel=parallel, log_dir=log_dir)

data_parallel = getenv("DATAPARALLEL") == "True"
num_grad_workers = int(getenv("NUMGRADWORKERS", default=1))
group = getenv("LEARNGROUP", default="learn")
allocation_mode = getenv("ALLOCATIONMODE", default="by-policy")
allocator = WorkerAllocator(allocation_mode, num_grad_workers, list(policy_func_dict.keys()), agent2policy)
if manager_type == "simple":
if parallel == 0:
return SimplePolicyManager(
policy_func_dict, group,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir)
else:
return MultiProcessPolicyManager(
policy_func_dict, group,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
},
log_dir=log_dir)
num_hosts = int(getenv("NUMHOSTS", default=5))
if manager_type == "distributed":
policy_manager = DistributedPolicyManager(
list(policy_func_dict.keys()), group, num_hosts,
data_parallel=data_parallel,
num_grad_workers=num_grad_workers,
worker_allocator=allocator,
proxy_kwargs={
"redis_address": (getenv("REDISHOST", default="maro-redis"), int(getenv("REDISPORT", default=6379))),
"max_peer_discovery_retries": 50
"max_peer_discovery_retries": 50
},
log_dir=log_dir
)
Expand Down
7 changes: 5 additions & 2 deletions maro/rl/learning/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

from .env_sampler import AbsEnvSampler
from .learning_loop import learn
from .policy_manager import AbsPolicyManager, DistributedPolicyManager, SimplePolicyManager, policy_host
from .policy_manager import (
AbsPolicyManager, DistributedPolicyManager, MultiProcessPolicyManager, SimplePolicyManager, grad_worker, policy_host
)
from .rollout_manager import AbsRolloutManager, DistributedRolloutManager, MultiProcessRolloutManager

__all__ = [
"AbsEnvSampler",
"learn",
"AbsPolicyManager", "DistributedPolicyManager", "SimplePolicyManager", "policy_host",
"AbsPolicyManager", "DistributedPolicyManager", "MultiProcessPolicyManager", "SimplePolicyManager",
"grad_worker", "policy_host",
"AbsRolloutManager", "DistributedRolloutManager", "MultiProcessRolloutManager"
]
Loading

0 comments on commit f2dd5c0

Please sign in to comment.