Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into multigpu-ma-support
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Dec 6, 2018
2 parents fc53187 + d864f29 commit 60f96eb
Show file tree
Hide file tree
Showing 19 changed files with 277 additions and 75 deletions.
4 changes: 4 additions & 0 deletions doc/source/rllib-env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ RLlib will auto-vectorize Gym envs for batch evaluation if the ``num_envs_per_wo
Multi-Agent
-----------

.. note::

Learn more about multi-agent reinforcement learning in RLlib by reading the `blog post <https://rise.cs.berkeley.edu/blog/scaling-multi-agent-rl-with-rllib/>`__.

A multi-agent environment is one which has multiple acting entities per step, e.g., in a traffic simulation, there may be multiple "car" and "traffic light" agents in the environment. The model for multi-agent in RLlib as follows: (1) as a user you define the number of policies available up front, and (2) a function that maps agent ids to policy ids. This is summarized by the below figure:

.. image:: multi-agent.svg
Expand Down
8 changes: 4 additions & 4 deletions doc/source/rllib-training.rst
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ Ray actors provide high levels of performance, so in more complex cases they can
Callbacks and Custom Metrics
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can provide callback functions to be called at points during policy evaluation. These functions have access to an info dict containing state for the current `episode <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/episode.py>`__. Custom state can be stored for the `episode <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/episode.py>`__ in the ``info["episode"].user_data`` dict, and custom scalar metrics reported by saving values to the ``info["episode"].custom_metrics`` dict. These custom metrics will be averaged and reported as part of training results. The following example (full code `here <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_metrics_and_callbacks.py>`__) logs a custom metric from the environment:
You can provide callback functions to be called at points during policy evaluation. These functions have access to an info dict containing state for the current `episode <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/episode.py>`__. Custom state can be stored for the `episode <https://github.com/ray-project/ray/blob/master/python/ray/rllib/evaluation/episode.py>`__ in the ``info["episode"].user_data`` dict, and custom scalar metrics reported by saving values to the ``info["episode"].custom_metrics`` dict. These custom metrics will be aggregated and reported as part of training results. The following example (full code `here <https://github.com/ray-project/ray/blob/master/python/ray/rllib/examples/custom_metrics_and_callbacks.py>`__) logs a custom metric from the environment:

