Skip to content

Commit

Permalink
Fix and add ut.
Browse files Browse the repository at this point in the history
  • Loading branch information
CPWstatic committed Sep 2, 2021
1 parent da8446c commit daf17ea
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 83 deletions.
172 changes: 89 additions & 83 deletions src/graph/executor/admin/SubmitJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,91 +35,97 @@ folly::Future<Status> SubmitJobExecutor::execute() {
LOG(ERROR) << resp.status().toString();
return std::move(resp).status();
}
switch (jobOp) {
case meta::cpp2::AdminJobOp::ADD: {
nebula::DataSet v({"New Job Id"});
DCHECK(resp.value().job_id_ref().has_value());
if (!resp.value().job_id_ref().has_value()) {
return Status::Error("Response unexpected.");
}
v.emplace_back(nebula::Row({*resp.value().job_id_ref()}));
return finish(std::move(v));
}
case meta::cpp2::AdminJobOp::RECOVER: {
nebula::DataSet v({"Recovered job num"});
DCHECK(resp.value().recovered_job_num_ref().has_value());
if (!resp.value().recovered_job_num_ref().has_value()) {
return Status::Error("Response unexpected.");
}
v.emplace_back(nebula::Row({*resp.value().recovered_job_num_ref()}));
return finish(std::move(v));
}
case meta::cpp2::AdminJobOp::SHOW: {
nebula::DataSet v(
{"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
DCHECK(resp.value().job_desc_ref().has_value());
if (!resp.value().job_desc_ref().has_value()) {
return Status::Error("Response unexpected.");
}
DCHECK(resp.value().task_desc_ref().has_value());
if (!resp.value().task_desc_ref().has_value()) {
return Status::Error("Response unexpected");
}
auto &jobDesc = *resp.value().job_desc_ref();
// job desc
auto stopTime = jobDesc.front().get_stop_time() > 0
? Value(time::TimeConversion::unixSecondsToDateTime(
jobDesc.front().get_stop_time()))
: Value::kEmpty;
v.emplace_back(nebula::Row({
jobDesc.front().get_id(),
apache::thrift::util::enumNameSafe(jobDesc.front().get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.front().get_status()),
time::TimeConversion::unixSecondsToDateTime(jobDesc.front().get_start_time()),
std::move(stopTime),
}));
// tasks desc
auto &tasksDesc = *resp.value().get_task_desc();
for (const auto &taskDesc : tasksDesc) {
v.emplace_back(nebula::Row({
taskDesc.get_task_id(),
taskDesc.get_host().host,
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
time::TimeConversion::unixSecondsToDateTime(taskDesc.get_start_time()),
time::TimeConversion::unixSecondsToDateTime(taskDesc.get_stop_time()),
}));
}
return finish(std::move(v));
}
case meta::cpp2::AdminJobOp::SHOW_All: {
nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"});
DCHECK(resp.value().job_desc_ref().has_value());
if (!resp.value().job_desc_ref().has_value()) {
return Status::Error("Response unexpected");
}
const auto &jobsDesc = *resp.value().job_desc_ref();
for (const auto &jobDesc : jobsDesc) {
v.emplace_back(nebula::Row({
jobDesc.get_id(),
apache::thrift::util::enumNameSafe(jobDesc.get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.get_status()),
time::TimeConversion::unixSecondsToDateTime(jobDesc.get_start_time()),
time::TimeConversion::unixSecondsToDateTime(jobDesc.get_stop_time()),
}));
}
return finish(std::move(v));
}
case meta::cpp2::AdminJobOp::STOP: {
nebula::DataSet v({"Result"});
v.emplace_back(nebula::Row({"Job stopped"}));
return finish(std::move(v));
}
// no default so the compiler will warning when lack
}
DLOG(FATAL) << "Unknown job operation " << static_cast<int>(jobOp);
return Status::Error("Unknown job job operation %d.", static_cast<int>(jobOp));
auto status = buildResult(jobOp, std::move(resp).value());
NG_RETURN_IF_ERROR(status);
return finish(std::move(status).value());
});
}

