From 5c0f8ca370691d37783dfb0ae8288cbab10eda83 Mon Sep 17 00:00:00 2001 From: Nivras <12605142+Nivras@users.noreply.github.com> Date: Thu, 16 Dec 2021 15:07:47 +0800 Subject: [PATCH] LookupIndex push aggregate --- src/interface/storage.thrift | 3 + src/meta/processors/BaseProcessor-inl.h | 1 + src/storage/BaseProcessor-inl.h | 27 + src/storage/BaseProcessor.h | 3 + src/storage/CMakeLists.txt | 1 + src/storage/exec/IndexAggregateNode.cpp | 125 ++++ src/storage/exec/IndexAggregateNode.h | 49 ++ src/storage/exec/IndexExprContext.h | 89 +++ src/storage/exec/IndexNode.h | 3 + src/storage/exec/IndexProjectionNode.cpp | 6 + src/storage/exec/IndexSelectionNode.cpp | 24 +- src/storage/exec/IndexSelectionNode.h | 60 +- src/storage/index/LookupProcessor.cpp | 143 ++++- src/storage/index/LookupProcessor.h | 9 +- src/storage/query/GetNeighborsProcessor.cpp | 28 +- src/storage/query/GetNeighborsProcessor.h | 3 - src/storage/test/LookupIndexTest.cpp | 629 +++++++++++++++++++- src/storage/test/QueryTestUtils.h | 10 + 18 files changed, 1096 insertions(+), 117 deletions(-) create mode 100644 src/storage/exec/IndexAggregateNode.cpp create mode 100644 src/storage/exec/IndexAggregateNode.h create mode 100644 src/storage/exec/IndexExprContext.h diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 1ff18fd1cd8..eb6173110ad 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -495,6 +495,8 @@ struct LookupIndexResp { // Each column represents one property. the column name is in the form of "tag_name.prop_alias" // or "edge_type_name.prop_alias" in the same order which specified in return_columns of request 2: optional common.DataSet data, + // stat_data only have one column, the column name is the order in LookupIndexRequest.stat_prop + 3: optional common.DataSet stat_data, } enum ScanType { @@ -546,6 +548,7 @@ struct LookupIndexRequest { // max row count of each partition in this response 6: optional i64 limit, 7: optional list order_by, + 8: optional list stat_columns, } diff --git a/src/meta/processors/BaseProcessor-inl.h b/src/meta/processors/BaseProcessor-inl.h index 85cd44ceafd..03780043fd5 100644 --- a/src/meta/processors/BaseProcessor-inl.h +++ b/src/meta/processors/BaseProcessor-inl.h @@ -5,6 +5,7 @@ #pragma once +#include "interface/gen-cpp2/storage_types.h" #include "meta/processors/BaseProcessor.h" namespace nebula { diff --git a/src/storage/BaseProcessor-inl.h b/src/storage/BaseProcessor-inl.h index 735cf4dc83e..985a492e138 100644 --- a/src/storage/BaseProcessor-inl.h +++ b/src/storage/BaseProcessor-inl.h @@ -186,5 +186,32 @@ StatusOr BaseProcessor::encodeRowVal(const meta::NebulaSchema return std::move(rowWrite).moveEncodedStr(); } +template +nebula::cpp2::ErrorCode BaseProcessor::checkStatType( + const meta::SchemaProviderIf::Field& field, cpp2::StatType statType) { + // todo(doodle): how to deal with nullable fields? For now, null add anything + // is null, if there is even one null, the result will be invalid + auto fType = field.type(); + switch (statType) { + case cpp2::StatType::SUM: + case cpp2::StatType::AVG: + case cpp2::StatType::MIN: + case cpp2::StatType::MAX: { + if (fType == nebula::cpp2::PropertyType::INT64 || + fType == nebula::cpp2::PropertyType::INT32 || + fType == nebula::cpp2::PropertyType::INT16 || fType == nebula::cpp2::PropertyType::INT8 || + fType == nebula::cpp2::PropertyType::FLOAT || + fType == nebula::cpp2::PropertyType::DOUBLE) { + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE; + } + case cpp2::StatType::COUNT: { + break; + } + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + } // namespace storage } // namespace nebula diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index bc64c5bfa73..9e68f5fd564 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -101,6 +101,9 @@ class BaseProcessor { void handleAsync(GraphSpaceID spaceId, PartitionID partId, nebula::cpp2::ErrorCode code); + nebula::cpp2::ErrorCode checkStatType(const meta::SchemaProviderIf::Field& field, + cpp2::StatType statType); + StatusOr encodeRowVal(const meta::NebulaSchemaProvider* schema, const std::vector& propNames, const std::vector& props, diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 6b1d568b5cc..6a5ade90293 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -44,6 +44,7 @@ nebula_add_library( exec/IndexDedupNode.cpp exec/IndexEdgeScanNode.cpp exec/IndexLimitNode.cpp + exec/IndexAggregateNode.cpp exec/IndexProjectionNode.cpp exec/IndexScanNode.cpp exec/IndexSelectionNode.cpp diff --git a/src/storage/exec/IndexAggregateNode.cpp b/src/storage/exec/IndexAggregateNode.cpp new file mode 100644 index 00000000000..3f19f65bb6b --- /dev/null +++ b/src/storage/exec/IndexAggregateNode.cpp @@ -0,0 +1,125 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "storage/exec/IndexAggregateNode.h" + +namespace nebula { +namespace storage { + +IndexAggregateNode::IndexAggregateNode(const IndexAggregateNode& node) + : IndexNode(node), statInfos_(node.statInfos_), returnColumnsCount_(node.returnColumnsCount_) { + stats_ = node.stats_; + retColMap_ = node.retColMap_; +} + +IndexAggregateNode::IndexAggregateNode( + RuntimeContext* context, + const std::vector>& statInfos, + size_t returnColumnsCount) + : IndexNode(context, "IndexAggregateNode"), + statInfos_(statInfos), + returnColumnsCount_(returnColumnsCount) {} + +nebula::cpp2::ErrorCode IndexAggregateNode::init(InitContext& ctx) { + DCHECK_EQ(children_.size(), 1); + for (const auto& statInfo : statInfos_) { + ctx.statColumns.insert(statInfo.first); + } + auto ret = children_[0]->init(ctx); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + initStatValue(); + retColMap_.clear(); + retColMap_ = ctx.retColMap; + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +void IndexAggregateNode::initStatValue() { + stats_.clear(); + if (statInfos_.size() > 0) { + stats_.reserve(statInfos_.size()); + for (const auto& statInfo : statInfos_) { + stats_.emplace_back(statInfo.second); + } + } +} + +void IndexAggregateNode::addStatValue(const Value& value, ColumnStat* stat) { + switch (stat->statType_) { + case cpp2::StatType::SUM: { + stat->sum_ = stat->sum_ + value; + break; + } + case cpp2::StatType::COUNT: { + stat->count_ = stat->count_ + 1; + break; + } + case cpp2::StatType::MAX: { + stat->max_ = value > stat->max_ ? value : stat->max_; + break; + } + case cpp2::StatType::MIN: { + stat->min_ = value < stat->min_ ? value : stat->min_; + break; + } + default: + LOG(ERROR) << "get invalid stat type"; + return; + } +} + +Row IndexAggregateNode::project(Row&& row) { + Row ret; + ret.reserve(returnColumnsCount_); + for (size_t i = 0; i < returnColumnsCount_; i++) { + ret.emplace_back(std::move(row[i])); + } + return ret; +} + +Row IndexAggregateNode::calculateStats() { + Row result; + result.values.reserve(stats_.size()); + for (const auto& stat : stats_) { + if (stat.statType_ == cpp2::StatType::SUM) { + result.values.emplace_back(stat.sum_); + } else if (stat.statType_ == cpp2::StatType::COUNT) { + result.values.emplace_back(stat.count_); + } else if (stat.statType_ == cpp2::StatType::MAX) { + result.values.emplace_back(stat.max_); + } else if (stat.statType_ == cpp2::StatType::MIN) { + result.values.emplace_back(stat.min_); + } + } + return result; +} + +IndexNode::Result IndexAggregateNode::doNext() { + DCHECK_EQ(children_.size(), 1); + auto& child = *children_[0]; + Result result = child.next(); + const auto& row = result.row(); + if (result.hasData()) { + for (size_t i = 0; i < statInfos_.size(); i++) { + const auto& columnName = statInfos_[i].first; + addStatValue(row[retColMap_[columnName]], &stats_[i]); + } + result = Result(project(std::move(result).row())); + } + return result; +} + +std::unique_ptr IndexAggregateNode::copy() { + return std::make_unique(*this); +} + +std::string IndexAggregateNode::identify() { + return ""; +} + +} // namespace storage + +} // namespace nebula diff --git a/src/storage/exec/IndexAggregateNode.h b/src/storage/exec/IndexAggregateNode.h new file mode 100644 index 00000000000..944de7ee4a5 --- /dev/null +++ b/src/storage/exec/IndexAggregateNode.h @@ -0,0 +1,49 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once +#include "storage/exec/IndexNode.h" + +namespace nebula { +namespace storage { + +// used to save stat value for each column +struct ColumnStat { + ColumnStat() = default; + + explicit ColumnStat(const cpp2::StatType& statType) : statType_(statType) {} + + cpp2::StatType statType_; + mutable Value sum_ = 0L; + mutable Value count_ = 0L; + mutable Value min_ = std::numeric_limits::max(); + mutable Value max_ = std::numeric_limits::min(); +}; + +class IndexAggregateNode : public IndexNode { + public: + IndexAggregateNode(const IndexAggregateNode& node); + explicit IndexAggregateNode(RuntimeContext* context, + const std::vector>& statInfos, + size_t returnColumnsCount); + + nebula::cpp2::ErrorCode init(InitContext& ctx) override; + void initStatValue(); + void addStatValue(const Value& value, ColumnStat* stat); + Row project(Row&& row); + Row calculateStats(); + + std::unique_ptr copy() override; + std::string identify() override; + + private: + Result doNext() override; + std::vector> statInfos_; + std::vector stats_; + Map retColMap_; + size_t returnColumnsCount_; +}; + +} // namespace storage +} // namespace nebula diff --git a/src/storage/exec/IndexExprContext.h b/src/storage/exec/IndexExprContext.h new file mode 100644 index 00000000000..a0cf4795062 --- /dev/null +++ b/src/storage/exec/IndexExprContext.h @@ -0,0 +1,89 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ +#pragma once + +#include "common/expression/Expression.h" +#include "storage/exec/IndexNode.h" + +namespace nebula { +namespace storage { + +class IndexExprContext : public ExpressionContext { + public: + explicit IndexExprContext(const Map &colPos) : colPos_(colPos) {} + void setRow(const Row &row) { + row_ = &row; + } + + Value getEdgeProp(const std::string &edgeType, const std::string &prop) const override { + UNUSED(edgeType); + DCHECK(row_ != nullptr); + auto iter = colPos_.find(prop); + DCHECK(iter != colPos_.end()); + DCHECK(iter->second < row_->size()); + return (*row_)[iter->second]; + } + + Value getTagProp(const std::string &tag, const std::string &prop) const override { + UNUSED(tag); + DCHECK(row_ != nullptr); + auto iter = colPos_.find(prop); + DCHECK(iter != colPos_.end()); + DCHECK(iter->second < row_->size()); + return (*row_)[iter->second]; + } + + // override + const Value &getVar(const std::string &var) const override { + UNUSED(var); + return fatal(__FILE__, __LINE__); + } + const Value &getVersionedVar(const std::string &var, int64_t version) const override { + UNUSED(var), UNUSED(version); + return fatal(__FILE__, __LINE__); + } + const Value &getVarProp(const std::string &var, const std::string &prop) const override { + UNUSED(var), UNUSED(prop); + return fatal(__FILE__, __LINE__); + } + Value getSrcProp(const std::string &tag, const std::string &prop) const override { + UNUSED(tag), UNUSED(prop); + return fatal(__FILE__, __LINE__); + } + const Value &getDstProp(const std::string &tag, const std::string &prop) const override { + UNUSED(tag), UNUSED(prop); + return fatal(__FILE__, __LINE__); + } + const Value &getInputProp(const std::string &prop) const override { + UNUSED(prop); + return fatal(__FILE__, __LINE__); + } + Value getVertex(const std::string &) const override { + return fatal(__FILE__, __LINE__); + } + Value getEdge() const override { + return fatal(__FILE__, __LINE__); + } + Value getColumn(int32_t index) const override { + UNUSED(index); + return fatal(__FILE__, __LINE__); + } + void setVar(const std::string &var, Value val) override { + UNUSED(var), UNUSED(val); + fatal(__FILE__, __LINE__); + } + + private: + const Map &colPos_; + const Row *row_; + inline const Value &fatal(const std::string &file, int line) const { + LOG(FATAL) << "Unexpect at " << file << ":" << line; + static Value placeholder; + return placeholder; + } +}; + +} // namespace storage +} // namespace nebula diff --git a/src/storage/exec/IndexNode.h b/src/storage/exec/IndexNode.h index 27131967700..5344e64278f 100644 --- a/src/storage/exec/IndexNode.h +++ b/src/storage/exec/IndexNode.h @@ -69,6 +69,9 @@ struct InitContext { std::vector returnColumns; // The index of name in `returncolumns` Map retColMap; + // The columns in statColumns + // TODO(nivras) need refactor this, put statColumns in returnColumns + Set statColumns; }; class IndexNode { diff --git a/src/storage/exec/IndexProjectionNode.cpp b/src/storage/exec/IndexProjectionNode.cpp index 946edafb418..261e3a21bf0 100644 --- a/src/storage/exec/IndexProjectionNode.cpp +++ b/src/storage/exec/IndexProjectionNode.cpp @@ -15,6 +15,12 @@ nebula::cpp2::ErrorCode IndexProjectionNode::init(InitContext& ctx) { for (auto& col : requiredColumns_) { ctx.requiredColumns.insert(col); } + for (auto& col : ctx.statColumns) { + if (ctx.requiredColumns.find(col) == ctx.requiredColumns.end()) { + ctx.requiredColumns.insert(col); + requiredColumns_.push_back(col); + } + } auto ret = children_[0]->init(ctx); if (UNLIKELY(ret != ::nebula::cpp2::ErrorCode::SUCCEEDED)) { return ret; diff --git a/src/storage/exec/IndexSelectionNode.cpp b/src/storage/exec/IndexSelectionNode.cpp index b69e6bbfcae..968c2c2efdc 100644 --- a/src/storage/exec/IndexSelectionNode.cpp +++ b/src/storage/exec/IndexSelectionNode.cpp @@ -7,7 +7,7 @@ namespace nebula { namespace storage { IndexSelectionNode::IndexSelectionNode(const IndexSelectionNode& node) : IndexNode(node), expr_(node.expr_), colPos_(node.colPos_) { - ctx_ = std::make_unique(colPos_); + ctx_ = std::make_unique(colPos_); } IndexSelectionNode::IndexSelectionNode(RuntimeContext* context, Expression* expr) @@ -26,7 +26,7 @@ nebula::cpp2::ErrorCode IndexSelectionNode::init(InitContext& ctx) { for (auto& col : vis.getRequiredColumns()) { colPos_[col] = ctx.retColMap.at(col); } - ctx_ = std::make_unique(colPos_); + ctx_ = std::make_unique(colPos_); return ::nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -53,26 +53,6 @@ std::string IndexSelectionNode::identify() { return fmt::format("{}(expr=[{}])", name_, expr_->toString()); } -Value IndexSelectionNode::ExprContext::getEdgeProp(const std::string& edgeType, - const std::string& prop) const { - UNUSED(edgeType); - DCHECK(row_ != nullptr); - auto iter = colPos_.find(prop); - DCHECK(iter != colPos_.end()); - DCHECK(iter->second < row_->size()); - return (*row_)[iter->second]; -} - -Value IndexSelectionNode::ExprContext::getTagProp(const std::string& tag, - const std::string& prop) const { - UNUSED(tag); - DCHECK(row_ != nullptr); - auto iter = colPos_.find(prop); - DCHECK(iter != colPos_.end()); - DCHECK(iter->second < row_->size()); - return (*row_)[iter->second]; -} - } // namespace storage } // namespace nebula diff --git a/src/storage/exec/IndexSelectionNode.h b/src/storage/exec/IndexSelectionNode.h index 3cf9a30e2d9..095890ba751 100644 --- a/src/storage/exec/IndexSelectionNode.h +++ b/src/storage/exec/IndexSelectionNode.h @@ -8,6 +8,7 @@ #include "common/expression/Expression.h" #include "folly/container/F14Map.h" #include "storage/ExprVisitorBase.h" +#include "storage/exec/IndexExprContext.h" #include "storage/exec/IndexNode.h" namespace nebula { namespace storage { @@ -60,64 +61,7 @@ class IndexSelectionNode : public IndexNode { Expression *expr_; Map colPos_; // TODO(hs.zhang): `ExprContext` could be moved out later if we unify the volcano in go/lookup - class ExprContext : public ExpressionContext { - public: - explicit ExprContext(const Map &colPos) : colPos_(colPos) {} - void setRow(const Row &row) { - row_ = &row; - } - Value getEdgeProp(const std::string &edgeType, const std::string &prop) const override; - Value getTagProp(const std::string &tag, const std::string &prop) const override; - // override - const Value &getVar(const std::string &var) const override { - UNUSED(var); - return fatal(__FILE__, __LINE__); - } - const Value &getVersionedVar(const std::string &var, int64_t version) const override { - UNUSED(var), UNUSED(version); - return fatal(__FILE__, __LINE__); - } - const Value &getVarProp(const std::string &var, const std::string &prop) const override { - UNUSED(var), UNUSED(prop); - return fatal(__FILE__, __LINE__); - } - Value getSrcProp(const std::string &tag, const std::string &prop) const override { - UNUSED(tag), UNUSED(prop); - return fatal(__FILE__, __LINE__); - } - const Value &getDstProp(const std::string &tag, const std::string &prop) const override { - UNUSED(tag), UNUSED(prop); - return fatal(__FILE__, __LINE__); - } - const Value &getInputProp(const std::string &prop) const override { - UNUSED(prop); - return fatal(__FILE__, __LINE__); - } - Value getVertex(const std::string &) const override { - return fatal(__FILE__, __LINE__); - } - Value getEdge() const override { - return fatal(__FILE__, __LINE__); - } - Value getColumn(int32_t index) const override { - UNUSED(index); - return fatal(__FILE__, __LINE__); - } - void setVar(const std::string &var, Value val) override { - UNUSED(var), UNUSED(val); - fatal(__FILE__, __LINE__); - } - - private: - const Map &colPos_; - const Row *row_; - inline const Value &fatal(const std::string &file, int line) const { - LOG(FATAL) << "Unexpect at " << file << ":" << line; - static Value placeholder; - return placeholder; - } - }; - std::unique_ptr ctx_; + std::unique_ptr ctx_; }; class SelectionExprVisitor : public ExprVisitorBase { diff --git a/src/storage/index/LookupProcessor.cpp b/src/storage/index/LookupProcessor.cpp index 21a5441d57d..b42ceb53673 100644 --- a/src/storage/index/LookupProcessor.cpp +++ b/src/storage/index/LookupProcessor.cpp @@ -11,6 +11,7 @@ #include "interface/gen-cpp2/common_types.tcc" #include "interface/gen-cpp2/meta_types.tcc" #include "interface/gen-cpp2/storage_types.tcc" +#include "storage/exec/IndexAggregateNode.h" #include "storage/exec/IndexDedupNode.h" #include "storage/exec/IndexEdgeScanNode.h" #include "storage/exec/IndexLimitNode.h" @@ -44,7 +45,16 @@ void LookupProcessor::doProcess(const cpp2::LookupIndexRequest& req) { onFinished(); return; } - auto plan = buildPlan(req); + auto planRet = buildPlan(req); + if (UNLIKELY(!nebula::ok(planRet))) { + for (auto& p : req.get_parts()) { + pushResultCode(nebula::error(planRet), p); + } + onFinished(); + return; + } + + auto plan = std::move(nebula::value(planRet)); if (UNLIKELY(profileDetailFlag_)) { plan->enableProfileDetail(); @@ -83,6 +93,7 @@ ::nebula::cpp2::ErrorCode LookupProcessor::prepare(const cpp2::LookupIndexReques } schemaName = schemaNameValue.value(); context_->edgeType_ = edgeType; + context_->edgeName_ = schemaName; } else { auto tagId = req.get_indices().get_schema_id().get_tag_id(); auto schemaNameValue = env_->schemaMan_->toTagName(req.get_space_id(), tagId); @@ -91,6 +102,7 @@ ::nebula::cpp2::ErrorCode LookupProcessor::prepare(const cpp2::LookupIndexReques } schemaName = schemaNameValue.value(); context_->tagId_ = tagId; + context_->tagName_ = schemaName; } std::vector colNames; for (auto& col : *req.get_return_columns()) { @@ -100,7 +112,8 @@ ::nebula::cpp2::ErrorCode LookupProcessor::prepare(const cpp2::LookupIndexReques return ::nebula::cpp2::ErrorCode::SUCCEEDED; } -std::unique_ptr LookupProcessor::buildPlan(const cpp2::LookupIndexRequest& req) { +ErrorOr> LookupProcessor::buildPlan( + const cpp2::LookupIndexRequest& req) { std::vector> nodes; for (auto& ctx : req.get_indices().get_contexts()) { auto node = buildOneContext(ctx); @@ -138,6 +151,16 @@ std::unique_ptr LookupProcessor::buildPlan(const cpp2::LookupIndexReq nodes[0] = std::move(node); } } + if (req.stat_columns_ref().has_value()) { + auto statRet = handleStatProps(*req.get_stat_columns()); + if (!nebula::ok(statRet)) { + return nebula::error(statRet); + } + auto node = std::make_unique( + context_.get(), nebula::value(statRet), req.get_return_columns()->size()); + node->addChild(std::move(nodes[0])); + nodes[0] = std::move(node); + } return std::move(nodes[0]); } @@ -187,6 +210,10 @@ void LookupProcessor::runInSingleThread(const std::vector& parts, datasetList.emplace_back(std::move(dataset)); codeList.emplace_back(code); } + if (statTypes_.size() > 0) { + auto indexAgg = dynamic_cast(plan.get()); + statsDataSet_.emplace_back(std::move(indexAgg->calculateStats())); + } for (size_t i = 0; i < datasetList.size(); i++) { if (codeList[i] == ::nebula::cpp2::ErrorCode::SUCCEEDED) { while (!datasetList[i].empty()) { @@ -208,7 +235,7 @@ void LookupProcessor::runInSingleThread(const std::vector& parts, void LookupProcessor::runInMultipleThread(const std::vector& parts, std::unique_ptr plan) { std::vector> planCopy = reproducePlan(plan.get(), parts.size()); - using ReturnType = std::tuple>; + using ReturnType = std::tuple, Row>; std::vector> futures; for (size_t i = 0; i < parts.size(); i++) { futures.emplace_back(folly::via( @@ -231,15 +258,21 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, if (UNLIKELY(profileDetailFlag_)) { profilePlan(plan.get()); } - return {part, code, dataset}; + Row statResult; + if (code == nebula::cpp2::ErrorCode::SUCCEEDED && statTypes_.size() > 0) { + auto indexAgg = dynamic_cast(plan.get()); + statResult = indexAgg->calculateStats(); + } + return {part, code, dataset, statResult}; })); } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) { CHECK(!t.hasException()); const auto& tries = t.value(); + std::vector statResults; for (size_t j = 0; j < tries.size(); j++) { CHECK(!tries[j].hasException()); - auto& [partId, code, dataset] = tries[j].value(); + auto& [partId, code, dataset, statResult] = tries[j].value(); if (code == ::nebula::cpp2::ErrorCode::SUCCEEDED) { for (auto& row : dataset) { resultDataSet_.emplace_back(std::move(row)); @@ -247,12 +280,112 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, } else { handleErrorCode(code, context_->spaceId(), partId); } + statResults.emplace_back(std::move(statResult)); } DLOG(INFO) << "finish"; + // IndexAggregateNode has been copyed and each part get it's own aggregate info, + // we need to merge it + this->mergeStatsResult(statResults); this->onProcessFinished(); this->onFinished(); }); } + +ErrorOr>> +LookupProcessor::handleStatProps(const std::vector& statProps) { + auto pool = &this->planContext_->objPool_; + std::vector> statInfos; + std::vector colNames; + + for (size_t statIdx = 0; statIdx < statProps.size(); statIdx++) { + const auto& statProp = statProps[statIdx]; + statTypes_.emplace_back(statProp.get_stat()); + auto exp = Expression::decode(pool, *statProp.prop_ref()); + if (exp == nullptr) { + return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE; + } + + // only support vertex property and edge property/rank expression for now + switch (exp->kind()) { + case Expression::Kind::kEdgeRank: + case Expression::Kind::kEdgeProperty: { + auto* edgeExp = static_cast(exp); + const auto& edgeName = edgeExp->sym(); + const auto& propName = edgeExp->prop(); + if (edgeName != context_->edgeName_) { + return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND; + } + if (exp->kind() == Expression::Kind::kEdgeProperty && propName != kSrc && + propName != kDst) { + auto edgeSchema = + env_->schemaMan_->getEdgeSchema(context_->spaceId(), context_->edgeType_); + auto field = edgeSchema->field(propName); + if (field == nullptr) { + VLOG(1) << "Can't find related prop " << propName << " on edge " << edgeName; + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + auto ret = checkStatType(*field, statProp.get_stat()); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return ret; + } + } + statInfos.emplace_back(propName, statProp.get_stat()); + break; + } + case Expression::Kind::kTagProperty: { + auto* tagExp = static_cast(exp); + const auto& tagName = tagExp->sym(); + const auto& propName = tagExp->prop(); + + if (tagName != context_->tagName_) { + return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND; + } + if (propName != kVid && propName != kTag) { + auto tagSchema = env_->schemaMan_->getTagSchema(context_->spaceId(), context_->tagId_); + auto field = tagSchema->field(propName); + if (field == nullptr) { + VLOG(1) << "Can't find related prop " << propName << "on tag " << tagName; + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + } + statInfos.emplace_back(propName, statProp.get_stat()); + break; + } + default: { + return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE; + } + } + colNames.push_back(statProp.get_alias()); + } + statsDataSet_ = nebula::DataSet(colNames); + return statInfos; +} + +void LookupProcessor::mergeStatsResult(const std::vector& statsResult) { + if (statsResult.size() == 0 || statTypes_.size() == 0) { + return; + } + + Row result; + for (size_t statIdx = 0; statIdx < statTypes_.size(); statIdx++) { + Value value = statsResult[0].values[statIdx]; + for (size_t resIdx = 1; resIdx < statsResult.size(); resIdx++) { + const auto& currType = statTypes_[statIdx]; + if (currType == cpp2::StatType::SUM || currType == cpp2::StatType::COUNT) { + value = value + statsResult[resIdx].values[statIdx]; + } else if (currType == cpp2::StatType::MAX) { + value = value > statsResult[resIdx].values[statIdx] ? value + : statsResult[resIdx].values[statIdx]; + } else if (currType == cpp2::StatType::MIN) { + value = value < statsResult[resIdx].values[statIdx] ? value + : statsResult[resIdx].values[statIdx]; + } + } + result.values.emplace_back(std::move(value)); + } + statsDataSet_.emplace_back(std::move(result)); +} + std::vector> LookupProcessor::reproducePlan(IndexNode* root, size_t count) { std::vector> ret(count); diff --git a/src/storage/index/LookupProcessor.h b/src/storage/index/LookupProcessor.h index 9954484bab1..00d6f8f55fa 100644 --- a/src/storage/index/LookupProcessor.h +++ b/src/storage/index/LookupProcessor.h @@ -27,19 +27,26 @@ class LookupProcessor : public BaseProcessor { void doProcess(const cpp2::LookupIndexRequest& req); void onProcessFinished() { BaseProcessor::resp_.data_ref() = std::move(resultDataSet_); + BaseProcessor::resp_.stat_data_ref() = std::move(statsDataSet_); } void profilePlan(IndexNode* plan); void runInSingleThread(const std::vector& parts, std::unique_ptr plan); void runInMultipleThread(const std::vector& parts, std::unique_ptr plan); ::nebula::cpp2::ErrorCode prepare(const cpp2::LookupIndexRequest& req); - std::unique_ptr buildPlan(const cpp2::LookupIndexRequest& req); + ErrorOr> buildPlan( + const cpp2::LookupIndexRequest& req); std::unique_ptr buildOneContext(const cpp2::IndexQueryContext& ctx); std::vector> reproducePlan(IndexNode* root, size_t count); + ErrorOr>> + handleStatProps(const std::vector& statProps); + void mergeStatsResult(const std::vector& statsResult); folly::Executor* executor_{nullptr}; std::unique_ptr planContext_; std::unique_ptr context_; nebula::DataSet resultDataSet_; + nebula::DataSet statsDataSet_; std::vector partResults_; + std::vector statTypes_; }; } // namespace storage } // namespace nebula diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 017e60d6d78..1063a37d307 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -430,7 +430,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::handleEdgeStatProps( VLOG(1) << "Can't find related prop " << propName << " on edge " << edgeName; return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; } - auto ret = checkStatType(field, statProp.get_stat()); + auto ret = checkStatType(*field, statProp.get_stat()); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return ret; } @@ -459,32 +459,6 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::handleEdgeStatProps( return nebula::cpp2::ErrorCode::SUCCEEDED; } -nebula::cpp2::ErrorCode GetNeighborsProcessor::checkStatType( - const meta::SchemaProviderIf::Field* field, cpp2::StatType statType) { - // todo(doodle): how to deal with nullable fields? For now, null add anything - // is null, if there is even one null, the result will be invalid - auto fType = field->type(); - switch (statType) { - case cpp2::StatType::SUM: - case cpp2::StatType::AVG: - case cpp2::StatType::MIN: - case cpp2::StatType::MAX: { - if (fType == nebula::cpp2::PropertyType::INT64 || - fType == nebula::cpp2::PropertyType::INT32 || - fType == nebula::cpp2::PropertyType::INT16 || fType == nebula::cpp2::PropertyType::INT8 || - fType == nebula::cpp2::PropertyType::FLOAT || - fType == nebula::cpp2::PropertyType::DOUBLE) { - return nebula::cpp2::ErrorCode::SUCCEEDED; - } - return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE; - } - case cpp2::StatType::COUNT: { - break; - } - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - void GetNeighborsProcessor::onProcessFinished() { resp_.vertices_ref() = std::move(resultDataSet_); } diff --git a/src/storage/query/GetNeighborsProcessor.h b/src/storage/query/GetNeighborsProcessor.h index d7f36fcf0e1..424952238aa 100644 --- a/src/storage/query/GetNeighborsProcessor.h +++ b/src/storage/query/GetNeighborsProcessor.h @@ -60,9 +60,6 @@ class GetNeighborsProcessor // add PropContext of stat nebula::cpp2::ErrorCode handleEdgeStatProps(const std::vector& statProps); - nebula::cpp2::ErrorCode checkStatType(const meta::SchemaProviderIf::Field* field, - cpp2::StatType statType); - void runInSingleThread(const cpp2::GetNeighborsRequest& req, int64_t limit, bool random); void runInMultipleThread(const cpp2::GetNeighborsRequest& req, int64_t limit, bool random); diff --git a/src/storage/test/LookupIndexTest.cpp b/src/storage/test/LookupIndexTest.cpp index 6ad99007806..bf9d5b341a9 100644 --- a/src/storage/test/LookupIndexTest.cpp +++ b/src/storage/test/LookupIndexTest.cpp @@ -2925,7 +2925,634 @@ TEST_P(LookupIndexTest, DedupEdgeIndexTest) { } } -INSTANTIATE_TEST_SUITE_P(Lookup_concurrently, LookupIndexTest, ::testing::Values(false, true)); +// test aggregate in tag, like sum(age) as "total age" +TEST_P(LookupIndexTest, AggregateTagIndexTest) { + fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.space_id_ref() = spaceId; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 1; + indices.schema_id_ref() = schemaId; + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + + req.parts_ref() = std::move(parts); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + returnCols.emplace_back("age"); + req.return_columns_ref() = std::move(returnCols); + + // player.name_ == "Rudy Gay" + cpp2::IndexColumnHint columnHint1; + std::string name1 = "Rudy Gay"; + columnHint1.begin_value_ref() = Value(name1); + columnHint1.column_name_ref() = "name"; + columnHint1.scan_type_ref() = cpp2::ScanType::PREFIX; + + // player.name_ == "Kobe Bryant" + cpp2::IndexColumnHint columnHint2; + std::string name2 = "Kobe Bryant"; + columnHint2.begin_value_ref() = Value(name2); + columnHint2.column_name_ref() = "name"; + columnHint2.scan_type_ref() = cpp2::ScanType::PREFIX; + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.column_hints_ref() = std::move(columnHints1); + context1.filter_ref() = ""; + context1.index_id_ref() = 1; + std::vector statProps; + cpp2::IndexQueryContext context2; + context2.column_hints_ref() = std::move(columnHints2); + context2.filter_ref() = ""; + context2.index_id_ref() = 1; + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.contexts_ref() = std::move(contexts); + req.indices_ref() = std::move(indices); + + cpp2::StatProp statProp; + statProp.alias_ref() = "total age"; + const auto& exp = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp.prop_ref() = Expression::encode(exp); + statProp.stat_ref() = cpp2::StatType::SUM; + statProps.emplace_back(std::move(statProp)); + req.stat_columns_ref() = std::move(statProps); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + std::vector expectCols = { + std::string("1.").append(kVid), std::string("1.").append(kTag), "1.age"}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2; + vId1.append(name1.data(), name1.size()); + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(1L)); + row1.emplace_back(Value(34L)); + expectRows.emplace_back(Row(row1)); + + vId2.append(name2.data(), name2.size()); + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(1L)); + row2.emplace_back(Value(41L)); + expectRows.emplace_back(Row(row2)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + nebula::Row expectStatRow; + expectStatColumns.emplace_back("total age"); + expectStatRow.values.push_back(Value(75L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +// test aggregate in tag, like sum(age) as "total age", and age not in returnColumns +TEST_P(LookupIndexTest, AggregateTagPropNotInReturnColumnsTest) { + fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.space_id_ref() = spaceId; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 1; + indices.schema_id_ref() = schemaId; + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + + req.parts_ref() = std::move(parts); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + req.return_columns_ref() = std::move(returnCols); + + // player.name_ == "Rudy Gay" + cpp2::IndexColumnHint columnHint1; + std::string name1 = "Rudy Gay"; + columnHint1.begin_value_ref() = Value(name1); + columnHint1.column_name_ref() = "name"; + columnHint1.scan_type_ref() = cpp2::ScanType::PREFIX; + + // player.name_ == "Kobe Bryant" + cpp2::IndexColumnHint columnHint2; + std::string name2 = "Kobe Bryant"; + columnHint2.begin_value_ref() = Value(name2); + columnHint2.column_name_ref() = "name"; + columnHint2.scan_type_ref() = cpp2::ScanType::PREFIX; + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.column_hints_ref() = std::move(columnHints1); + context1.filter_ref() = ""; + context1.index_id_ref() = 1; + std::vector statProps; + cpp2::IndexQueryContext context2; + context2.column_hints_ref() = std::move(columnHints2); + context2.filter_ref() = ""; + context2.index_id_ref() = 1; + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.contexts_ref() = std::move(contexts); + req.indices_ref() = std::move(indices); + + cpp2::StatProp statProp; + statProp.alias_ref() = "total age"; + const auto& exp = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp.prop_ref() = Expression::encode(exp); + statProp.stat_ref() = cpp2::StatType::SUM; + statProps.emplace_back(std::move(statProp)); + req.stat_columns_ref() = std::move(statProps); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + std::vector expectCols = {std::string("1.").append(kVid), + std::string("1.").append(kTag)}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2; + vId1.append(name1.data(), name1.size()); + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(1L)); + expectRows.emplace_back(Row(row1)); + + vId2.append(name2.data(), name2.size()); + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(1L)); + expectRows.emplace_back(Row(row2)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + nebula::Row expectStatRow; + expectStatColumns.emplace_back("total age"); + expectStatRow.values.push_back(Value(75L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +// test multi aggregate in tag, like sum(age) as "total age", max(age) as "max age", +// max(kVid) as "max kVid", min(age) as "min age" +TEST_P(LookupIndexTest, AggregateTagPropsTest) { + fs::TempDir rootPath("/tmp/SimpleVertexIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.space_id_ref() = spaceId; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 1; + indices.schema_id_ref() = schemaId; + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + + req.parts_ref() = std::move(parts); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + returnCols.emplace_back("age"); + req.return_columns_ref() = std::move(returnCols); + + // player.name_ == "Rudy Gay" + cpp2::IndexColumnHint columnHint1; + std::string name1 = "Rudy Gay"; + columnHint1.begin_value_ref() = Value(name1); + columnHint1.column_name_ref() = "name"; + columnHint1.scan_type_ref() = cpp2::ScanType::PREFIX; + + // player.name_ == "Kobe Bryant" + cpp2::IndexColumnHint columnHint2; + std::string name2 = "Kobe Bryant"; + columnHint2.begin_value_ref() = Value(name2); + columnHint2.column_name_ref() = "name"; + columnHint2.scan_type_ref() = cpp2::ScanType::PREFIX; + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.column_hints_ref() = std::move(columnHints1); + context1.filter_ref() = ""; + context1.index_id_ref() = 1; + std::vector statProps; + cpp2::IndexQueryContext context2; + context2.column_hints_ref() = std::move(columnHints2); + context2.filter_ref() = ""; + context2.index_id_ref() = 1; + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.contexts_ref() = std::move(contexts); + req.indices_ref() = std::move(indices); + + cpp2::StatProp statProp1; + statProp1.alias_ref() = "total age"; + const auto& exp1 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp1.prop_ref() = Expression::encode(exp1); + statProp1.stat_ref() = cpp2::StatType::SUM; + statProps.emplace_back(statProp1); + + cpp2::StatProp statProp2; + statProp2.alias_ref() = "max age"; + const auto& exp2 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp2.prop_ref() = Expression::encode(exp2); + statProp2.stat_ref() = cpp2::StatType::MAX; + statProps.emplace_back(statProp2); + + cpp2::StatProp statProp3; + statProp3.alias_ref() = "max kVid"; + const auto& exp3 = *TagPropertyExpression::make(pool, folly::to(1), kVid); + statProp3.prop_ref() = Expression::encode(exp3); + statProp3.stat_ref() = cpp2::StatType::MAX; + statProps.emplace_back(statProp3); + + cpp2::StatProp statProp4; + statProp4.alias_ref() = "min age"; + const auto& exp4 = *TagPropertyExpression::make(pool, folly::to(1), "age"); + statProp4.prop_ref() = Expression::encode(exp4); + statProp4.stat_ref() = cpp2::StatType::MIN; + statProps.emplace_back(statProp4); + + req.stat_columns_ref() = std::move(statProps); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + + std::vector expectCols = { + std::string("1.").append(kVid), std::string("1.").append(kTag), "1.age"}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2; + vId1.append(name1.data(), name1.size()); + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(1L)); + row1.emplace_back(Value(34L)); + expectRows.emplace_back(Row(row1)); + + vId2.append(name2.data(), name2.size()); + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(1L)); + row2.emplace_back(Value(41L)); + expectRows.emplace_back(Row(row2)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total age"); + expectStatColumns.emplace_back("max age"); + expectStatColumns.emplace_back("max kVid"); + expectStatColumns.emplace_back("min age"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(75L)); + expectStatRow.values.push_back(Value(41L)); + expectStatRow.values.push_back(Value(vId1)); + expectStatRow.values.push_back(Value(34L)); + + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +TEST_P(LookupIndexTest, AggregateEdgeIndexTest) { + fs::TempDir rootPath("/tmp/AggregateEdgeIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_TRUE(QueryTestUtils::mockEdgeData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.space_id_ref() = spaceId; + nebula::cpp2::SchemaID schemaId; + schemaId.edge_type_ref() = 102; + indices.schema_id_ref() = schemaId; + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.parts_ref() = std::move(parts); + + std::string tony = "Tony Parker"; + std::string manu = "Manu Ginobili"; + std::string yao = "Yao Ming"; + std::string tracy = "Tracy McGrady"; + std::vector returnCols; + returnCols.emplace_back(kSrc); + returnCols.emplace_back(kType); + returnCols.emplace_back(kRank); + returnCols.emplace_back(kDst); + returnCols.emplace_back("teamName"); + returnCols.emplace_back("startYear"); + req.return_columns_ref() = std::move(returnCols); + + // teammates.player1 == "Tony Parker" + cpp2::IndexColumnHint columnHint1; + columnHint1.begin_value_ref() = Value(tony); + columnHint1.column_name_ref() = "player1"; + columnHint1.scan_type_ref() = cpp2::ScanType::PREFIX; + // teammates.player1 == "Yao Ming" + cpp2::IndexColumnHint columnHint2; + columnHint2.begin_value_ref() = Value(yao); + columnHint2.column_name_ref() = "player1"; + columnHint2.scan_type_ref() = cpp2::ScanType::PREFIX; + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.column_hints_ref() = std::move(columnHints1); + context1.filter_ref() = ""; + context1.index_id_ref() = 102; + cpp2::IndexQueryContext context2; + context2.column_hints_ref() = std::move(columnHints2); + context2.filter_ref() = ""; + context2.index_id_ref() = 102; + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.contexts_ref() = std::move(contexts); + req.indices_ref() = std::move(indices); + + std::vector statProps; + cpp2::StatProp statProp1; + statProp1.alias_ref() = "total startYear"; + const auto& exp1 = *EdgePropertyExpression::make(pool, folly::to(102), "startYear"); + statProp1.prop_ref() = Expression::encode(exp1); + statProp1.stat_ref() = cpp2::StatType::SUM; + statProps.emplace_back(std::move(statProp1)); + + cpp2::StatProp statProp2; + statProp2.alias_ref() = "max kDst"; + const auto& exp2 = *EdgePropertyExpression::make(pool, folly::to(102), kDst); + statProp2.prop_ref() = Expression::encode(exp2); + statProp2.stat_ref() = cpp2::StatType::MAX; + statProps.emplace_back(std::move(statProp2)); + req.stat_columns_ref() = std::move(statProps); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + std::vector expectCols = {std::string("102.").append(kSrc), + std::string("102.").append(kType), + std::string("102.").append(kRank), + std::string("102.").append(kDst), + "102.teamName", + "102.startYear"}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2, vId3, vId4; + vId1.append(tony.data(), tony.size()); + vId2.append(manu.data(), manu.size()); + vId3.append(yao.data(), yao.size()); + vId4.append(tracy.data(), tracy.size()); + + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(102L)); + row1.emplace_back(Value(2002L)); + row1.emplace_back(Value(vId2)); + row1.emplace_back(Value("Spurs")); + row1.emplace_back(Value(2002)); + expectRows.emplace_back(Row(row1)); + + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(102L)); + row2.emplace_back(Value(2002L)); + row2.emplace_back(Value(vId1)); + row2.emplace_back(Value("Spurs")); + row2.emplace_back(Value(2002)); + expectRows.emplace_back(Row(row2)); + + Row row3; + row3.emplace_back(Value(vId3)); + row3.emplace_back(Value(102L)); + row3.emplace_back(Value(2004L)); + row3.emplace_back(Value(vId4)); + row3.emplace_back(Value("Rockets")); + row3.emplace_back(Value(2004L)); + expectRows.emplace_back(Row(row3)); + + Row row4; + row4.emplace_back(Value(vId4)); + row4.emplace_back(Value(102L)); + row4.emplace_back(Value(2004L)); + row4.emplace_back(Value(vId3)); + row4.emplace_back(Value("Rockets")); + row4.emplace_back(Value(2004L)); + expectRows.emplace_back(Row(row4)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total startYear"); + expectStatColumns.emplace_back("max kDst"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(8012L)); + expectStatRow.values.push_back(Value(vId3)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +TEST_P(LookupIndexTest, AggregateEdgePropNotInReturnColumnsTest) { + fs::TempDir rootPath("/tmp/AggregateEdgeIndexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster.getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_TRUE(QueryTestUtils::mockEdgeData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.space_id_ref() = spaceId; + nebula::cpp2::SchemaID schemaId; + schemaId.edge_type_ref() = 102; + indices.schema_id_ref() = schemaId; + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.parts_ref() = std::move(parts); + + std::string tony = "Tony Parker"; + std::string manu = "Manu Ginobili"; + std::string yao = "Yao Ming"; + std::string tracy = "Tracy McGrady"; + std::vector returnCols; + returnCols.emplace_back(kSrc); + returnCols.emplace_back(kType); + returnCols.emplace_back(kRank); + returnCols.emplace_back(kDst); + req.return_columns_ref() = std::move(returnCols); + + // teammates.player1 == "Tony Parker" + cpp2::IndexColumnHint columnHint1; + columnHint1.begin_value_ref() = Value(tony); + columnHint1.column_name_ref() = "player1"; + columnHint1.scan_type_ref() = cpp2::ScanType::PREFIX; + // teammates.player1 == "Yao Ming" + cpp2::IndexColumnHint columnHint2; + columnHint2.begin_value_ref() = Value(yao); + columnHint2.column_name_ref() = "player1"; + columnHint2.scan_type_ref() = cpp2::ScanType::PREFIX; + std::vector columnHints1; + columnHints1.emplace_back(std::move(columnHint1)); + std::vector columnHints2; + columnHints2.emplace_back(std::move(columnHint2)); + + cpp2::IndexQueryContext context1; + context1.column_hints_ref() = std::move(columnHints1); + context1.filter_ref() = ""; + context1.index_id_ref() = 102; + cpp2::IndexQueryContext context2; + context2.column_hints_ref() = std::move(columnHints2); + context2.filter_ref() = ""; + context2.index_id_ref() = 102; + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + contexts.emplace_back(std::move(context2)); + indices.contexts_ref() = std::move(contexts); + req.indices_ref() = std::move(indices); + + std::vector statProps; + cpp2::StatProp statProp1; + statProp1.alias_ref() = "total startYear"; + const auto& exp1 = *EdgePropertyExpression::make(pool, folly::to(102), "startYear"); + statProp1.prop_ref() = Expression::encode(exp1); + statProp1.stat_ref() = cpp2::StatType::SUM; + statProps.emplace_back(std::move(statProp1)); + + cpp2::StatProp statProp2; + statProp2.alias_ref() = "count teamName"; + const auto& exp2 = *EdgePropertyExpression::make(pool, folly::to(102), "teamName"); + statProp2.prop_ref() = Expression::encode(exp2); + statProp2.stat_ref() = cpp2::StatType::COUNT; + statProps.emplace_back(std::move(statProp2)); + req.stat_columns_ref() = std::move(statProps); + + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + std::vector expectCols = {std::string("102.").append(kSrc), + std::string("102.").append(kType), + std::string("102.").append(kRank), + std::string("102.").append(kDst)}; + decltype(resp.get_data()->rows) expectRows; + + std::string vId1, vId2, vId3, vId4; + vId1.append(tony.data(), tony.size()); + vId2.append(manu.data(), manu.size()); + vId3.append(yao.data(), yao.size()); + vId4.append(tracy.data(), tracy.size()); + + Row row1; + row1.emplace_back(Value(vId1)); + row1.emplace_back(Value(102L)); + row1.emplace_back(Value(2002L)); + row1.emplace_back(Value(vId2)); + expectRows.emplace_back(Row(row1)); + + Row row2; + row2.emplace_back(Value(vId2)); + row2.emplace_back(Value(102L)); + row2.emplace_back(Value(2002L)); + row2.emplace_back(Value(vId1)); + expectRows.emplace_back(Row(row2)); + + Row row3; + row3.emplace_back(Value(vId3)); + row3.emplace_back(Value(102L)); + row3.emplace_back(Value(2004L)); + row3.emplace_back(Value(vId4)); + expectRows.emplace_back(Row(row3)); + + Row row4; + row4.emplace_back(Value(vId4)); + row4.emplace_back(Value(102L)); + row4.emplace_back(Value(2004L)); + row4.emplace_back(Value(vId3)); + expectRows.emplace_back(Row(row4)); + QueryTestUtils::checkResponse(resp, expectCols, expectRows); + + std::vector expectStatColumns; + expectStatColumns.emplace_back("total startYear"); + expectStatColumns.emplace_back("count teamName"); + + nebula::Row expectStatRow; + expectStatRow.values.push_back(Value(8012L)); + expectStatRow.values.push_back(Value(4L)); + QueryTestUtils::checkStatResponse(resp, expectStatColumns, expectStatRow); +} + +INSTANTIATE_TEST_CASE_P(Lookup_concurrently, LookupIndexTest, ::testing::Values(false, true)); } // namespace storage } // namespace nebula diff --git a/src/storage/test/QueryTestUtils.h b/src/storage/test/QueryTestUtils.h index a655e8b4629..cad9eda85e1 100644 --- a/src/storage/test/QueryTestUtils.h +++ b/src/storage/test/QueryTestUtils.h @@ -413,6 +413,16 @@ class QueryTestUtils { EXPECT_EQ(expectRows, actualRows); } + static void checkStatResponse(const cpp2::LookupIndexResp& resp, + const std::vector& expectCols, + const Row& expectRow) { + auto columns = (*resp.stat_data_ref()).colNames; + EXPECT_EQ(expectCols, columns); + auto actualRows = (*resp.stat_data_ref()).rows; + EXPECT_EQ(1, actualRows.size()); + EXPECT_EQ(actualRows[0], expectRow); + } + static void checkColNames( const nebula::DataSet& dataSet, const std::vector>>& tags,