From dc5d0389f1a5e4b4677ec9add9530524dccb30de Mon Sep 17 00:00:00 2001 From: Nivras <12605142+Nivras@users.noreply.github.com> Date: Thu, 23 Dec 2021 18:00:43 +0800 Subject: [PATCH] add unit test --- src/storage/exec/IndexAggregateNode.cpp | 12 +- src/storage/exec/IndexProjectionNode.cpp | 2 +- src/storage/index/LookupProcessor.cpp | 53 ++- src/storage/index/LookupProcessor.h | 3 +- src/storage/test/LookupIndexTest.cpp | 531 ++++++++++++++++++++++- src/storage/test/QueryTestUtils.h | 10 + 6 files changed, 591 insertions(+), 20 deletions(-) diff --git a/src/storage/exec/IndexAggregateNode.cpp b/src/storage/exec/IndexAggregateNode.cpp index ca6334cedd2..8ed8727bd1c 100644 --- a/src/storage/exec/IndexAggregateNode.cpp +++ b/src/storage/exec/IndexAggregateNode.cpp @@ -27,10 +27,14 @@ nebula::cpp2::ErrorCode IndexAggregateNode::init(InitContext& ctx) { for (const auto& statInfo : statInfos_) { ctx.statColumns.insert(statInfo.first); } + auto ret = children_[0]->init(ctx); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } initStatValue(); retColMap_.clear(); retColMap_ = ctx.retColMap; - return children_[0]->init(ctx); + return nebula::cpp2::ErrorCode::SUCCEEDED; } void IndexAggregateNode::initStatValue() { @@ -45,10 +49,8 @@ void IndexAggregateNode::initStatValue() { void IndexAggregateNode::addStatValue(const Value& value, ColumnStat& stat) { switch (stat.statType_) { - case cpp2::StatType::SUM: - case cpp2::StatType::AVG: { + case cpp2::StatType::SUM: { stat.sum_ = stat.sum_ + value; - stat.count_ = stat.count_ + 1; break; } case cpp2::StatType::COUNT: { @@ -86,8 +88,6 @@ Row IndexAggregateNode::calculateStats() { result.values.emplace_back(stat.sum_); } else if (stat.statType_ == cpp2::StatType::COUNT) { result.values.emplace_back(stat.count_); - } else if (stat.statType_ == cpp2::StatType::AVG) { - result.values.emplace_back(stat.sum_ / stat.count_); } else if (stat.statType_ == cpp2::StatType::MAX) { result.values.emplace_back(stat.max_); } else if (stat.statType_ == cpp2::StatType::MIN) { diff --git a/src/storage/exec/IndexProjectionNode.cpp b/src/storage/exec/IndexProjectionNode.cpp index c2d701eb19a..261e3a21bf0 100644 --- a/src/storage/exec/IndexProjectionNode.cpp +++ b/src/storage/exec/IndexProjectionNode.cpp @@ -16,7 +16,7 @@ nebula::cpp2::ErrorCode IndexProjectionNode::init(InitContext& ctx) { ctx.requiredColumns.insert(col); } for (auto& col : ctx.statColumns) { - if (ctx.requiredColumns.find(col) != ctx.requiredColumns.end()) { + if (ctx.requiredColumns.find(col) == ctx.requiredColumns.end()) { ctx.requiredColumns.insert(col); requiredColumns_.push_back(col); } diff --git a/src/storage/index/LookupProcessor.cpp b/src/storage/index/LookupProcessor.cpp index 7ade10df676..bb2e217259d 100644 --- a/src/storage/index/LookupProcessor.cpp +++ b/src/storage/index/LookupProcessor.cpp @@ -203,7 +203,7 @@ void LookupProcessor::runInSingleThread(const std::vector& parts, datasetList.emplace_back(std::move(dataset)); codeList.emplace_back(code); } - if (statsDataSet_.colNames.size() > 0) { + if (statTypes_.size() > 0) { auto indexAgg = dynamic_cast(plan.get()); statsDataSet_.emplace_back(std::move(indexAgg->calculateStats())); } @@ -228,7 +228,7 @@ void LookupProcessor::runInSingleThread(const std::vector& parts, void LookupProcessor::runInMultipleThread(const std::vector& parts, std::unique_ptr plan) { std::vector> planCopy = reproducePlan(plan.get(), parts.size()); - using ReturnType = std::tuple>; + using ReturnType = std::tuple, Row>; std::vector> futures; for (size_t i = 0; i < parts.size(); i++) { futures.emplace_back(folly::via( @@ -251,15 +251,21 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, if (UNLIKELY(profileDetailFlag_)) { profilePlan(plan.get()); } - return {part, code, dataset}; + Row statResult; + if (code == nebula::cpp2::ErrorCode::SUCCEEDED && statTypes_.size() > 0) { + auto indexAgg = dynamic_cast(plan.get()); + statResult = indexAgg->calculateStats(); + } + return {part, code, dataset, statResult}; })); } - folly::collectAll(futures).via(executor_).thenTry([this, &plan](auto&& t) { + folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) { CHECK(!t.hasException()); const auto& tries = t.value(); + std::vector statResults; for (size_t j = 0; j < tries.size(); j++) { CHECK(!tries[j].hasException()); - auto& [partId, code, dataset] = tries[j].value(); + auto& [partId, code, dataset, statResult] = tries[j].value(); if (code == ::nebula::cpp2::ErrorCode::SUCCEEDED) { for (auto& row : dataset) { resultDataSet_.emplace_back(std::move(row)); @@ -267,12 +273,12 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, } else { handleErrorCode(code, context_->spaceId(), partId); } - } - if (statsDataSet_.colNames.size() > 0) { - auto indexAgg = dynamic_cast(plan.get()); - statsDataSet_.emplace_back(std::move(indexAgg->calculateStats())); + statResults.emplace_back(std::move(statResult)); } DLOG(INFO) << "finish"; + // IndexAggregateNode has been copyed and each part get it's own aggregate info, + // we need to merge it + this->mergeStatsResult(statResults); this->onProcessFinished(); this->onFinished(); }); @@ -286,6 +292,7 @@ LookupProcessor::handleStatProps(const std::vector& statProps) { for (size_t statIdx = 0; statIdx < statProps.size(); statIdx++) { const auto& statProp = statProps[statIdx]; + statTypes_.emplace_back(statProp.get_stat()); auto exp = Expression::decode(pool, *statProp.prop_ref()); if (exp == nullptr) { return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE; @@ -301,7 +308,8 @@ LookupProcessor::handleStatProps(const std::vector& statProps) { if (edgeName != context_->edgeName_) { return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; } - if (exp->kind() == Expression::Kind::kEdgeProperty) { + if (exp->kind() == Expression::Kind::kEdgeProperty && propName != kSrc && + propName != kDst) { auto edgeSchema = env_->schemaMan_->getEdgeSchema(context_->spaceId(), context_->edgeType_); auto field = edgeSchema->field(propName); @@ -346,6 +354,31 @@ LookupProcessor::handleStatProps(const std::vector& statProps) { return statInfos; } +void LookupProcessor::mergeStatsResult(const std::vector& statsResult) { + if (statsResult.size() == 0 || statTypes_.size() == 0) { + return; + } + + Row result; + for (size_t statIdx = 0; statIdx < statTypes_.size(); statIdx++) { + Value value = statsResult[0].values[statIdx]; + for (size_t resIdx = 1; resIdx < statsResult.size(); resIdx++) { + const auto& currType = statTypes_[statIdx]; + if (currType == cpp2::StatType::SUM || currType == cpp2::StatType::COUNT) { + value = value + statsResult[resIdx].values[statIdx]; + } else if (currType == cpp2::StatType::MAX) { + value = value > statsResult[resIdx].values[statIdx] ? value + : statsResult[resIdx].values[statIdx]; + } else if (currType == cpp2::StatType::MIN) { + value = value < statsResult[resIdx].values[statIdx] ? value + : statsResult[resIdx].values[statIdx]; + } + } + result.values.emplace_back(std::move(value)); + } + statsDataSet_.emplace_back(std::move(result)); +} + std::vector> LookupProcessor::reproducePlan(IndexNode* root, size_t count) { std::vector> ret(count); diff --git a/src/storage/index/LookupProcessor.h b/src/storage/index/LookupProcessor.h index 15ddbef49b4..9040f8bd05b 100644 --- a/src/storage/index/LookupProcessor.h +++ b/src/storage/index/LookupProcessor.h @@ -39,13 +39,14 @@ class LookupProcessor : public BaseProcessor { std::vector> reproducePlan(IndexNode* root, size_t count); ErrorOr>> handleStatProps(const std::vector& statProps); + void mergeStatsResult(const std::vector& statsResult); folly::Executor* executor_{nullptr}; std::unique_ptr planContext_; std::unique_ptr context_; nebula::DataSet resultDataSet_; nebula::DataSet statsDataSet_; std::vector partResults_; - std::vector statColumnName_; + std::vector statTypes_; }; } // namespace storage } // namespace nebula diff --git a/src/storage/test/LookupIndexTest.cpp b/src/storage/test/LookupIndexTest.cpp index d94b0e33899..00fa29bd96e 100644 --- a/src/storage/test/LookupIndexTest.cpp +++ b/src/storage/test/LookupIndexTest.cpp @@ -2923,6 +2923,7 @@ TEST_P(LookupIndexTest, DedupEdgeIndexTest) { } } +// test aggregate in tag, like sum(age) as "total age" TEST_P(LookupIndexTest, AggregateTagIndexTest) { fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); mock::MockCluster cluster; @@ -3019,8 +3020,534 @@ TEST_P(LookupIndexTest, AggregateTagIndexTest) { row2.emplace_back(Value(41L)); expectRows.emplace_back(Row(row2)); QueryTestUtils::checkResponse(resp, expectCols, expectRows); - nebula::DataSet emptySet; - EXPECT_EQ(*resp.get_stat_data(), emptySet); + + std::vector expectStatColumns; + nebula::Row expectStatRow; + expectStatColumns.emplace_back("total age"); + expectStatRow.values.push_back(Value(75L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +// test aggregate in tag, like sum(age) as "total age", and age not in returnColumns +TEST_P(LookupIndexTest, AggregateTagPropNotInReturnColumnsTest) { + fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + nebula::cpp2::SchemaID schemaId; + schemaId.set_tag_id(1); + indices.set_schema_id(schemaId); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + + req.set_parts(std::move(parts)); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + req.set_return_columns(std::move(returnCols)); + + // player.name_ == "Rudy Gay" + cpp2::IndexColumnHint columnHint1; + std::string name1 = "Rudy Gay"; + columnHint1.set_begin_value(Value(name1)); + columnHint1.set_column_name("name"); + columnHint1.set_scan_type(cpp2::ScanType::PREFIX); + + // player.name_ == "Kobe Bryant" + cpp2::IndexColumnHint columnHint2; + std::string name2 = "Kobe Bryant"; + columnHint2.set_begin_value(Value(name2)); + columnHint2.set_column_name("name"); + columnHint2.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints1)); + context1.set_filter(""); + context1.set_index_id(1); + std::vector statProps; + cpp2::IndexQueryContext context2; + context2.set_column_hints(std::move(columnHints2)); + context2.set_filter(""); + context2.set_index_id(1); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + + cpp2::StatProp statProp; + statProp.set_alias("total age"); + const auto& exp = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp.set_prop(Expression::encode(exp)); + statProp.set_stat(cpp2::StatType::SUM); + statProps.emplace_back(std::move(statProp)); + req.set_stat_columns(std::move(statProps)); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + std::vector expectCols = {std::string("1.").append(kVid), + std::string("1.").append(kTag)}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2; + vId1.append(name1.data(), name1.size()); + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(1L)); + expectRows.emplace_back(Row(row1)); + + vId2.append(name2.data(), name2.size()); + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(1L)); + expectRows.emplace_back(Row(row2)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + nebula::Row expectStatRow; + expectStatColumns.emplace_back("total age"); + expectStatRow.values.push_back(Value(75L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +// test multi aggregate in tag, like sum(age) as "total age", max(age) as "max age", +// max(kVid) as "max kVid", min(age) as "min age" +TEST_P(LookupIndexTest, AggregateTagPropsTest) { + fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + nebula::cpp2::SchemaID schemaId; + schemaId.set_tag_id(1); + indices.set_schema_id(schemaId); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + + req.set_parts(std::move(parts)); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + returnCols.emplace_back("age"); + req.set_return_columns(std::move(returnCols)); + + // player.name_ == "Rudy Gay" + cpp2::IndexColumnHint columnHint1; + std::string name1 = "Rudy Gay"; + columnHint1.set_begin_value(Value(name1)); + columnHint1.set_column_name("name"); + columnHint1.set_scan_type(cpp2::ScanType::PREFIX); + + // player.name_ == "Kobe Bryant" + cpp2::IndexColumnHint columnHint2; + std::string name2 = "Kobe Bryant"; + columnHint2.set_begin_value(Value(name2)); + columnHint2.set_column_name("name"); + columnHint2.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints1)); + context1.set_filter(""); + context1.set_index_id(1); + std::vector statProps; + cpp2::IndexQueryContext context2; + context2.set_column_hints(std::move(columnHints2)); + context2.set_filter(""); + context2.set_index_id(1); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + + cpp2::StatProp statProp1; + statProp1.set_alias("total age"); + const auto& exp1 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp1.set_prop(Expression::encode(exp1)); + statProp1.set_stat(cpp2::StatType::SUM); + statProps.emplace_back(statProp1); + + cpp2::StatProp statProp2; + statProp2.set_alias("max age"); + const auto& exp2 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp2.set_prop(Expression::encode(exp2)); + statProp2.set_stat(cpp2::StatType::MAX); + statProps.emplace_back(statProp2); + + cpp2::StatProp statProp3; + statProp3.set_alias("max kVid"); + const auto& exp3 = *TagPropertyExpression::make(pool, folly::to(1), kVid); + statProp3.set_prop(Expression::encode(exp3)); + statProp3.set_stat(cpp2::StatType::MAX); + statProps.emplace_back(statProp3); + + cpp2::StatProp statProp4; + statProp4.set_alias("min age"); + const auto& exp4 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp4.set_prop(Expression::encode(exp4)); + statProp4.set_stat(cpp2::StatType::MIN); + statProps.emplace_back(statProp4); + + req.set_stat_columns(std::move(statProps)); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + std::vector expectCols = { + std::string("1.").append(kVid), std::string("1.").append(kTag), "1.age"}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2; + vId1.append(name1.data(), name1.size()); + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(1L)); + row1.emplace_back(Value(34L)); + expectRows.emplace_back(Row(row1)); + + vId2.append(name2.data(), name2.size()); + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(1L)); + row2.emplace_back(Value(41L)); + expectRows.emplace_back(Row(row2)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total age"); + expectStatColumns.emplace_back("max age"); + expectStatColumns.emplace_back("max kVid"); + expectStatColumns.emplace_back("min age"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(75L)); + expectStatRow.values.push_back(Value(41L)); + expectStatRow.values.push_back(Value(vId1)); + expectStatRow.values.push_back(Value(34L)); + + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +TEST_P(LookupIndexTest, AggregateEdgeIndexTest) { + fs::TempDir rootPath("/tmp/AggregateEdgeIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_TRUE(QueryTestUtils::mockEdgeData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + nebula::cpp2::SchemaID schemaId; + schemaId.set_edge_type(102); + indices.set_schema_id(schemaId); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.set_parts(std::move(parts)); + + std::string tony = "Tony Parker"; + std::string manu = "Manu Ginobili"; + std::string yao = "Yao Ming"; + std::string tracy = "Tracy McGrady"; + std::vector returnCols; + returnCols.emplace_back(kSrc); + returnCols.emplace_back(kType); + returnCols.emplace_back(kRank); + returnCols.emplace_back(kDst); + returnCols.emplace_back("teamName"); + returnCols.emplace_back("startYear"); + req.set_return_columns(std::move(returnCols)); + + // teammates.player1 == "Tony Parker" + cpp2::IndexColumnHint columnHint1; + columnHint1.set_begin_value(Value(tony)); + columnHint1.set_column_name("player1"); + columnHint1.set_scan_type(cpp2::ScanType::PREFIX); + // teammates.player1 == "Yao Ming" + cpp2::IndexColumnHint columnHint2; + columnHint2.set_begin_value(Value(yao)); + columnHint2.set_column_name("player1"); + columnHint2.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints1)); + context1.set_filter(""); + context1.set_index_id(102); + cpp2::IndexQueryContext context2; + context2.set_column_hints(std::move(columnHints2)); + context2.set_filter(""); + context2.set_index_id(102); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + + std::vector statProps; + cpp2::StatProp statProp1; + statProp1.set_alias("total startYear"); + const auto& exp1 = *EdgePropertyExpression::make(pool, folly::to(102), "startYear"); + statProp1.set_prop(Expression::encode(exp1)); + statProp1.set_stat(cpp2::StatType::SUM); + statProps.emplace_back(std::move(statProp1)); + + cpp2::StatProp statProp2; + statProp2.set_alias("max kDst"); + const auto& exp2 = *EdgePropertyExpression::make(pool, folly::to(102), kDst); + statProp2.set_prop(Expression::encode(exp2)); + statProp2.set_stat(cpp2::StatType::MAX); + statProps.emplace_back(std::move(statProp2)); + req.set_stat_columns(std::move(statProps)); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + std::vector expectCols = {std::string("102.").append(kSrc), + std::string("102.").append(kType), + std::string("102.").append(kRank), + std::string("102.").append(kDst), + "102.teamName", + "102.startYear"}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2, vId3, vId4; + vId1.append(tony.data(), tony.size()); + vId2.append(manu.data(), manu.size()); + vId3.append(yao.data(), yao.size()); + vId4.append(tracy.data(), tracy.size()); + + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(102L)); + row1.emplace_back(Value(2002L)); + row1.emplace_back(Value(vId2)); + row1.emplace_back(Value("Spurs")); + row1.emplace_back(Value(2002)); + expectRows.emplace_back(Row(row1)); + + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(102L)); + row2.emplace_back(Value(2002L)); + row2.emplace_back(Value(vId1)); + row2.emplace_back(Value("Spurs")); + row2.emplace_back(Value(2002)); + expectRows.emplace_back(Row(row2)); + + Row row3; + row3.emplace_back(Value(vId3)); + row3.emplace_back(Value(102L)); + row3.emplace_back(Value(2004L)); + row3.emplace_back(Value(vId4)); + row3.emplace_back(Value("Rockets")); + row3.emplace_back(Value(2004L)); + expectRows.emplace_back(Row(row3)); + + Row row4; + row4.emplace_back(Value(vId4)); + row4.emplace_back(Value(102L)); + row4.emplace_back(Value(2004L)); + row4.emplace_back(Value(vId3)); + row4.emplace_back(Value("Rockets")); + row4.emplace_back(Value(2004L)); + expectRows.emplace_back(Row(row4)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total startYear"); + expectStatColumns.emplace_back("max kDst"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(8012L)); + expectStatRow.values.push_back(Value(vId3)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +TEST_P(LookupIndexTest, AggregateEdgePropNotInReturnColumnsTest) { + fs::TempDir rootPath("/tmp/AggregateEdgeIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_TRUE(QueryTestUtils::mockEdgeData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + nebula::cpp2::SchemaID schemaId; + schemaId.set_edge_type(102); + indices.set_schema_id(schemaId); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.set_parts(std::move(parts)); + + std::string tony = "Tony Parker"; + std::string manu = "Manu Ginobili"; + std::string yao = "Yao Ming"; + std::string tracy = "Tracy McGrady"; + std::vector returnCols; + returnCols.emplace_back(kSrc); + returnCols.emplace_back(kType); + returnCols.emplace_back(kRank); + returnCols.emplace_back(kDst); + req.set_return_columns(std::move(returnCols)); + + // teammates.player1 == "Tony Parker" + cpp2::IndexColumnHint columnHint1; + columnHint1.set_begin_value(Value(tony)); + columnHint1.set_column_name("player1"); + columnHint1.set_scan_type(cpp2::ScanType::PREFIX); + // teammates.player1 == "Yao Ming" + cpp2::IndexColumnHint columnHint2; + columnHint2.set_begin_value(Value(yao)); + columnHint2.set_column_name("player1"); + columnHint2.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints1)); + context1.set_filter(""); + context1.set_index_id(102); + cpp2::IndexQueryContext context2; + context2.set_column_hints(std::move(columnHints2)); + context2.set_filter(""); + context2.set_index_id(102); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + + std::vector statProps; + cpp2::StatProp statProp1; + statProp1.set_alias("total startYear"); + const auto& exp1 = *EdgePropertyExpression::make(pool, folly::to(102), "startYear"); + statProp1.set_prop(Expression::encode(exp1)); + statProp1.set_stat(cpp2::StatType::SUM); + statProps.emplace_back(std::move(statProp1)); + + cpp2::StatProp statProp2; + statProp2.set_alias("count teamName"); + const auto& exp2 = *EdgePropertyExpression::make(pool, folly::to(102), "teamName"); + statProp2.set_prop(Expression::encode(exp2)); + statProp2.set_stat(cpp2::StatType::COUNT); + statProps.emplace_back(std::move(statProp2)); + req.set_stat_columns(std::move(statProps)); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + std::vector expectCols = {std::string("102.").append(kSrc), + std::string("102.").append(kType), + std::string("102.").append(kRank), + std::string("102.").append(kDst)}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2, vId3, vId4; + vId1.append(tony.data(), tony.size()); + vId2.append(manu.data(), manu.size()); + vId3.append(yao.data(), yao.size()); + vId4.append(tracy.data(), tracy.size()); + + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(102L)); + row1.emplace_back(Value(2002L)); + row1.emplace_back(Value(vId2)); + expectRows.emplace_back(Row(row1)); + + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(102L)); + row2.emplace_back(Value(2002L)); + row2.emplace_back(Value(vId1)); + expectRows.emplace_back(Row(row2)); + + Row row3; + row3.emplace_back(Value(vId3)); + row3.emplace_back(Value(102L)); + row3.emplace_back(Value(2004L)); + row3.emplace_back(Value(vId4)); + expectRows.emplace_back(Row(row3)); + + Row row4; + row4.emplace_back(Value(vId4)); + row4.emplace_back(Value(102L)); + row4.emplace_back(Value(2004L)); + row4.emplace_back(Value(vId3)); + expectRows.emplace_back(Row(row4)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total startYear"); + expectStatColumns.emplace_back("count teamName"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(8012L)); + expectStatRow.values.push_back(Value(4L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); } INSTANTIATE_TEST_CASE_P(Lookup_concurrently, LookupIndexTest, ::testing::Values(false, true)); diff --git a/src/storage/test/QueryTestUtils.h b/src/storage/test/QueryTestUtils.h index 05b2371e69d..f7d3f4ac526 100644 --- a/src/storage/test/QueryTestUtils.h +++ b/src/storage/test/QueryTestUtils.h @@ -412,6 +412,16 @@ class QueryTestUtils { EXPECT_EQ(expectRows, actualRows); } + static void checkStatResponse(const cpp2::LookupIndexResp& resp, + const std::vector& expectCols, + const Row& expectRow) { + auto columns = (*resp.stat_data_ref()).colNames; + EXPECT_EQ(expectCols, columns); + auto actualRows = (*resp.stat_data_ref()).rows; + EXPECT_EQ(1, actualRows.size()); + EXPECT_EQ(actualRows[0], expectRow); + } + static void checkColNames( const nebula::DataSet& dataSet, const std::vector>>& tags,