From 74e8895b0e9de75e7ef9d741b32954e2272811c3 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Aug 2022 21:24:38 +0200 Subject: [PATCH 01/15] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 239 +++++++++++++++++- rllib/algorithms/algorithm_config.py | 10 +- .../algorithms/tests/test_worker_failures.py | 19 +- rllib/execution/parallel_requests.py | 19 +- 4 files changed, 267 insertions(+), 20 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index b9793e5d5438d..b02b83653e74d 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -46,6 +46,7 @@ from ray.rllib.execution.common import ( STEPS_TRAINED_THIS_ITER_COUNTER, # TODO: Backward compatibility. ) +from ray.rllib.execution.parallel_requests import AsyncRequestsManager from ray.rllib.execution.rollout_ops import synchronous_parallel_sample from ray.rllib.execution.train_ops import multi_gpu_train_one_step, train_one_step from ray.rllib.offline import get_offline_io_resource_bundles @@ -284,6 +285,9 @@ def default_logger_creator(config): # Evaluation WorkerSet and metrics last returned by `self.evaluate()`. self.evaluation_workers: Optional[WorkerSet] = None + # If evaluation duration is "auto", use a AsyncRequestsManager to be more + # robust against eval worker failures. + self._evaluation_async_req_manager: Optional[AsyncRequestsManager] = None # Initialize common evaluation_metrics to nan, before they become # available. We want to make sure the metrics are always present # (although their values may be nan), so that Tune does not complain @@ -502,7 +506,7 @@ def setup(self, config: PartialAlgorithmConfigDict): # Set `rollout_fragment_length` such that desired steps are divided # equally amongst workers or - in "auto" duration mode - set it # to a reasonably small number (10), such that a single `sample()` - # call doesn't take too much time so we can stop evaluation as soon + # call doesn't take too much time and we can stop evaluation as soon # as possible after the train step is completed. else: eval_config.update( @@ -540,6 +544,13 @@ def setup(self, config: PartialAlgorithmConfigDict): logdir=self.logdir, ) + if self.config["evaluation_with_async_requests"]: + self._evaluation_async_req_manager = AsyncRequestsManager( + workers=self.evaluation_workers.remote_workers(), + max_remote_requests_in_flight_per_worker=1, + ) + self._evaluation_weights_seq_number = 0 + self.reward_estimators: Dict[str, OffPolicyEstimator] = {} ope_types = { "is": ImportanceSampling, @@ -718,15 +729,6 @@ def evaluate( episodes left to run. It's used to find out whether evaluation should continue. """ - # In case we are evaluating (in a thread) parallel to training, - # we may have to re-enable eager mode here (gets disabled in the - # thread). - if ( - self.config.get("framework") in ["tf2", "tfe"] - and not tf.executing_eagerly() - ): - tf1.enable_eager_execution() - # Call the `_before_evaluate` hook. self._before_evaluate() @@ -928,6 +930,198 @@ def duration_fn(num_units_done): # Also return the results here for convenience. return self.evaluation_metrics + @ExperimentalAPI + def _evaluate_v2( + self, + duration_fn: Optional[Callable[[int], int]] = None, + ) -> dict: + """Evaluates current policy under `evaluation_config` settings. + + Note that this default implementation does not do anything beyond + merging evaluation_config with the normal trainer config. + + Args: + duration_fn: An optional callable taking the already run + num episodes as only arg and returning the number of + episodes left to run. It's used to find out whether + evaluation should continue. + """ + # How many episodes/timesteps do we need to run? + # In "auto" mode (only for parallel eval + training): Run as long + # as training lasts. + unit = self.config["evaluation_duration_unit"] + eval_cfg = self.config["evaluation_config"] + rollout_len = eval_cfg["rollout_fragment_length"] + num_envs = eval_cfg["num_envs_per_worker"] + auto = self.config["evaluation_duration"] == "auto" + duration = ( + self.config["evaluation_duration"] + if not auto + else (self.config["evaluation_num_workers"] or 1) + * (1 if unit == "episodes" else rollout_len) + ) + + # Call the `_before_evaluate` hook. + self._before_evaluate() + + # Put weights only once into object store and use same object + # ref to synch to all workers. + self._evaluation_weights_seq_number += 1 + weights_ref = ray.put(self.workers.local_worker().get_weights()) + self._sync_filters_if_needed( + self.evaluation_workers, + timeout_seconds=self.config[ + "sync_filters_on_rollout_workers_timeout_s" + ], + ) + + if self.config["custom_eval_function"]: + raise ValueError( + "`custom_eval_function` not supported in combination " + "with `evaluation_with_async_requests=True` config setting!" + ) + if ( + self.evaluation_workers is None + and ( + self.workers.local_worker().input_reader is None + or self.config["evaluation_num_workers"] == 0 + ) + ): + raise ValueError( + "Local evaluation OR evaluation without input reader OR evaluation " + "with only a local eval worker not supported in combination " + "with `evaluation_with_async_requests=True` config setting!" + ) + + agent_steps_this_iter = 0 + env_steps_this_iter = 0 + + logger.info(f"Evaluating current policy for {duration} {unit}.") + + all_batches = [] + + # Default done-function returns True, whenever num episodes + # have been completed. + if duration_fn is None: + def duration_fn(num_units_done): + return duration - num_units_done + + def remote_fn(worker, w_ref, w_seq_no): + worker.set_weights(weights=w_ref) + batch = worker.sample() + metrics = worker.get_metrics() + return batch, metrics, w_seq_no + + rollout_metrics = [] + + # How many episodes have we run (across all eval workers)? + num_units_done = 0 + round_ = 0 + time_started = time.time() + timed_out = True + + while time.time() - time_started < self.config["evaluation_sample_timeout_s"]: + units_left_to_do = duration_fn(num_units_done) + if units_left_to_do <= 0: + timed_out = False + break + + round_ += 1 + # Use the AsyncRequestsManager to get ready evaluation results and + # metrics. + self._evaluation_async_req_manager.call_on_all_available( + remote_fn=remote_fn, + fn_args=[weights_ref, self._evaluation_weights_seq_number], + ) + ready_requests = self._evaluation_async_req_manager.get_ready() + + batches = [] + for i, requests in enumerate(ready_requests.values()): + for req in requests: + batch, metrics, seq_no = req + # Ignore results, if the weights seq-number does not match (is from + # a previous evaluation step) OR if we have already reached the + # configured duration (e.g. number of episodes to evaluate for). + if ( + seq_no == self._evaluation_weights_seq_number + and ( + i * (1 if unit == "episodes" else rollout_len * num_envs) + < units_left_to_do + ) + ): + batches.append(batch) + rollout_metrics.extend(metrics) + + _agent_steps = sum(b.agent_steps() for b in batches) + _env_steps = sum(b.env_steps() for b in batches) + + # 1 episode per returned batch. + if unit == "episodes": + num_units_done += len(batches) + # Make sure all batches are exactly one episode. + for ma_batch in batches: + ma_batch = ma_batch.as_multi_agent() + for batch in ma_batch.policy_batches.values(): + assert np.sum(batch[SampleBatch.DONES]) + # n timesteps per returned batch. + else: + num_units_done += ( + _agent_steps if self._by_agent_steps else _env_steps + ) + if self.reward_estimators: + all_batches.extend(batches) + + agent_steps_this_iter += _agent_steps + env_steps_this_iter += _env_steps + + logger.info( + f"Ran round {round_} of parallel evaluation " + f"({num_units_done}/{duration if not auto else '?'} " + f"{unit} done)" + ) + + if timed_out and log_once("evaluation_timeout"): + logger.warning( + "Calling `sample()` on your remote evaluation worker(s) " + "resulted in a timeout (after the configured " + f"{self.config['evaluation_sample_timeout_s']} seconds)! " + "Try to set `evaluation_sample_timeout_s` in your config" + " to a larger value." + + ( + " If your episodes don't terminate easily, you may " + "also want to set `evaluation_duration_unit` to " + "'timesteps' (instead of 'episodes')." + if unit == "episodes" + else "" + ) + ) + + metrics = summarize_episodes( + rollout_metrics, + keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"], + ) + + metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter + metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter + # TODO: Remove this key at some point. Here for backward compatibility. + metrics["timesteps_this_iter"] = env_steps_this_iter + + if self.reward_estimators: + # Compute off-policy estimates + metrics["off_policy_estimator"] = {} + total_batch = concat_samples(all_batches) + for name, estimator in self.reward_estimators.items(): + estimates = estimator.estimate(total_batch) + metrics["off_policy_estimator"][name] = estimates + + # Evaluation does not run for every step. + # Save evaluation metrics on trainer, so it can be attached to + # subsequent step results as latest evaluation result. + self.evaluation_metrics = {"evaluation": metrics} + + # Return evaluation results. + return self.evaluation_metrics + @OverrideToImplementCustomLogic @DeveloperAPI def training_step(self) -> ResultDict: @@ -2193,6 +2387,12 @@ def try_recover_from_step_attempt( self.train_exec_impl = self.execution_plan( worker_set, self.config, **self._kwargs_for_execution_plan() ) + elif ( + worker_set is getattr(self, "evaluation_workers", None) + and self.config["evaluation_config"].get("evaluation_duration") == "auto" + ): + self._evaluation_async_req_manager.remove_workers(removed_workers) + self._evaluation_async_req_manager.add_workers(new_workers) def on_worker_failures( self, removed_workers: List[ActorHandle], new_workers: List[ActorHandle] @@ -2339,6 +2539,15 @@ def _run_one_training_iteration(self) -> Tuple[ResultDict, "TrainIterCtx"]: Returns: The results dict from the training iteration. """ + # In case we are training (in a thread) parallel to evaluation, + # we may have to re-enable eager mode here (gets disabled in the + # thread). + if ( + self.config.get("framework") in ["tf2", "tfe"] + and not tf.executing_eagerly() + ): + tf1.enable_eager_execution() + results = None # Create a step context ... with TrainIterCtx(algo=self) as train_iter_ctx: @@ -2378,6 +2587,12 @@ def _run_one_evaluation( The results dict from the evaluation call. """ eval_results = {"evaluation": {}} + + eval_func_to_use = ( + self._evaluate_v2 if self.config["evaluation_with_async_requests"] + else self.evaluate + ) + try: if self.config["evaluation_duration"] == "auto": assert ( @@ -2385,7 +2600,7 @@ def _run_one_evaluation( and self.config["evaluation_parallel_to_training"] ) unit = self.config["evaluation_duration_unit"] - eval_results = self.evaluate( + eval_results = eval_func_to_use( duration_fn=functools.partial( self._automatic_evaluation_duration_fn, unit, @@ -2396,7 +2611,7 @@ def _run_one_evaluation( ) # Run `self.evaluate()` only once per training iteration. else: - eval_results = self.evaluate() + eval_results = eval_func_to_use() # In case of any failures, try to ignore/recover the failed evaluation workers. except Exception as e: diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 8fa9581aaf24d..f20d3a10a877f 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -182,6 +182,7 @@ def __init__(self, algo_class=None): self.evaluation_num_workers = 0 self.custom_evaluation_function = None self.always_attach_evaluation_results = False + self.evaluation_with_async_requests = False # TODO: Set this flag still in the config or - much better - in the # RolloutWorker as a property. self.in_evaluation = False @@ -810,7 +811,7 @@ def evaluation( self, *, evaluation_interval: Optional[int] = None, - evaluation_duration: Optional[int] = None, + evaluation_duration: Optional[Union[int, str]] = None, evaluation_duration_unit: Optional[str] = None, evaluation_sample_timeout_s: Optional[float] = None, evaluation_parallel_to_training: Optional[bool] = None, @@ -821,6 +822,7 @@ def evaluation( evaluation_num_workers: Optional[int] = None, custom_evaluation_function: Optional[Callable] = None, always_attach_evaluation_results: Optional[bool] = None, + evaluation_with_async_requests: Optional[bool] = None, ) -> "AlgorithmConfig": """Sets the config's evaluation settings. @@ -885,6 +887,10 @@ def evaluation( results are always attached to a step result dict. This may be useful if Tune or some other meta controller needs access to evaluation metrics all the time. + evaluation_with_async_requests: If True, use an AsyncRequestsManager for + the evaluation workers and use this manager to send `sample()` requests + to the evaluation workers. This way, the Algorithm becomes more robust + against long running episodes and/or failing (and restarting) workers. Returns: This updated AlgorithmConfig object. @@ -913,6 +919,8 @@ def evaluation( self.custom_evaluation_function = custom_evaluation_function if always_attach_evaluation_results: self.always_attach_evaluation_results = always_attach_evaluation_results + if evaluation_with_async_requests: + self.evaluation_with_async_requests = evaluation_with_async_requests return self diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index f2427f1ac83af..89619104f1237 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -151,7 +151,7 @@ def _do_test_fault_fatal_but_recreate(self, alg, config, eval_only=False): }, } - for _ in framework_iterator(config, frameworks=("tf2", "torch")): + for _ in framework_iterator(config, frameworks=("tf", "tf2", "torch")): a = agent_cls(config=config, env="fault_env") # Expect this to go well and all faulty workers are recovered. self.assertTrue( @@ -258,6 +258,23 @@ def test_eval_workers_failing_recreate(self): eval_only=True, ) + def test_eval_workers_parallel_to_training_failing_recreate(self): + # Test the case where all eval workers fail, but we chose to recover. + config = pg.PGConfig()\ + .evaluation( + evaluation_num_workers=2, + evaluation_parallel_to_training=True, + evaluation_duration="auto", + )\ + .training(model={"fcnet_hiddens": [4]}) + + self.do_test( + "PG", + config=config.to_dict(), + fn=self._do_test_fault_fatal_but_recreate, + eval_only=True, + ) + def test_eval_workers_failing_fatal(self): # Test the case where all eval workers fail (w/o recovery). self.do_test( diff --git a/rllib/execution/parallel_requests.py b/rllib/execution/parallel_requests.py index c8819cddf55eb..6a59321194ac1 100644 --- a/rllib/execution/parallel_requests.py +++ b/rllib/execution/parallel_requests.py @@ -79,7 +79,8 @@ def call( """Call remote function on any available Actor or - if provided - on `actor`. Args: - remote_fn: The remote function to call. + remote_fn: The remote function to call. The function must have a signature + of: [RolloutWorker, *args, **kwargs] and return Any. actor: The actor to call the remote function on. fn_args: The arguments to pass to the remote function. fn_kwargs: The keyword arguments to pass to the remote function. @@ -136,12 +137,17 @@ def call_on_all_available( fn_args: List[Any] = None, fn_kwargs: Dict[str, Any] = None, ) -> int: - """ "Call remote_fn on all available workers + """Call `remote_fn` on all available workers. + + Available workers are those that have less than the maximum requests currently + in-flight. The max. requests is set via the constructor's + `max_remote_requests_in_flight_per_worker` argument. Args: - remote_fn: The remote function to call - fn_args: The arguments to pass to the remote function - fn_kwargs: The keyword arguments to pass to the remote function + remote_fn: The remote function to call. The function must have a signature + of: [RolloutWorker, *args, **kwargs] and return Any. + fn_args: The arguments to pass to the remote function. + fn_kwargs: The keyword arguments to pass to the remote function. Returns: The number of remote calls of remote_fn that were able to be launched. @@ -173,12 +179,13 @@ def get_ready(self) -> Dict[ActorHandle, List[Any]]: objs = ray.get(ready_requests) else: objs = ready_requests + for req, obj in zip(ready_requests, objs): actor = self._pending_to_actor[req] self._remote_requests_in_flight[actor].remove(req) ready_requests_dict[actor].append(obj) del self._pending_to_actor[req] - del ready_requests + return dict(ready_requests_dict) def add_workers(self, new_workers: Union[List[ActorHandle], ActorHandle]) -> None: From a0cdf9e500e059c4ff01387914283be80368db79 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Aug 2022 21:51:44 +0200 Subject: [PATCH 02/15] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 4 ++-- rllib/algorithms/tests/test_worker_failures.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index b02b83653e74d..122230a26ff2d 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -2388,8 +2388,8 @@ def try_recover_from_step_attempt( worker_set, self.config, **self._kwargs_for_execution_plan() ) elif ( - worker_set is getattr(self, "evaluation_workers", None) - and self.config["evaluation_config"].get("evaluation_duration") == "auto" + self._evaluation_async_req_manager is not None + and worker_set is getattr(self, "evaluation_workers", None) ): self._evaluation_async_req_manager.remove_workers(removed_workers) self._evaluation_async_req_manager.add_workers(new_workers) diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index 89619104f1237..d371fd3054ce4 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -258,11 +258,11 @@ def test_eval_workers_failing_recreate(self): eval_only=True, ) - def test_eval_workers_parallel_to_training_failing_recreate(self): + def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): # Test the case where all eval workers fail, but we chose to recover. config = pg.PGConfig()\ .evaluation( - evaluation_num_workers=2, + evaluation_with_async_requests=True, evaluation_parallel_to_training=True, evaluation_duration="auto", )\ From 3d300f279dba63f6de96299f09c7159ac97f09b0 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Aug 2022 22:24:08 +0200 Subject: [PATCH 03/15] wip Signed-off-by: sven1977 --- rllib/algorithms/tests/test_worker_failures.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index 8d9aef7dd8bec..a609e162f029e 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -334,7 +334,7 @@ def test_eval_workers_failing_ignore(self): def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): # Test the case where all eval workers fail, but we chose to recover. - config = pg.PGConfig()\ + config = PGConfig()\ .evaluation( evaluation_with_async_requests=True, evaluation_parallel_to_training=True, @@ -342,10 +342,9 @@ def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): )\ .training(model={"fcnet_hiddens": [4]}) - self.do_test( + self._do_test_fault_fatal_but_recreate( "PG", config=config.to_dict(), - fn=self._do_test_fault_fatal_but_recreate, eval_only=True, ) From 96b6b420d4fab257e74e46505434da55476ead4f Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 2 Aug 2022 23:25:23 +0200 Subject: [PATCH 04/15] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index cc1b033afc9cd..4c33234dcc56d 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -982,10 +982,11 @@ def _evaluate_v2( self._evaluation_weights_seq_number += 1 weights_ref = ray.put(self.workers.local_worker().get_weights()) self._sync_filters_if_needed( - self.evaluation_workers, - timeout_seconds=self.config[ + from_worker=self.workers.local_worker(), + workers=self.evaluation_workers, + timeout_seconds=eval_cfg.get( "sync_filters_on_rollout_workers_timeout_s" - ], + ), ) if self.config["custom_eval_function"]: From ac9f6436a55386bba97f7bee53e7d3de292ef3a8 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 5 Aug 2022 13:06:16 +0200 Subject: [PATCH 05/15] wip --- ...artpole-crashing-restart-sub-envs-pg.yaml} | 0 ...t-cartpole-crashing-restart-worker-pg.yaml | 42 +++++++++++++++++++ 2 files changed, 42 insertions(+) rename rllib/tuned_examples/pg/{multi-agent-cartpole-crashing-pg.yaml => multi-agent-cartpole-crashing-restart-sub-envs-pg.yaml} (100%) create mode 100644 rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml diff --git a/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-pg.yaml b/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-sub-envs-pg.yaml similarity index 100% rename from rllib/tuned_examples/pg/multi-agent-cartpole-crashing-pg.yaml rename to rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-sub-envs-pg.yaml diff --git a/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml b/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml new file mode 100644 index 0000000000000..87d772221a26e --- /dev/null +++ b/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml @@ -0,0 +1,42 @@ +multi-agent-cartpole-crashing-pg: + env: ray.rllib.examples.env.cartpole_crashing.MultiAgentCartPoleCrashing + run: PG + stop: + evaluation/episode_reward_mean: 300.0 + num_env_steps_sampled: 300000 + config: + # Works for both torch and tf. + framework: tf + + env_config: + config: + num_agents: 2 + # Crash roughly every 300 ts. This should be ok to measure 300+ + # reward (episodes are 200 ts long). + p_crash: 0.00025 # prob to crash during step() + p_crash_reset: 0.01 # prob to crash during reset() + # Time for the env to initialize when newly created. + # Every time a remote sub-environment crashes, a new env is created + # in its place and will take this long (sleep) to "initialize". + init_time_s_min: 5.0 + init_time_s_max: 600.0 + horizon: 200 + num_workers: 10 + + # Disable env checking. Env checker doesn't handle Exceptions from + # user envs, and will crash rollout worker. + disable_env_checking: true + + # Switch on resiliency for failed sub environments (within a vectorized stack). + recreate_failed_workers: true + + # Switch on evaluation workers being managed by AsyncRequestsManager object. + evaluation_with_async_requests: true + + evaluation_num_workers: 10 + evaluation_interval: 1 + evaluation_duration: 20 + evaluation_duration_unit: episodes + evaluation_parallel_to_training: true + evaluation_config: + explore: false From 410d7f0e2c93974f9599c8d336e7a035ff50ee67 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sat, 6 Aug 2022 15:41:55 +0200 Subject: [PATCH 06/15] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 112 +++++++++--------- rllib/algorithms/algorithm_config.py | 10 +- rllib/evaluation/rollout_worker.py | 33 ++++-- rllib/examples/env/cartpole_crashing.py | 14 ++- ...t-cartpole-crashing-restart-env-appo.yaml} | 33 ++++-- 5 files changed, 121 insertions(+), 81 deletions(-) rename rllib/tuned_examples/{pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml => appo/multi-agent-cartpole-crashing-restart-env-appo.yaml} (53%) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 4c33234dcc56d..1b9e8be0df6f8 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -555,10 +555,11 @@ def setup(self, config: PartialAlgorithmConfigDict): logdir=self.logdir, ) - if self.config["evaluation_with_async_requests"]: + if self.config["enable_async_evaluation"]: self._evaluation_async_req_manager = AsyncRequestsManager( workers=self.evaluation_workers.remote_workers(), max_remote_requests_in_flight_per_worker=1, + return_object_refs=True, ) self._evaluation_weights_seq_number = 0 @@ -852,13 +853,13 @@ def duration_fn(num_units_done): else: # How many episodes have we run (across all eval workers)? num_units_done = 0 - round_ = 0 + _round = 0 while True: units_left_to_do = duration_fn(num_units_done) if units_left_to_do <= 0: break - round_ += 1 + _round += 1 try: batches = ray.get( [ @@ -910,7 +911,7 @@ def duration_fn(num_units_done): env_steps_this_iter += _env_steps logger.info( - f"Ran round {round_} of parallel evaluation " + f"Ran round {_round} of parallel evaluation " f"({num_units_done}/{duration if not auto else '?'} " f"{unit} done)" ) @@ -944,7 +945,7 @@ def duration_fn(num_units_done): return self.evaluation_metrics @ExperimentalAPI - def _evaluate_v2( + def _evaluate_async( self, duration_fn: Optional[Callable[[int], int]] = None, ) -> dict: @@ -964,14 +965,14 @@ def _evaluate_v2( # as training lasts. unit = self.config["evaluation_duration_unit"] eval_cfg = self.config["evaluation_config"] - rollout_len = eval_cfg["rollout_fragment_length"] + rollout = eval_cfg["rollout_fragment_length"] num_envs = eval_cfg["num_envs_per_worker"] auto = self.config["evaluation_duration"] == "auto" duration = ( self.config["evaluation_duration"] if not auto else (self.config["evaluation_num_workers"] or 1) - * (1 if unit == "episodes" else rollout_len) + * (1 if unit == "episodes" else rollout) ) # Call the `_before_evaluate` hook. @@ -981,6 +982,8 @@ def _evaluate_v2( # ref to synch to all workers. self._evaluation_weights_seq_number += 1 weights_ref = ray.put(self.workers.local_worker().get_weights()) + # TODO(Jun): Make sure this cannot block for e.g. 1h. Implement solution via + # connectors. self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, @@ -992,7 +995,7 @@ def _evaluate_v2( if self.config["custom_eval_function"]: raise ValueError( "`custom_eval_function` not supported in combination " - "with `evaluation_with_async_requests=True` config setting!" + "with `enable_async_evaluation=True` config setting!" ) if ( self.evaluation_workers is None @@ -1004,7 +1007,7 @@ def _evaluate_v2( raise ValueError( "Local evaluation OR evaluation without input reader OR evaluation " "with only a local eval worker not supported in combination " - "with `evaluation_with_async_requests=True` config setting!" + "with `enable_async_evaluation=True` config setting!" ) agent_steps_this_iter = 0 @@ -1021,7 +1024,9 @@ def duration_fn(num_units_done): return duration - num_units_done def remote_fn(worker, w_ref, w_seq_no): - worker.set_weights(weights=w_ref) + # Pass in seq-no so that eval workers may ignore this call if no update has + # happened since the last call to `remote_fn` (sample). + worker.set_weights(weights=w_ref, weights_seq_no=w_seq_no) batch = worker.sample() metrics = worker.get_metrics() return batch, metrics, w_seq_no @@ -1030,17 +1035,15 @@ def remote_fn(worker, w_ref, w_seq_no): # How many episodes have we run (across all eval workers)? num_units_done = 0 - round_ = 0 - time_started = time.time() - timed_out = True + _round = 0 + errors = [] - while time.time() - time_started < self.config["evaluation_sample_timeout_s"]: + while len(self._evaluation_async_req_manager.workers) > 0: units_left_to_do = duration_fn(num_units_done) if units_left_to_do <= 0: - timed_out = False break - round_ += 1 + _round += 1 # Use the AsyncRequestsManager to get ready evaluation results and # metrics. self._evaluation_async_req_manager.call_on_all_available( @@ -1050,21 +1053,29 @@ def remote_fn(worker, w_ref, w_seq_no): ready_requests = self._evaluation_async_req_manager.get_ready() batches = [] - for i, requests in enumerate(ready_requests.values()): + i = 0 + for actor, requests in ready_requests.items(): for req in requests: - batch, metrics, seq_no = req - # Ignore results, if the weights seq-number does not match (is from - # a previous evaluation step) OR if we have already reached the - # configured duration (e.g. number of episodes to evaluate for). - if ( - seq_no == self._evaluation_weights_seq_number - and ( - i * (1 if unit == "episodes" else rollout_len * num_envs) - < units_left_to_do - ) - ): - batches.append(batch) - rollout_metrics.extend(metrics) + try: + batch, metrics, seq_no = ray.get(req) + # Ignore results, if the weights seq-number does not match (is + # from a previous evaluation step) OR if we have already reached + # the configured duration (e.g. number of episodes to evaluate + # for). + if ( + seq_no == self._evaluation_weights_seq_number + and ( + i * (1 if unit == "episodes" else rollout * num_envs) + < units_left_to_do + ) + ): + batches.append(batch) + rollout_metrics.extend(metrics) + except RayError as e: + errors.append(e) + self._evaluation_async_req_manager.remove_workers(actor) + + i += 1 _agent_steps = sum(b.agent_steps() for b in batches) _env_steps = sum(b.env_steps() for b in batches) @@ -1089,25 +1100,18 @@ def remote_fn(worker, w_ref, w_seq_no): env_steps_this_iter += _env_steps logger.info( - f"Ran round {round_} of parallel evaluation " + f"Ran round {_round} of parallel evaluation " f"({num_units_done}/{duration if not auto else '?'} " f"{unit} done)" ) - if timed_out and log_once("evaluation_timeout"): - logger.warning( - "Calling `sample()` on your remote evaluation worker(s) " - "resulted in a timeout (after the configured " - f"{self.config['evaluation_sample_timeout_s']} seconds)! " - "Try to set `evaluation_sample_timeout_s` in your config" - " to a larger value." - + ( - " If your episodes don't terminate easily, you may " - "also want to set `evaluation_duration_unit` to " - "'timesteps' (instead of 'episodes')." - if unit == "episodes" - else "" - ) + num_recreated_workers = 0 + if errors: + num_recreated_workers = self.try_recover_from_step_attempt( + error=errors[0], + worker_set=self.evaluation_workers, + ignore=eval_cfg.get("ignore_worker_failures"), + recreate=eval_cfg.get("recreate_failed_workers"), ) metrics = summarize_episodes( @@ -1115,6 +1119,8 @@ def remote_fn(worker, w_ref, w_seq_no): keep_custom_metrics=eval_cfg["keep_per_episode_custom_metrics"], ) + metrics["num_recreated_workers"] = num_recreated_workers + metrics[NUM_AGENT_STEPS_SAMPLED_THIS_ITER] = agent_steps_this_iter metrics[NUM_ENV_STEPS_SAMPLED_THIS_ITER] = env_steps_this_iter # TODO: Remove this key at some point. Here for backward compatibility. @@ -2609,11 +2615,15 @@ def _run_one_evaluation( Returns: The results dict from the evaluation call. """ - eval_results = {"evaluation": {}} - num_recreated = 0 + eval_results = {"evaluation": { + "episode_reward_max": np.nan, + "episode_reward_min": np.nan, + "episode_reward_mean": np.nan, + }} + eval_results["evaluation"]["num_recreated_workers"] = 0 eval_func_to_use = ( - self._evaluate_v2 if self.config["evaluation_with_async_requests"] + self._evaluate_async if self.config["enable_async_evaluation"] else self.evaluate ) @@ -2647,6 +2657,7 @@ def _run_one_evaluation( "recreate_failed_workers" ), ) + eval_results["evaluation"]["num_recreated_workers"] = num_recreated # Add number of healthy evaluation workers after this iteration. eval_results["evaluation"]["num_healthy_workers"] = ( @@ -2654,7 +2665,6 @@ def _run_one_evaluation( if self.evaluation_workers is not None else 0 ) - eval_results["evaluation"]["num_recreated_workers"] = num_recreated return eval_results @@ -2848,10 +2858,6 @@ def _make_workers( logdir=self.logdir, ) - @Deprecated(new="Trainer.try_recover_from_step_attempt()", error=False) - def _try_recover(self): - return self.try_recover_from_step_attempt() - @staticmethod @Deprecated(new="Trainer.validate_config()", error=False) def _validate_config(config, trainer_or_none): diff --git a/rllib/algorithms/algorithm_config.py b/rllib/algorithms/algorithm_config.py index 8fad7d1f8a4a2..13eb9de40489d 100644 --- a/rllib/algorithms/algorithm_config.py +++ b/rllib/algorithms/algorithm_config.py @@ -183,7 +183,7 @@ def __init__(self, algo_class=None): self.evaluation_num_workers = 0 self.custom_evaluation_function = None self.always_attach_evaluation_results = False - self.evaluation_with_async_requests = False + self.enable_async_evaluation = False # TODO: Set this flag still in the config or - much better - in the # RolloutWorker as a property. self.in_evaluation = False @@ -830,7 +830,7 @@ def evaluation( evaluation_num_workers: Optional[int] = None, custom_evaluation_function: Optional[Callable] = None, always_attach_evaluation_results: Optional[bool] = None, - evaluation_with_async_requests: Optional[bool] = None, + enable_async_evaluation: Optional[bool] = None, ) -> "AlgorithmConfig": """Sets the config's evaluation settings. @@ -895,7 +895,7 @@ def evaluation( results are always attached to a step result dict. This may be useful if Tune or some other meta controller needs access to evaluation metrics all the time. - evaluation_with_async_requests: If True, use an AsyncRequestsManager for + enable_async_evaluation: If True, use an AsyncRequestsManager for the evaluation workers and use this manager to send `sample()` requests to the evaluation workers. This way, the Algorithm becomes more robust against long running episodes and/or failing (and restarting) workers. @@ -927,8 +927,8 @@ def evaluation( self.custom_evaluation_function = custom_evaluation_function if always_attach_evaluation_results: self.always_attach_evaluation_results = always_attach_evaluation_results - if evaluation_with_async_requests: - self.evaluation_with_async_requests = evaluation_with_async_requests + if enable_async_evaluation: + self.enable_async_evaluation = enable_async_evaluation return self diff --git a/rllib/evaluation/rollout_worker.py b/rllib/evaluation/rollout_worker.py index c9fa83191cc2c..93d7063593388 100644 --- a/rllib/evaluation/rollout_worker.py +++ b/rllib/evaluation/rollout_worker.py @@ -746,6 +746,10 @@ def wrap(env): self.input_reader: InputReader = input_creator(self.io_context) self.output_writer: OutputWriter = output_creator(self.io_context) + # The current weights sequence number (version). May remain None for when + # not tracking weights versions. + self.weights_seq_no: Optional[int] = None + logger.debug( "Created rollout worker with env {} ({}), policies {}".format( self.async_env, self.env, self.policy_map @@ -1563,7 +1567,10 @@ def get_weights( @DeveloperAPI def set_weights( - self, weights: Dict[PolicyID, ModelWeights], global_vars: Optional[Dict] = None + self, + weights: Dict[PolicyID, ModelWeights], + global_vars: Optional[Dict] = None, + weights_seq_no: Optional[int] = None, ) -> None: """Sets each policies' model weights of this worker. @@ -1571,6 +1578,10 @@ def set_weights( weights: Dict mapping PolicyIDs to the new weights to be used. global_vars: An optional global vars dict to set this worker to. If None, do not update the global_vars. + weights_seq_no: If needed, a sequence number for the weights version + can be passed into this method. If not None, will store this seq no + (in self.weights_seq_no) and in future calls - if the seq no did not + change wrt. the last call - will ignore the call to save on performance. Examples: >>> from ray.rllib.evaluation.rollout_worker import RolloutWorker @@ -1580,13 +1591,21 @@ def set_weights( >>> # Set `global_vars` (timestep) as well. >>> worker.set_weights(weights, {"timestep": 42}) # doctest: +SKIP """ - # If per-policy weights are object refs, `ray.get()` them first. - if weights and isinstance(next(iter(weights.values())), ObjectRef): - actual_weights = ray.get(list(weights.values())) - weights = {pid: actual_weights[i] for i, pid in enumerate(weights.keys())} + # Only update our weights, if no seq no given OR given seq no is different + # from ours + if weights_seq_no is None or weights_seq_no != self.weights_seq_no: + # If per-policy weights are object refs, `ray.get()` them first. + if weights and isinstance(next(iter(weights.values())), ObjectRef): + actual_weights = ray.get(list(weights.values())) + weights = { + pid: actual_weights[i] for i, pid in enumerate(weights.keys()) + } + + for pid, w in weights.items(): + self.policy_map[pid].set_weights(w) + + self.weights_seq_no = weights_seq_no - for pid, w in weights.items(): - self.policy_map[pid].set_weights(w) if global_vars: self.set_global_vars(global_vars) diff --git a/rllib/examples/env/cartpole_crashing.py b/rllib/examples/env/cartpole_crashing.py index a0ec90786d545..1cd881ee25a12 100644 --- a/rllib/examples/env/cartpole_crashing.py +++ b/rllib/examples/env/cartpole_crashing.py @@ -29,7 +29,7 @@ def __init__(self, config=None): # Crash probability (in each `step()`). self.p_crash = config.get("p_crash", 0.005) - self.p_crash_reset = config.get("p_crash_reset", self.p_crash) + self.p_crash_reset = config.get("p_crash_reset", 0.0) self.crash_after_n_steps = config.get("crash_after_n_steps") # Only crash (with prob=p_crash) if on certain worker indices. faulty_indices = config.get("crash_on_worker_indices", None) @@ -41,12 +41,16 @@ def __init__(self, config=None): self.timesteps = 0 # Time in seconds to initialize (in this c'tor). - init_time_s = config.get("init_time_s", 0) + if "init_time_s" in config: + init_time_s = config.get("init_time_s", 0) + else: + init_time_s = np.random.randint( + config.get("init_time_s_min", 0), + config.get("init_time_s_max", 0), + ) + print(f"Initializing crashing env with init-delay of {init_time_s}sec ...") time.sleep(init_time_s) - # Time in seconds to re-initialize, while `reset()` is called after a crash. - self.re_init_time_s = config.get("re_init_time_s", 10) - # No env pre-checking? self._skip_env_checking = config.get("skip_env_checking", False) diff --git a/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml b/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml similarity index 53% rename from rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml rename to rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml index 87d772221a26e..8bf1e5b83c0f2 100644 --- a/rllib/tuned_examples/pg/multi-agent-cartpole-crashing-restart-worker-pg.yaml +++ b/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml @@ -1,6 +1,6 @@ -multi-agent-cartpole-crashing-pg: +multi-agent-cartpole-crashing-appo: env: ray.rllib.examples.env.cartpole_crashing.MultiAgentCartPoleCrashing - run: PG + run: APPO stop: evaluation/episode_reward_mean: 300.0 num_env_steps_sampled: 300000 @@ -11,32 +11,43 @@ multi-agent-cartpole-crashing-pg: env_config: config: num_agents: 2 - # Crash roughly every 300 ts. This should be ok to measure 300+ + # Crash roughly every n ts. This should be ok to measure 300+ # reward (episodes are 200 ts long). - p_crash: 0.00025 # prob to crash during step() - p_crash_reset: 0.01 # prob to crash during reset() + p_crash: 0.001 # prob to crash during step() + p_crash_reset: 0.005 # prob to crash during reset() # Time for the env to initialize when newly created. # Every time a remote sub-environment crashes, a new env is created # in its place and will take this long (sleep) to "initialize". - init_time_s_min: 5.0 - init_time_s_max: 600.0 + init_time_s: 10.0 horizon: 200 - num_workers: 10 + + num_workers: 5 # Disable env checking. Env checker doesn't handle Exceptions from # user envs, and will crash rollout worker. disable_env_checking: true # Switch on resiliency for failed sub environments (within a vectorized stack). - recreate_failed_workers: true + restart_failed_sub_environments: true # Switch on evaluation workers being managed by AsyncRequestsManager object. - evaluation_with_async_requests: true + enable_async_evaluation: true - evaluation_num_workers: 10 + evaluation_num_workers: 5 evaluation_interval: 1 evaluation_duration: 20 evaluation_duration_unit: episodes evaluation_parallel_to_training: true evaluation_config: explore: false + env_config: + config: + num_agents: 2 + # Crash roughly every n ts. + p_crash: 0.0001 # prob to crash during step() + p_crash_reset: 0.001 #1 # prob to crash during reset() + # Time for the env to initialize when newly created. + # Every time a remote sub-environment crashes, a new env is created + # in its place and will take this long (sleep) to "initialize". + init_time_s_min: 1.0 + init_time_s_max: 30.0 From b4ab3540f4774a05533eb9a5bfa3355df80a56b5 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Sat, 6 Aug 2022 15:45:49 +0200 Subject: [PATCH 07/15] wip --- rllib/algorithms/tests/test_worker_failures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index a609e162f029e..e248083598b3a 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -336,7 +336,7 @@ def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): # Test the case where all eval workers fail, but we chose to recover. config = PGConfig()\ .evaluation( - evaluation_with_async_requests=True, + enable_async_evaluation=True, evaluation_parallel_to_training=True, evaluation_duration="auto", )\ From 93e68ad73091ab888c231eb600fed63ce8d592b9 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Wed, 10 Aug 2022 09:15:46 +0200 Subject: [PATCH 08/15] wip Signed-off-by: sven1977 --- rllib/examples/env/cartpole_crashing.py | 2 +- ...nt-cartpole-crashing-restart-env-appo.yaml | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rllib/examples/env/cartpole_crashing.py b/rllib/examples/env/cartpole_crashing.py index 1cd881ee25a12..deac54cd8f8fd 100644 --- a/rllib/examples/env/cartpole_crashing.py +++ b/rllib/examples/env/cartpole_crashing.py @@ -46,7 +46,7 @@ def __init__(self, config=None): else: init_time_s = np.random.randint( config.get("init_time_s_min", 0), - config.get("init_time_s_max", 0), + config.get("init_time_s_max", 1), ) print(f"Initializing crashing env with init-delay of {init_time_s}sec ...") time.sleep(init_time_s) diff --git a/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml b/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml index 8bf1e5b83c0f2..9733f52261b64 100644 --- a/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml +++ b/rllib/tuned_examples/appo/multi-agent-cartpole-crashing-restart-env-appo.yaml @@ -3,7 +3,6 @@ multi-agent-cartpole-crashing-appo: run: APPO stop: evaluation/episode_reward_mean: 300.0 - num_env_steps_sampled: 300000 config: # Works for both torch and tf. framework: tf @@ -18,7 +17,8 @@ multi-agent-cartpole-crashing-appo: # Time for the env to initialize when newly created. # Every time a remote sub-environment crashes, a new env is created # in its place and will take this long (sleep) to "initialize". - init_time_s: 10.0 + init_time_s_min: 10.0 + init_time_s_max: 11.0 horizon: 200 num_workers: 5 @@ -41,13 +41,13 @@ multi-agent-cartpole-crashing-appo: evaluation_config: explore: false env_config: - config: - num_agents: 2 - # Crash roughly every n ts. - p_crash: 0.0001 # prob to crash during step() - p_crash_reset: 0.001 #1 # prob to crash during reset() - # Time for the env to initialize when newly created. - # Every time a remote sub-environment crashes, a new env is created - # in its place and will take this long (sleep) to "initialize". - init_time_s_min: 1.0 - init_time_s_max: 30.0 + config: + num_agents: 2 + # Crash roughly every n ts. + p_crash: 0.002 # prob to crash during step() + p_crash_reset: 0.02 # prob to crash during reset() + # Time for the env to initialize when newly created. + # Every time a remote sub-environment crashes, a new env is created + # in its place and will take this long (sleep) to "initialize". + init_time_s_min: 1.0 + init_time_s_max: 30.0 From 73aa45ad2e594fadf0c94eaa089aabc8b1bfb78e Mon Sep 17 00:00:00 2001 From: sven1977 Date: Thu, 11 Aug 2022 11:36:20 +0200 Subject: [PATCH 09/15] wip Signed-off-by: sven1977 --- rllib/BUILD | 4 ++-- rllib/algorithms/algorithm.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rllib/BUILD b/rllib/BUILD index 76dcba770e0a7..7b0b2456928be 100644 --- a/rllib/BUILD +++ b/rllib/BUILD @@ -502,12 +502,12 @@ py_test( ) py_test( - name = "learning_tests_multi_agent_cartpole_crashing_pg", + name = "learning_tests_multi_agent_cartpole_crashing_restart_sub_envs_pg", main = "tests/run_regression_tests.py", tags = ["team:rllib", "exclusive", "learning_tests", "learning_tests_cartpole", "learning_tests_discrete"], size = "large", srcs = ["tests/run_regression_tests.py"], - data = ["tuned_examples/pg/multi-agent-cartpole-crashing-pg.yaml"], + data = ["tuned_examples/pg/multi-agent-cartpole-crashing-restart-sub-envs-pg.yaml"], args = ["--yaml-dir=tuned_examples/pg"] ) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 1b9e8be0df6f8..60fc37e921626 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -2627,6 +2627,8 @@ def _run_one_evaluation( else self.evaluate ) + num_recreated = 0 + try: if self.config["evaluation_duration"] == "auto": assert ( @@ -2657,7 +2659,6 @@ def _run_one_evaluation( "recreate_failed_workers" ), ) - eval_results["evaluation"]["num_recreated_workers"] = num_recreated # Add number of healthy evaluation workers after this iteration. eval_results["evaluation"]["num_healthy_workers"] = ( @@ -2665,6 +2666,7 @@ def _run_one_evaluation( if self.evaluation_workers is not None else 0 ) + eval_results["evaluation"]["num_recreated_workers"] = num_recreated return eval_results From 0690a361246631bf604fe7a2941c61c9bf1abc28 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Thu, 11 Aug 2022 12:59:37 +0200 Subject: [PATCH 10/15] LINT Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 46 ++++++++----------- .../algorithms/tests/test_worker_failures.py | 6 ++- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 60fc37e921626..1d7f2a8311b0c 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -987,9 +987,7 @@ def _evaluate_async( self._sync_filters_if_needed( from_worker=self.workers.local_worker(), workers=self.evaluation_workers, - timeout_seconds=eval_cfg.get( - "sync_filters_on_rollout_workers_timeout_s" - ), + timeout_seconds=eval_cfg.get("sync_filters_on_rollout_workers_timeout_s"), ) if self.config["custom_eval_function"]: @@ -997,12 +995,9 @@ def _evaluate_async( "`custom_eval_function` not supported in combination " "with `enable_async_evaluation=True` config setting!" ) - if ( - self.evaluation_workers is None - and ( - self.workers.local_worker().input_reader is None - or self.config["evaluation_num_workers"] == 0 - ) + if self.evaluation_workers is None and ( + self.workers.local_worker().input_reader is None + or self.config["evaluation_num_workers"] == 0 ): raise ValueError( "Local evaluation OR evaluation without input reader OR evaluation " @@ -1062,12 +1057,9 @@ def remote_fn(worker, w_ref, w_seq_no): # from a previous evaluation step) OR if we have already reached # the configured duration (e.g. number of episodes to evaluate # for). - if ( - seq_no == self._evaluation_weights_seq_number - and ( - i * (1 if unit == "episodes" else rollout * num_envs) - < units_left_to_do - ) + if seq_no == self._evaluation_weights_seq_number and ( + i * (1 if unit == "episodes" else rollout * num_envs) + < units_left_to_do ): batches.append(batch) rollout_metrics.extend(metrics) @@ -1090,9 +1082,8 @@ def remote_fn(worker, w_ref, w_seq_no): assert np.sum(batch[SampleBatch.DONES]) # n timesteps per returned batch. else: - num_units_done += ( - _agent_steps if self._by_agent_steps else _env_steps - ) + num_units_done += _agent_steps if self._by_agent_steps else _env_steps + if self.reward_estimators: all_batches.extend(batches) @@ -2416,8 +2407,8 @@ def try_recover_from_step_attempt(self, error, worker_set, ignore, recreate) -> self._evaluation_async_req_manager is not None and worker_set is getattr(self, "evaluation_workers", None) ): - self._evaluation_async_req_manager.remove_workers(removed_workers) - self._evaluation_async_req_manager.add_workers(new_workers) + self._evaluation_async_req_manager.remove_workers(removed_workers) + self._evaluation_async_req_manager.add_workers(new_workers) return len(new_workers) @@ -2615,15 +2606,18 @@ def _run_one_evaluation( Returns: The results dict from the evaluation call. """ - eval_results = {"evaluation": { - "episode_reward_max": np.nan, - "episode_reward_min": np.nan, - "episode_reward_mean": np.nan, - }} + eval_results = { + "evaluation": { + "episode_reward_max": np.nan, + "episode_reward_min": np.nan, + "episode_reward_mean": np.nan, + } + } eval_results["evaluation"]["num_recreated_workers"] = 0 eval_func_to_use = ( - self._evaluate_async if self.config["enable_async_evaluation"] + self._evaluate_async + if self.config["enable_async_evaluation"] else self.evaluate ) diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index e248083598b3a..e4cb48100ac4c 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -334,13 +334,15 @@ def test_eval_workers_failing_ignore(self): def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): # Test the case where all eval workers fail, but we chose to recover. - config = PGConfig()\ + config = ( + PGConfig() .evaluation( enable_async_evaluation=True, evaluation_parallel_to_training=True, evaluation_duration="auto", - )\ + ) .training(model={"fcnet_hiddens": [4]}) + ) self._do_test_fault_fatal_but_recreate( "PG", From f68662471741787d7a2e21a0a66fe4487b4530e2 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Thu, 11 Aug 2022 18:50:30 +0200 Subject: [PATCH 11/15] LINT Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 1d7f2a8311b0c..661e09e6e42e9 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -2403,9 +2403,8 @@ def try_recover_from_step_attempt(self, error, worker_set, ignore, recreate) -> self.train_exec_impl = self.execution_plan( worker_set, self.config, **self._kwargs_for_execution_plan() ) - elif ( - self._evaluation_async_req_manager is not None - and worker_set is getattr(self, "evaluation_workers", None) + elif self._evaluation_async_req_manager is not None and worker_set is getattr( + self, "evaluation_workers", None ): self._evaluation_async_req_manager.remove_workers(removed_workers) self._evaluation_async_req_manager.add_workers(new_workers) From c416332c8b45bbda439b4dbeed8c2585e48ef941 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 12 Aug 2022 15:44:16 +0200 Subject: [PATCH 12/15] wip Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 6 ++-- .../algorithms/tests/test_worker_failures.py | 34 +++++++------------ 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 661e09e6e42e9..7bc3f71a04b9c 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -1000,8 +1000,10 @@ def _evaluate_async( or self.config["evaluation_num_workers"] == 0 ): raise ValueError( - "Local evaluation OR evaluation without input reader OR evaluation " - "with only a local eval worker not supported in combination " + "Evaluation w/o eval workers (calling Algorithm.evaluate() w/o " + "evaluation specifically set up) OR evaluation without input reader " + "OR evaluation with only a local evaluation worker " + "(`evaluation_num_workers=0`) not supported in combination " "with `enable_async_evaluation=True` config setting!" ) diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index e4cb48100ac4c..95d44cb4ff922 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -216,27 +216,21 @@ def _do_test_fault_fatal(self, alg, config, fail_eval=False): self.assertRaises(Exception, lambda: a.train()) a.stop() - def _do_test_fault_fatal_but_recreate(self, alg, config, eval_only=False): + def _do_test_fault_fatal_but_recreate(self, alg, config): register_env("fault_env", lambda c: FaultInjectEnv(c)) agent_cls = get_algorithm_class(alg) # Test raises real error when out of workers. - if not eval_only: - config["num_workers"] = 2 - config["recreate_failed_workers"] = True - # Make both worker idx=1 and 2 fail. - config["env_config"] = {"bad_indices": [1, 2]} - else: - config["num_workers"] = 1 - config["evaluation_num_workers"] = 1 - config["evaluation_interval"] = 1 - config["evaluation_config"] = { - "recreate_failed_workers": True, - # Make eval worker (index 1) fail. - "env_config": { - "bad_indices": [1], - }, - } + config["num_workers"] = 1 + config["evaluation_num_workers"] = 1 + config["evaluation_interval"] = 1 + config["evaluation_config"] = { + "recreate_failed_workers": True, + # Make eval worker (index 1) fail. + "env_config": { + "bad_indices": [1], + }, + } for _ in framework_iterator(config, frameworks=("tf", "tf2", "torch")): a = agent_cls(config=config, env="fault_env") @@ -344,11 +338,7 @@ def test_recreate_eval_workers_parallel_to_training_w_async_req_manager(self): .training(model={"fcnet_hiddens": [4]}) ) - self._do_test_fault_fatal_but_recreate( - "PG", - config=config.to_dict(), - eval_only=True, - ) + self._do_test_fault_fatal_but_recreate("PG", config=config.to_dict()) def test_eval_workers_failing_fatal(self): # Test the case where all eval workers fail (w/o recovery). From 2290679575c0b63a5895436eb4d204e7b1a4ded4 Mon Sep 17 00:00:00 2001 From: sven1977 Date: Fri, 12 Aug 2022 15:45:55 +0200 Subject: [PATCH 13/15] LINT Signed-off-by: sven1977 --- rllib/algorithms/algorithm.py | 1 + .../algorithms/tests/test_worker_failures.py | 25 +++---------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 7bc3f71a04b9c..179b99cfe7c5b 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -1017,6 +1017,7 @@ def _evaluate_async( # Default done-function returns True, whenever num episodes # have been completed. if duration_fn is None: + def duration_fn(num_units_done): return duration - num_units_done diff --git a/rllib/algorithms/tests/test_worker_failures.py b/rllib/algorithms/tests/test_worker_failures.py index 837a0adaa0a14..fabddd9fca822 100644 --- a/rllib/algorithms/tests/test_worker_failures.py +++ b/rllib/algorithms/tests/test_worker_failures.py @@ -247,29 +247,12 @@ def _do_test_fault_fatal_but_recreate(self, alg, config): ) ) result = a.train() - if not eval_only: - self.assertTrue(result["num_healthy_workers"] == 2) - self.assertTrue( - all( - ray.get( - worker.apply.remote( - lambda w: w.recreated_worker - and w.env_context.recreated_worker - ) - ) - for worker in a.workers.remote_workers() - ) - ) - else: - self.assertTrue(result["num_healthy_workers"] == 1) - self.assertTrue(result["evaluation"]["num_healthy_workers"] == 1) + self.assertTrue(result["num_healthy_workers"] == 1) + self.assertTrue(result["evaluation"]["num_healthy_workers"] == 1) # This should also work several times. result = a.train() - if not eval_only: - self.assertTrue(result["num_healthy_workers"] == 2) - else: - self.assertTrue(result["num_healthy_workers"] == 1) - self.assertTrue(result["evaluation"]["num_healthy_workers"] == 1) + self.assertTrue(result["num_healthy_workers"] == 1) + self.assertTrue(result["evaluation"]["num_healthy_workers"] == 1) a.stop() def test_fatal(self): From a22b15740537cd3ef2c71d75b7c33c0a697d174d Mon Sep 17 00:00:00 2001 From: Avnish Date: Mon, 15 Aug 2022 15:28:24 -0700 Subject: [PATCH 14/15] Address lint failure Signed-off-by: Avnish --- rllib/execution/parallel_requests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/execution/parallel_requests.py b/rllib/execution/parallel_requests.py index 3bb0d12e66006..db1c465a6aad4 100644 --- a/rllib/execution/parallel_requests.py +++ b/rllib/execution/parallel_requests.py @@ -13,7 +13,7 @@ class AsyncRequestsManager: Args: workers: A list of ray remote workers to operate on. These workers must have an - `apply` method which takes a function and a list of arguments to that + ``apply`` method which takes a function and a list of arguments to that function. max_remote_requests_in_flight_per_worker: The maximum number of remote requests that can be in flight per actor. Any requests made to the pool From 64eebfd43e4326691e1c428bd888bb717fcbfa9f Mon Sep 17 00:00:00 2001 From: sven1977 Date: Tue, 16 Aug 2022 09:52:26 +0200 Subject: [PATCH 15/15] LINT Signed-off-by: sven1977 --- rllib/execution/parallel_requests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rllib/execution/parallel_requests.py b/rllib/execution/parallel_requests.py index db1c465a6aad4..0aa0d4327248c 100644 --- a/rllib/execution/parallel_requests.py +++ b/rllib/execution/parallel_requests.py @@ -80,7 +80,7 @@ def call( Args: remote_fn: The remote function to call. The function must have a signature - of: [RolloutWorker, *args, **kwargs] and return Any. + of: [RolloutWorker, args, kwargs] and return Any. actor: The actor to call the remote function on. fn_args: The arguments to pass to the remote function. fn_kwargs: The keyword arguments to pass to the remote function. @@ -145,7 +145,7 @@ def call_on_all_available( Args: remote_fn: The remote function to call. The function must have a signature - of: [RolloutWorker, *args, **kwargs] and return Any. + of: [RolloutWorker, args, kwargs] and return Any. fn_args: The arguments to pass to the remote function. fn_kwargs: The keyword arguments to pass to the remote function.