Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] PPO runs with EnvRunner w/o old Policy API (also solves KL issues with PPORLModules). #39732

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2762b77
Added PPOEnvRunner and added implementations to PPO.
simonsays1980 Sep 14, 2023
041d4c2
Run linter.
simonsays1980 Sep 14, 2023
0d0f0dc
Run linter.
simonsays1980 Sep 14, 2023
1192243
Merge branch 'master' into solve-kl-issues-with-ppo-rl-module
simonsays1980 Sep 15, 2023
dd679fe
Added EnvRunner to PPO. Added foreach_module to MARL and modified set…
simonsays1980 Sep 15, 2023
c0bb25d
Had to run linter.
simonsays1980 Sep 15, 2023
0a67532
Moved 'foreach_module' to utils.rl_module.py'.
simonsays1980 Sep 18, 2023
8fd93f9
Merge branch 'master' into solve-kl-issues-with-ppo-rl-module
simonsays1980 Sep 18, 2023
d5875b1
EnvRunner needed some modifications as well as PPO. Sampling works no…
simonsays1980 Sep 18, 2023
77b0036
Running linter.
simonsays1980 Sep 18, 2023
4066431
Merge branch 'master' into solve-kl-issues-with-ppo-rl-module
simonsays1980 Sep 20, 2023
8de01f9
Added 'info' to '_Episode'. Wrapped vector env into VecotListInfo to …
simonsays1980 Sep 21, 2023
5a9c823
Added extra_model_outputs to the _Episode. Started to create postproc…
simonsays1980 Sep 21, 2023
346cbcd
Implemented bootstrapping for GAE on episodes. Run linter.
simonsays1980 Sep 22, 2023
442f9cd
Added advantage computation to EnvRunner and added tqdm to EnvRunner.…
simonsays1980 Sep 22, 2023
0924207
Added advantage computation to EnvRunner and added tqdm to EnvRunner.…
simonsays1980 Sep 22, 2023
4e8fb06
Implemented changes from @sven1977 review. Moved 'foreach_module()' t…
simonsays1980 Sep 27, 2023
ecc516d
Ran linter.
simonsays1980 Sep 27, 2023
ff35676
Debugging.
simonsays1980 Sep 29, 2023
be067e7
Debugging.
simonsays1980 Sep 29, 2023
cdc3d2e
Merge branch 'master' of https://github.com/ray-project/ray into solv…
sven1977 Sep 29, 2023
d451a83
wip
sven1977 Sep 29, 2023
9a8c5d7
Added evaluation for EnvRunner. This is just a draft.
simonsays1980 Sep 29, 2023
6386860
Removed errors with render_images and checking for env_runner_cls. Fu…
simonsays1980 Sep 30, 2023
09e384b
Worked on tuned_examples and the evaluate() method needed for the exa…
simonsays1980 Oct 2, 2023
a269cf1
Ran linter.
simonsays1980 Oct 2, 2023
59b48ed
Refactored PPO.evaluate() to make it better readable, more efficient,…
simonsays1980 Oct 3, 2023
266ea1f
Added two durther sampling forms to 'PPO.evaluate()' method to make s…
simonsays1980 Oct 3, 2023
4ac7195
Added SingleAgentEnvRunner to 'env_runner.py' and fixed little typing…
simonsays1980 Oct 4, 2023
992ebbe
Ran linter.
simonsays1980 Oct 4, 2023
ab30fd2
Included SingleAgentRLModuleSpec into SingleAgentEnvRunner and change…
simonsays1980 Oct 5, 2023
61f7eaf
wip
sven1977 Oct 6, 2023
e3bf8f0
Implemented review from @sven1977. Furthermore moved postprocessing t…
simonsays1980 Oct 6, 2023
2120e75
Solved conflicts and removed unused imports.
simonsays1980 Oct 6, 2023
db0b93d
Ran linter.
simonsays1980 Oct 6, 2023
6c016a2
Fixed minor bugs.
simonsays1980 Oct 9, 2023
d8de7f6
Added async evaluation.
simonsays1980 Oct 10, 2023
149c5ea
Removed initialized episodes from sampling output to fix 'vf_preds' k…
simonsays1980 Oct 11, 2023
6eef936
Changed import to original one in postprocessing.
simonsays1980 Oct 11, 2023
d6f1b58
wip
sven1977 Oct 12, 2023
5deaa74
Merge branch 'solve-kl-issues-with-ppo-rl-module' of https://github.c…
sven1977 Oct 12, 2023
8a3621c
Debugging PPO with EnvRunner.
simonsays1980 Oct 23, 2023
82f9152
Debugging PPO with EnvRunner. Added test environments.
simonsays1980 Oct 24, 2023
4bdf1a8
wip
sven1977 Oct 24, 2023
e12b356
Merge branch 'master' of https://github.com/ray-project/ray into solv…
sven1977 Oct 24, 2023
f379269
wip
sven1977 Oct 24, 2023
dffc9fb
wip
sven1977 Oct 24, 2023
c124d98
LINT
sven1977 Oct 24, 2023
8d8119d
wip
sven1977 Oct 24, 2023
fac1815
wip
sven1977 Oct 24, 2023
e011776
wip
sven1977 Oct 24, 2023
eba5667
wip
sven1977 Oct 24, 2023
069979c
wip
sven1977 Oct 24, 2023
fc3d387
wip
sven1977 Oct 24, 2023
aa469de
wip
sven1977 Oct 24, 2023
6c49aca
Merge branch 'master' of https://github.com/ray-project/ray into solv…
sven1977 Oct 24, 2023
e5b01a3
wip
sven1977 Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -671,40 +671,53 @@ py_test(
)

