New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[native] Introduce lastCoordinatorHeartbeatMs to PrestoTask. #22718
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@spershin thanks for fixing this % comments.
@@ -780,6 +784,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo( | |||
{ | |||
std::lock_guard<std::mutex> l(prestoTask->mutex); | |||
prestoTask->updateHeartbeatLocked(); | |||
prestoTask->updateCoordinatorHeartbeatLocked(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass a flag to updateHeartbeatLocked(fromCoordinator)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we cannot.
The sites we call these two differ.
@@ -95,8 +95,14 @@ struct PrestoTask { | |||
/// has not been started, until the actual 'create task' message comes. | |||
bool taskStarted{false}; | |||
|
|||
/// Time point (in ms) when the last message (any) came for this task. | |||
uint64_t lastHeartbeatMs{0}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/lastHeartbeatMs/lastWorkerHeartbeatMs/
Do we need to track the last heartbeat or message received from worker? Or shall we rename it to lastWorkerMsgTimeMs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should do this renaming in this change.
It is unnecessary and is actually unclear why you are suggesting it.
Do we need to track the last heartbeat or message received from worker? Or shall we rename it to lastWorkerMsgTimeMs?
If we need this - it would be a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we need to update lastHeartbeatMs when we receive message from worker? Or we only need to record the heartbeat timestamp from coordinator? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we need to update lastHeartbeatMs when we receive message from worker? Or we only need to record the heartbeat timestamp from coordinator? Thanks!
I don't know that - just keeping the existing heartbeat as a status quo in order not to break the current behavior.
What I can do is to follow up by looking at the java code and see if it has something similar and how it is updated.
Not sure what I will have time for it, because it is low pri.
@@ -468,6 +469,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask( | |||
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime); | |||
{ | |||
std::lock_guard<std::mutex> l(prestoTask->mutex); | |||
prestoTask->updateCoordinatorHeartbeatLocked(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we don't update updateCoordinatorHeartbeatLocked in findOrCreateTask? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because findOrCreateTask() is called from literally everywhere and we only need to cover the Coord's endpoints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@spershin thanks for the offline discussion and clear code comments. Please consider to deprecate the old heartbeat ts in followup per our discussion.
It is used to detemrine if Coordinator has abandoned the Task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@spershin thanks for the test!
Description
Right now we use 'lastHeartbeatMs' to determine if a Task has been abandoned by
the Coordinator (Coordinator restarted or crashed or anyhow gone).
This does not work too well, as 'lastHeartbeatMs' is updated on every message to the Task.
Including 'getResults', which workers send to each other and refreshing it.
As the result, the Tasks get abandoned in waves, starting from stage 0, then stage 1, etc.
A large number of stages is possible and with the timeout is 1-3 minutes, that can cause
a large waiting time after the Coordinator long gone.
The new 'lastCoordinatorHeartbeatMs' is only updated by the messages sent by the
Coordinator, which results in all Tasks being cancelled at the same time after just single
timeout period.
It is used to determine if Coordinator has abandoned the Task.
Motivation and Context
Get rid of the Tasks, which were abandoned by the Coordinator, instead of leaving them
running forever.
Test Plan
Orchestrated the restart in the cluster with a long running query.
Observed prompt restart of the workers compared with the previous version.