Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Preparatory PR for multi-agent multi-GPU learner (alpha-star style) #03 #21652

Merged
2 changes: 1 addition & 1 deletion rllib/agents/a3c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
A3C_CONFIG,
{
"rollout_fragment_length": 20,
"min_iter_time_s": 10,
"min_time_s_per_reporting": 10,
"sample_async": False,

# A2C supports microbatching, in which we accumulate gradients over
Expand Down
4 changes: 2 additions & 2 deletions rllib/agents/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
"entropy_coeff": 0.01,
# Entropy coefficient schedule
"entropy_coeff_schedule": None,
# Min time per iteration
"min_iter_time_s": 5,
# Min time per reporting
"min_time_s_per_reporting": 5,
# Workers sample async. Note that this increases the effective
# rollout_fragment_length by up to 5x due to async buffering of batches.
"sample_async": True,
Expand Down
4 changes: 2 additions & 2 deletions rllib/agents/a3c/tests/test_a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_a2c_compilation(self):
trainer.stop()

def test_a2c_exec_impl(ray_start_regular):
config = {"min_iter_time_s": 0}
config = {"min_time_s_per_reporting": 0}
for _ in framework_iterator(config):
trainer = a3c.A2CTrainer(env="CartPole-v0", config=config)
results = trainer.train()
Expand All @@ -46,7 +46,7 @@ def test_a2c_exec_impl(ray_start_regular):

def test_a2c_exec_impl_microbatch(ray_start_regular):
config = {
"min_iter_time_s": 0,
"min_time_s_per_reporting": 0,
"microbatch_size": 10,
}
for _ in framework_iterator(config):
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/a3c/tests/test_a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_a3c_entropy_coeff_schedule(self):
config["timesteps_per_iteration"] = 20
# 0 metrics reporting delay, this makes sure timestep,
# which entropy coeff depends on, is updated after each worker rollout.
config["min_iter_time_s"] = 0
config["min_time_s_per_reporting"] = 0
# Initial lr, doesn't really matter because of the schedule below.
config["entropy_coeff"] = 0.01
schedule = [
Expand Down
42 changes: 28 additions & 14 deletions rllib/agents/ars/ars.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,30 +228,44 @@ def validate_config(self, config: TrainerConfigDict) -> None:
"`NoFilter` for ARS!")

@override(Trainer)
def _init(self, config, env_creator):
self.validate_config(config)
env_context = EnvContext(config["env_config"] or {}, worker_index=0)
env = env_creator(env_context)
def setup(self, config):
# Setup our config: Merge the user-supplied config (which could
# be a partial config dict with the class' default).
self.config = self.merge_trainer_configs(
self.get_default_config(), config, self._allow_unknown_configs)

self._policy_class = get_policy_class(config)
# Validate our config dict.
self.validate_config(self.config)

# Generate `self.env_creator` callable to create an env instance.
self.env_creator = self._get_env_creator_from_env_id(self._env_id)
# Generate the local env.
env_context = EnvContext(
self.config["env_config"] or {}, worker_index=0)
env = self.env_creator(env_context)

self.callbacks = self.config["callbacks"]()

self._policy_class = get_policy_class(self.config)
self.policy = self._policy_class(env.observation_space,
env.action_space, config)
self.optimizer = optimizers.SGD(self.policy, config["sgd_stepsize"])
env.action_space, self.config)
self.optimizer = optimizers.SGD(self.policy,
self.config["sgd_stepsize"])

self.rollouts_used = config["rollouts_used"]
self.num_rollouts = config["num_rollouts"]
self.report_length = config["report_length"]
self.rollouts_used = self.config["rollouts_used"]
self.num_rollouts = self.config["num_rollouts"]
self.report_length = self.config["report_length"]

# Create the shared noise table.
logger.info("Creating shared noise table.")
noise_id = create_shared_noise.remote(config["noise_size"])
noise_id = create_shared_noise.remote(self.config["noise_size"])
self.noise = SharedNoiseTable(ray.get(noise_id))