# PPO
# w/ new EnvRunner
py_test(
name = "learning_tests_cartpole_truncated_ppo",
name = "learning_tests_cartpole_ppo_w_env_runner",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"],
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "no_tf_static_graph"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/cartpole-truncated-ppo.py"],
data = ["tuned_examples/ppo/cartpole-ppo-envrunner.py"],
args = ["--dir=tuned_examples/ppo"]
)

# TODO (sven): Remove the torch only flag for this test (tf2 is still very slow for EnvRunner and we need to debug this further).
py_test(
name = "learning_tests_pendulum_ppo",
name = "learning_tests_pendulum_ppo_w_env_runner",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous"],
size = "large", # bazel may complain about it being too long sometimes - large is on purpose as some frameworks take longer
tags = ["torch_only", "team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/pendulum-ppo.yaml"],
data = ["tuned_examples/ppo/pendulum-ppo-envrunner.py"],
args = ["--dir=tuned_examples/ppo"]
)

# w/o EnvRunner (using RolloutWorker)
py_test(
name = "learning_tests_cartpole_truncated_ppo",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete", "no_tf_static_graph"],
size = "large",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/cartpole-truncated-ppo.py"],
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "learning_tests_pendulum_ppo_with_rl_module",
name = "learning_tests_pendulum_ppo",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous", "no_tf_static_graph"],
size = "large", # bazel may complain about it being too long sometimes - large is on purpose as some frameworks take longer
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/pendulum-ppo-with-rl-module.yaml"],
data = ["tuned_examples/ppo/pendulum-ppo.yaml"],
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "learning_tests_multi_agent_pendulum_ppo",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous"],
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous", "no_tf_static_graph"],
size = "large", # bazel may complain about it being too long sometimes - large is on purpose as some frameworks take longer
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/multi_agent_pendulum_ppo.py"],
Expand All @@ -714,15 +727,15 @@ py_test(
py_test(
name = "learning_tests_transformed_actions_pendulum_ppo",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous"],
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_pendulum", "learning_tests_continuous", "no_tf_static_graph"],
size = "large", # bazel may complain about it being too long sometimes - large is on purpose as some frameworks take longer
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/pendulum-transformed-actions-ppo.yaml"],
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "learning_tests_repeat_after_me_ppo",
name = "learning_tests_repeat_after_me_ppo_wo_rl_module",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_discrete"],
size = "medium",
Expand All @@ -731,16 +744,6 @@ py_test(
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "learning_tests_repeat_after_me_ppo_with_rl_module",
main = "tests/run_regression_tests.py",
tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_discrete", "torch_only"],
size = "medium",
srcs = ["tests/run_regression_tests.py"],
data = ["tuned_examples/ppo/repeatafterme-ppo-lstm-with-rl-module.yaml"],
args = ["--dir=tuned_examples/ppo"]
)

py_test(
name = "learning_tests_cartpole_ppo_fake_gpus",
main = "tests/run_regression_tests.py",
Expand Down Expand Up @@ -1232,6 +1235,13 @@ py_test(
)

# PPO
py_test(
name = "test_ppo_with_env_runner",
tags = ["team:rllib", "algorithms_dir"],
size = "medium",
srcs = ["algorithms/ppo/tests/test_ppo_with_env_runner.py"]
)

py_test(
name = "test_ppo",
tags = ["team:rllib", "algorithms_dir"],
Expand Down
208 changes: 199 additions & 9 deletions rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@
from ray.actor import ActorHandle
from ray.train import Checkpoint
import ray.cloudpickle as pickle

from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.registry import ALGORITHMS_CLASS_TO_NAME as ALL_ALGORITHMS
from ray.rllib.connectors.agent.obs_preproc import ObsPreprocessorConnector
from ray.rllib.core.rl_module.rl_module import SingleAgentRLModuleSpec
from ray.rllib.env.env_context import EnvContext
from ray.rllib.env.env_runner import EnvRunner
from ray.rllib.env.utils import _gym_env_creator
from ray.rllib.evaluation.episode import Episode
from ray.rllib.evaluation.metrics import (
collect_episodes,
collect_metrics,
summarize_episodes,
)
from ray.rllib.evaluation.postprocessing_v2 import postprocess_episodes_to_sample_batch
from ray.rllib.evaluation.rollout_worker import RolloutWorker
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.common import (
Expand Down Expand Up @@ -590,7 +591,7 @@ def setup(self, config: AlgorithmConfig) -> None:

# Create a dict, mapping ActorHandles to sets of open remote
# requests (object refs). This way, we keep track, of which actors
# inside this Algorithm (e.g. a remote RolloutWorker) have
# inside this Algorithm (e.g. a remote EnvRunner) have
# already been sent how many (e.g. `sample()`) requests.
self.remote_requests_in_flight: DefaultDict[
ActorHandle, Set[ray.ObjectRef]
Expand Down Expand Up @@ -755,7 +756,7 @@ def setup(self, config: AlgorithmConfig) -> None:
spaces=getattr(local_worker, "spaces", None),
)
# TODO (Sven): Unify the inference of the MARLModuleSpec. Right now,
# we get this from the RolloutWorker's `marl_module_spec` property.
# we get this from the EnvRunner's `marl_module_spec` property.
# However, this is hacky (information leak) and should not remain this
# way. For other EnvRunner classes (that don't have this property),
# Algorithm should infer this itself.
Expand Down Expand Up @@ -1168,9 +1169,9 @@ def _evaluate_async(
"""Evaluates current policy under `evaluation_config` settings.

Uses the AsyncParallelRequests manager to send frequent `sample.remote()`
requests to the evaluation RolloutWorkers and collect the results of these
requests to the evaluation EnvRunners and collect the results of these
calls. Handles worker failures (or slowdowns) gracefully due to the asynch'ness
and the fact that other eval RolloutWorkers can thus cover the workload.
and the fact that other eval EnvRunners can thus cover the workload.

Important Note: This will replace the current `self.evaluate()` method as the
default in the future.
Expand Down Expand Up @@ -1353,12 +1354,195 @@ def remote_fn(worker):
# Return evaluation results.
return self.evaluation_metrics

@ExperimentalAPI
def _evaluate_async_with_env_runner(
self,
duration_fn: Optional[Callable[[int], int]] = None,
) -> dict:
"""Evaluates current MARLModule given `evaluation_config` settings.

Uses the AsyncParallelRequests manager to send frequent `sample.remote()`
requests to the evaluation EnvRunners and collect the results of these
calls. Handles worker failures (or slowdowns) gracefully due to the asynch'ness
and the fact that other eval EnvRunners can thus cover the workload.

Important Note: This will replace the current `self.evaluate()` method (as well
as the experimental `self._evaluate_async()`, which is only for RolloutWorkers)
in the future.

Args:
duration_fn: An optional callable taking the already run
num episodes as only arg and returning the number of
episodes left to run. It's used to find out whether
evaluation should continue.
"""
# How many episodes/timesteps do we need to run?
# In "auto" mode (only for parallel eval + training): Run as long
# as training lasts.
unit = self.config.evaluation_duration_unit
eval_cfg = self.evaluation_config
rollout = eval_cfg.rollout_fragment_length
num_envs = eval_cfg.num_envs_per_worker
auto = self.config.evaluation_duration == "auto"
duration = (
self.config.evaluation_duration
if not auto
else (self.config.evaluation_num_workers or 1)
* (1 if unit == "episodes" else rollout)
)

# Call the `_before_evaluate` hook.
self._before_evaluate()

# TODO (sven): Implement solution via connectors.
self._sync_filters_if_needed(
central_worker=self.workers.local_worker(),
workers=self.evaluation_workers,
config=eval_cfg,
)

if self.evaluation_workers is None and (
self.workers.local_worker().input_reader is None
or self.config.evaluation_num_workers == 0
):
raise ValueError(
"Evaluation w/o eval workers (calling Algorithm.evaluate() w/o "
"evaluation specifically set up) OR evaluation without input reader "
"OR evaluation with only a local evaluation worker "
"(`evaluation_num_workers=0`) not supported in combination "
"with `enable_async_evaluation=True` config setting!"
)

agent_steps_this_iter = 0
env_steps_this_iter = 0

logger.info(f"Evaluating current state of {self} for {duration} {unit}.")

all_batches = []

# Default done-function returns True, whenever num episodes
# have been completed.
if duration_fn is None:

def duration_fn(num_units_done):
return duration - num_units_done

# Put weights only once into object store and use same object
# ref to synch to all workers.
self._evaluation_weights_seq_number += 1
weights_ref = ray.put(self.workers.local_worker().get_weights())
weights_seq_no = self._evaluation_weights_seq_number

def remote_fn(worker):
# Pass in seq-no so that eval workers may ignore this call if no update has
# happened since the last call to `remote_fn` (sample).
worker.set_weights(
weights=ray.get(weights_ref), weights_seq_no=weights_seq_no
)
episodes = worker.sample(explore=False)
metrics = worker.get_metrics()
return episodes, metrics, weights_seq_no

rollout_metrics = []

# How many episodes have we run (across all eval workers)?
num_units_done = 0
_round = 0

while self.evaluation_workers.num_healthy_remote_workers() > 0:
units_left_to_do = duration_fn(num_units_done)
if units_left_to_do <= 0:
break

_round += 1
# Get ready evaluation results and metrics asynchronously.
self.evaluation_workers.foreach_worker_async(
func=remote_fn,
healthy_only=True,
)
eval_results = self.evaluation_workers.fetch_ready_async_reqs()

episodes = []
i = 0
for _, result in eval_results:
eps, metrics, seq_no = result
# Ignore results, if the weights seq-number does not match (is
# from a previous evaluation step) OR if we have already reached
# the configured duration (e.g. number of episodes to evaluate
# for).
if seq_no == self._evaluation_weights_seq_number and (
i * (1 if unit == "episodes" else rollout * num_envs)
< units_left_to_do
):
episodes.extend(eps)
rollout_metrics.extend(metrics)
i += 1

# Convert our list of Episodes to a single SampleBatch.
batch = postprocess_episodes_to_sample_batch(episodes)
# Collect steps stats.
_agent_steps = batch.agent_steps()
_env_steps = batch.env_steps()

# Only complete episodes done by eval workers.
if unit == "episodes":
num_units_done += len(episodes)
# n timesteps per returned episode done by eval workers.
else:
num_units_done += (
_agent_steps
if self.config.count_steps_by == "agent_steps"
else _env_steps
)

if self.reward_estimators:
all_batches.append(batch)

agent_steps_this_iter += _agent_steps
env_steps_this_iter += _env_steps

logger.info(
f"Ran round {_round} of parallel evaluation "
f"({num_units_done}/{duration if not auto else '?'} "
f"{unit} done)"
)

sampler_results = summarize_episodes(
rollout_metrics,
keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"],
)

metrics = dict({"sampler_results": sampler_results})
metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter
metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter

if self.reward_estimators:
# Compute off-policy estimates
metrics["off_policy_estimator"] = {}
total_batch = concat_samples(all_batches)
for name, estimator in self.reward_estimators.items():
estimates = estimator.estimate(total_batch)
metrics["off_policy_estimator"][name] = estimates

# Evaluation does not run for every step.
# Save evaluation metrics on Algorithm, so it can be attached to
# subsequent step results as latest evaluation result.
self.evaluation_metrics = {"evaluation": metrics}

# Trigger `on_evaluate_end` callback.
self.callbacks.on_evaluate_end(
algorithm=self, evaluation_metrics=self.evaluation_metrics
)

# Return evaluation results.
return self.evaluation_metrics

@OverrideToImplementCustomLogic
@DeveloperAPI
def restore_workers(self, workers: WorkerSet) -> None:
"""Try syncing previously failed and restarted workers with local, if necessary.

Algorithms that use custom RolloutWorkers may override this method to
Algorithms that use custom EnvRunners may override this method to
disable default, and create custom restoration logics. Note that "restoring"
does not include the actual restarting process, but merely what should happen
after such a restart of a (previously failed) worker.
Expand Down Expand Up @@ -1413,7 +1597,7 @@ def training_step(self) -> ResultDict:
"""Default single iteration logic of an algorithm.

- Collect on-policy samples (SampleBatches) in parallel using the
Algorithm's RolloutWorkers (@ray.remote).
Algorithm's EnvRunners (@ray.remote).
- Concatenate collected SampleBatches into one train batch.
- Note that we may have more than one policy in the multi-agent case:
Call the different policies' `learn_on_batch` (simple optimizer) OR
Expand Down Expand Up @@ -2376,7 +2560,7 @@ def _is_multi_agent(self):
def _sync_filters_if_needed(
self,
*,
central_worker: RolloutWorker,
central_worker: EnvRunner,
workers: WorkerSet,
config: AlgorithmConfig,
) -> None:
Expand Down Expand Up @@ -2885,7 +3069,13 @@ def _run_one_evaluation(
The results dict from the evaluation call.
"""
eval_func_to_use = (
self._evaluate_async
self._evaluate_async_with_env_runner
if (
self.config.enable_async_evaluation
and self.config.env_runner_cls is not None
and not issubclass(self.config.env_runner_cls, RolloutWorker)
)
else self._evaluate_async
if self.config.enable_async_evaluation
else self.evaluate
)
Expand Down
Loading
Loading