Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Jobs] Fix race condition in supervisor actor creation and add timeout for pending jobs #34223

Merged
merged 12 commits into from
Apr 12, 2023

Conversation

architkulkarni
Copy link
Contributor

Why are these changes needed?

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

  1. A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
  2. In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
  3. The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.

If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with #34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

  • Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id #33259, but it's not clear how this would matter in the case of a single job.
  • What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.

Related issue number

Closes #34172

Closes

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
# If the job is still pending, we will set the status
# to FAILED.
job_info = await self._job_info_client.get_info(job_id)
timeout = float(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: timeout extract out of the while loop.

DEFAULT_JOB_START_TIMEOUT_SECONDS,
)
)
if job_info.start_time in [None, 0]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what case is for job_info.start_time in [None, 0]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's overly defensive, to avoid losing the user's job if we have some edge case or bug where start_time is unset or the default value. Not confident about whether it makes sense to include it, let me know if you think it should be removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is not expected to have start_time not set, I think it makes sense to make that attribute as required and always positive in JobInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it as discussed offline (the added complexity and maintainability burden is not worth it, later we should enforce required and positive)

logger.error(err_msg)
continue

if job_supervisor is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check JobStatus.PENDING (checked above)

else:
    if job_supervisor is None:
        job_supervisor = self._get_actor_for_job(job_id)
        if job_supervisor is None:
            logger.error(f"Failed to get job supervisor for job {job_id}")
            <put status job failed>
        await job_supervisor.ping.remote()

)
if time.time() - job_info.start_time / 1000 > timeout:
err_msg = (
"Job supervisor actor failed to start within "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also write the action items here? Most likely this happens because the job submission requires the resources? Can you write action items regarding this? Is there any other case this can happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any other case where it can happen. Added a message

f"Job {job_id} start time not found. "
f"Skipping job start timeout check."
)
if time.time() - job_info.start_time / 1000 > timeout:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will break if start_time is None


while is_alive:
try:
await job_supervisor.ping.remote()
job_status = await self._job_info_client.get_status(job_id)
if job_status == JobStatus.PENDING:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a bit hard to follow the flow. Can we just rewrite this way?

I think we can assume status != PEDNGIN once the job supervisor is created

if PENDING
    check timeout 
else
    actor = get supervisor actor
    if not actor:
         fail
    ping
     

@rkooo567
Copy link
Contributor

Btw, @sihanwang41 I remember you mentioned a different solution last time (getting job info before running recovery monitor). What's the reason why we decided to change the approach?

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 11, 2023
@sihanwang41
Copy link
Contributor

Btw, @sihanwang41 I remember you mentioned a different solution last time (getting job info before running recovery monitor). What's the reason why we decided to change the approach?

Archit mentioned getting job info is not very effecient, if there are thousands of jobs, it might have long time hang to the end user. We can improve it later.

Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
@architkulkarni architkulkarni removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 11, 2023
@architkulkarni
Copy link
Contributor Author

@rkooo567 Comments addressed, please take another look. Also, let me know if you think the other approach (make get all jobs sync) is still better. I made a guess about the tradeoffs, but you might have more experience with gRPC latency and issues arising from the agent hanging.

Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
@architkulkarni architkulkarni added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 11, 2023
Signed-off-by: Archit Kulkarni <architkulkarni@users.noreply.github.com>
@architkulkarni
Copy link
Contributor Author

I had to revert the if-else refactor you both suggested, because in the case where job_supervisor is provided, we always need to try to ping the actor in order to pick up errors like RuntimeEnvSetupError or ActorUnschedulableError as soon as possible, even if the current status is PENDING. (In this case, we transition the status to FAILED.)

@architkulkarni architkulkarni removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 11, 2023
@architkulkarni
Copy link
Contributor Author

Doc build broken on master, unrelated

@architkulkarni architkulkarni added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Apr 11, 2023
@rkooo567 rkooo567 merged commit 74325ef into ray-project:master Apr 12, 2023
@rkooo567
Copy link
Contributor

cc @architkulkarni please make sure to chrery pick

architkulkarni added a commit to architkulkarni/ray that referenced this pull request Apr 12, 2023
…t for pending jobs (ray-project#34223)

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.
If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with ray-project#34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id ray-project#33259, but it's not clear how this would matter in the case of a single job.
What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.
clarng pushed a commit that referenced this pull request Apr 12, 2023
…t for pending jobs (#34223) (#34318)

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.
If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with #34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id #33259, but it's not clear how this would matter in the case of a single job.
What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
…t for pending jobs (ray-project#34223)

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.
If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with ray-project#34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id ray-project#33259, but it's not clear how this would matter in the case of a single job.
What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.

Signed-off-by: elliottower <elliot@elliottower.com>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
…t for pending jobs (ray-project#34223)

@rkooo567 and @sihanwang41 found a race condition when submitting a job causing the job to fail. The failure happens when this sequence of events happens:

A job is submitted. Its job_info is put to the internal KV. This happens here, before the JobSupervisor is actually created.
In the constructor of JobManager, we call await self._recover_running_jobs(), which finds the job_info in the internal KV and starts to monitor that job. Because the JobSupervisor actor doesn't exist yet, the JobManager job monitoring loop fails to ping it, and puts the status of this job as FAILED in the internal KV.
The JobSupervisor is created. JobSupervisor.run() checks that the status is PENDING, but it's not, so it raises the error "run should only be called once" which is not helpful to the user.
If step 2 happens before step 1, there's no issue. But these are both async, so the order isn't guaranteed.

The solution in this PR is to allow the JobManager monitoring loop to handle the case PENDING. It handles it by skipping the ping to the JobSupervisor actor for that iteration of the loop.

This PR adds a unit test that fails with ray-project#34190 (which forces the race condition).

This PR also adds a timeout to fail jobs that have been pending for 15 minutes, configurable via environment variable.

Some questions are still open:

Why did this only start to fail recently? The only recent change is [Jobs] Fix race condition on submitting multiple jobs with the same id ray-project#33259, but it's not clear how this would matter in the case of a single job.
What is a reasonable default timeout for pending jobs, and should we even have one? It should be larger than the existing runtime_env setup timeout (10 minutes) in order to distinguish runtime env setup timeouts from other timeouts. Not sure if there are other existing timeouts that we should consider.

Signed-off-by: Jack He <jackhe2345@gmail.com>
edoakes added a commit that referenced this pull request Oct 17, 2023
There are multiple race conditions between `JobManager.submit_job` and `JobManager._recover_running_jobs` because the latter runs as an async task on the event loop. This can cause a duplicate monitoring task to be started for a job before the actor is event spawned. This has caused multiple issues:

- #34223
- #40062

This PR adds synchronization so `submit_job` will only run after `_recover_running_jobs` finishes (note that `_recover_running_jobs` is async and will exit after a single GCS read RPC, so this should have no practical impact).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Job] Ray Job fail to start because of Job supervisor actor creation race condition.
4 participants