# Create the actors.
logger.info("Creating actors.")
self.workers = [
Worker.remote(config, env_creator, noise_id, idx + 1)
for idx in range(config["num_workers"])
Worker.remote(self.config, self.env_creator, noise_id, idx + 1)
for idx in range(self.config["num_workers"])
]

self.episodes_so_far = 0
Expand Down Expand Up @@ -375,7 +389,7 @@ def compute_single_action(self, observation, *args, **kwargs):
return action[0], [], {}
return action[0]

@Deprecated(new="compute_single_action", error=False)
@Deprecated(new="compute_single_action", error=True)
def compute_action(self, observation, *args, **kwargs):
return self.compute_single_action(observation, *args, **kwargs)

Expand Down
3 changes: 2 additions & 1 deletion rllib/agents/ars/tests/test_ars.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def test_ars_compilation(self):
config["model"]["fcnet_hiddens"] = [10]
config["model"]["fcnet_activation"] = None
config["noise_size"] = 2500000
# Test eval workers ("normal" Trainer eval WorkerSet, unusual for ARS).
# Test eval workers ("normal" WorkerSet, unlike ARS' list of
# RolloutWorkers used for collecting train batches).
config["evaluation_interval"] = 1
config["evaluation_num_workers"] = 1

Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/ddpg/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"target_network_update_freq": 500000,
"timesteps_per_iteration": 25000,
"worker_side_prioritization": True,
"min_iter_time_s": 30,
"min_time_s_per_reporting": 30,
},
_allow_unknown_configs=True,
)
Expand Down
4 changes: 2 additions & 2 deletions rllib/agents/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@
"num_workers": 0,
# Whether to compute priorities on workers.
"worker_side_prioritization": False,
# Prevent iterations from going lower than this time span
"min_iter_time_s": 1,
# Prevent reporting frequency from going lower than this time span.
"min_time_s_per_reporting": 1,
})
# __sphinx_doc_end__
# yapf: enable
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/ddpg/tests/test_apex_ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def test_apex_ddpg_compilation_and_per_worker_epsilon_values(self):
config["num_workers"] = 2
config["prioritized_replay"] = True
config["timesteps_per_iteration"] = 100
config["min_iter_time_s"] = 1
config["min_time_s_per_reporting"] = 1
config["learning_starts"] = 0
config["optimizer"]["num_replay_buffer_shards"] = 1
num_iterations = 1
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/ddpg/tests/test_ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_ddpg_loss_function(self):
config["actor_hiddens"] = [10]
config["critic_hiddens"] = [10]
# Make sure, timing differences do not affect trainer.train().
config["min_iter_time_s"] = 0
config["min_time_s_per_reporting"] = 0
config["timesteps_per_iteration"] = 100

map_ = {
Expand Down
2 changes: 1 addition & 1 deletion rllib/agents/dqn/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
"timesteps_per_iteration": 25000,
"exploration_config": {"type": "PerWorkerEpsilonGreedy"},
"worker_side_prioritization": True,
"min_iter_time_s": 30,
"min_time_s_per_reporting": 30,
# If set, this will fix the ratio of replayed from a buffer and learned
# on timesteps to sampled from an environment and stored in the replay
# buffer timesteps. Otherwise, replay will proceed as fast as possible.
Expand Down
4 changes: 2 additions & 2 deletions rllib/agents/dqn/simple_q.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
# to increase if your environment is particularly slow to sample, or if
# you"re using the Async or Ape-X optimizers.
"num_workers": 0,
# Prevent iterations from going lower than this time span.
"min_iter_time_s": 1,
# Prevent reporting frequency from going lower than this time span.
"min_time_s_per_reporting": 1,
})
# __sphinx_doc_end__
# yapf: enable
Expand Down
6 changes: 3 additions & 3 deletions rllib/agents/dqn/tests/test_apex_dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_apex_zero_workers(self):
config["learning_starts"] = 1000
config["prioritized_replay"] = True
config["timesteps_per_iteration"] = 100
config["min_iter_time_s"] = 1
config["min_time_s_per_reporting"] = 1
config["optimizer"]["num_replay_buffer_shards"] = 1
for _ in framework_iterator(config):
trainer = apex.ApexTrainer(config=config, env="CartPole-v0")
Expand All @@ -41,7 +41,7 @@ def test_apex_dqn_compilation_and_per_worker_epsilon_values(self):
config["learning_starts"] = 1000
config["prioritized_replay"] = True
config["timesteps_per_iteration"] = 100
config["min_iter_time_s"] = 1
config["min_time_s_per_reporting"] = 1
config["optimizer"]["num_replay_buffer_shards"] = 1

