-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[core] Fix idempotency issues in RequestWorkerLease for scheduled leases #58265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Fix idempotency issues in RequestWorkerLease for scheduled leases #58265
Conversation
Signed-off-by: joshlee <joshlee@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses an idempotency issue in RequestWorkerLease by allowing multiple callbacks to be stored for a single lease request, which is a robust way to handle retries from transient network errors. The changes are well-integrated across the scheduling components, and the new unit test provides good validation for the fix. My review includes a couple of suggestions to refine the StoreReplyCallback implementations by using find() instead of operator[] on maps to prevent unintended side effects and improve efficiency. Overall, this is a solid improvement to the scheduler's reliability.
| for (const auto &work : leases_to_schedule_[scheduling_class]) { | ||
| if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { | ||
| work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); | ||
| return; | ||
| } | ||
| } | ||
| for (const auto &work : infeasible_leases_[scheduling_class]) { | ||
| if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { | ||
| work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using operator[] on leases_to_schedule_ and infeasible_leases_ will create a new empty std::deque if the scheduling_class is not found. This is inefficient and can lead to the map being populated with empty entries. It's better to use find() to check for the key's existence before accessing the deque.
auto it = leases_to_schedule_.find(scheduling_class);
if (it != leases_to_schedule_.end()) {
for (const auto &work : it->second) {
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
return;
}
}
}
auto infeasible_it = infeasible_leases_.find(scheduling_class);
if (infeasible_it != infeasible_leases_.end()) {
for (const auto &work : infeasible_it->second) {
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
return;
}
}
}| for (const auto &work : leases_to_grant_[scheduling_class]) { | ||
| if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) { | ||
| work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using operator[] on leases_to_grant_ will create a new empty std::deque if the scheduling_class is not found. This is inefficient and can lead to the map being populated with empty entries. It's better to use find() to check for the key's existence before accessing the deque.
auto leases_to_grant_it = leases_to_grant_.find(scheduling_class);
if (leases_to_grant_it != leases_to_grant_.end()) {
for (const auto &work : leases_to_grant_it->second) {
if (work->lease_.GetLeaseSpecification().LeaseId() == lease_id) {
work->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply);
return;
}
}
}| return false; | ||
| } | ||
|
|
||
| void LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was considering whether I should combine IsLeaseQueued and StoreReplyCallback but felt it was more clear to separate em
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
…otency-in-request-worker-lease
Signed-off-by: joshlee <joshlee@anyscale.com>
edoakes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summarizing to check my understanding:
- Previously, the local lease manager assumed that it would only ever get a single request to pull dependencies for a lease request.
- However, if the RPC is retried after we start to pull dependencies for the lease request, it might be retried, and then we would be re-requesting to pull dependencies again.
- To address this, you are allowing duplicate requests and replying to all of them once the pull is complete. You are doing this instead of overwriting the ongoing callback because the retry could come in before the initial request, in which case if we overwrite we would only reply to the initial request and the client would hang forever.
Did I miss anything?
| return false; | ||
| } | ||
|
|
||
| void LocalLeaseManager::StoreReplyCallback(const SchedulingClass &scheduling_class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
| for (const auto &reply_callback : reply_callbacks) { | ||
| ::ray::rpc::ResourceMapEntry *resource; | ||
| for (auto &resource_id : allocated_resources->ResourceIds()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be missing something, but it looks like these loops should be inverted -- nothing about the inner loop logic depends on which callback we are iterating through. So you can make a single pass through allocated_resources->ResourceIds() and populate all callbacks' resource mappings at once instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, inverted the loops
| (*it->second)->reply_callbacks_.emplace_back(std::move(send_reply_callback), reply); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
real? ^
Nope that pretty much summarizes it! The idempotency guards we have in place only come into action once the lease is granted, but we're vulnerable if in between the lease arrived -> lease granted stage which includes the pulling dependencies stage.
Yea... I called StoreReplyCallback under the assumption it's used only after IsLeaseQueued but thats not good, I'll do what the AI said |
Signed-off-by: joshlee <joshlee@anyscale.com>
Signed-off-by: joshlee <joshlee@anyscale.com>
edoakes
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only stylistic comments. Ping for merge when ready.
| return true; | ||
| } | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Lease Queue Race Condition
A race condition exists between IsLeaseQueued and AddReplyCallback due to their inconsistent search orders for leases. IsLeaseQueued checks waiting_leases_index_ then leases_to_grant_, while AddReplyCallback checks the reverse. This allows a lease to move between queues after IsLeaseQueued returns true, causing AddReplyCallback to fail and trigger a RAY_CHECK in HandleRequestWorkerLease, crashing the Raylet.
Additional Locations (1)
…ses (ray-project#58265) ## Description > Briefly describe what this PR accomplishes and why it's needed. Using the ip tables script created in ray-project#58241 we found a bug in RequestWorkerLease where a RAY_CHECK was being triggered here: https://github.com/ray-project/ray/blob/66c08b47a195bcfac6878a234dc804142e488fc2/src/ray/raylet/lease_dependency_manager.cc#L222-L223 The issue is that transient network errors can happen ANYTIME, including when the server logic is executing and has not yet replied back to the client. Our original testing framework using an env variable to drop the request or reply when it's being sent, hence this was missed. The issue specifically is that RequestWorkerLease could be in the process of pulling the lease dependencies to it's local plasma store, and the retry can arrive triggering this check. Created a cpp unit test that specifically triggers this RAY_CHECK without this change and is fixed. I decided to store the callbacks instead of replacing the older one with the new one due to the possibility of message reordering where the new one could arrive before the old one. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
…ses (ray-project#58265) ## Description > Briefly describe what this PR accomplishes and why it's needed. Using the ip tables script created in ray-project#58241 we found a bug in RequestWorkerLease where a RAY_CHECK was being triggered here: https://github.com/ray-project/ray/blob/66c08b47a195bcfac6878a234dc804142e488fc2/src/ray/raylet/lease_dependency_manager.cc#L222-L223 The issue is that transient network errors can happen ANYTIME, including when the server logic is executing and has not yet replied back to the client. Our original testing framework using an env variable to drop the request or reply when it's being sent, hence this was missed. The issue specifically is that RequestWorkerLease could be in the process of pulling the lease dependencies to it's local plasma store, and the retry can arrive triggering this check. Created a cpp unit test that specifically triggers this RAY_CHECK without this change and is fixed. I decided to store the callbacks instead of replacing the older one with the new one due to the possibility of message reordering where the new one could arrive before the old one. --------- Signed-off-by: joshlee <joshlee@anyscale.com>
…ses (ray-project#58265) ## Description > Briefly describe what this PR accomplishes and why it's needed. Using the ip tables script created in ray-project#58241 we found a bug in RequestWorkerLease where a RAY_CHECK was being triggered here: https://github.com/ray-project/ray/blob/66c08b47a195bcfac6878a234dc804142e488fc2/src/ray/raylet/lease_dependency_manager.cc#L222-L223 The issue is that transient network errors can happen ANYTIME, including when the server logic is executing and has not yet replied back to the client. Our original testing framework using an env variable to drop the request or reply when it's being sent, hence this was missed. The issue specifically is that RequestWorkerLease could be in the process of pulling the lease dependencies to it's local plasma store, and the retry can arrive triggering this check. Created a cpp unit test that specifically triggers this RAY_CHECK without this change and is fixed. I decided to store the callbacks instead of replacing the older one with the new one due to the possibility of message reordering where the new one could arrive before the old one. --------- Signed-off-by: joshlee <joshlee@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Description
Using the ip tables script created in #58241 we found a bug in RequestWorkerLease where a RAY_CHECK was being triggered here:
ray/src/ray/raylet/lease_dependency_manager.cc
Lines 222 to 223 in 66c08b4
The issue is that transient network errors can happen ANYTIME, including when the server logic is executing and has not yet replied back to the client. Our original testing framework using an env variable to drop the request or reply when it's being sent, hence this was missed. The issue specifically is that RequestWorkerLease could be in the process of pulling the lease dependencies to it's local plasma store, and the retry can arrive triggering this check. Created a cpp unit test that specifically triggers this RAY_CHECK without this change and is fixed. I decided to store the callbacks instead of replacing the older one with the new one due to the possibility of message reordering where the new one could arrive before the old one.