-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Work stealing #10607
Work stealing #10607
Conversation
… the CoreWorkerDirectTaskReceiver mutex private
…rt_test. builds on mac
} | ||
|
||
// Set the "stolen" bool flag to true | ||
reverse_it->second = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of storing a flag on the executor side, can we store the callback, then delete the task/reply to the owner immediately? That would help to make sure that we reply to the owner as quickly as possible.
absl::MutexLock lock(&mu_); | ||
|
||
size_t half = non_actor_task_queue_.size() / 2; | ||
RAY_CHECK(half >= 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this assertion is meaningful (size_t is always non-negative).
reverse_it->second = true; | ||
|
||
// Add the task's TaskSpecification to the StealWork RPC reply | ||
reply->add_tasks_stolen()->CopyFrom(reverse_it->first.GetMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Carrying on the conversation from the previous PR:
Currently, I don't think the owner keeps a {TaskId -> TaskSpecification} mapping for the tasks that are in-flight to a worker. In fact, right after calling PushNormalTask to push a task to a worker, OnWorkerIdle pops the task's TaskSpec from the relevant owner's queue. So once we push a task, the owner will not see the task's TaskSpec until the worker responds to the owner and activates the PushNormalTask callback (which captures the TaskSpec).
I think you can get the task spec from the owner's TaskManager, which caches the specs for all tasks that are still running. This may actually be relevant for performance because we can save bandwidth by not sending the full task spec back to the owner.
@@ -523,6 +542,14 @@ class CoreWorkerDirectTaskReceiver { | |||
/// Queue of pending requests per actor handle. | |||
/// TODO(ekl) GC these queues once the handle is no longer active. | |||
std::unordered_map<WorkerID, SchedulingQueue> scheduling_queue_; | |||
/// The Worker ID of the worker running this task receiver | |||
WorkerID this_worker_id_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try not to add fields that are only used for debugging purposes. I think you can get the worker ID from other parts of the same log).
@@ -320,6 +320,31 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( | |||
return; | |||
} | |||
|
|||
if (!task_spec.IsActorTask() && !task_spec.IsActorCreationTask()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not sure if these handlers are guaranteed to execute in the same order that they were posted in. I think it will work for now since there is only one thread executing the event loop, but that could be a problem with multiple threads since then the queue may not match the order of callbacks. It may be safer to store the original send_reply_callback
onto the queue first (basically the same code that was already in HandlePushTask), then execute from the queue in this handler.
|
||
/// Enable stealing of non-actor tasks among workers that are associated with the same | ||
/// owner | ||
RAY_CONFIG(bool, work_stealing_enabled, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to remove this flag? That is, we always enable work stealing, but it doesn't actually kick in unless max_tasks_in_flight is > 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
if (victim_addr.worker_id == thief_addr.worker_id || | ||
((candidate_lease_entry.stealable_tasks.size() > | ||
victim_lease_entry.stealable_tasks.size()) && | ||
candidate_addr.worker_id != thief_addr.worker_id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I'm not really sure if I understand this if condition! Won't this find the worker with the minimum stealable tasks? I thought that we would want the worker with the maximum number. Also, I don't quite understand this part: victim_addr.worker_id == thief_addr.worker_id
.
It would be good to add some comments here to explain the criteria for the victim worker.
} | ||
rpc::WorkerAddress victim_addr = *victim_it; | ||
|
||
// Check that the victim is a suitable one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we could simplify these checks to just checking if the victim has more than one task in flight. The thief should have 0 so that will also make sure the worker doesn't steal from itself.
auto res = thief_entry.stealable_tasks.emplace(task_spec.TaskId()); | ||
RAY_CHECK(res.second); | ||
executing_tasks_.emplace(task_spec.TaskId(), thief_addr); | ||
PushNormalTask(thief_addr, client, scheduling_key, task_spec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about reusing some of the existing code, to cut down on the additional logic? You could push the stolen tasks back onto the main queue, then call OnWorkerIdle
for the thief worker, right? That would also help to make sure that we never exceed the maximum tasks in flight.
worker_to_lease_entry_.erase(addr); | ||
} | ||
|
||
void CoreWorkerDirectTaskSubmitter::StealWorkIfNeeded( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is pretty long! I left some suggestions on how to simplify it, but if it's still ~50+ lines, we should think about other ways to pull out parts of the logic.
…nto cancel_queued_tasks
Increase debugability of test cancel
Continuing this PR with new PR #13570 ! |
Why are these changes needed?
These changes are needed to allow workers to steal non-actor tasks from other workers that are overloaded. In particular, if a worker is done with its own work, and there are no more tasks in the owner's task queue, the owner looks for a suitable victim, and if one is found, it initiates work stealing on behalf of the idle worker. In addition, when the work stealing mode is enabled,
RequestNewWorkerIfNeeded
is instructed to request new workers not only if there are more tasks in the owner's queue (and the pipelines to the current workers are all full), but also if there are stealable tasks at any of the existing workers. This approach is called "Eager Worker Requesting Mode".Work Stealing is only available among workers that share the same owner, and that are working on tasks with the same
SchedulingKey
.Related issue number
This PR supersedes the (now closed) PR #10135. It also includes the changes introduced with PR #10225 (Keeping pipelines full), now merged into master.
Checks
scripts/format.sh
to lint the changes in this PR.