.. code-block:: python
Expand All @@ -245,10 +245,10 @@ You can provide callback functions to be called at points during policy evaluati
def on_episode_end(info):
episode = info["episode"]
mean_pole_angle = np.mean(episode.user_data["pole_angles"])
pole_angle = np.mean(episode.user_data["pole_angles"])
print("episode {} ended with length {} and pole angles {}".format(
episode.episode_id, episode.length, mean_pole_angle))
episode.custom_metrics["mean_pole_angle"] = mean_pole_angle
episode.episode_id, episode.length, pole_angle))
episode.custom_metrics["pole_angle"] = pole_angle
def on_train_result(info):
print("agent.train() result: {} -> {} episodes".format(
Expand Down
11 changes: 11 additions & 0 deletions python/ray/rllib/agents/dqn/dqn_policy_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ def __init__(self,
self.td_error = tf.nn.softmax_cross_entropy_with_logits(
labels=m, logits=q_logits_t_selected)
self.loss = tf.reduce_mean(self.td_error * importance_weights)
self.stats = {
# TODO: better Q stats for dist dqn
"mean_td_error": tf.reduce_mean(self.td_error),
}
else:
q_tp1_best_masked = (1.0 - done_mask) * q_tp1_best

Expand All @@ -264,6 +268,12 @@ def __init__(self,
q_t_selected - tf.stop_gradient(q_t_selected_target))
self.loss = tf.reduce_mean(
importance_weights * _huber_loss(self.td_error))
self.stats = {
"mean_q": tf.reduce_mean(q_t_selected),
"min_q": tf.reduce_min(q_t_selected),
"max_q": tf.reduce_max(q_t_selected),
"mean_td_error": tf.reduce_mean(self.td_error),
}


class DQNPolicyGraph(TFPolicyGraph):
Expand Down Expand Up @@ -430,6 +440,7 @@ def extra_compute_action_feed_dict(self):
def extra_compute_grad_fetches(self):
return {
"td_error": self.loss.td_error,
"stats": self.loss.stats,
}

def postprocess_trajectory(self,
Expand Down
7 changes: 7 additions & 0 deletions python/ray/rllib/env/async_vector_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ def send_actions(self, action_dict):
assert isinstance(rewards, dict), "Not a multi-agent reward"
assert isinstance(dones, dict), "Not a multi-agent return"
assert isinstance(infos, dict), "Not a multi-agent info"
if set(obs.keys()) != set(rewards.keys()):
raise ValueError(
"Key set for obs and rewards must be the same: "
"{} vs {}".format(obs.keys(), rewards.keys()))
if set(obs.keys()) != set(infos.keys()):
raise ValueError("Key set for obs and infos must be the same: "
"{} vs {}".format(obs.keys(), infos.keys()))
if dones["__all__"]:
self.dones.add(env_id)
self.env_states[env_id].observe(obs, rewards, dones, infos)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/rllib/env/multi_agent_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def step(self, action_dict):
rewards (dict): Reward values for each ready agent. If the
episode is just started, the value will be None.
dones (dict): Done values for each ready agent. The special key
"__all__" is used to indicate env termination.
"__all__" (required) is used to indicate env termination.
infos (dict): Info values for each ready agent.
"""
raise NotImplementedError
9 changes: 9 additions & 0 deletions python/ray/rllib/evaluation/episode.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(self, policies, policy_mapping_fn, batch_builder_factory,
self._agent_to_policy = {}
self._agent_to_rnn_state = {}
self._agent_to_last_obs = {}
self._agent_to_last_info = {}
self._agent_to_last_action = {}
self._agent_to_last_pi_info = {}
self._agent_to_prev_action = {}
Expand All @@ -81,6 +82,11 @@ def last_observation_for(self, agent_id=_DUMMY_AGENT_ID):

return self._agent_to_last_obs.get(agent_id)

def last_info_for(self, agent_id=_DUMMY_AGENT_ID):
"""Returns the last info for the specified agent."""

return self._agent_to_last_info.get(agent_id)

def last_action_for(self, agent_id=_DUMMY_AGENT_ID):
"""Returns the last action for the specified agent, or zeros."""

Expand Down Expand Up @@ -137,6 +143,9 @@ def _set_rnn_state(self, agent_id, rnn_state):
def _set_last_observation(self, agent_id, obs):
self._agent_to_last_obs[agent_id] = obs

def _set_last_info(self, agent_id, info):
self._agent_to_last_info[agent_id] = info

def _set_last_action(self, agent_id, action):
self._agent_to_last_action[agent_id] = action

Expand Down
12 changes: 10 additions & 2 deletions python/ray/rllib/evaluation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,16 @@ def summarize_episodes(episodes, new_episodes, num_dropped):
for policy_id, rewards in policy_rewards.copy().items():
policy_rewards[policy_id] = np.mean(rewards)

for k, v_list in custom_metrics.items():
custom_metrics[k] = np.mean(v_list)
for k, v_list in custom_metrics.copy().items():
custom_metrics[k + "_mean"] = np.mean(v_list)
filt = [v for v in v_list if not np.isnan(v)]
if filt:
custom_metrics[k + "_min"] = np.min(filt)
custom_metrics[k + "_max"] = np.max(filt)
else:
custom_metrics[k + "_min"] = float("nan")
custom_metrics[k + "_max"] = float("nan")
del custom_metrics[k]

return dict(
episode_reward_max=max_reward,
Expand Down
58 changes: 32 additions & 26 deletions python/ray/rllib/evaluation/policy_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import tensorflow as tf

import ray
from ray.rllib.models import ModelCatalog
from ray.rllib.env.async_vector_env import AsyncVectorEnv
from ray.rllib.env.atari_wrappers import wrap_deepmind, is_atari
from ray.rllib.env.env_context import EnvContext
Expand All @@ -19,6 +18,8 @@
from ray.rllib.evaluation.sampler import AsyncSampler, SyncSampler
from ray.rllib.evaluation.policy_graph import PolicyGraph
from ray.rllib.evaluation.tf_policy_graph import TFPolicyGraph
from ray.rllib.models import ModelCatalog
from ray.rllib.models.preprocessors import NoPreprocessor
from ray.rllib.utils import merge_dicts
from ray.rllib.utils.compression import pack
from ray.rllib.utils.filter import get_filter
Expand Down Expand Up @@ -191,23 +192,21 @@ def __init__(self,
self.sample_batch_size = batch_steps * num_envs
self.batch_mode = batch_mode
self.compress_observations = compress_observations
self.preprocessing_enabled = True

self.env = env_creator(env_context)
if isinstance(self.env, MultiAgentEnv) or \
isinstance(self.env, AsyncVectorEnv):

if model_config.get("custom_preprocessor"):
raise ValueError(
"Custom preprocessors are not supported for env types "
"MultiAgentEnv and AsyncVectorEnv. Please preprocess "
"observations in your env directly.")

def wrap(env):
return env # we can't auto-wrap these env types
elif is_atari(self.env) and \
not model_config.get("custom_preprocessor") and \
preprocessor_pref == "deepmind":

# Deepmind wrappers already handle all preprocessing
self.preprocessing_enabled = False

if clip_rewards is None:
clip_rewards = True

Expand All @@ -222,8 +221,6 @@ def wrap(env):
else:

def wrap(env):
env = ModelCatalog.get_preprocessor_as_wrapper(
env, model_config)
if monitor_path:
env = _monitor(env, monitor_path)
return env
Expand All @@ -246,11 +243,11 @@ def make_env(vector_index):
config=tf.ConfigProto(
gpu_options=tf.GPUOptions(allow_growth=True)))
with self.tf_sess.as_default():
self.policy_map = self._build_policy_map(
policy_dict, policy_config)
self.policy_map, self.preprocessors = \
self._build_policy_map(policy_dict, policy_config)
else:
self.policy_map = self._build_policy_map(policy_dict,
policy_config)
self.policy_map, self.preprocessors = self._build_policy_map(
policy_dict, policy_config)

self.multiagent = set(self.policy_map.keys()) != {DEFAULT_POLICY_ID}
if self.multiagent:
Expand Down Expand Up @@ -286,6 +283,7 @@ def make_env(vector_index):
self.async_env,
self.policy_map,
policy_mapping_fn,
self.preprocessors,
self.filters,
clip_rewards,
unroll_length,
Expand All @@ -300,6 +298,7 @@ def make_env(vector_index):
self.async_env,
self.policy_map,
policy_mapping_fn,
self.preprocessors,
self.filters,
clip_rewards,
unroll_length,
Expand All @@ -314,24 +313,26 @@ def make_env(vector_index):

def _build_policy_map(self, policy_dict, policy_config):
policy_map = {}
preprocessors = {}
for name, (cls, obs_space, act_space,
conf) in sorted(policy_dict.items()):
merged_conf = merge_dicts(policy_config, conf)
if self.preprocessing_enabled:
preprocessor = ModelCatalog.get_preprocessor_for_space(
obs_space, merged_conf.get("model"))
preprocessors[name] = preprocessor
obs_space = preprocessor.observation_space
else:
preprocessors[name] = NoPreprocessor(obs_space)
if isinstance(obs_space, gym.spaces.Dict) or \
isinstance(obs_space, gym.spaces.Tuple):
raise ValueError(
"Found raw Tuple|Dict space as input to policy graph. "
"Please preprocess these observations with a "
"Tuple|DictFlatteningPreprocessor.")
with tf.variable_scope(name):
if isinstance(obs_space, gym.spaces.Dict):
raise ValueError(
"Found raw Dict space as input to policy graph. "
"Please preprocess your environment observations "
"with DictFlatteningPreprocessor and set the "
"obs space to `preprocessor.observation_space`.")
elif isinstance(obs_space, gym.spaces.Tuple):
raise ValueError(
"Found raw Tuple space as input to policy graph. "
"Please preprocess your environment observations "
"with TupleFlatteningPreprocessor and set the "
"obs space to `preprocessor.observation_space`.")
policy_map[name] = cls(obs_space, act_space, merged_conf)
return policy_map
return policy_map, preprocessors

def sample(self):
"""Evaluate the current policies and return a batch of experiences.
Expand Down Expand Up @@ -554,6 +555,11 @@ def _validate_and_canonicalize(policy_graph, env):
elif not issubclass(policy_graph, PolicyGraph):
raise ValueError("policy_graph must be a rllib.PolicyGraph class")
else:
if (isinstance(env, MultiAgentEnv)
and not hasattr(env, "observation_space")):
raise ValueError(
"MultiAgentEnv must have observation_space defined if run "
"in a single-agent configuration.")
return {
DEFAULT_POLICY_ID: (policy_graph, env.observation_space,
env.action_space, {})
Expand Down
5 changes: 5 additions & 0 deletions python/ray/rllib/evaluation/sample_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def __init__(self, policy_map, clip_rewards):
self.agent_to_policy = {}
self.count = 0 # increment this manually

def total(self):
"""Returns summed number of steps across all agent buffers."""

return sum(p.count for p in self.policy_builders.values())

def has_pending_data(self):
"""Returns whether there is pending unprocessed data."""

Expand Down
Loading

0 comments on commit 60f96eb

Please sign in to comment.