Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Make RolloutWorkers (optionally) recoverable after failure. #23739

Merged
merged 10 commits into from
Apr 8, 2022

Conversation

sven1977
Copy link
Contributor

@sven1977 sven1977 commented Apr 6, 2022

Make RolloutWorkers (optionally) recoverable after failure.

  • A new config key recreate_failed_workers can be set to True (False by default) in order to attempt re-creating failed RolloutWorkers. The recreated workers will have their (new) self.recreated_worker property set to True, but are otherwise identical with the failed+terminated+replaced ones.
  • Test case ignore_worker_failures was moved into agents dir, renamed test_worker_failures.py and augmented by a recreate_failed_workers test case.
  • EnvContext objects now also carry a recreated_worker flag that indicates whether the env is run by a RolloutWorker that is a recreated one (after some failure, rather than an original one).

Related (but no full solution) to issue: #23652

Why are these changes needed?

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few comments.
this is really exciting.
now, we just need to figure out what do we do if head/trainer node goes down? :)

# failed one in its `self.recreated_worker=True` property value. It will have
# the same `worker_index` as the original one.
# If True, the `ignore_worker_failures` setting will be ignored.
"recreate_failed_workers": False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should set this default to True, and deprecate ignore_worker_failures?
can't think of why anyone would not want this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah, the danger with that is that if there is a real problem with one or more workers (e.g. the env is broken) and they keep crashing, RLlib will hopelessly try to restart them over and over again. This particular problem has not been fixed.
Maybe we can add an extra key: failed_worker_recreation_num_retries. ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normally, I'd say it's ok to keep restarting.
it's monitoring's job to alert the users that their job has been consistently restarting.
since there may be a lot of workers, I feel like you need to set the default pretty high anyways, if they are running on spot for example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to clarify a bit more, I think absolute # is not a good indication. a job that runs 2 weeks may accumulate a lot of restarts, and we essentially want to detect many restarts in a short period of time, which is why it's better handled by monitoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense! Let's do this in a follow-up PR, though.
Then we can also discuss, whether we should soft-deprecate ignore_worker_failures or just leave its functionality as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should soft-deprecate ignore_worker_failure.

logger.info("Health checking all workers ...")
checks = []
for worker in self.remote_workers():
_, obj_ref = worker.sample_with_count.remote()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is how we have been doing this, so maybe just a TODO suggestion.
usually there will be a separate health check function that performs actual health checks (able to respond to probes, all required data available, etc). and it is usually very light weight, so it doesn't impact actual workload.
I don't know if getting an actual SampleBatch of data then dropping it is the best kind of health probe (it could be really expensive), so this suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I actually think this is fine. sample() (or sample_with_count()) is the most important method to run and we only perform this check, if we get any Ray errors during the training iterations.

In other words: "able to respond to probes, all required data available, etc": I feel like that's exactly what sample() does. I agree we should probably check the returned SampleBatch data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, ultimately your call. I guess we only probe when there is a worker failure, so it doesn't happen often and should be ok. if it's a periodic probe, we probably should make this much lighter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Then again, not sure what to use (in today's RolloutWorker) that would serve as a probing tool, other than sample(). I'll add a TODO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

env_creator=self._env_creator,
validate_env=None,
policy_cls=self._policy_class,
worker_index=i + 1,
num_workers=num_workers,
worker_index=old_num_workers + i + 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you want to +1 here?
if there are 5 old_num_workers, you want the next worker_index to be 5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, 6 is ok. Index 0 is reserved for the local worker, so remote workers start from index=1. i here runs from 0 up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

for worker_index in reversed(faulty_indices):
del self._remote_workers[worker_index - 1]
# TODO: Should we also change each healthy worker's num_workers counter and
# worker_index property?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you raise above RuntimeError with a safer check like

if not self._remote_workers:
    raise RuntimeError("Not enough healthy workers remain to continue.")

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.info("Worker {} looks healthy.".format(i + 1))
except RayError:
logger.exception("Worker {} is faulty.".format(i + 1))
faulty_worker_indices.append(i + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually why do you + 1 here, and then -1 when you use the indices in remove_failed_workers and recreate_failed_workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i starts from 0 (due to hos the for loop is executed with range), but remote worker indices start from 1.
We could make the for loop start from 1, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ok.
I think you are only probing remote_workers() here, so it's 0th remote worker, 1th remote worker, etc.
and when you recreate them, you are also going through 0th entry in self.remote_worker, 1th entry, etc ...
so they seem to match exactly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, but still: Remote worker's indices start from 1, not 0.

Here, we only use the i for:

  • printing out, which remote worker indices are bad (ok)
  • returning the list of faulty remote worker indices (which start from 1) (ok)

In the function that calls this method, we also interpret the return values as remote worker indices (starting from 1). Hence the worker = self.remote_workers()[worker_index - 1].

I really think it's all ok :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. this is conceptually more correctly.

Copy link
Member

@gjoliver gjoliver left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, a few nits left. awesome change.

logger.info("Health checking all workers ...")
checks = []
for worker in self.remote_workers():
_, obj_ref = worker.sample_with_count.remote()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, ultimately your call. I guess we only probe when there is a worker failure, so it doesn't happen often and should be ok. if it's a periodic probe, we probably should make this much lighter.

logger.info("Worker {} looks healthy.".format(i + 1))
except RayError:
logger.exception("Worker {} is faulty.".format(i + 1))
faulty_worker_indices.append(i + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, ok.
I think you are only probing remote_workers() here, so it's 0th remote worker, 1th remote worker, etc.
and when you recreate them, you are also going through 0th entry in self.remote_worker, 1th entry, etc ...
so they seem to match exactly.

env_creator=self._env_creator,
validate_env=None,
policy_cls=self._policy_class,
worker_index=i + 1,
num_workers=num_workers,
worker_index=old_num_workers + i + 1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

# failed one in its `self.recreated_worker=True` property value. It will have
# the same `worker_index` as the original one.
# If True, the `ignore_worker_failures` setting will be ignored.
"recreate_failed_workers": False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normally, I'd say it's ok to keep restarting.
it's monitoring's job to alert the users that their job has been consistently restarting.
since there may be a lot of workers, I feel like you need to set the default pretty high anyways, if they are running on spot for example.

@sven1977 sven1977 merged commit c82f6c6 into ray-project:master Apr 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants