Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Replay buffers: Add config option to store contents in checkpoints. #17999

Merged
5 changes: 2 additions & 3 deletions rllib/agents/cql/cql.py
Expand Up @@ -162,9 +162,8 @@ def after_init(trainer):
np.concatenate([obs[1:], np.zeros_like(obs[0:1])])
batch[SampleBatch.DONES][-1] = True
replay_buffer.add_batch(batch)
print(
f"Loaded {num_batches} batches ({total_timesteps} ts) into the "
f"replay buffer, which has capacity {replay_buffer.buffer_size}.")
print(f"Loaded {num_batches} batches ({total_timesteps} ts) into the "
f"replay buffer, which has capacity {replay_buffer.capacity}.")
else:
raise ValueError(
"Unknown offline input! config['input'] must either be list of "
Expand Down
8 changes: 8 additions & 0 deletions rllib/agents/ddpg/ddpg.py
Expand Up @@ -91,6 +91,14 @@
# Size of the replay buffer. Note that if async_updates is set, then
# each worker will have a replay buffer of this size.
"buffer_size": 50000,
# Set this to True, if you want the contents of your buffer(s) to be
# stored in any saved checkpoints as well.
# Warnings will be created if:
# - This is True AND restoring from a checkpoint that contains no buffer
# data.
# - This is False AND restoring from a checkpoint that does contain
# buffer data.
"store_buffer_in_checkpoints": False,
# If True prioritized replay buffer will be used.
"prioritized_replay": True,
# Alpha parameter for prioritized replay buffer.
Expand Down
208 changes: 84 additions & 124 deletions rllib/agents/dqn/dqn.py
Expand Up @@ -14,8 +14,9 @@

from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.dqn.dqn_torch_policy import DQNTorchPolicy
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.agents.dqn.simple_q import SimpleQTrainer, \
DEFAULT_CONFIG as SIMPLEQ_DEFAULT_CONFIG
from ray.rllib.agents.trainer import Trainer
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.concurrency_ops import Concurrently
from ray.rllib.execution.metric_ops import StandardMetricsReporting
Expand All @@ -32,123 +33,71 @@

# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# === Model ===
# Number of atoms for representing the distribution of return. When
# this is greater than 1, distributional Q-learning is used.
# the discrete supports are bounded by v_min and v_max
"num_atoms": 1,
"v_min": -10.0,
"v_max": 10.0,
# Whether to use noisy network
"noisy": False,
# control the initial value of noisy nets
"sigma0": 0.5,
# Whether to use dueling dqn
"dueling": True,
# Dense-layer setup for each the advantage branch and the value branch
# in a dueling architecture.
"hiddens": [256],
# Whether to use double dqn
"double_q": True,
# N-step Q learning
"n_step": 1,

# === Exploration Settings ===
"exploration_config": {
# The Exploration class to use.
"type": "EpsilonGreedy",
# Config for the Exploration class' constructor:
"initial_epsilon": 1.0,
"final_epsilon": 0.02,
"epsilon_timesteps": 10000, # Timesteps over which to anneal epsilon.

# For soft_q, use:
# "exploration_config" = {
# "type": "SoftQ"
# "temperature": [float, e.g. 1.0]
# }
DEFAULT_CONFIG = Trainer.merge_trainer_configs(
SIMPLEQ_DEFAULT_CONFIG,
{
# === Model ===
# Number of atoms for representing the distribution of return. When
# this is greater than 1, distributional Q-learning is used.
# the discrete supports are bounded by v_min and v_max
"num_atoms": 1,
"v_min": -10.0,
"v_max": 10.0,
# Whether to use noisy network
"noisy": False,
# control the initial value of noisy nets
"sigma0": 0.5,
# Whether to use dueling dqn
"dueling": True,
# Dense-layer setup for each the advantage branch and the value branch
# in a dueling architecture.
"hiddens": [256],
# Whether to use double dqn
"double_q": True,
# N-step Q learning
"n_step": 1,

# === Prioritized replay buffer ===
# If True prioritized replay buffer will be used.
"prioritized_replay": True,
# Alpha parameter for prioritized replay buffer.
"prioritized_replay_alpha": 0.6,
# Beta parameter for sampling from prioritized replay buffer.
"prioritized_replay_beta": 0.4,
# Final value of beta (by default, we use constant beta=0.4).
"final_prioritized_replay_beta": 0.4,
# Time steps over which the beta parameter is annealed.
"prioritized_replay_beta_annealing_timesteps": 20000,
# Epsilon to add to the TD errors when updating priorities.
"prioritized_replay_eps": 1e-6,

# Callback to run before learning on a multi-agent batch of
# experiences.
"before_learn_on_batch": None,

# The intensity with which to update the model (vs collecting samples
# from the env). If None, uses the "natural" value of:
# `train_batch_size` / (`rollout_fragment_length` x `num_workers` x
# `num_envs_per_worker`).
# If provided, will make sure that the ratio between ts inserted into
# and sampled from the buffer matches the given value.
# Example:
# training_intensity=1000.0
# train_batch_size=250 rollout_fragment_length=1
# num_workers=1 (or 0) num_envs_per_worker=1
# -> natural value = 250 / 1 = 250.0
# -> will make sure that replay+train op will be executed 4x as
# often as rollout+insert op (4 * 250 = 1000).
# See: rllib/agents/dqn/dqn.py::calculate_rr_weights for further
# details.
"training_intensity": None,

# === Parallelism ===
# Whether to compute priorities on workers.
"worker_side_prioritization": False,
},
# Switch to greedy actions in evaluation workers.
"evaluation_config": {
"explore": False,
},

# Minimum env steps to optimize for per train call. This value does
# not affect learning, only the length of iterations.
"timesteps_per_iteration": 1000,
# Update the target network every `target_network_update_freq` steps.
"target_network_update_freq": 500,
# === Replay buffer ===
# Size of the replay buffer. Note that if async_updates is set, then
# each worker will have a replay buffer of this size.
"buffer_size": 50000,
# The number of contiguous environment steps to replay at once. This may
# be set to greater than 1 to support recurrent models.
"replay_sequence_length": 1,
# If True prioritized replay buffer will be used.
"prioritized_replay": True,
# Alpha parameter for prioritized replay buffer.
"prioritized_replay_alpha": 0.6,
# Beta parameter for sampling from prioritized replay buffer.
"prioritized_replay_beta": 0.4,
# Final value of beta (by default, we use constant beta=0.4).
"final_prioritized_replay_beta": 0.4,
# Time steps over which the beta parameter is annealed.
"prioritized_replay_beta_annealing_timesteps": 20000,
# Epsilon to add to the TD errors when updating priorities.
"prioritized_replay_eps": 1e-6,

# Whether to LZ4 compress observations
"compress_observations": False,
# Callback to run before learning on a multi-agent batch of experiences.
"before_learn_on_batch": None,

# The intensity with which to update the model (vs collecting samples from
# the env). If None, uses the "natural" value of:
# `train_batch_size` / (`rollout_fragment_length` x `num_workers` x
# `num_envs_per_worker`).
# If provided, will make sure that the ratio between ts inserted into and
# sampled from the buffer matches the given value.
# Example:
# training_intensity=1000.0
# train_batch_size=250 rollout_fragment_length=1
# num_workers=1 (or 0) num_envs_per_worker=1
# -> natural value = 250 / 1 = 250.0
# -> will make sure that replay+train op will be executed 4x as
# often as rollout+insert op (4 * 250 = 1000).
# See: rllib/agents/dqn/dqn.py::calculate_rr_weights for further details.
"training_intensity": None,

# === Optimization ===
# Learning rate for adam optimizer
"lr": 5e-4,
# Learning rate schedule
"lr_schedule": None,
# Adam epsilon hyper parameter
"adam_epsilon": 1e-8,
# If not None, clip gradients during optimization at this value
"grad_clip": 40,
# How many steps of the model to sample before learning starts.
"learning_starts": 1000,
# Update the replay buffer with this many samples at once. Note that
# this setting applies per-worker if num_workers > 1.
"rollout_fragment_length": 4,
# Size of a batch sampled from replay buffer for training. Note that
# if async_updates is set, then each worker returns gradients for a
# batch of this size.
"train_batch_size": 32,

# === Parallelism ===
# Number of workers for collecting samples with. This only makes sense
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Whether to compute priorities on workers.
"worker_side_prioritization": False,
# Prevent iterations from going lower than this time span
"min_iter_time_s": 1,
})
_allow_unknown_configs=True,
)
# __sphinx_doc_end__
# yapf: enable

Expand Down Expand Up @@ -195,11 +144,12 @@ def validate_config(config: TrainerConfigDict) -> None:
"simple_optimizer=True if this doesn't work for you.")


def execution_plan(workers: WorkerSet,
config: TrainerConfigDict) -> LocalIterator[dict]:
def execution_plan(trainer: Trainer, workers: WorkerSet,
config: TrainerConfigDict, **kwargs) -> LocalIterator[dict]:
"""Execution plan of the DQN algorithm. Defines the distributed dataflow.

Args:
trainer (Trainer): The Trainer object creating the execution plan.
workers (WorkerSet): The WorkerSet for training the Polic(y/ies)
of the Trainer.
config (TrainerConfigDict): The trainer's configuration dict.
Expand All @@ -226,6 +176,9 @@ def execution_plan(workers: WorkerSet,
replay_burn_in=config.get("burn_in", 0),
replay_zero_init_states=config.get("zero_init_states", True),
**prio_args)
# Assign to Trainer, so we can store the LocalReplayBuffer's
# data when we save checkpoints.
trainer.local_replay_buffer = local_replay_buffer

rollouts = ParallelRollouts(workers, mode="bulk_sync")

Expand Down Expand Up @@ -323,15 +276,22 @@ def get_policy_class(config: TrainerConfigDict) -> Optional[Type[Policy]]:

# Build a generic off-policy trainer. Other trainers (such as DDPGTrainer)
# may build on top of it.
GenericOffPolicyTrainer = build_trainer(
name="GenericOffPolicyAlgorithm",
GenericOffPolicyTrainer = SimpleQTrainer.with_updates(
name="GenericOffPolicyTrainer",
# No Policy preference.
default_policy=None,
get_policy_class=get_policy_class,
get_policy_class=None,
# Use DQN's config and exec. plan as base for
# all other OffPolicy algos.
default_config=DEFAULT_CONFIG,
validate_config=validate_config,
execution_plan=execution_plan)

# Build a DQN trainer, which uses the framework specific Policy
# determined in `get_policy_class()` above.
DQNTrainer = GenericOffPolicyTrainer.with_updates(
name="DQN", default_policy=DQNTFPolicy, default_config=DEFAULT_CONFIG)
name="DQN",
default_policy=DQNTFPolicy,
get_policy_class=get_policy_class,
default_config=DEFAULT_CONFIG,
)
4 changes: 1 addition & 3 deletions rllib/agents/dqn/r2d2.py
Expand Up @@ -16,7 +16,6 @@
from typing import List, Optional, Type

from ray.rllib.agents import dqn
from ray.rllib.agents.dqn.dqn import execution_plan
from ray.rllib.agents.dqn.r2d2_tf_policy import R2D2TFPolicy
from ray.rllib.agents.dqn.r2d2_torch_policy import R2D2TorchPolicy
from ray.rllib.policy.policy import Policy
Expand Down Expand Up @@ -145,8 +144,7 @@ def get_policy_class(config: TrainerConfigDict) -> Optional[Type[Policy]]:
R2D2Trainer = dqn.DQNTrainer.with_updates(
name="R2D2",
default_policy=R2D2TFPolicy,
get_policy_class=get_policy_class,
default_config=DEFAULT_CONFIG,
validate_config=validate_config,
get_policy_class=get_policy_class,
execution_plan=execution_plan,
)
34 changes: 24 additions & 10 deletions rllib/agents/dqn/simple_q.py
Expand Up @@ -12,10 +12,10 @@
import logging
from typing import Optional, Type

from ray.rllib.agents.dqn.dqn import DQNTrainer
from ray.rllib.agents.dqn.simple_q_tf_policy import SimpleQTFPolicy
from ray.rllib.agents.dqn.simple_q_torch_policy import SimpleQTorchPolicy
from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.concurrency_ops import Concurrently
from ray.rllib.execution.metric_ops import StandardMetricsReporting
Expand All @@ -33,7 +33,7 @@
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# === Exploration Settings (Experimental) ===
# === Exploration Settings ===
"exploration_config": {
# The Exploration class to use.
"type": "EpsilonGreedy",
Expand Down Expand Up @@ -63,11 +63,17 @@
# Size of the replay buffer. Note that if async_updates is set, then
# each worker will have a replay buffer of this size.
"buffer_size": 50000,
# Set this to True, if you want the contents of your buffer(s) to be
# stored in any saved checkpoints as well.
# Warnings will be created if:
# - This is True AND restoring from a checkpoint that contains no buffer
# data.
# - This is False AND restoring from a checkpoint that does contain
# buffer data.
"store_buffer_in_checkpoints": False,
# The number of contiguous environment steps to replay at once. This may
# be set to greater than 1 to support recurrent models.
"replay_sequence_length": 1,
# Whether to LZ4 compress observations
"compress_observations": False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check, are you sure you want to get rid of the default value from the base default config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the default value of this key in trainer.py is already False, so no need to duplicate it here. We don't do this duplication for other keys either.


# === Optimization ===
# Learning rate for adam optimizer
Expand All @@ -93,7 +99,7 @@
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Prevent iterations from going lower than this time span
# Prevent iterations from going lower than this time span.
"min_iter_time_s": 1,
})
# __sphinx_doc_end__
Expand All @@ -114,11 +120,12 @@ def get_policy_class(config: TrainerConfigDict) -> Optional[Type[Policy]]:
return SimpleQTorchPolicy


def execution_plan(workers: WorkerSet,
config: TrainerConfigDict) -> LocalIterator[dict]:
def execution_plan(trainer: Trainer, workers: WorkerSet,
config: TrainerConfigDict, **kwargs) -> LocalIterator[dict]:
"""Execution plan of the Simple Q algorithm. Defines the distributed dataflow.

