Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keeping pipelines full #10225

Merged
merged 19 commits into from
Aug 31, 2020
Merged

Conversation

goliaro
Copy link
Contributor

@goliaro goliaro commented Aug 20, 2020

Why are these changes needed?

These changes are needed to avoid over-requesting new workers when using pipelining to submit tasks from owners to their workers. When a new task is submitted to an owner, the code first tries to send the task to an existing worker if the number of tasks in flight to that worker is less than the maximum allowed by the pipelining settings. If all pipelines to all workers are full, then it requests a new worker.

Related issue number

Checks

  • [ x] I've run scripts/format.sh to lint the changes in this PR.
  • [ x] I've included any doc changes needed for https://docs.ray.io/en/latest/.
  • [ x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failure rates at https://ray-travis-tracker.herokuapp.com/.
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested (please justify below)

@stephanie-wang stephanie-wang self-assigned this Aug 20, 2020
// (1) how many worker leases have been granted to execute tasks with
// the current SchedulingKey
// (2) how many tasks are in flight to all the workers from (1)
absl::flat_hash_map<SchedulingKey, std::pair<uint32_t, uint32_t>> worker_info_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline about refactoring this class to squash all of the fields that are currently related to SchedulingKey into one hashmap. The value should be a new struct that includes:

  • these new counts
  • the workers that are currently leased to us with that scheduling key (worker_to_lease_entry_)
  • whether there is a pending lease request for that key (pending_lease_requests_)
  • the tasks queued with that key (task_queues_)

@stephanie-wang
Copy link
Contributor

I'll take another look once the refactoring and unit tests are done!

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 20, 2020
it->second.push_back(task_spec);
auto &it = scheduling_key_entries_[scheduling_key];
it.task_queue_.push_back(task_spec);
RAY_CHECK(it.task_queue_.size() >= 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check doesn't seem necessary.

// We don't have any of this type of task to run.
return;
}

// If pipelining is enabled, check whether we really need a new worer or whether we have
// enough room in an existing worker's pipeline to send the new tasks
if (max_tasks_in_flight_per_worker_ > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this check because it should be okay to run the following code even when max_tasks_in_flight = 1.

// enough room in an existing worker's pipeline to send the new tasks
if (max_tasks_in_flight_per_worker_ > 1) {
if (scheduling_key_entry.tot_tasks_in_flight <
scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about putting this logic in RequestNewWorkerIfNeeded. It seems a bit strange to be submitting tasks in this method (before the logic only had to do with requesting new workers and not anything to do with task submission). It also seems a bit brittle because OnWorkerIdle, called below, calls back into RequestNewWorkerIfNeeded.

Instead, how about we only use the check to see whether we should request a new worker or not? Then, we should also move this new logic to submit tasks to already active workers to when the task is first queued in SubmitTask.

Copy link
Contributor Author

@goliaro goliaro Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I added this part (calling OnWorkerIdle from within RequestNewWorkerIfNeeded) was to avoid introducing new latency due to the fact that OnWorkerIdle works in a pull (rather than push) fashion. OnWorkerIdle is normally only called when (1) we get a new worker, or (2) when we get a response from a worker (after the worker has completed the execution of a task). At that point, we pull tasks from the queue and submit them.

Now, consider a scenario with a set of active workers with non-full pipelines, where we have just added a few more tasks to a queue at the owner's. In this situation, even if some of the pipelines are not full, before we can submit the new tasks to an active worker, we have to wait until that worker has responded back to the owner (because OnWorkerIdle is not called until then). If we submit to a queue at the owner's a number of tasks that is larger than the grand total of existing "spots" available in the pipelines to the existing worker, the system will only fill those pipelines (and realize that it needs to request an additional worker) only after it has received a response from every single one of the existing workers with non-full pipelines. As a result, the total number of active workers will be lower for a longer period of time, and the overall execution time will suffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that we should call OnWorkerIdle even while it is still executing other tasks. My comment was more about where we should call it. I think it is better to call it directly in SubmitTask right after we've added tasks to the queue. The reason is that: a) dispatching tasks doesn't match the current semantics of the method, which is supposed to only request a new worker, and b) it prevents the bad recursive structure where RequestNewWorkerIfNeeded calls OnWorkerIdle calls RequestNewWorkerIfNeeded, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see! I think I misread your initial comment! Sorry about that. This makes sense. So I guess we would just move the code block

if (scheduling_key_entry.tot_tasks_in_flight <
      scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
    // The pipelines to the current workers are not full yet, so we don't need more
    // workers.

    // Find a worker with a number of tasks in flight that is less than the maximum
    // value (max_tasks_in_flight_per_worker_) and call OnWorkerIdle to send tasks to
    // that worker
    for (auto active_worker_addr : scheduling_key_entry.active_workers_) {
      RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) !=
                worker_to_lease_entry_.end());
      auto &lease_entry = worker_to_lease_entry_[active_worker_addr];
      if (lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) {
        OnWorkerIdle(active_worker_addr, scheduling_key, false,
                     lease_entry.assigned_resources_);
        break;
      }
    }

    return;
  } 

(without the return statement, of course) to the SubmitTask function. RequestNewWorkerIfNeeded would then only keep the following if statement?

if (scheduling_key_entry.tot_tasks_in_flight <
      scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
    // The pipelines to the current workers are not full yet, so we don't need more
    // workers.

    return;
  } 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! Let me do that right now.

if (lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) {
OnWorkerIdle(active_worker_addr, scheduling_key, false,
lease_entry.assigned_resources_);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we want to break here. Couldn't there be multiple idle workers that could get filled by new tasks? But I guess this depends on where we decide to put this logic (see above comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! So I guess we would remove the break statement if we put this in SubmitTask, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I need to think about that. It seems like we could structure the code so that we're always guaranteed that only one worker needs to be filled up during SubmitTask, but I'm not sure.

auto lease_client = std::move(pending_lease_request.first);
const auto task_id = pending_lease_request.second;
pending_lease_request = std::make_pair(nullptr, TaskID::Nil());
RAY_CHECK(lease_client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this check is necessary (std::move should guarantee this already).

.emplace(scheduling_key, std::make_pair(lease_client, task_id))
.second);
pending_lease_request = std::make_pair(lease_client, task_id);
RAY_CHECK(pending_lease_request.first);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this check is necessary. The previous check was just to make sure that there wasn't already a pending lease request for the same scheduling key (arguably also not necessary).

@@ -399,6 +469,22 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec,
cancel_retry_timer_->async_wait(boost::bind(
&CoreWorkerDirectTaskSubmitter::CancelTask, this, task_spec, force_kill));
}
} else if (status.ok() && reply.attempt_succeeded()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary because we should still get back the callback for PushNormalTask.

std::deque<TaskSpecification> task_queue_ = std::deque<TaskSpecification>();
// Keep track of the active workers, so that we can quickly check if one of them has
// room for more tasks in flight
absl::flat_hash_set<rpc::WorkerAddress> active_workers_ =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making this a hashmap from rpc::WorkerAddress -> LeaseEntry instead of keeping a separate hashmap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't worker_to_lease_entry_ already have this mapping? Do you mean that I should just move worker_to_lease_entry_ into the SchedulingKeyEntry struct, so that each SchedulingKey will be paired to its own worker_to_lease_entry_ hashmap?

@stephanie-wang
Copy link
Contributor

stephanie-wang commented Aug 25, 2020 via email

@stephanie-wang
Copy link
Contributor

stephanie-wang commented Aug 25, 2020 via email

@goliaro
Copy link
Contributor Author

goliaro commented Aug 25, 2020

@stephanie-wang Ok, I just pushed the updated code :)

