Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Introduce lastCoordinatorHeartbeatMs to PrestoTask. #22718

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ void PrestoTask::updateHeartbeatLocked() {
info.lastHeartbeat = util::toISOTimestamp(lastHeartbeatMs);
}

void PrestoTask::updateCoordinatorHeartbeat() {
std::lock_guard<std::mutex> l(mutex);
updateCoordinatorHeartbeatLocked();
}

void PrestoTask::updateCoordinatorHeartbeatLocked() {
lastCoordinatorHeartbeatMs = velox::getCurrentTimeMs();
}

uint64_t PrestoTask::timeSinceLastHeartbeatMs() const {
std::lock_guard<std::mutex> l(mutex);
if (lastHeartbeatMs == 0UL) {
Expand All @@ -290,6 +299,14 @@ uint64_t PrestoTask::timeSinceLastHeartbeatMs() const {
return getCurrentTimeMs() - lastHeartbeatMs;
}

uint64_t PrestoTask::timeSinceLastCoordinatorHeartbeatMs() const {
std::lock_guard<std::mutex> l(mutex);
if (lastCoordinatorHeartbeatMs == 0UL) {
return 0UL;
}
return getCurrentTimeMs() - lastCoordinatorHeartbeatMs;
}

void PrestoTask::recordProcessCpuTime() {
if (processCpuTime_ > 0) {
return;
Expand Down
18 changes: 18 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,15 @@ struct PrestoTask {
/// has not been started, until the actual 'create task' message comes.
bool taskStarted{false};

/// Time point (in ms) when the last message (any) came for this task.
spershin marked this conversation as resolved.
Show resolved Hide resolved
// TODO (spershin): Deprecate it, use only the 'lastCoordinatorHeartbeatMs'.
uint64_t lastHeartbeatMs{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/lastHeartbeatMs/lastWorkerHeartbeatMs/

Do we need to track the last heartbeat or message received from worker? Or shall we rename it to lastWorkerMsgTimeMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this renaming in this change.
It is unnecessary and is actually unclear why you are suggesting it.

Do we need to track the last heartbeat or message received from worker? Or shall we rename it to lastWorkerMsgTimeMs?

If we need this - it would be a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to update lastHeartbeatMs when we receive message from worker? Or we only need to record the heartbeat timestamp from coordinator? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we need to update lastHeartbeatMs when we receive message from worker? Or we only need to record the heartbeat timestamp from coordinator? Thanks!

I don't know that - just keeping the existing heartbeat as a status quo in order not to break the current behavior.

What I can do is to follow up by looking at the java code and see if it has something similar and how it is updated.
Not sure what I will have time for it, because it is low pri.

/// Time point (in ms) when the last message came for this task from the
/// Coordinator. Used to determine if the Task has been abandoned.
uint64_t lastCoordinatorHeartbeatMs{0};
/// Time point (in ms) when the time we updated Task stats.
uint64_t lastTaskStatsUpdateMs = {0};

uint64_t lastMemoryReservation = {0};
uint64_t createTimeMs{0};
uint64_t firstSplitStartTimeMs{0};
Expand Down Expand Up @@ -132,10 +139,21 @@ struct PrestoTask {
/// Updates when this task was touched last time.
void updateHeartbeatLocked();

/// Updates time point (ms) when this task was touched last time by a message
/// from the Coordinator.
void updateCoordinatorHeartbeat();
void updateCoordinatorHeartbeatLocked();

/// Returns time (ms) since the task was touched last time (last heartbeat).
/// Returns zero, if never (shouldn't happen).
uint64_t timeSinceLastHeartbeatMs() const;

/// Returns time (ms) since the task was touched last time by a message from
/// the Coordinator.
/// If above never happened, returns time since the task start or zero, if
/// task never started.
uint64_t timeSinceLastCoordinatorHeartbeatMs() const;

protocol::TaskStatus updateStatus() {
std::lock_guard<std::mutex> l(mutex);
return updateStatusLocked();
Expand Down
15 changes: 11 additions & 4 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
for (const auto& [id, prestoTask] : taskMap) {
if (prestoTask->task != nullptr) {
if (prestoTask->task->isRunning()) {
if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) {
if (prestoTask->timeSinceLastCoordinatorHeartbeatMs() >= abandonedMs) {
LOG(INFO) << "Cancelling abandoned task '" << id << "'.";
prestoTask->task->requestCancel();
}
Expand Down Expand Up @@ -364,6 +364,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateErrorTask(
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime);
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
prestoTask->updateHeartbeatLocked();
if (prestoTask->error == nullptr) {
prestoTask->error = exception;
Expand Down Expand Up @@ -428,7 +429,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateTask(
const velox::core::PlanFragment& planFragment,
std::shared_ptr<velox::core::QueryCtx> queryCtx,
long startProcessCpuTime) {
return createOrUpdateTask(
return createOrUpdateTaskImpl(
taskId,
planFragment,
updateRequest.sources,
Expand All @@ -447,7 +448,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(

checkSplitsForBatchTask(planFragment.planNode, updateRequest.sources);

return createOrUpdateTask(
return createOrUpdateTaskImpl(
taskId,
planFragment,
updateRequest.sources,
Expand All @@ -456,7 +457,7 @@ std::unique_ptr<protocol::TaskInfo> TaskManager::createOrUpdateBatchTask(
startProcessCpuTime);
}

std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
const TaskId& taskId,
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
Expand All @@ -468,6 +469,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime);
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't update updateCoordinatorHeartbeatLocked in findOrCreateTask? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because findOrCreateTask() is called from literally everywhere and we only need to cover the Coord's endpoints.

if (not prestoTask->task && planFragment.planNode) {
// If the task is aborted, no need to do anything else.
// This takes care of DELETE task message coming before CREATE task.
Expand Down Expand Up @@ -620,6 +622,7 @@ std::unique_ptr<TaskInfo> TaskManager::deleteTask(

std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateHeartbeatLocked();
prestoTask->updateCoordinatorHeartbeatLocked();
auto execTask = prestoTask->task;
if (execTask) {
auto state = execTask->state();
Expand Down Expand Up @@ -771,6 +774,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
// Return current TaskInfo without waiting.
promise.setValue(
std::make_unique<protocol::TaskInfo>(prestoTask->updateInfo()));
prestoTask->updateCoordinatorHeartbeat();
return std::move(future).via(httpSrvCpuExecutor_);
}

Expand All @@ -780,6 +784,7 @@ folly::Future<std::unique_ptr<protocol::TaskInfo>> TaskManager::getTaskInfo(
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateHeartbeatLocked();
prestoTask->updateCoordinatorHeartbeatLocked();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass a flag to updateHeartbeatLocked(fromCoordinator)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot.
The sites we call these two differ.

if (!prestoTask->task) {
auto promiseHolder =
std::make_shared<PromiseHolder<std::unique_ptr<protocol::TaskInfo>>>(
Expand Down Expand Up @@ -937,6 +942,7 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(

if (!currentState || !maxWait) {
// Return task's status immediately without waiting.
prestoTask->updateCoordinatorHeartbeat();
return std::make_unique<protocol::TaskStatus>(prestoTask->updateStatus());
}

Expand All @@ -946,6 +952,7 @@ folly::Future<std::unique_ptr<protocol::TaskStatus>> TaskManager::getTaskStatus(
protocol::TaskStatus status;
{
std::lock_guard<std::mutex> l(prestoTask->mutex);
prestoTask->updateCoordinatorHeartbeatLocked();
if (!prestoTask->task) {
auto promiseHolder = std::make_shared<
PromiseHolder<std::unique_ptr<protocol::TaskStatus>>>(
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class TaskManager {
// coordinator for a considerable time.
void cancelAbandonedTasks();

std::unique_ptr<protocol::TaskInfo> createOrUpdateTask(
std::unique_ptr<protocol::TaskInfo> createOrUpdateTaskImpl(
xiaoxmeng marked this conversation as resolved.
Show resolved Hide resolved
const protocol::TaskId& taskId,
const velox::core::PlanFragment& planFragment,
const std::vector<protocol::TaskSource>& sources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "presto_cpp/main/PrestoTask.h"
#include <gtest/gtest.h>
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/time/Timer.h"

DECLARE_bool(velox_memory_leak_check_enabled);

Expand Down Expand Up @@ -64,3 +65,14 @@ TEST_F(PrestoTaskTest, runtimeMetricConversion) {
EXPECT_EQ(veloxMetric.max, prestoMetric.max);
EXPECT_EQ(veloxMetric.min, prestoMetric.min);
}

TEST_F(PrestoTaskTest, basic) {
PrestoTask task{"20201107_130540_00011_wrpkw.1.2.3.4", "node2", 0};

// Test coordinator heartbeat.
EXPECT_EQ(task.timeSinceLastCoordinatorHeartbeatMs(), 0);
task.updateCoordinatorHeartbeat();
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
EXPECT_GE(task.timeSinceLastCoordinatorHeartbeatMs(), 100);
}