for _ in framework_iterator(config, with_eager_tracing=True):
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_apex_lr_schedule(self):
config["timesteps_per_iteration"] = 10
# 0 metrics reporting delay, this makes sure timestep,
# which lr depends on, is updated after each worker rollout.
config["min_iter_time_s"] = 0
config["min_time_s_per_reporting"] = 0
config["optimizer"]["num_replay_buffer_shards"] = 1
# This makes sure learning schedule is checked every 10 timesteps.
config["optimizer"]["max_weight_sync_delay"] = 10
Expand Down
46 changes: 30 additions & 16 deletions rllib/agents/es/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,28 +228,42 @@ def validate_config(self, config: TrainerConfigDict) -> None:
"`NoFilter` for ES!")

@override(Trainer)
def _init(self, config, env_creator):
self.validate_config(config)
env_context = EnvContext(config["env_config"] or {}, worker_index=0)
env = env_creator(env_context)
self._policy_class = get_policy_class(config)
def setup(self, config):
# Setup our config: Merge the user-supplied config (which could
# be a partial config dict with the class' default).
self.config = self.merge_trainer_configs(
self.get_default_config(), config, self._allow_unknown_configs)

# Call super's validation method.
self.validate_config(self.config)

# Generate `self.env_creator` callable to create an env instance.
self.env_creator = self._get_env_creator_from_env_id(self._env_id)
# Generate the local env.
env_context = EnvContext(
self.config["env_config"] or {}, worker_index=0)
env = self.env_creator(env_context)

self.callbacks = self.config["callbacks"]()

self._policy_class = get_policy_class(self.config)
self.policy = self._policy_class(
obs_space=env.observation_space,
action_space=env.action_space,
config=config)
self.optimizer = optimizers.Adam(self.policy, config["stepsize"])
self.report_length = config["report_length"]
config=self.config)
self.optimizer = optimizers.Adam(self.policy, self.config["stepsize"])
self.report_length = self.config["report_length"]

# Create the shared noise table.
logger.info("Creating shared noise table.")
noise_id = create_shared_noise.remote(config["noise_size"])
noise_id = create_shared_noise.remote(self.config["noise_size"])
self.noise = SharedNoiseTable(ray.get(noise_id))

# Create the actors.
logger.info("Creating actors.")
self._workers = [
Worker.remote(config, {}, env_creator, noise_id, idx + 1)
for idx in range(config["num_workers"])
self.workers = [
Worker.remote(self.config, {}, self.env_creator, noise_id, idx + 1)
for idx in range(self.config["num_workers"])
]

self.episodes_so_far = 0
Expand Down Expand Up @@ -333,7 +347,7 @@ def step_attempt(self):
# Now sync the filters
FilterManager.synchronize({
DEFAULT_POLICY_ID: self.policy.observation_filter
}, self._workers)
}, self.workers)

