diff --git a/src/graph/executor/admin/SubmitJobExecutor.cpp b/src/graph/executor/admin/SubmitJobExecutor.cpp index 88d95e19e86..592acdc0f0f 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.cpp +++ b/src/graph/executor/admin/SubmitJobExecutor.cpp @@ -35,91 +35,97 @@ folly::Future SubmitJobExecutor::execute() { LOG(ERROR) << resp.status().toString(); return std::move(resp).status(); } - switch (jobOp) { - case meta::cpp2::AdminJobOp::ADD: { - nebula::DataSet v({"New Job Id"}); - DCHECK(resp.value().job_id_ref().has_value()); - if (!resp.value().job_id_ref().has_value()) { - return Status::Error("Response unexpected."); - } - v.emplace_back(nebula::Row({*resp.value().job_id_ref()})); - return finish(std::move(v)); - } - case meta::cpp2::AdminJobOp::RECOVER: { - nebula::DataSet v({"Recovered job num"}); - DCHECK(resp.value().recovered_job_num_ref().has_value()); - if (!resp.value().recovered_job_num_ref().has_value()) { - return Status::Error("Response unexpected."); - } - v.emplace_back(nebula::Row({*resp.value().recovered_job_num_ref()})); - return finish(std::move(v)); - } - case meta::cpp2::AdminJobOp::SHOW: { - nebula::DataSet v( - {"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); - DCHECK(resp.value().job_desc_ref().has_value()); - if (!resp.value().job_desc_ref().has_value()) { - return Status::Error("Response unexpected."); - } - DCHECK(resp.value().task_desc_ref().has_value()); - if (!resp.value().task_desc_ref().has_value()) { - return Status::Error("Response unexpected"); - } - auto &jobDesc = *resp.value().job_desc_ref(); - // job desc - auto stopTime = jobDesc.front().get_stop_time() > 0 - ? Value(time::TimeConversion::unixSecondsToDateTime( - jobDesc.front().get_stop_time())) - : Value::kEmpty; - v.emplace_back(nebula::Row({ - jobDesc.front().get_id(), - apache::thrift::util::enumNameSafe(jobDesc.front().get_cmd()), - apache::thrift::util::enumNameSafe(jobDesc.front().get_status()), - time::TimeConversion::unixSecondsToDateTime(jobDesc.front().get_start_time()), - std::move(stopTime), - })); - // tasks desc - auto &tasksDesc = *resp.value().get_task_desc(); - for (const auto &taskDesc : tasksDesc) { - v.emplace_back(nebula::Row({ - taskDesc.get_task_id(), - taskDesc.get_host().host, - apache::thrift::util::enumNameSafe(taskDesc.get_status()), - time::TimeConversion::unixSecondsToDateTime(taskDesc.get_start_time()), - time::TimeConversion::unixSecondsToDateTime(taskDesc.get_stop_time()), - })); - } - return finish(std::move(v)); - } - case meta::cpp2::AdminJobOp::SHOW_All: { - nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); - DCHECK(resp.value().job_desc_ref().has_value()); - if (!resp.value().job_desc_ref().has_value()) { - return Status::Error("Response unexpected"); - } - const auto &jobsDesc = *resp.value().job_desc_ref(); - for (const auto &jobDesc : jobsDesc) { - v.emplace_back(nebula::Row({ - jobDesc.get_id(), - apache::thrift::util::enumNameSafe(jobDesc.get_cmd()), - apache::thrift::util::enumNameSafe(jobDesc.get_status()), - time::TimeConversion::unixSecondsToDateTime(jobDesc.get_start_time()), - time::TimeConversion::unixSecondsToDateTime(jobDesc.get_stop_time()), - })); - } - return finish(std::move(v)); - } - case meta::cpp2::AdminJobOp::STOP: { - nebula::DataSet v({"Result"}); - v.emplace_back(nebula::Row({"Job stopped"})); - return finish(std::move(v)); - } - // no default so the compiler will warning when lack - } - DLOG(FATAL) << "Unknown job operation " << static_cast(jobOp); - return Status::Error("Unknown job job operation %d.", static_cast(jobOp)); + auto status = buildResult(jobOp, std::move(resp).value()); + NG_RETURN_IF_ERROR(status); + return finish(std::move(status).value()); }); } +StatusOr SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp, + meta::cpp2::AdminJobResult &&resp) { + switch (jobOp) { + case meta::cpp2::AdminJobOp::ADD: { + nebula::DataSet v({"New Job Id"}); + DCHECK(resp.job_id_ref().has_value()); + if (!resp.job_id_ref().has_value()) { + return Status::Error("Response unexpected."); + } + v.emplace_back(nebula::Row({*resp.job_id_ref()})); + return v; + } + case meta::cpp2::AdminJobOp::RECOVER: { + nebula::DataSet v({"Recovered job num"}); + DCHECK(resp.recovered_job_num_ref().has_value()); + if (!resp.recovered_job_num_ref().has_value()) { + return Status::Error("Response unexpected."); + } + v.emplace_back(nebula::Row({*resp.recovered_job_num_ref()})); + return v; + } + case meta::cpp2::AdminJobOp::SHOW: { + nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"}); + DCHECK(resp.job_desc_ref().has_value()); + if (!resp.job_desc_ref().has_value()) { + return Status::Error("Response unexpected."); + } + DCHECK(resp.task_desc_ref().has_value()); + if (!resp.task_desc_ref().has_value()) { + return Status::Error("Response unexpected"); + } + auto &jobDesc = *resp.job_desc_ref(); + // job desc + v.emplace_back(nebula::Row({ + jobDesc.front().get_id(), + apache::thrift::util::enumNameSafe(jobDesc.front().get_cmd()), + apache::thrift::util::enumNameSafe(jobDesc.front().get_status()), + convertJobTimestampToDateTime(jobDesc.front().get_start_time()), + convertJobTimestampToDateTime(jobDesc.front().get_stop_time()), + })); + // tasks desc + auto &tasksDesc = *resp.get_task_desc(); + for (const auto &taskDesc : tasksDesc) { + v.emplace_back(nebula::Row({ + taskDesc.get_task_id(), + taskDesc.get_host().host, + apache::thrift::util::enumNameSafe(taskDesc.get_status()), + convertJobTimestampToDateTime(taskDesc.get_start_time()), + convertJobTimestampToDateTime(taskDesc.get_stop_time()), + })); + } + return v; + } + case meta::cpp2::AdminJobOp::SHOW_All: { + nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"}); + DCHECK(resp.job_desc_ref().has_value()); + if (!resp.job_desc_ref().has_value()) { + return Status::Error("Response unexpected"); + } + const auto &jobsDesc = *resp.job_desc_ref(); + for (const auto &jobDesc : jobsDesc) { + v.emplace_back(nebula::Row({ + jobDesc.get_id(), + apache::thrift::util::enumNameSafe(jobDesc.get_cmd()), + apache::thrift::util::enumNameSafe(jobDesc.get_status()), + convertJobTimestampToDateTime(jobDesc.get_start_time()), + convertJobTimestampToDateTime(jobDesc.get_stop_time()), + })); + } + return v; + } + case meta::cpp2::AdminJobOp::STOP: { + nebula::DataSet v({"Result"}); + v.emplace_back(nebula::Row({"Job stopped"})); + return v; + } + // no default so the compiler will warning when lack + } + DLOG(FATAL) << "Unknown job operation " << static_cast(jobOp); + return Status::Error("Unknown job job operation %d.", static_cast(jobOp)); +} + +Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) { + return timestamp > 0 ? Value(time::TimeConversion::unixSecondsToDateTime(timestamp)) + : Value::kEmpty; +} } // namespace graph } // namespace nebula diff --git a/src/graph/executor/admin/SubmitJobExecutor.h b/src/graph/executor/admin/SubmitJobExecutor.h index dc8da2e75bf..08f777f2629 100644 --- a/src/graph/executor/admin/SubmitJobExecutor.h +++ b/src/graph/executor/admin/SubmitJobExecutor.h @@ -18,6 +18,11 @@ class SubmitJobExecutor final : public Executor { : Executor("SubmitJobExecutor", node, qctx) {} folly::Future execute() override; + + private: + FRIEND_TEST(JobTest, JobFinishTime); + StatusOr buildResult(meta::cpp2::AdminJobOp jobOp, meta::cpp2::AdminJobResult &&resp); + Value convertJobTimestampToDateTime(int64_t timestamp); }; } // namespace graph diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index 030f6b43dd9..c0a8925a452 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -81,6 +81,7 @@ nebula_add_test( CartesianProductTest.cpp AssignTest.cpp ShowQueriesTest.cpp + JobTest.cpp OBJECTS ${EXEC_QUERY_TEST_OBJS} LIBRARIES diff --git a/src/graph/executor/test/JobTest.cpp b/src/graph/executor/test/JobTest.cpp new file mode 100644 index 00000000000..58e4c33f79b --- /dev/null +++ b/src/graph/executor/test/JobTest.cpp @@ -0,0 +1,69 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include + +#include "common/time/TimeUtils.h" +#include "graph/context/QueryContext.h" +#include "graph/executor/admin/SubmitJobExecutor.h" +#include "graph/planner/plan/Admin.h" + +namespace nebula { +namespace graph { +class JobTest : public testing::Test {}; + +TEST_F(JobTest, JobFinishTime) { + { + meta::cpp2::AdminJobResult resp; + resp.set_job_id(0); + meta::cpp2::JobDesc jobDesc; + jobDesc.set_id(0); + jobDesc.set_start_time(123); + jobDesc.set_stop_time(0); + resp.set_job_desc({std::move(jobDesc)}); + meta::cpp2::TaskDesc taskDesc; + taskDesc.set_start_time(456); + taskDesc.set_stop_time(0); + resp.set_task_desc({std::move(taskDesc)}); + + auto qctx = std::make_unique(); + auto submitJob = SubmitJob::make( + qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW, meta::cpp2::AdminCmd::UNKNOWN, {}); + auto submitJobExe = std::make_unique(submitJob, qctx.get()); + + auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW, std::move(resp)); + EXPECT_TRUE(status.ok()); + auto result = std::move(status).value(); + EXPECT_EQ(result.rows.size(), 2); + EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123))); + EXPECT_EQ(result.rows[0][4], Value::kEmpty); + EXPECT_EQ(result.rows[1][3], Value(time::TimeConversion::unixSecondsToDateTime(456))); + EXPECT_EQ(result.rows[1][4], Value::kEmpty); + } + { + meta::cpp2::AdminJobResult resp; + resp.set_job_id(0); + meta::cpp2::JobDesc jobDesc; + jobDesc.set_id(0); + jobDesc.set_start_time(123); + jobDesc.set_stop_time(0); + resp.set_job_desc({std::move(jobDesc)}); + + auto qctx = std::make_unique(); + auto submitJob = SubmitJob::make( + qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW_All, meta::cpp2::AdminCmd::UNKNOWN, {}); + auto submitJobExe = std::make_unique(submitJob, qctx.get()); + + auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW_All, std::move(resp)); + EXPECT_TRUE(status.ok()); + auto result = std::move(status).value(); + EXPECT_EQ(result.rows.size(), 1); + EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123))); + EXPECT_EQ(result.rows[0][4], Value::kEmpty); + } +} +} // namespace graph +} // namespace nebula