Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[xray] Resubmit tasks that fail to be forwarded #2645

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 29 additions & 17 deletions src/ray/raylet/node_manager.cc
Expand Up @@ -1275,26 +1275,38 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;
// Mark the failed task as pending to let other raylets know that we still
// have the task. Once the task is successfully retried, it will be
// canceled. TaskDependencyManager::TaskPending() is assumed to be
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
task_dependency_manager_.TaskPending(task);

// Create a timer to resubmit the task in a little bit. TODO(rkn): Really
// this should be a unique_ptr instead of a shared_ptr. However, it's a
// little harder to move unique_ptrs into lambdas.
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(INFO) << "In ForwardTask retry callback for task " << task_id;
EnqueuePlaceableTask(task);
});
// Actor tasks can only be executed at the actor's location, so they are
// retried after a timeout. All other tasks that fail to be forwarded are
// deemed to be placeable again.
if (task.GetTaskSpecification().IsActorTask()) {
// The task is for an actor on another node. Create a timer to resubmit
// the task in a little bit. TODO(rkn): Really this should be a
// unique_ptr instead of a shared_ptr. However, it's a little harder to
// move unique_ptrs into lambdas.
auto retry_timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
auto retry_duration = boost::posix_time::milliseconds(
RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds());
retry_timer->expires_from_now(retry_duration);
retry_timer->async_wait(
[this, task, task_id, retry_timer](const boost::system::error_code &error) {
// Timer killing will receive the boost::asio::error::operation_aborted,
// we only handle the timeout event.
RAY_CHECK(!error);
RAY_LOG(DEBUG) << "Retrying ForwardTask for task " << task_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should update the log message here, as we're no longer retrying the forward. How about "resubmitting task due to a failed forward"?

SubmitTask(task, Lineage());
});
// Remove the task from the lineage cache. The task will get added back
// once it is resubmitted.
lineage_cache_.RemoveWaitingTask(task_id);
} else {
// The task is not for an actor and may therefore be placed on another
// node immediately.
local_queues_.QueuePlaceableTasks({task});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang can you explain the rationale for the change here from EnqueuePlaceableTask(task); to local_queues_.QueuePlaceableTasks({task});?

Reverting that change fixes the hanging issue.

However, that does cause me to see messages of the form

/Users/rkn/Workspace/ray/src/ray/common/client_connection.cc:160: [node manager]ProcessMessage with type 6 took 107 ms 

which makes me think we may still want to use a timer so that we don't loop here over and over if the scheduling policy keeps making the same decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice catch! I should've tried running it locally first :)

The behavior I was hoping for was that the task would become "placeable" again, meaning that the node could choose some other node to schedule it to. Unfortunately, as of this PR, the node will only schedule tasks with a call to NodeManager::ScheduleTasks, so I think what's happening is that the task is getting stuck in "placeable" queue.

I don't necessarily want to change it back to EnqueuePlaceableTask since the task may not be feasible on that node. I think that #2420 will guarantee that tasks in the "placeable" queue will eventually get scheduled, so it will probably work after that PR is merged. Maybe @atumanov can weigh in on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EnqueuePlaceableTask basically makes the raylet take ownership of the task, by putting it either into the waiting or ready queue. In contrast, QueuePlaceableTasks will re-enqueue the task as placeable, so that we can make another decision about its placement. We should remember that this code executes when ForwardTask fails, which likely happens because the remote raylet died. In such a case, I would expect that our cluster resources data structure is updated, removing the dead raylet from the resource map. When that happens, the policy will no longer "make the same decision".

In the test you are referencing, @robertnishihara, it might be hanging because the cluster resource map is not updated to remove the dead raylet. We should double check that this mechanism is actually firing and the cluster resource map is updated. However, I am against automatically taking ownership of the task and bypassing the scheduling policy when we fail to forward it. Letting the scheduling policy decide what to do with that task should be strictly better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and in response to @stephanie-wang , yes, it's likely that the task gets stuck as placeable, which is fixed in #2420 . Another possibility (if the policy is firing) is that the dead raylet is the only place this task can run (I don't know the details of the test), and the dead raylet's resources are not cleaned up on raylet failure. We need to be absolutely certain that the latter happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the dead raylet's resources not getting cleaned up, it will happen eventually, but it may take a while since the monitor by default waits 10s before declaring the raylet dead. So it's definitely possible that the task will be retried at the dead raylet multiple times.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atumanov, are you saying that the dead raylet should be removed from the cluster resources data structure as soon as a write to that raylet fails? It seems far simpler to wait for the monitor to broadcast that the raylet is dead (which could take 10 seconds, and in that time, the scheduling policy could make the same decision over and over).

If we do local_queues_.QueuePlaceableTasks({task});, then don't we need to follow that up with a call to ScheduleTasks();?

}
}
}

Expand Down