From 2bb11ae92c83e93d9858bee2dca6eca6bc8673fe Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 10 May 2024 18:38:36 -0700 Subject: [PATCH] [native] Introduce lastCoordinatorHeartbeatMs to PrestoTask. It is used to detemrine if Coordinator has abandoned the Task. --- .../presto_cpp/main/PrestoTask.cpp | 17 +++++++++++++++++ .../presto_cpp/main/PrestoTask.h | 18 ++++++++++++++++++ .../presto_cpp/main/TaskManager.cpp | 15 +++++++++++---- .../presto_cpp/main/TaskManager.h | 2 +- .../presto_cpp/main/tests/PrestoTaskTest.cpp | 12 ++++++++++++ 5 files changed, 59 insertions(+), 5 deletions(-) diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.cpp b/presto-native-execution/presto_cpp/main/PrestoTask.cpp index 3b654d28706d..153f65fb66df 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoTask.cpp @@ -282,6 +282,15 @@ void PrestoTask::updateHeartbeatLocked() { info.lastHeartbeat = util::toISOTimestamp(lastHeartbeatMs); } +void PrestoTask::updateCoordinatorHeartbeat() { + std::lock_guard l(mutex); + updateCoordinatorHeartbeatLocked(); +} + +void PrestoTask::updateCoordinatorHeartbeatLocked() { + lastCoordinatorHeartbeatMs = velox::getCurrentTimeMs(); +} + uint64_t PrestoTask::timeSinceLastHeartbeatMs() const { std::lock_guard l(mutex); if (lastHeartbeatMs == 0UL) { @@ -290,6 +299,14 @@ uint64_t PrestoTask::timeSinceLastHeartbeatMs() const { return getCurrentTimeMs() - lastHeartbeatMs; } +uint64_t PrestoTask::timeSinceLastCoordinatorHeartbeatMs() const { + std::lock_guard l(mutex); + if (lastCoordinatorHeartbeatMs == 0UL) { + return 0UL; + } + return getCurrentTimeMs() - lastCoordinatorHeartbeatMs; +} + void PrestoTask::recordProcessCpuTime() { if (processCpuTime_ > 0) { return; diff --git a/presto-native-execution/presto_cpp/main/PrestoTask.h b/presto-native-execution/presto_cpp/main/PrestoTask.h index ad5e2a525a6f..6a76a5734efe 100644 --- a/presto-native-execution/presto_cpp/main/PrestoTask.h +++ b/presto-native-execution/presto_cpp/main/PrestoTask.h @@ -95,8 +95,15 @@ struct PrestoTask { /// has not been started, until the actual 'create task' message comes. bool taskStarted{false}; + /// Time point (in ms) when the last message (any) came for this task. + // TODO (spershin): Deprecate it, use only the 'lastCoordinatorHeartbeatMs'. uint64_t lastHeartbeatMs{0}; + /// Time point (in ms) when the last message came for this task from the + /// Coordinator. Used to determine if the Task has been abandoned. + uint64_t lastCoordinatorHeartbeatMs{0}; + /// Time point (in ms) when the time we updated Task stats. uint64_t lastTaskStatsUpdateMs = {0}; + uint64_t lastMemoryReservation = {0}; uint64_t createTimeMs{0}; uint64_t firstSplitStartTimeMs{0}; @@ -132,10 +139,21 @@ struct PrestoTask { /// Updates when this task was touched last time. void updateHeartbeatLocked(); + /// Updates time point (ms) when this task was touched last time by a message + /// from the Coordinator. + void updateCoordinatorHeartbeat(); + void updateCoordinatorHeartbeatLocked(); + /// Returns time (ms) since the task was touched last time (last heartbeat). /// Returns zero, if never (shouldn't happen). uint64_t timeSinceLastHeartbeatMs() const; + /// Returns time (ms) since the task was touched last time by a message from + /// the Coordinator. + /// If above never happened, returns time since the task start or zero, if + /// task never started. + uint64_t timeSinceLastCoordinatorHeartbeatMs() const; + protocol::TaskStatus updateStatus() { std::lock_guard l(mutex); return updateStatusLocked(); diff --git a/presto-native-execution/presto_cpp/main/TaskManager.cpp b/presto-native-execution/presto_cpp/main/TaskManager.cpp index 5297f03e1710..854786b2ae56 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.cpp +++ b/presto-native-execution/presto_cpp/main/TaskManager.cpp @@ -43,7 +43,7 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) { for (const auto& [id, prestoTask] : taskMap) { if (prestoTask->task != nullptr) { if (prestoTask->task->isRunning()) { - if (prestoTask->timeSinceLastHeartbeatMs() >= abandonedMs) { + if (prestoTask->timeSinceLastCoordinatorHeartbeatMs() >= abandonedMs) { LOG(INFO) << "Cancelling abandoned task '" << id << "'."; prestoTask->task->requestCancel(); } @@ -364,6 +364,7 @@ std::unique_ptr TaskManager::createOrUpdateErrorTask( auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime); { std::lock_guard l(prestoTask->mutex); + prestoTask->updateCoordinatorHeartbeatLocked(); prestoTask->updateHeartbeatLocked(); if (prestoTask->error == nullptr) { prestoTask->error = exception; @@ -428,7 +429,7 @@ std::unique_ptr TaskManager::createOrUpdateTask( const velox::core::PlanFragment& planFragment, std::shared_ptr queryCtx, long startProcessCpuTime) { - return createOrUpdateTask( + return createOrUpdateTaskImpl( taskId, planFragment, updateRequest.sources, @@ -447,7 +448,7 @@ std::unique_ptr TaskManager::createOrUpdateBatchTask( checkSplitsForBatchTask(planFragment.planNode, updateRequest.sources); - return createOrUpdateTask( + return createOrUpdateTaskImpl( taskId, planFragment, updateRequest.sources, @@ -456,7 +457,7 @@ std::unique_ptr TaskManager::createOrUpdateBatchTask( startProcessCpuTime); } -std::unique_ptr TaskManager::createOrUpdateTask( +std::unique_ptr TaskManager::createOrUpdateTaskImpl( const TaskId& taskId, const velox::core::PlanFragment& planFragment, const std::vector& sources, @@ -468,6 +469,7 @@ std::unique_ptr TaskManager::createOrUpdateTask( auto prestoTask = findOrCreateTask(taskId, startProcessCpuTime); { std::lock_guard l(prestoTask->mutex); + prestoTask->updateCoordinatorHeartbeatLocked(); if (not prestoTask->task && planFragment.planNode) { // If the task is aborted, no need to do anything else. // This takes care of DELETE task message coming before CREATE task. @@ -620,6 +622,7 @@ std::unique_ptr TaskManager::deleteTask( std::lock_guard l(prestoTask->mutex); prestoTask->updateHeartbeatLocked(); + prestoTask->updateCoordinatorHeartbeatLocked(); auto execTask = prestoTask->task; if (execTask) { auto state = execTask->state(); @@ -771,6 +774,7 @@ folly::Future> TaskManager::getTaskInfo( // Return current TaskInfo without waiting. promise.setValue( std::make_unique(prestoTask->updateInfo())); + prestoTask->updateCoordinatorHeartbeat(); return std::move(future).via(httpSrvCpuExecutor_); } @@ -780,6 +784,7 @@ folly::Future> TaskManager::getTaskInfo( { std::lock_guard l(prestoTask->mutex); prestoTask->updateHeartbeatLocked(); + prestoTask->updateCoordinatorHeartbeatLocked(); if (!prestoTask->task) { auto promiseHolder = std::make_shared>>( @@ -937,6 +942,7 @@ folly::Future> TaskManager::getTaskStatus( if (!currentState || !maxWait) { // Return task's status immediately without waiting. + prestoTask->updateCoordinatorHeartbeat(); return std::make_unique(prestoTask->updateStatus()); } @@ -946,6 +952,7 @@ folly::Future> TaskManager::getTaskStatus( protocol::TaskStatus status; { std::lock_guard l(prestoTask->mutex); + prestoTask->updateCoordinatorHeartbeatLocked(); if (!prestoTask->task) { auto promiseHolder = std::make_shared< PromiseHolder>>( diff --git a/presto-native-execution/presto_cpp/main/TaskManager.h b/presto-native-execution/presto_cpp/main/TaskManager.h index 1f30c66a8361..2de4f48d0d98 100644 --- a/presto-native-execution/presto_cpp/main/TaskManager.h +++ b/presto-native-execution/presto_cpp/main/TaskManager.h @@ -171,7 +171,7 @@ class TaskManager { // coordinator for a considerable time. void cancelAbandonedTasks(); - std::unique_ptr createOrUpdateTask( + std::unique_ptr createOrUpdateTaskImpl( const protocol::TaskId& taskId, const velox::core::PlanFragment& planFragment, const std::vector& sources, diff --git a/presto-native-execution/presto_cpp/main/tests/PrestoTaskTest.cpp b/presto-native-execution/presto_cpp/main/tests/PrestoTaskTest.cpp index 38ee89b938a5..b6bce24b86c5 100644 --- a/presto-native-execution/presto_cpp/main/tests/PrestoTaskTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/PrestoTaskTest.cpp @@ -14,6 +14,7 @@ #include "presto_cpp/main/PrestoTask.h" #include #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/time/Timer.h" DECLARE_bool(velox_memory_leak_check_enabled); @@ -64,3 +65,14 @@ TEST_F(PrestoTaskTest, runtimeMetricConversion) { EXPECT_EQ(veloxMetric.max, prestoMetric.max); EXPECT_EQ(veloxMetric.min, prestoMetric.min); } + +TEST_F(PrestoTaskTest, basic) { + PrestoTask task{"20201107_130540_00011_wrpkw.1.2.3.4", "node2", 0}; + + // Test coordinator heartbeat. + EXPECT_EQ(task.timeSinceLastCoordinatorHeartbeatMs(), 0); + task.updateCoordinatorHeartbeat(); + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_GE(task.timeSinceLastCoordinatorHeartbeatMs(), 100); +}