Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Pin arguments during task execution #13737

Merged
merged 7 commits into from
Jan 29, 2021

Conversation

stephanie-wang
Copy link
Contributor

Why are these changes needed?

Currently, there is a race condition where a task's arguments can get evicted after the task is dispatched but before the worker has gotten the arguments from the object store. This can lead to a deadlock if there are too many requests to pull other objects in the node's queue.

This fixes the race condition by having the raylet pin the dependencies while a task lease is granted.

Related issue number

Closes #12663.

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Is the reason why we don't pin an object as soon as it is pulled because that's more complex or less robust?

}
for (size_t i = 0; i < deps.size(); i++) {
if (args[i] == nullptr) {
RAY_LOG(INFO) << "Task " << spec.TaskId() << " argument " << deps[i] << " was evicted before task could be dispatched";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment when this can happen? It happens only when some of objects are evicted before the whole bundle is pulled?

bool success = true;
const auto &deps = spec.GetDependencyIds();
if (!deps.empty()) {
success = pin_task_arguments_(deps, &args);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment args will have a reference to a plasma store?

@@ -252,8 +271,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) {
};

/* Blocked on dependencies */
dependency_manager_.task_ready_ = false;
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1);
auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this line of code, but maybe we can add a simple test that all args are pinned correctly?

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; simpler than I thought.

/// Arguments needed by currently granted lease requests. These should be
/// pinned before the lease is granted to ensure that the arguments are not
/// evicted before the task(s) start running.
std::unordered_map<TaskID, std::vector<std::unique_ptr<RayObject>>> pinned_task_arguments_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add this to DebugString()?

pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.ScheduleAndDispatchTasks();
ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks);
ASSERT_EQ(num_callbacks, 0);
ASSERT_EQ(leased_workers_.size(), 0);

/* Worker available and arguments available */
dependency_manager_.task_ready_ = true;
missing_objects_.erase(missing_arg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test that the pinned object is unpinned when the lease is returned?

@@ -2346,33 +2375,14 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
object_ids.push_back(ObjectID::FromBinary(object_id_binary));
}
if (object_pinning_enabled_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this feature flag (separately)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's just used for LRU right now (and I guess if you wanted to turn off ref counting manually, but I don't think that should be recommended or exposed anymore...).

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jan 27, 2021
@stephanie-wang stephanie-wang merged commit 42d501d into ray-project:master Jan 29, 2021
@stephanie-wang stephanie-wang deleted the pin-bundles branch January 29, 2021 03:07
fishbone pushed a commit to fishbone/ray that referenced this pull request Feb 16, 2021
* tmp

* Pin task args

* unit tests

* update

* test

* Fix
fishbone added a commit to fishbone/ray that referenced this pull request Feb 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Object Spilling] Thrashing when there are large number of dependencies for many tasks
3 participants