Co-authored-by: fangfengbin <869218239a@zju.edu.cn>
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments :)

task_queues_.emplace(scheduling_key, std::deque<TaskSpecification>()).first;
auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key];
scheduling_key_entry.task_queue_.push_back(task_spec);
if (scheduling_key_entry.tot_tasks_in_flight <
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should make this a while loop? And yeah, I think it's not correct to break after only one worker. Could you add this case to the unit test (i.e. check that we don't request a new worker if there are multiple workers that could be filled from the owner's queue). Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm okay now that I'm thinking about this again, I think it is okay to have the break statement and only dispatch to one worker! Sorry about that :)

Could you add a comment explaining why it's okay to break after one worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, let me add that right now!

task_queues_.erase(queue_entry);
RAY_LOG(DEBUG) << "Task queue empty, canceling lease request";
if (current_queue.empty()) {
RAY_LOG(INFO) << "Task queue empty, canceling lease request";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still attempt to delete the entry here? It'd be good to add a method to the new SchedulingKeyEntry struct to check whether it's safe to delete the entry (i.e. if the task queue is empty, no pending request, etc).

Copy link
Contributor Author

@goliaro goliaro Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the new method! However, I am not sure about attempting to delete in this place in the code, because there would still be some worker in the active_workers set, so calling the new method would tell us that it's not safe to delete the entry yet

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh gotcha, makes sense!

// The pipelines to the current workers are not full yet, so we don't need more
// workers.

// Find a worker with a number of tasks in flight that is less than the maximum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove this comment?

src/ray/core_worker/transport/direct_task_transport.cc Outdated Show resolved Hide resolved
@@ -377,6 +449,7 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec,
return Status::OK();
}
client = maybe_client.value();
client_addr = rpc_client->second.ToProto();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this variable anymore.

