From 6a8a5f751ae1389433efba33f5380e160d648f54 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Thu, 25 Nov 2021 09:59:46 +0800 Subject: [PATCH] Support filter in scan. (#3329) * Support filter in scan. * Fix compile error. * Clear expression context when after filter. Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> --- .../context/StorageExpressionContext.h | 2 +- src/storage/exec/ScanNode.h | 78 +++++++++++++++---- src/storage/query/GetNeighborsProcessor.cpp | 8 +- src/storage/query/QueryBaseProcessor-inl.h | 41 ++++++++-- src/storage/query/QueryBaseProcessor.h | 3 +- src/storage/query/ScanEdgeProcessor.cpp | 26 +++++-- src/storage/query/ScanEdgeProcessor.h | 7 +- src/storage/query/ScanVertexProcessor.cpp | 43 ++++++---- src/storage/query/ScanVertexProcessor.h | 7 +- src/storage/test/ScanEdgeTest.cpp | 43 ++++++++++ src/storage/test/ScanVertexTest.cpp | 65 ++++++++++++++++ src/storage/test/UpdateEdgeTest.cpp | 3 +- 12 files changed, 274 insertions(+), 52 deletions(-) diff --git a/src/storage/context/StorageExpressionContext.h b/src/storage/context/StorageExpressionContext.h index 65b989594a3..38ef72eba39 100644 --- a/src/storage/context/StorageExpressionContext.h +++ b/src/storage/context/StorageExpressionContext.h @@ -147,7 +147,7 @@ class StorageExpressionContext final : public ExpressionContext { size_t vIdLen_; bool isIntId_; - RowReader* reader_; + RowReader* reader_{nullptr}; std::string key_; // tag or edge name std::string name_; diff --git a/src/storage/exec/ScanNode.h b/src/storage/exec/ScanNode.h index 5af7912f758..22425dcb90b 100644 --- a/src/storage/exec/ScanNode.h +++ b/src/storage/exec/ScanNode.h @@ -13,6 +13,8 @@ namespace storage { using Cursor = std::string; +inline bool vTrue(const Value& v) { return v.isBool() && v.getBool(); } + // Node to scan vertices of one partition class ScanVertexPropNode : public QueryNode { public: @@ -23,13 +25,17 @@ class ScanVertexPropNode : public QueryNode { bool enableReadFollower, int64_t limit, std::unordered_map* cursors, - nebula::DataSet* resultDataSet) + nebula::DataSet* resultDataSet, + StorageExpressionContext* expCtx = nullptr, + Expression* filter = nullptr) : context_(context), tagNodes_(std::move(tagNodes)), enableReadFollower_(enableReadFollower), limit_(limit), cursors_(cursors), - resultDataSet_(resultDataSet) { + resultDataSet_(resultDataSet), + expCtx_(expCtx), + filter_(filter) { name_ = "ScanVertexPropNode"; for (std::size_t i = 0; i < tagNodes_.size(); ++i) { tagNodesIndex_.emplace(tagNodes_[i]->tagId(), i); @@ -110,20 +116,36 @@ class ScanVertexPropNode : public QueryNode { })) { for (auto& tagNode : tagNodes_) { ret = tagNode->collectTagPropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, tagNode = tagNode.get(), this]( + const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, tagNode = tagNode.get(), this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + for (const auto& prop : *props) { + if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) { + auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop); + if (!value.ok()) { + return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND; + } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setTagProp(tagNode->getTagName(), prop.name_, value.value()); + } + if (prop.returned_) { + VLOG(2) << "Collect prop " << prop.name_; + row.emplace_back(std::move(value).value()); + } + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -131,9 +153,11 @@ class ScanVertexPropNode : public QueryNode { break; } } - if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && + (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } + expCtx_->clear(); for (auto& tagNode : tagNodes_) { tagNode->clear(); } @@ -149,6 +173,8 @@ class ScanVertexPropNode : public QueryNode { // cursors for next scan std::unordered_map* cursors_; nebula::DataSet* resultDataSet_; + StorageExpressionContext* expCtx_{nullptr}; + Expression* filter_{nullptr}; }; // Node to scan edge of one partition @@ -161,13 +187,17 @@ class ScanEdgePropNode : public QueryNode { bool enableReadFollower, int64_t limit, std::unordered_map* cursors, - nebula::DataSet* resultDataSet) + nebula::DataSet* resultDataSet, + StorageExpressionContext* expCtx = nullptr, + Expression* filter = nullptr) : context_(context), edgeNodes_(std::move(edgeNodes)), enableReadFollower_(enableReadFollower), limit_(limit), cursors_(cursors), - resultDataSet_(resultDataSet) { + resultDataSet_(resultDataSet), + expCtx_(expCtx), + filter_(filter) { QueryNode::name_ = "ScanEdgePropNode"; for (std::size_t i = 0; i < edgeNodes_.size(); ++i) { edgeNodesIndex_.emplace(edgeNodes_[i]->edgeType(), i); @@ -230,20 +260,36 @@ class ScanEdgePropNode : public QueryNode { nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED; for (auto& edgeNode : edgeNodes_) { ret = edgeNode->collectEdgePropsIfValid( - [&row](const std::vector* props) -> nebula::cpp2::ErrorCode { + [&row, edgeNode = edgeNode.get(), this]( + const std::vector* props) -> nebula::cpp2::ErrorCode { for (const auto& prop : *props) { if (prop.returned_) { row.emplace_back(Value()); } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value()); + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }, - [&row, vIdLen, isIntId]( + [&row, vIdLen, isIntId, edgeNode = edgeNode.get(), this]( folly::StringPiece key, RowReader* reader, const std::vector* props) -> nebula::cpp2::ErrorCode { - if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) { - return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + for (const auto& prop : *props) { + if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) { + auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop); + if (!value.ok()) { + return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND; + } + if (prop.filtered_ && expCtx_ != nullptr) { + expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, value.value()); + } + if (prop.returned_) { + VLOG(2) << "Collect prop " << prop.name_; + row.emplace_back(std::move(value).value()); + } + } } return nebula::cpp2::ErrorCode::SUCCEEDED; }); @@ -251,9 +297,11 @@ class ScanEdgePropNode : public QueryNode { break; } } - if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED && + (filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) { resultDataSet_->rows.emplace_back(std::move(row)); } + expCtx_->clear(); for (auto& edgeNode : edgeNodes_) { edgeNode->clear(); } @@ -268,6 +316,8 @@ class ScanEdgePropNode : public QueryNode { // cursors for next scan std::unordered_map* cursors_; nebula::DataSet* resultDataSet_; + StorageExpressionContext* expCtx_{nullptr}; + Expression* filter_{nullptr}; }; } // namespace storage diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 149a7c9018b..5d08b2ab7e3 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -284,7 +284,13 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::checkAndBuildContexts( if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } - code = buildFilter(req); + code = buildFilter(req, [](const cpp2::GetNeighborsRequest& r) -> const std::string* { + if (r.get_traverse_spec().filter_ref().has_value()) { + return r.get_traverse_spec().get_filter(); + } else { + return nullptr; + } + }); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } diff --git a/src/storage/query/QueryBaseProcessor-inl.h b/src/storage/query/QueryBaseProcessor-inl.h index f472f265576..47b8f48c54f 100644 --- a/src/storage/query/QueryBaseProcessor-inl.h +++ b/src/storage/query/QueryBaseProcessor-inl.h @@ -129,18 +129,18 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::buildYields(const REQ& re } template -nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter(const REQ& req) { - const auto& traverseSpec = req.get_traverse_spec(); - if (!traverseSpec.filter_ref().has_value()) { +nebula::cpp2::ErrorCode QueryBaseProcessor::buildFilter( + const REQ& req, std::function&& getFilter) { + const auto* filterStr = getFilter(req); + if (filterStr == nullptr) { return nebula::cpp2::ErrorCode::SUCCEEDED; } - const auto& filterStr = *traverseSpec.filter_ref(); auto pool = &this->planContext_->objPool_; // auto v = env_; - if (!filterStr.empty()) { + if (!filterStr->empty()) { // the filter expression **must** return a bool - filter_ = Expression::decode(pool, filterStr); + filter_ = Expression::decode(pool, *filterStr); if (filter_ == nullptr) { return nebula::cpp2::ErrorCode::E_INVALID_FILTER; } @@ -438,8 +438,9 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::checkExp(const Expression } return nebula::cpp2::ErrorCode::SUCCEEDED; } + case Expression::Kind::kTagProperty: case Expression::Kind::kSrcProperty: { - auto* sourceExp = static_cast(exp); + auto* sourceExp = static_cast(exp); const auto& tagName = sourceExp->sym(); const auto& propName = sourceExp->prop(); auto tagRet = this->env_->schemaMan_->toTagID(spaceId_, tagName); @@ -458,6 +459,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::checkExp(const Expression const auto& tagSchema = iter->second.back(); if (propName == kVid || propName == kTag) { + if (returned || filtered) { + addPropContextIfNotExists(tagContext_.propContexts_, + tagContext_.indexMap_, + tagContext_.tagNames_, + tagId, + tagName, + propName, + nullptr, + returned, + filtered); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -505,6 +517,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::checkExp(const Expression const auto& edgeSchema = iter->second.back(); if (propName == kSrc || propName == kType || propName == kRank || propName == kDst) { + if (returned || filtered) { + addPropContextIfNotExists(edgeContext_.propContexts_, + edgeContext_.indexMap_, + edgeContext_.edgeNames_, + edgeType, + edgeName, + propName, + nullptr, + returned, + filtered); + } return nebula::cpp2::ErrorCode::SUCCEEDED; } @@ -561,7 +584,6 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::checkExp(const Expression case Expression::Kind::kSubscript: case Expression::Kind::kAttribute: case Expression::Kind::kLabelAttribute: - case Expression::Kind::kTagProperty: case Expression::Kind::kVertex: case Expression::Kind::kEdge: case Expression::Kind::kLabel: @@ -597,6 +619,9 @@ void QueryBaseProcessor::addPropContextIfNotExists( bool filtered, const std::pair* statInfo) { auto idxIter = indexMap.find(entryId); + if (idxIter == indexMap.end()) { // for edge type + idxIter = indexMap.find(-entryId); + } if (idxIter == indexMap.end()) { // if no property of tag/edge has been add to propContexts PropContext ctx(propName.c_str(), field, returned, filtered, statInfo); diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index 3f75b0c5335..00dd8e97247 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -154,7 +154,8 @@ class QueryBaseProcessor : public BaseProcessor { // build edgeContexts_ according to return props nebula::cpp2::ErrorCode handleEdgeProps(std::vector& edgeProps); - nebula::cpp2::ErrorCode buildFilter(const REQ& req); + nebula::cpp2::ErrorCode buildFilter( + const REQ& req, std::function&& getFilter); nebula::cpp2::ErrorCode buildYields(const REQ& req); // build ttl info map diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index 5da9b6425e6..94a6a03a1b2 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -64,6 +64,13 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca std::vector returnProps = {*req.return_columns_ref()}; ret = handleEdgeProps(returnProps); buildEdgeColName(returnProps); + ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r) -> const std::string* { + if (r.filter_ref().has_value()) { + return r.get_filter(); + } else { + return nullptr; + } + }); return ret; } @@ -85,7 +92,8 @@ void ScanEdgeProcessor::onProcessFinished() { StoragePlan ScanEdgeProcessor::buildPlan( RuntimeContext* context, nebula::DataSet* result, - std::unordered_map* cursors) { + std::unordered_map* cursors, + StorageExpressionContext* expCtx) { StoragePlan plan; std::vector> edges; for (const auto& ec : edgeContext_.propContexts_) { @@ -93,7 +101,7 @@ StoragePlan ScanEdgeProcessor::buildPlan( std::make_unique(context, &edgeContext_, ec.first, &ec.second)); } auto output = std::make_unique( - context, std::move(edges), enableReadFollower_, limit_, cursors, result); + context, std::move(edges), enableReadFollower_, limit_, cursors, result, expCtx, filter_); plan.addNode(std::move(output)); return plan; @@ -104,10 +112,11 @@ folly::Future> ScanEdgeProcessor nebula::DataSet* result, std::unordered_map* cursors, PartitionID partId, - Cursor cursor) { + Cursor cursor, + StorageExpressionContext* expCtx) { return folly::via(executor_, - [this, context, result, cursors, partId, input = std::move(cursor)]() { - auto plan = buildPlan(context, result, cursors); + [this, context, result, cursors, partId, input = std::move(cursor), expCtx]() { + auto plan = buildPlan(context, result, cursors, expCtx); auto ret = plan.go(partId, input); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -119,8 +128,9 @@ folly::Future> ScanEdgeProcessor void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) { contexts_.emplace_back(RuntimeContext(planContext_.get())); + expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_)); std::unordered_set failedParts; - auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_, &expCtxs_.front()); for (const auto& partEntry : req.get_parts()) { auto partId = partEntry.first; auto cursor = partEntry.second; @@ -142,6 +152,7 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { nebula::DataSet result = resultDataSet_; results_.emplace_back(std::move(result)); contexts_.emplace_back(RuntimeContext(planContext_.get())); + expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_)); } size_t i = 0; std::vector>> futures; @@ -150,7 +161,8 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { &results_[i], &cursorsOfPart_[i], partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + cursor.get_has_next() ? *cursor.get_next_cursor() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/query/ScanEdgeProcessor.h b/src/storage/query/ScanEdgeProcessor.h index f1931cd881b..40c5186975d 100644 --- a/src/storage/query/ScanEdgeProcessor.h +++ b/src/storage/query/ScanEdgeProcessor.h @@ -39,14 +39,16 @@ class ScanEdgeProcessor : public QueryBaseProcessor buildPlan(RuntimeContext* context, nebula::DataSet* result, - std::unordered_map* cursors); + std::unordered_map* cursors, + StorageExpressionContext* expCtx); folly::Future> runInExecutor( RuntimeContext* context, nebula::DataSet* result, std::unordered_map* cursors, PartitionID partId, - Cursor cursor); + Cursor cursor, + StorageExpressionContext* expCtx); void runInSingleThread(const cpp2::ScanEdgeRequest& req); @@ -55,6 +57,7 @@ class ScanEdgeProcessor : public QueryBaseProcessor contexts_; + std::vector expCtxs_; std::vector results_; std::vector> cursorsOfPart_; diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 032ec103660..bb9b3a705ad 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -65,6 +65,13 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts( std::vector returnProps = *req.return_columns_ref(); ret = handleVertexProps(returnProps); buildTagColName(returnProps); + ret = buildFilter(req, [](const cpp2::ScanVertexRequest& r) -> const std::string* { + if (r.filter_ref().has_value()) { + return r.get_filter(); + } else { + return nullptr; + } + }); return ret; } @@ -87,14 +94,15 @@ void ScanVertexProcessor::onProcessFinished() { StoragePlan ScanVertexProcessor::buildPlan( RuntimeContext* context, nebula::DataSet* result, - std::unordered_map* cursors) { + std::unordered_map* cursors, + StorageExpressionContext* expCtx) { StoragePlan plan; std::vector> tags; for (const auto& tc : tagContext_.propContexts_) { tags.emplace_back(std::make_unique(context, &tagContext_, tc.first, &tc.second)); } auto output = std::make_unique( - context, std::move(tags), enableReadFollower_, limit_, cursors, result); + context, std::move(tags), enableReadFollower_, limit_, cursors, result, expCtx, filter_); plan.addNode(std::move(output)); return plan; @@ -105,23 +113,26 @@ folly::Future> ScanVertexProcess nebula::DataSet* result, std::unordered_map* cursorsOfPart, PartitionID partId, - Cursor cursor) { - return folly::via(executor_, - [this, context, result, cursorsOfPart, partId, input = std::move(cursor)]() { - auto plan = buildPlan(context, result, cursorsOfPart); - - auto ret = plan.go(partId, input); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - return std::make_pair(ret, partId); - } - return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); - }); + Cursor cursor, + StorageExpressionContext* expCtx) { + return folly::via( + executor_, + [this, context, result, cursorsOfPart, partId, input = std::move(cursor), expCtx]() { + auto plan = buildPlan(context, result, cursorsOfPart, expCtx); + + auto ret = plan.go(partId, input); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + return std::make_pair(ret, partId); + } + return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId); + }); } void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req) { contexts_.emplace_back(RuntimeContext(planContext_.get())); + expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_)); std::unordered_set failedParts; - auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_); + auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_, &expCtxs_.front()); for (const auto& partEntry : req.get_parts()) { auto partId = partEntry.first; auto cursor = partEntry.second; @@ -143,6 +154,7 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req nebula::DataSet result = resultDataSet_; results_.emplace_back(std::move(result)); contexts_.emplace_back(RuntimeContext(planContext_.get())); + expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_)); } size_t i = 0; std::vector>> futures; @@ -151,7 +163,8 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req &results_[i], &cursorsOfPart_[i], partId, - cursor.get_has_next() ? *cursor.get_next_cursor() : "")); + cursor.get_has_next() ? *cursor.get_next_cursor() : "", + &expCtxs_[i])); i++; } diff --git a/src/storage/query/ScanVertexProcessor.h b/src/storage/query/ScanVertexProcessor.h index 987c77e2edd..39b34aedae9 100644 --- a/src/storage/query/ScanVertexProcessor.h +++ b/src/storage/query/ScanVertexProcessor.h @@ -40,14 +40,16 @@ class ScanVertexProcessor StoragePlan buildPlan(RuntimeContext* context, nebula::DataSet* result, - std::unordered_map* cursors); + std::unordered_map* cursors, + StorageExpressionContext* expCtx); folly::Future> runInExecutor( RuntimeContext* context, nebula::DataSet* result, std::unordered_map* cursors, PartitionID partId, - Cursor cursor); + Cursor cursor, + StorageExpressionContext* expCtx); void runInSingleThread(const cpp2::ScanVertexRequest& req); @@ -57,6 +59,7 @@ class ScanVertexProcessor private: std::vector contexts_; + std::vector expCtxs_; std::vector results_; std::vector> cursorsOfPart_; diff --git a/src/storage/test/ScanEdgeTest.cpp b/src/storage/test/ScanEdgeTest.cpp index 3ed3d41a9dc..381b0df6c33 100644 --- a/src/storage/test/ScanEdgeTest.cpp +++ b/src/storage/test/ScanEdgeTest.cpp @@ -289,6 +289,49 @@ TEST(ScanEdgeTest, LimitTest) { } } +TEST(ScanEdgeTest, FilterTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + EdgeType serve = 101; + ObjectPool pool; + + { + LOG(INFO) << "Scan one edge with some properties in one batch"; + constexpr std::size_t limit = 3; + auto edge = std::make_pair( + serve, + std::vector{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"}); + auto req = buildRequest({1}, {""}, edge, limit); + Expression* filter = EdgePropertyExpression::make(&pool, "101", kSrc); + filter = RelationalExpression::makeEQ( + &pool, filter, ConstantExpression::make(&pool, "Damian Lillard")); + req.set_filter(filter->encode()); + auto* processor = ScanEdgeProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + + DataSet expected({"101._src", + "101._type", + "101._rank", + "101._dst", + "101.teamName", + "101.startYear", + "101.endYear"}); + expected.emplace_back( + List({"Damian Lillard", 101, 2012, "Trail Blazers", "Trail Blazers", 2012, 2020})); + EXPECT_EQ(*resp.edge_data_ref(), expected); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/ScanVertexTest.cpp b/src/storage/test/ScanVertexTest.cpp index f582848ad92..ea972dd39e6 100644 --- a/src/storage/test/ScanVertexTest.cpp +++ b/src/storage/test/ScanVertexTest.cpp @@ -437,6 +437,71 @@ TEST(ScanVertexTest, MultipleTagsTest) { } } +TEST(ScanVertexTest, FilterTest) { + fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX"); + mock::MockCluster cluster; + cluster.initStorageKV(rootPath.path()); + auto* env = cluster.storageEnv_.get(); + auto totalParts = cluster.getTotalParts(); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + + TagID player = 1; + TagID team = 2; + ObjectPool pool; + + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + // size_t totalRowCount = 0; + auto playerTag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto teamTag = std::make_pair(team, std::vector{kTag, "name"}); + auto req = buildRequest({1}, {""}, {playerTag, teamTag}); + Expression* filter = TagPropertyExpression::make(&pool, "1", "name"); + filter = + RelationalExpression::makeEQ(&pool, filter, ConstantExpression::make(&pool, "Kobe Bryant")); + req.set_filter(filter->encode()); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + nebula::DataSet expect( + {"_vid", "1._vid", "1._tag", "1.name", "1.age", "1.avgScore", "2._tag", "2.name"}); + expect.emplace_back(List( + {"Kobe Bryant", "Kobe Bryant", 1, "Kobe Bryant", 41, 25, Value::kEmpty, Value::kEmpty})); + EXPECT_EQ(expect, *resp.vertex_data_ref()); + } + { + LOG(INFO) << "Scan one tag with some properties in one batch"; + // size_t totalRowCount = 0; + auto playerTag = + std::make_pair(player, std::vector{kVid, kTag, "name", "age", "avgScore"}); + auto teamTag = std::make_pair(team, std::vector{kTag, "name"}); + auto req = buildRequest({1}, {""}, {playerTag, teamTag}); + Expression* filter = TagPropertyExpression::make(&pool, "1", "name"); + filter = + RelationalExpression::makeEQ(&pool, filter, ConstantExpression::make(&pool, "Kobe Bryant")); + filter = LogicalExpression::makeAnd( + &pool, + filter, + UnaryExpression::makeIsEmpty(&pool, TagPropertyExpression::make(&pool, "2", "name"))); + req.set_filter(filter->encode()); + auto* processor = ScanVertexProcessor::instance(env, nullptr); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + + ASSERT_EQ(0, resp.result.failed_parts.size()); + nebula::DataSet expect( + {"_vid", "1._vid", "1._tag", "1.name", "1.age", "1.avgScore", "2._tag", "2.name"}); + expect.emplace_back(List( + {"Kobe Bryant", "Kobe Bryant", 1, "Kobe Bryant", 41, 25, Value::kEmpty, Value::kEmpty})); + EXPECT_EQ(expect, *resp.vertex_data_ref()); + } +} + } // namespace storage } // namespace nebula diff --git a/src/storage/test/UpdateEdgeTest.cpp b/src/storage/test/UpdateEdgeTest.cpp index 86d66825c69..0dc6b115229 100644 --- a/src/storage/test/UpdateEdgeTest.cpp +++ b/src/storage/test/UpdateEdgeTest.cpp @@ -291,7 +291,8 @@ TEST(UpdateEdgeTest, No_Filter_Test) { auto resp = std::move(f).get(); LOG(INFO) << "Check the results..."; - EXPECT_EQ(0, (*resp.result_ref()).failed_parts.size()); + EXPECT_EQ(0, (*resp.result_ref()).failed_parts.size()) + << apache::thrift::util::enumNameSafe((*resp.result_ref()).failed_parts.front().get_code()); EXPECT_EQ(9, (*resp.props_ref()).colNames.size()); EXPECT_EQ("_inserted", (*resp.props_ref()).colNames[0]); EXPECT_EQ("101.playerName", (*resp.props_ref()).colNames[1]);