info = {
"weights_norm": np.square(theta).sum(),
Expand Down Expand Up @@ -375,7 +389,7 @@ def _sync_weights_to_workers(self, *, worker_set=None, workers=None):
@override(Trainer)
def cleanup(self):
# workaround for https://github.com/ray-project/ray/issues/1516
for w in self._workers:
for w in self.workers:
w.__ray_terminate__.remote()

def _collect_results(self, theta_id, min_episodes, min_timesteps):
Expand All @@ -386,7 +400,7 @@ def _collect_results(self, theta_id, min_episodes, min_timesteps):
"Collected {} episodes {} timesteps so far this iter".format(
num_episodes, num_timesteps))
rollout_ids = [
worker.do_rollouts.remote(theta_id) for worker in self._workers
worker.do_rollouts.remote(theta_id) for worker in self.workers
]
# Get the results of the rollouts.
for result in ray.get(rollout_ids):
Expand All @@ -413,4 +427,4 @@ def __setstate__(self, state):
self.policy.observation_filter = state["filter"]
FilterManager.synchronize({
DEFAULT_POLICY_ID: self.policy.observation_filter
}, self._workers)
}, self.workers)
2 changes: 1 addition & 1 deletion rllib/agents/impala/impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
#
"rollout_fragment_length": 50,
"train_batch_size": 500,
"min_iter_time_s": 10,
"min_time_s_per_reporting": 10,
"num_workers": 2,
# Number of GPUs the learner should use.
"num_gpus": 1,
Expand Down
21 changes: 15 additions & 6 deletions rllib/agents/impala/tests/test_impala.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ def test_impala_lr_schedule(self):
config = impala.DEFAULT_CONFIG.copy()
config["num_gpus"] = 0
# Test whether we correctly ignore the "lr" setting.
# The first lr should be 0.0005.
# The first lr should be 0.05.
config["lr"] = 0.1
config["lr_schedule"] = [
[0, 0.0005],
[0, 0.05],
[10000, 0.000001],
]
config["num_gpus"] = 0 # Do not use any (fake) GPUs.
Expand All @@ -69,18 +69,27 @@ def get_lr(result):
return result["info"][LEARNER_INFO][DEFAULT_POLICY_ID][
LEARNER_STATS_KEY]["cur_lr"]

for fw in framework_iterator(config, frameworks=("tf", "torch")):
for fw in framework_iterator(config):
trainer = impala.ImpalaTrainer(config=config)
policy = trainer.get_policy()

try:
if fw == "tf":
check(policy.get_session().run(policy.cur_lr), 0.0005)
check(policy.get_session().run(policy.cur_lr), 0.05)
else:
check(policy.cur_lr, 0.0005)
check(policy.cur_lr, 0.05)
r1 = trainer.train()
r2 = trainer.train()
assert get_lr(r2) < get_lr(r1), (r1, r2)
r3 = trainer.train()
# Due to the asynch'ness of IMPALA, learner-stats metrics
# could be delayed by one iteration. Do 3 train() calls here
# and measure guaranteed decrease in lr between 1st and 3rd.
lr1 = get_lr(r1)
lr2 = get_lr(r2)
lr3 = get_lr(r3)
assert lr2 <= lr1, (lr1, lr2)
assert lr3 <= lr2, (lr2, lr3)
assert lr3 < lr1, (lr1, lr3)
finally:
trainer.stop()

Expand Down
18 changes: 17 additions & 1 deletion rllib/agents/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,22 @@ def get_default_config(cls) -> TrainerConfigDict:
def default_resource_request(cls, config):
return None

def _init(self, config, env_creator):
@override(Trainer)
def setup(self, config):
# Setup our config: Merge the user-supplied config (which could
# be a partial config dict with the class' default).
self.config = self.merge_trainer_configs(
self.get_default_config(), config, self._allow_unknown_configs)
self.config["env"] = self._env_id

self.validate_config(self.config)
self.callbacks = self.config["callbacks"]()

# Add needed properties.
self.info = None
self.restored = False

@override(Trainer)
def step(self):
if self.config["mock_error"] and self.iteration == 1 \
and (self.config["persistent_error"] or not self.restored):
Expand All @@ -45,19 +57,23 @@ def step(self):
result.update({tune_result.SHOULD_CHECKPOINT: True})
return result

@override(Trainer)
def save_checkpoint(self, checkpoint_dir):
path = os.path.join(checkpoint_dir, "mock_agent.pkl")
with open(path, "wb") as f:
pickle.dump(self.info, f)
return path

@override(Trainer)
def load_checkpoint(self, checkpoint_path):
with open(checkpoint_path, "rb") as f:
info = pickle.load(f)
self.info = info
self.restored = True

@override(Trainer)
def _register_if_needed(self, env_object, config):
# No env to register.
pass

def set_info(self, info):
Expand Down