Args:
trainer (Trainer): The Trainer object creating the execution plan.
workers (WorkerSet): The WorkerSet for training the Polic(y/ies)
of the Trainer.
config (TrainerConfigDict): The trainer's configuration dict.
Expand All @@ -133,6 +140,9 @@ def execution_plan(workers: WorkerSet,
replay_batch_size=config["train_batch_size"],
replay_mode=config["multiagent"]["replay_mode"],
replay_sequence_length=config["replay_sequence_length"])
# Assign to Trainer, so we can store the LocalReplayBuffer's
# data when we save checkpoints.
trainer.local_replay_buffer = local_replay_buffer

rollouts = ParallelRollouts(workers, mode="bulk_sync")

Expand Down Expand Up @@ -165,8 +175,12 @@ def execution_plan(workers: WorkerSet,
return StandardMetricsReporting(train_op, workers, config)


SimpleQTrainer = DQNTrainer.with_updates(
# Build a child class of `Trainer`, which uses the framework specific Policy
# determined in `get_policy_class()` above.
SimpleQTrainer = build_trainer(
name="SimpleQTrainer",
default_policy=SimpleQTFPolicy,
get_policy_class=get_policy_class,
execution_plan=execution_plan,
default_config=DEFAULT_CONFIG)
default_config=DEFAULT_CONFIG,
)