Skip to content

Commit

Permalink
[native] Move static functions from PrestoTask to Util.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitkdutta committed May 4, 2024
1 parent be20d29 commit 374bc4b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 29 deletions.
26 changes: 7 additions & 19 deletions presto-native-execution/presto_cpp/main/PrestoTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ PrestoTask::PrestoTask(
: id(taskId),
startProcessCpuTime{
_startProcessCpuTime > 0 ? _startProcessCpuTime
: getProcessCpuTime()} {
: util::getProcessCpuTimeNs()} {
info.taskId = taskId;
info.nodeId = nodeId;
}
Expand All @@ -290,24 +290,12 @@ uint64_t PrestoTask::timeSinceLastHeartbeatMs() const {
return getCurrentTimeMs() - lastHeartbeatMs;
}

// static
long PrestoTask::getProcessCpuTime() {
struct rusage rusageEnd;
getrusage(RUSAGE_SELF, &rusageEnd);

auto tvNanos = [](struct timeval tv) {
return tv.tv_sec * 1000000000 + tv.tv_usec * 1000;
};

return tvNanos(rusageEnd.ru_utime) + tvNanos(rusageEnd.ru_stime);
}

void PrestoTask::recordProcessCpuTime() {
if (processCpuTime_ > 0) {
return;
}

processCpuTime_ = getProcessCpuTime() - startProcessCpuTime;
processCpuTime_ = util::getProcessCpuTimeNs() - startProcessCpuTime;
}

protocol::TaskStatus PrestoTask::updateStatusLocked() {
Expand Down Expand Up @@ -778,8 +766,8 @@ void PrestoTask::updateExecutionInfoLocked(
prestoTaskStats);
}

/*static*/ std::string PrestoTask::taskNumbersToString(
const std::array<size_t, 5>& taskNumbers) {
/*static*/ std::string PrestoTask::taskStatesToString(
const std::array<size_t, 5>& taskStates) {
// Names of five TaskState (enum defined in exec/Task.h).
static constexpr std::array<folly::StringPiece, 5> taskStateNames{
"Running",
Expand All @@ -790,10 +778,10 @@ void PrestoTask::updateExecutionInfoLocked(
};

std::string str;
for (size_t i = 0; i < taskNumbers.size(); ++i) {
if (taskNumbers[i] != 0) {
for (size_t i = 0; i < taskStates.size(); ++i) {
if (taskStates[i] != 0) {
folly::toAppend(
fmt::format("{}={} ", taskStateNames[i], taskNumbers[i]), &str);
fmt::format("{}={} ", taskStateNames[i], taskStates[i]), &str);
}
}
return str;
Expand Down
7 changes: 2 additions & 5 deletions presto-native-execution/presto_cpp/main/PrestoTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ struct PrestoTask {
}

/// Turns the task numbers (per state) into a string.
static std::string taskNumbersToString(
const std::array<size_t, 5>& taskNumbers);

/// Returns process-wide CPU time in nanoseconds.
static long getProcessCpuTime();
static std::string taskStatesToString(
const std::array<size_t, 5>& taskStates);

/// Invoked to update presto task status from the updated velox task stats.
protocol::TaskStatus updateStatusLocked();
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ void TaskManager::shutdown() {
PRESTO_SHUTDOWN_LOG(INFO)
<< "Waited (" << seconds
<< " seconds so far) for 'Running' tasks to complete. " << numTasks
<< " tasks left: " << PrestoTask::taskNumbersToString(taskNumbers);
<< " tasks left: " << PrestoTask::taskStatesToString(taskNumbers);
std::this_thread::sleep_for(std::chrono::seconds(1));
cancelAbandonedTasks();
taskNumbers = getTaskNumbers(numTasks);
Expand Down
9 changes: 5 additions & 4 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "presto_cpp/main/TaskResource.h"
#include <presto_cpp/main/common/Exception.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/thrift/ProtocolToThrift.h"
#include "presto_cpp/main/thrift/ThriftIO.h"
#include "presto_cpp/main/thrift/gen-cpp2/PrestoThrift.h"
Expand Down Expand Up @@ -220,7 +221,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
folly::via(
httpSrvCpuExecutor_,
[this, &body, taskId, createOrUpdateFunc]() {
const auto startProcessCpuTime = PrestoTask::getProcessCpuTime();
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();

// TODO Avoid copy
std::ostringstream oss;
Expand All @@ -231,15 +232,15 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(

std::unique_ptr<protocol::TaskInfo> taskInfo;
try {
taskInfo =
createOrUpdateFunc(taskId, updateJson, startProcessCpuTime);
taskInfo = createOrUpdateFunc(
taskId, updateJson, startProcessCpuTimeNs);
} catch (const velox::VeloxException& e) {
// Creating an empty task, putting errors inside so that next
// status fetch from coordinator will catch the error and well
// categorize it.
try {
taskInfo = taskManager_.createOrUpdateErrorTask(
taskId, std::current_exception(), startProcessCpuTime);
taskId, std::current_exception(), startProcessCpuTimeNs);
} catch (const velox::VeloxUserError& e) {
throw;
}
Expand Down
12 changes: 12 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "presto_cpp/main/common/Utils.h"
#include <fmt/format.h>
#include <sys/resource.h>

namespace facebook::presto::util {

Expand Down Expand Up @@ -43,4 +44,15 @@ std::shared_ptr<folly::SSLContext> createSSLContext(
}
}

long getProcessCpuTimeNs() {
struct rusage rusageEnd;
getrusage(RUSAGE_SELF, &rusageEnd);

auto tvNanos = [](struct timeval tv) {
return tv.tv_sec * 1'000'000'000 + tv.tv_usec * 1'000;
};

return tvNanos(rusageEnd.ru_utime) + tvNanos(rusageEnd.ru_stime);
}

} // namespace facebook::presto::util
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ std::shared_ptr<folly::SSLContext> createSSLContext(
const std::string& clientCertAndKeyPath,
const std::string& ciphers);

/// Returns current process-wide CPU time in nanoseconds.
long getProcessCpuTimeNs();

} // namespace facebook::presto::util

0 comments on commit 374bc4b

Please sign in to comment.