-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Work Stealing and Eager Worker Requesting Mode #10135
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't read through direct_task_transport.cc
code since I left several suggestions to reduce the scope of this PR (remove the eager worker requesting) and to refactor that will probably change the code significantly. We should try to keep the changes very simple and contained (right now there is a lot of logic that has to check whether work stealing is enabled and it's a little hard to follow).
/// Whether the eager worker requesting mode should be enabled, so that | ||
/// we can request new workers for the purpose of stealing work from overloaded | ||
/// existing workers. | ||
RAY_CONFIG(bool, work_stealing_and_eager_workers_requesting_enabled, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep this PR manageable, let's leave out eager worker requesting for now. This will be similar to before, where we only requested one worker at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing this offline, we are keeping work stealing and eager worker requesting together, but we are adding only one boolean variable (instead of two) to the config files
@@ -289,6 +289,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ | |||
std::unique_ptr<CoreWorkerDirectTaskReceiver>(new CoreWorkerDirectTaskReceiver( | |||
worker_context_, task_execution_service_, execute_task, | |||
[this] { return local_raylet_client_->TaskDone(); })); | |||
direct_task_receiver_->this_worker = worker_id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also get the worker ID from the worker context that is already passed into the constructor: https://github.com/ray-project/ray/blob/master/src/ray/core_worker/context.h#L32
absl::MutexLock lock(&direct_task_receiver_->mu_); | ||
RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId() | ||
<< ". Adding it to the tasks_received_ queue!"; | ||
direct_task_receiver_->tasks_received_.emplace(task_spec.TaskId(), task_spec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's usually not a good idea to structure code so that another class has public access to another class's members. Also, we would like to use the mutex internal to direct_task_receiver_
, not the one internal to the CoreWorker
. Finally, we already have a queue implemented in the direct_task_receiver_
, so we should try to use that one instead of adding another queue.
I'd suggest restructuring this code so that we queue the task in this section, and then attempt to pop tasks off of the queue in the callback that gets posted to the task_execution_service_
below. This will require refactoring CoreWorkerDirectTaskReceiver::HandlePushTask
into two different methods, one to push onto the queue and another to process tasks from the queue. Right now, that method does both.
rpc::StealWorkReply *reply, | ||
rpc::SendReplyCallback send_reply_callback) { | ||
RAY_LOG(DEBUG) << "Entering HandleStealWork!"; | ||
// get maximum number of tasks to steal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use full sentences for all code comments (capitalize and punctuation) :)
{ | ||
absl::MutexLock lock(&direct_task_receiver_->mu_); | ||
|
||
int half = direct_task_receiver_->tasks_received_.size() / 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give some intuition for why we should try to steal half of the tasks?
Also, we should use a size_t
here instead of an int
(in case there are more tasks in the queue than the max int
value).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just made the change from int
to size_t
. When it comes to the choice of stealing half of the tasks (instead of a larger or smaller fraction of the total), the idea is that when a thief initiates stealing, it has no tasks in flight. So by stealing half of the tasks, we make sure that thief and victim end up with the same number of tasks in flight. Of course, if tasks are very short, when the thief receives the stolen tasks, the victim might have less than half of the original workload (because it has executed some in the meantime).
@@ -504,6 +504,10 @@ class CoreWorkerDirectTaskReceiver { | |||
void HandlePushTask(const rpc::PushTaskRequest &request, rpc::PushTaskReply *reply, | |||
rpc::SendReplyCallback send_reply_callback); | |||
|
|||
absl::Mutex mu_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New fields should be private.
Also, please document the new fields with a comment, like the code below.
Is the new lock necessary because the CoreWorkerDirectTaskReceiver
state is now modified from the IO event loop in addition to the task execution event loop? If so, it'd be good to write that down in the comment!
struct LeaseEntry { | ||
std::shared_ptr<WorkerLeaseInterface> lease_client_; | ||
int64_t lease_expiration_time_; | ||
uint32_t tasks_in_flight_; | ||
SchedulingKey current_scheduling_key_; | ||
absl::flat_hash_set<TaskID> stealable_tasks_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to keep track of the stealable tasks? Shouldn't that already be captured by the tasks_in_flight_?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why we need to keep track of the number of stealable tasks as well as the number of tasks in flight (the former are a subset of the latter) is that the two quantities are often not equal, and using only one variable instead of two would cause some complications, inefficiencies, and perhaps errors.
When we steal a bunch of tasks from a worker, assuming that the reply to the thief's StealWork
RPC arrives to the owner before the reply to the victim's PushNormalTask
RPCs for the stolen tasks, we remove the stolen tasks from the stealable_tasks_
hash_set, while the victim's tasks_in_flight_
value initially remains unchanged. This reflects the fact that the stolen tasks are still technically in flight to the victim. Knowing how many tasks are in flight to a worker (even if they have been stolen) is important, for example, to know when to return the worker to the Raylet. So even if the stolen tasks are not going to be executed by the victim anymore, it is still useful for us to know that they are still "in flight", in the sense that their respective RPCs have not completed yet.
On the other hand, knowing the number of tasks in flight does not allow us, on its own, to perform work stealing in an efficient way. If we rely on it to choose which worker to steal from (picking the one with more tasks in flight), we will frequently make a suboptimal choice, as many times the number of tasks up for grab will be lower than the reported number of tasks in flight.
Having decided to simultaneously keep track of the number of stealable tasks, as well as the number of tasks in flight to a worker, why use a flat_hash_set
containing the TaskIDs of the stealable_tasks_
, instead of just using an integer? The reason for this is to avoid a race condition between a thief's StealWork
RPC callback and the victim's PushNormalTask
RPC callbacks. When the thief steals some tasks from a victim, the reply to the StealWork
RPC usually comes back to the owner earlier than the replies to the PushNormalTask RPCs. However, this might not always be the case. If the PushNormalTask callbacks are run first, they will be the ones responsible for decrementing the number of stealable tasks (to maintain the invariant that stealable_tasks_.size() <= tasks_in_flight
). The challenge is that we must decrement the number of stealable tasks exactly once for each stolen task, and decrementing an integer is not an idempotent operation. On the other hand, deleting a TaskID
from a set is idempotent, so we don't have to worry about finding a way to let the PushNormalTask
and the StealWork
callbacks coordinate to decrement the counter only once.
struct LeaseEntry { | ||
std::shared_ptr<WorkerLeaseInterface> lease_client_; | ||
int64_t lease_expiration_time_; | ||
uint32_t tasks_in_flight_; | ||
SchedulingKey current_scheduling_key_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding the new mapping from worker -> scheduling key, I'd suggest refactoring the existing fields to map from scheduling key -> { task queue, pending lease request, pool of leased workers }. I don't think it's a big deal to iterate through all the leased workers for a particular scheduling key when we need to steal tasks, since this should be happening relatively infrequently and the number of workers per scheduling key should be low. However, I am a bit worried about having to iterate through all leased workers for common operations like checking whether we need to request a new worker. There's no bound on the number of total leased workers, since this is based on the scheduling keys requested by the application.
So, it would be best if we could keep the mapping from scheduling key -> leased workers, instead of the other way around. Also, there are already a couple different data structures that map from scheduling key -> x (task_queues_, pending_lease_requests_), so I think it makes sense to merge them now.
Also, I think we could also use this data structure to do a fast check for whether another worker is needed or not, right? If I'm not mistaken, it should just be if the total number of tasks, including both queued tasks and ones that have already been sent to a worker, is greater than the number of leased workers.
} | ||
|
||
message StealWorkReply { | ||
int64 number_of_tasks_stolen = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this field since we can just check the size of tasks_stolen
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! I removed it.
RAY_LOG(DEBUG) | ||
<< "Calling Steal Work RPC! Maximum number of tasks requested (to steal) = " | ||
<< victim->second.tasks_in_flight_; | ||
request->set_max_tasks_to_steal(victim->second.tasks_in_flight_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag seems useful in theory since stealing can be expensive. However, I don't think it makes sense to set it to the total number of tasks in flight to the victim. The victim should already be basically limited to that number since its queue size should be <= the tasks in flight, right? (Assuming the owner doesn't send many more tasks to the victim in between this point and when the victim receives the steal request)
Could you also add a high-level description of the algorithm in the PR text? It would be nice to see a comparison of the flow diagram between the previous code and the new code, e.g., when are new workers requested? I also had a question about when workers are returned if there are no tasks left to steal. Maybe this is already covered, but I didn't read the code in direct_task_transport.cc very carefully :) For example, what happens in the following case?
When do we return worker B in this case? |
|
||
message StealWorkReply { | ||
int64 number_of_tasks_stolen = 1; | ||
repeated TaskSpec tasks_stolen = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the owner already know which specs are on the worker? No need to send the full spec message back, can just send their indices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah, good point. It's probably best to send back the task ID instead of the indices since the indices could get updated between when the worker replies and when the owner receives the reply, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I don't think the owner keeps a TaskId
-> TaskSpecification
mapping for the tasks that are in-flight to a worker. In fact, right after calling PushNormalTask
to push a task to a worker, OnWorkerIdle
pops the task's TaskSpec from the relevant owner's queue. So once we push a task, the owner will not see the task's TaskSpec until the worker responds to the owner and activates the PushNormalTask
callback (which captures the TaskSpec).
For this reason, when we perform work stealing, the only way for the owner to access the the stolen tasks' TaskSpecs is for the victim to send them back along with the StealWork
rpc reply.
The worker will eventually also respond to each of the stolen tasks' original PushNormalTask
RPCs, so an alternative approach would be to let those PushNormalTask
callbacks take care of pushing the stolen tasks to the thief. However, this approach would be less efficient, because the replies would only start coming back after the worker has finished executing the non-stolen tasks that were pushed to the worker's queue before the stolen ones.
int64 number_of_tasks_stolen = 1; | ||
repeated TaskSpec tasks_stolen = 2; | ||
} | ||
|
||
message PushTaskRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To take full advantage of vectorization, don't we need to push a list of tasks to the worker instead of one per RPC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're still getting some improvement from grpc batching, but yeah, long-term we should consider sending multiple tasks to a worker in a single RPC. Right now it's a little tricky to do that without adding latency for other tasks because the return values of the task are also sent back in the reply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. You could do something like best effort vectorization of replies (send replies back along a separate reply path). This is probably something that would make sense once we have gRPC streaming support.
@gabrieleoliaro, let me know when this is ready for another look! :) |
Sure! In the situation you mention here, worker B's |
Closing this PR, superseded by PR #10607 |
Why are these changes needed?
These changes enable work stealing among workers that are associated with the same owner. They also introduce a new 'eager' worker requesting mode, which allows us to request new workers even when the task queues at a given owner are already empty. The purpose of these new workers is to steal tasks from the other overloaded workers.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.