StatusOr<DataSet> SubmitJobExecutor::buildResult(meta::cpp2::AdminJobOp jobOp,
meta::cpp2::AdminJobResult &&resp) {
switch (jobOp) {
case meta::cpp2::AdminJobOp::ADD: {
nebula::DataSet v({"New Job Id"});
DCHECK(resp.job_id_ref().has_value());
if (!resp.job_id_ref().has_value()) {
return Status::Error("Response unexpected.");
}
v.emplace_back(nebula::Row({*resp.job_id_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::RECOVER: {
nebula::DataSet v({"Recovered job num"});
DCHECK(resp.recovered_job_num_ref().has_value());
if (!resp.recovered_job_num_ref().has_value()) {
return Status::Error("Response unexpected.");
}
v.emplace_back(nebula::Row({*resp.recovered_job_num_ref()}));
return v;
}
case meta::cpp2::AdminJobOp::SHOW: {
nebula::DataSet v({"Job Id(TaskId)", "Command(Dest)", "Status", "Start Time", "Stop Time"});
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
return Status::Error("Response unexpected.");
}
DCHECK(resp.task_desc_ref().has_value());
if (!resp.task_desc_ref().has_value()) {
return Status::Error("Response unexpected");
}
auto &jobDesc = *resp.job_desc_ref();
// job desc
v.emplace_back(nebula::Row({
jobDesc.front().get_id(),
apache::thrift::util::enumNameSafe(jobDesc.front().get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.front().get_status()),
convertJobTimestampToDateTime(jobDesc.front().get_start_time()),
convertJobTimestampToDateTime(jobDesc.front().get_stop_time()),
}));
// tasks desc
auto &tasksDesc = *resp.get_task_desc();
for (const auto &taskDesc : tasksDesc) {
v.emplace_back(nebula::Row({
taskDesc.get_task_id(),
taskDesc.get_host().host,
apache::thrift::util::enumNameSafe(taskDesc.get_status()),
convertJobTimestampToDateTime(taskDesc.get_start_time()),
convertJobTimestampToDateTime(taskDesc.get_stop_time()),
}));
}
return v;
}
case meta::cpp2::AdminJobOp::SHOW_All: {
nebula::DataSet v({"Job Id", "Command", "Status", "Start Time", "Stop Time"});
DCHECK(resp.job_desc_ref().has_value());
if (!resp.job_desc_ref().has_value()) {
return Status::Error("Response unexpected");
}
const auto &jobsDesc = *resp.job_desc_ref();
for (const auto &jobDesc : jobsDesc) {
v.emplace_back(nebula::Row({
jobDesc.get_id(),
apache::thrift::util::enumNameSafe(jobDesc.get_cmd()),
apache::thrift::util::enumNameSafe(jobDesc.get_status()),
convertJobTimestampToDateTime(jobDesc.get_start_time()),
convertJobTimestampToDateTime(jobDesc.get_stop_time()),
}));
}
return v;
}
case meta::cpp2::AdminJobOp::STOP: {
nebula::DataSet v({"Result"});
v.emplace_back(nebula::Row({"Job stopped"}));
return v;
}
// no default so the compiler will warning when lack
}
DLOG(FATAL) << "Unknown job operation " << static_cast<int>(jobOp);
return Status::Error("Unknown job job operation %d.", static_cast<int>(jobOp));
}

Value SubmitJobExecutor::convertJobTimestampToDateTime(int64_t timestamp) {
return timestamp > 0 ? Value(time::TimeConversion::unixSecondsToDateTime(timestamp))
: Value::kEmpty;
}
} // namespace graph
} // namespace nebula
5 changes: 5 additions & 0 deletions src/graph/executor/admin/SubmitJobExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class SubmitJobExecutor final : public Executor {
: Executor("SubmitJobExecutor", node, qctx) {}

folly::Future<Status> execute() override;

private:
FRIEND_TEST(JobTest, JobFinishTime);
StatusOr<DataSet> buildResult(meta::cpp2::AdminJobOp jobOp, meta::cpp2::AdminJobResult &&resp);
Value convertJobTimestampToDateTime(int64_t timestamp);
};

} // namespace graph
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ nebula_add_test(
CartesianProductTest.cpp
AssignTest.cpp
ShowQueriesTest.cpp
JobTest.cpp
OBJECTS
${EXEC_QUERY_TEST_OBJS}
LIBRARIES
Expand Down
69 changes: 69 additions & 0 deletions src/graph/executor/test/JobTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <gtest/gtest.h>

#include "common/time/TimeUtils.h"
#include "graph/context/QueryContext.h"
#include "graph/executor/admin/SubmitJobExecutor.h"
#include "graph/planner/plan/Admin.h"

namespace nebula {
namespace graph {
class JobTest : public testing::Test {};

TEST_F(JobTest, JobFinishTime) {
{
meta::cpp2::AdminJobResult resp;
resp.set_job_id(0);
meta::cpp2::JobDesc jobDesc;
jobDesc.set_id(0);
jobDesc.set_start_time(123);
jobDesc.set_stop_time(0);
resp.set_job_desc({std::move(jobDesc)});
meta::cpp2::TaskDesc taskDesc;
taskDesc.set_start_time(456);
taskDesc.set_stop_time(0);
resp.set_task_desc({std::move(taskDesc)});

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW, meta::cpp2::AdminCmd::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 2);
EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123)));
EXPECT_EQ(result.rows[0][4], Value::kEmpty);
EXPECT_EQ(result.rows[1][3], Value(time::TimeConversion::unixSecondsToDateTime(456)));
EXPECT_EQ(result.rows[1][4], Value::kEmpty);
}
{
meta::cpp2::AdminJobResult resp;
resp.set_job_id(0);
meta::cpp2::JobDesc jobDesc;
jobDesc.set_id(0);
jobDesc.set_start_time(123);
jobDesc.set_stop_time(0);
resp.set_job_desc({std::move(jobDesc)});

auto qctx = std::make_unique<QueryContext>();
auto submitJob = SubmitJob::make(
qctx.get(), nullptr, meta::cpp2::AdminJobOp::SHOW_All, meta::cpp2::AdminCmd::UNKNOWN, {});
auto submitJobExe = std::make_unique<SubmitJobExecutor>(submitJob, qctx.get());

auto status = submitJobExe->buildResult(meta::cpp2::AdminJobOp::SHOW_All, std::move(resp));
EXPECT_TRUE(status.ok());
auto result = std::move(status).value();
EXPECT_EQ(result.rows.size(), 1);
EXPECT_EQ(result.rows[0][3], Value(time::TimeConversion::unixSecondsToDateTime(123)));
EXPECT_EQ(result.rows[0][4], Value::kEmpty);
}
}
} // namespace graph
} // namespace nebula

0 comments on commit daf17ea

Please sign in to comment.