absl::flat_hash_set<rpc::WorkerAddress> active_workers_ =
absl::flat_hash_set<rpc::WorkerAddress>();
// Keep track of how many tasks with this SchedulingKey are in flight, in total
uint32_t tot_tasks_in_flight = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uint32_t tot_tasks_in_flight = 0;
uint32_t total_tasks_in_flight = 0;

We try to use complete words for variable naming in most cases.

pending_lease_requests_ GUARDED_BY(mu_);
struct SchedulingKeyEntry {
// Keep track of pending worker lease requests to the raylet.
std::pair<std::shared_ptr<WorkerLeaseInterface>, TaskID> pending_lease_request_ =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove the tail underscore at the end of these field names? Usually we do this only for private class members (see Google style guide). Realizing now that the above LeaseEntry struct doesn't follow this convention, oops :)

Copy link
Contributor Author

@goliaro goliaro Aug 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I also remove the underscores from the field names in the LeaseEntry struct? I guess it's better late than never, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great, thanks! :)

@goliaro
Copy link
Contributor Author

goliaro commented Aug 25, 2020

Ok, I just pushed the updated code!

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is looking very close! I just had some questions about making sure we delete the SchedulingKeyEntry properly. It would also be good to add unit tests for that, to make sure we're not leaking any memory.

scheduling_key_entry.task_queue.push_back(task_spec);
if (scheduling_key_entry.total_tasks_in_flight <
scheduling_key_entry.active_workers.size() *
max_tasks_in_flight_per_worker_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest wrapping this in a const method of the SchedulingKeyEntry (e.g., HasAvailableWorkers()) to make it more readable!

Copy link
Contributor Author

@goliaro goliaro Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, what do you mean by a const method? Also, I had to define the method outside of the SchedulingKeyEntry struct to be able to access the max_tasks_in_flight_per_worker_ member of CoreWorkerDirectTaskSubmitter. Otherwise, the compiler reported an error that I did not know how to fix. Do you know if there is a way to access max_tasks_in_flight_per_worker_ from a method within the SchedulingKeyEntry struct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I just mean to mark that the method does not modify the SchedulingKeyEntry, like this:
CanDelete() const { ... }

Yes, it shouldn't be able to access it because it's a private member of CoreWorkerDirectTaskSubmitter. You can fix it by allowing CanDelete to take the max tasks as an argument.

Copy link
Contributor Author

@goliaro goliaro Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I'll add the const keyword, and allow CanDelete() (and the other two methods I added to check whether the pipeline fullness) to take max_tasks_in_flight_per_worker_ as an argument, so that I can place the function inside the struct

RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) !=
worker_to_lease_entry_.end());
auto &lease_entry = worker_to_lease_entry_[active_worker_addr];
if (lease_entry.tasks_in_flight < max_tasks_in_flight_per_worker_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest wrapping this in a const method of the WorkerLeaseEntry to make it more readable!

if (it == task_queues_.end()) {

auto &task_queue = scheduling_key_entry.task_queue;
if (task_queue.empty()) {
// We don't have any of this type of task to run.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we check if it's okay to delete the SchedulingKeyEntry here too?

}
if (reply.worker_exiting()) {
// The worker is draining and will shutdown after it is done. Don't return
// it to the Raylet since that will kill it early.
absl::MutexLock lock(&mu_);
worker_to_lease_entry_.erase(addr);
auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key];
scheduling_key_entry.active_workers.erase(addr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if we should delete the SchedulingKeyEntry here?


if (scheduled_tasks->second.empty()) {
task_queues_.erase(scheduling_key);
if (scheduled_tasks.empty()) {
CancelWorkerLeaseIfNeeded(scheduling_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether we should delete the SchedulingKeyEntry here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! However, I think that the check should be placed inside the callback function in CancelWorkerLeaseIfNeeded, because the callback can call CancelWorkerLeaseIfNeeded as well, and CancelWorkerLeaseIfNeeded needs to access the SchedulingKeyEntry

src/ray/core_worker/transport/direct_task_transport.h Outdated Show resolved Hide resolved
Gabriele Oliaro and others added 2 commits August 26, 2020 15:58
@goliaro
Copy link
Contributor Author

goliaro commented Aug 26, 2020

Thanks, this is looking very close! I just had some questions about making sure we delete the SchedulingKeyEntry properly. It would also be good to add unit tests for that, to make sure we're not leaking any memory.

I was wondering what type of unit test you had in mind to check that we are not leaking memory by forgetting to delete some entries in the scheduling_key_entries hashmap. Because the hashmap is a private field of the CoreWorkerDirectTaskSubmitter class, we can't just check its size from within direct_task_transport_test.cc . Should I add a public function that allows us to check the size?

@stephanie-wang
Copy link
Contributor

Yes, you can either add a public function to check the size or you can make the unit test class a friend of the CoreWorkerDirectTaskSubmitter class (see here).

if (scheduling_key_entries_.size() != 0) {
RAY_LOG(INFO) << "size: " << scheduling_key_entries_.size();
}
return scheduling_key_entries_.size() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also use scheduling_key_entries_.empty() here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Just changed this. I also removed the RAY_LOG(INFO) line

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I'll merge once Travis finishes.

@stephanie-wang stephanie-wang removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 28, 2020
@stephanie-wang
Copy link
Contributor

Hey @gabrieleoliaro, looks like there is a build error. Could you fix it? https://api.travis-ci.com/v3/job/378975005/log.txt

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 28, 2020
@stephanie-wang
Copy link
Contributor

@gabrieleoliaro, I'm not sure if that last commit will fix the issue. That annotation means that the compiler should check that the mutex is held whenever that method is called. But the method is public and the mutex is private, so I don't think it will work.

You can fix the error by acquiring the lock inside the method!

@goliaro
Copy link
Contributor Author

goliaro commented Aug 29, 2020

Hey @gabrieleoliaro, looks like there is a build error. Could you fix it? https://api.travis-ci.com/v3/job/378975005/log.txt

Just pushed!

@goliaro
Copy link
Contributor Author

goliaro commented Aug 29, 2020

@gabrieleoliaro, I'm not sure if that last commit will fix the issue. That annotation means that the compiler should check that the mutex is held whenever that method is called. But the method is public and the mutex is private, so I don't think it will work.

sorry, this was just a first commit. I was not done yet :)

@ffbin
Copy link
Contributor

ffbin commented Aug 31, 2020

https://travis-ci.com/github/ray-project/ray/jobs/379015992 Hi @gabrieleoliaro , the java ci job failure is related to the pr, pls help take a look, thanks.

@stephanie-wang
Copy link
Contributor

https://travis-ci.com/github/ray-project/ray/jobs/379015992 Hi @gabrieleoliaro , the java ci job failure is related to the pr, pls help take a look, thanks.

Really? Seems unlikely, the test is failing on master too: https://travis-ci.com/github/ray-project/ray/jobs/379177286

@stephanie-wang stephanie-wang merged commit 05fe6dc into ray-project:master Aug 31, 2020
@ffbin
Copy link
Contributor

ffbin commented Aug 31, 2020

https://travis-ci.com/github/ray-project/ray/jobs/379015992 Hi @gabrieleoliaro , the java ci job failure is related to the pr, pls help take a look, thanks.

Really? Seems unlikely, the test is failing on master too: https://travis-ci.com/github/ray-project/ray/jobs/379177286

Sorry, It should be irrelevant.

@goliaro goliaro mentioned this pull request Sep 6, 2020
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants