Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Eval workers use async req manager. #27390

Merged

Conversation

sven1977
Copy link
Contributor

@sven1977 sven1977 commented Aug 2, 2022

Add new config option: enable_evaluation_v2. This is an experimental setting.

If True:

  • Evaluation workers will be organized inside a AsyncRequestsManager object, no matter what the eval settings are (e.g. parallel eval and training, different eval durations, etc..)
  • This allows the eval step to become more robust against eval worker failures and or environment-related delays and pauses (e.g. if an env has to restart or re-connect to some server, which may take hours).
  • The AsyncRequestsManager is queried during the evaluation step for results and each returned result is checked for being up-to-date in terms of the weights being used for that sample request. If the weights used for a sample request are outdated, the sample result is discarded.

Why are these changes needed?

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Copy link
Contributor

@avnishn avnishn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this design Sven, is there a requirement to enforce a timeout on remote requests?

If so we don't support that today, but in theory we could.

self._evaluation_async_req_manager is not None
and worker_set is getattr(self, "evaluation_workers", None)
):
self._evaluation_async_req_manager.remove_workers(removed_workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty nice!! feel proud that we have built utils to make this task easier.
there are a bunch of nits, and a couple of meaningful comments in evaluate_v2(), please take a look and see if they make sense.

@@ -293,6 +294,9 @@ def default_logger_creator(config):

# Evaluation WorkerSet and metrics last returned by `self.evaluate()`.
self.evaluation_workers: Optional[WorkerSet] = None
# If evaluation duration is "auto", use a AsyncRequestsManager to be more
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the comment? if enable_async_evaluation is True ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.config.get("framework") in ["tf2", "tfe"]
and not tf.executing_eagerly()
):
tf1.enable_eager_execution()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is still useful? if an eval worker is scheduled to a node that doesn't have eager turned on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. Yeah, it's really not needed here. evaluate is never called from within a thread (yes, it could be run on a different node/process b/c it's on some remote eval worker, but never inside a thread).

@@ -551,6 +555,13 @@ def setup(self, config: PartialAlgorithmConfigDict):
logdir=self.logdir,
)

if self.config["evaluation_with_async_requests"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit nit nit, just a naming suggest, enable_async_evaluation may be more expressive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but all our eval settings start with evaluation_...
But fair enough, now that we have config objects with type-safe property names and method args, this may not be so much of a problem anymore :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this the default as fast as possible to not get users confused as of the difference between evaluation_parallel_to_training and enable_async_evaluation.

@@ -941,6 +943,199 @@ def duration_fn(num_units_done):
# Also return the results here for convenience.
return self.evaluation_metrics

@ExperimentalAPI
def _evaluate_v2(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit nit nit, another naming suggestion, with the config name, we can then call this _async_evaluate(self)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to _evaluate_async().

We should make this the default as fast as possible to not get users confused as of the difference between evaluation_parallel_to_training and enable_async_evaluation.

@@ -2359,6 +2560,15 @@ def _run_one_training_iteration(self) -> Tuple[ResultDict, "TrainIterCtx"]:
Returns:
The results dict from the training iteration.
"""
# In case we are training (in a thread) parallel to evaluation,
# we may have to re-enable eager mode here (gets disabled in the
# thread).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tf1.enable_eager_execution() throws exception if it is called after any other tf operations.
why do we disable it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't get disabled explicitly, but for some reason, any new thread you create starts with "regular" tf (non-eager) and we have to enable it. We do the same in learning threads of IMPALA and APEX. Sorry, never had the time to investigate why it would start this way. However, even if we start with eager enabled, we would still check and DISABLE it in case we don't want eager mode :) So it would be the same "hassle".

# Evaluation does not run for every step.
# Save evaluation metrics on trainer, so it can be attached to
# subsequent step results as latest evaluation result.
self.evaluation_metrics = {"evaluation": metrics}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this block into _run_one_evaluation()? then we don't have to duplicate this in both evaluate_v2() and evaluate()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this won't work. What if the user wants to call evaluate manually?

time_started = time.time()
timed_out = True

while time.time() - time_started < self.config["evaluation_sample_timeout_s"]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just be while True
in this mode we won't be looking at evaluation_sample_timeout_s, which is the time we wait for a single round of eval episodes. and in this mode, there is no concept of rounds.
and we don't have to track timed_out parameter either actually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the episodes are too long (no horizon set on evaluation config and batch-mode=complete_episodes)?
Fair, in this case, we are screwed anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

timed_out = False
break

round_ += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to _round? don't know if style guide suggests training _ for any type of variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

seq_no == self._evaluation_weights_seq_number
and (
i * (1 if unit == "episodes" else rollout_len * num_envs)
< units_left_to_do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really care about this? usually folks don't complain if we have a bit more data than intended? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"usually" Yeah, but this is cleaner, no? :D

f"{unit} done)"
)

if timed_out and log_once("evaluation_timeout"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get rid of this warning for async mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, no more timeouts in async mode

Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
):
raise ValueError(
"Local evaluation OR evaluation without input reader OR evaluation "
"with only a local eval worker not supported in combination "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between "Local evaluation" and "evaluation with only a local eval worker"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Local evaluation" is when you use the "local worker" (of the regular WorkerSet (self.workers)) for evaluation.

Clarified the error message.

Signed-off-by: sven1977 <svenmika1977@gmail.com>
@@ -216,6 +216,68 @@ def _do_test_fault_fatal(self, alg, config, fail_eval=False):
self.assertRaises(Exception, lambda: a.train())
a.stop()

def _do_test_fault_fatal_but_recreate(self, alg, config, eval_only=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is eval_only=False ever used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point: Removed the arg altogether.

weights = {pid: actual_weights[i] for i, pid in enumerate(weights.keys())}
# Only update our weights, if no seq no given OR given seq no is different
# from ours
if weights_seq_no is None or weights_seq_no != self.weights_seq_no:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If we have a sequence number, it would probably be expected behaviour to update if it increments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's what we do, no? Below in the line

self.weights_seq_no = weights_seq_no

We even update if the passed in seq_no is None, such that next time any seq-no is passed that's not None, we also update again and - after that - have a non-None seq-no again. :)

Copy link
Contributor

@ArturNiederfahrenhorst ArturNiederfahrenhorst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question regarding concurrent calls to recreate_failed_workers. Other than that: Nothing major.

@sven1977
Copy link
Contributor Author

Hey @ArturNiederfahrenhorst , please take another look. All your questions have been addressed now. Thanks!

Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Copy link
Contributor

@ArturNiederfahrenhorst ArturNiederfahrenhorst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Signed-off-by: Avnish <avnishnarayan@gmail.com>
@richardliaw
Copy link
Contributor

Lint still failing?

Signed-off-by: sven1977 <svenmika1977@gmail.com>
@sven1977 sven1977 merged commit 436c89b into ray-project:master Aug 16, 2022
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
JiahaoYao pushed a commit to JiahaoYao/ray that referenced this pull request Aug 21, 2022
JiahaoYao pushed a commit to JiahaoYao/ray that referenced this pull request Aug 22, 2022
JiahaoYao pushed a commit to JiahaoYao/ray that referenced this pull request Aug 22, 2022
JiahaoYao pushed a commit to JiahaoYao/ray that referenced this pull request Aug 30, 2022
pcmoritz pushed a commit to pcmoritz/ray-1 that referenced this pull request Aug 31, 2022
Signed-off-by: Philipp Moritz <pcmoritz@gmail.com>
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Sep 1, 2022
Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
@sven1977 sven1977 deleted the eval_workers_use_async_req_manager branch June 2, 2023 20:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants