Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filter in scan. #3329

Merged
merged 15 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/storage/context/StorageExpressionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class StorageExpressionContext final : public ExpressionContext {
size_t vIdLen_;
bool isIntId_;

RowReader* reader_;
RowReader* reader_{nullptr};
std::string key_;
// tag or edge name
std::string name_;
Expand Down
78 changes: 64 additions & 14 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace storage {

using Cursor = std::string;

inline bool vTrue(const Value& v) { return v.isBool() && v.getBool(); }

// Node to scan vertices of one partition
class ScanVertexPropNode : public QueryNode<Cursor> {
public:
Expand All @@ -23,13 +25,17 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
bool enableReadFollower,
int64_t limit,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
nebula::DataSet* resultDataSet)
nebula::DataSet* resultDataSet,
StorageExpressionContext* expCtx = nullptr,
Expression* filter = nullptr)
: context_(context),
tagNodes_(std::move(tagNodes)),
enableReadFollower_(enableReadFollower),
limit_(limit),
cursors_(cursors),
resultDataSet_(resultDataSet) {
resultDataSet_(resultDataSet),
expCtx_(expCtx),
filter_(filter) {
name_ = "ScanVertexPropNode";
for (std::size_t i = 0; i < tagNodes_.size(); ++i) {
tagNodesIndex_.emplace(tagNodes_[i]->tagId(), i);
Expand Down Expand Up @@ -110,30 +116,48 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
})) {
for (auto& tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, tagNode = tagNode.get(), this](
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, tagNode = tagNode.get(), this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
for (const auto& prop : *props) {
if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) {
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, value.value());
}
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
row.emplace_back(std::move(value).value());
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
for (auto& tagNode : tagNodes_) {
tagNode->clear();
}
Expand All @@ -149,6 +173,8 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
// cursors for next scan
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors_;
nebula::DataSet* resultDataSet_;
StorageExpressionContext* expCtx_{nullptr};
Expression* filter_{nullptr};
};

// Node to scan edge of one partition
Expand All @@ -161,13 +187,17 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
bool enableReadFollower,
int64_t limit,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
nebula::DataSet* resultDataSet)
nebula::DataSet* resultDataSet,
StorageExpressionContext* expCtx = nullptr,
Expression* filter = nullptr)
: context_(context),
edgeNodes_(std::move(edgeNodes)),
enableReadFollower_(enableReadFollower),
limit_(limit),
cursors_(cursors),
resultDataSet_(resultDataSet) {
resultDataSet_(resultDataSet),
expCtx_(expCtx),
filter_(filter) {
QueryNode::name_ = "ScanEdgePropNode";
for (std::size_t i = 0; i < edgeNodes_.size(); ++i) {
edgeNodesIndex_.emplace(edgeNodes_[i]->edgeType(), i);
Expand Down Expand Up @@ -230,30 +260,48 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
for (auto& edgeNode : edgeNodes_) {
ret = edgeNode->collectEdgePropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, edgeNode = edgeNode.get(), this](
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, edgeNode = edgeNode.get(), this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
for (const auto& prop : *props) {
if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) {
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, value.value());
}
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
row.emplace_back(std::move(value).value());
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
critical27 marked this conversation as resolved.
Show resolved Hide resolved
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
for (auto& edgeNode : edgeNodes_) {
edgeNode->clear();
}
Expand All @@ -268,6 +316,8 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
// cursors for next scan
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors_;
nebula::DataSet* resultDataSet_;
StorageExpressionContext* expCtx_{nullptr};
Expression* filter_{nullptr};
};

} // namespace storage
Expand Down
8 changes: 7 additions & 1 deletion src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::checkAndBuildContexts(
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
code = buildFilter(req);
code = buildFilter(req, [](const cpp2::GetNeighborsRequest& r) -> const std::string* {
if (r.get_traverse_spec().filter_ref().has_value()) {
return r.get_traverse_spec().get_filter();
} else {
return nullptr;
}
});
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
Expand Down
41 changes: 33 additions & 8 deletions src/storage/query/QueryBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildYields(const REQ& re
}

template <typename REQ, typename RESP>
nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildFilter(const REQ& req) {
const auto& traverseSpec = req.get_traverse_spec();
if (!traverseSpec.filter_ref().has_value()) {
nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildFilter(
const REQ& req, std::function<const std::string*(const REQ& req)>&& getFilter) {
const auto* filterStr = getFilter(req);
if (filterStr == nullptr) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
const auto& filterStr = *traverseSpec.filter_ref();

auto pool = &this->planContext_->objPool_;
// auto v = env_;
if (!filterStr.empty()) {
if (!filterStr->empty()) {
// the filter expression **must** return a bool
filter_ = Expression::decode(pool, filterStr);
filter_ = Expression::decode(pool, *filterStr);
if (filter_ == nullptr) {
return nebula::cpp2::ErrorCode::E_INVALID_FILTER;
}
Expand Down Expand Up @@ -438,8 +438,9 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
case Expression::Kind::kTagProperty:
case Expression::Kind::kSrcProperty: {
auto* sourceExp = static_cast<const SourcePropertyExpression*>(exp);
auto* sourceExp = static_cast<const PropertyExpression*>(exp);
const auto& tagName = sourceExp->sym();
const auto& propName = sourceExp->prop();
auto tagRet = this->env_->schemaMan_->toTagID(spaceId_, tagName);
Expand All @@ -458,6 +459,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
const auto& tagSchema = iter->second.back();

if (propName == kVid || propName == kTag) {
if (returned || filtered) {
addPropContextIfNotExists(tagContext_.propContexts_,
tagContext_.indexMap_,
tagContext_.tagNames_,
tagId,
tagName,
propName,
nullptr,
returned,
filtered);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -505,6 +517,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
const auto& edgeSchema = iter->second.back();

if (propName == kSrc || propName == kType || propName == kRank || propName == kDst) {
if (returned || filtered) {
addPropContextIfNotExists(edgeContext_.propContexts_,
edgeContext_.indexMap_,
edgeContext_.edgeNames_,
edgeType,
edgeName,
propName,
nullptr,
returned,
filtered);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -561,7 +584,6 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
case Expression::Kind::kSubscript:
case Expression::Kind::kAttribute:
case Expression::Kind::kLabelAttribute:
case Expression::Kind::kTagProperty:
case Expression::Kind::kVertex:
case Expression::Kind::kEdge:
case Expression::Kind::kLabel:
Expand Down Expand Up @@ -597,6 +619,9 @@ void QueryBaseProcessor<REQ, RESP>::addPropContextIfNotExists(
bool filtered,
const std::pair<size_t, cpp2::StatType>* statInfo) {
auto idxIter = indexMap.find(entryId);
if (idxIter == indexMap.end()) { // for edge type
idxIter = indexMap.find(-entryId);
}
if (idxIter == indexMap.end()) {
// if no property of tag/edge has been add to propContexts
PropContext ctx(propName.c_str(), field, returned, filtered, statInfo);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/query/QueryBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ class QueryBaseProcessor : public BaseProcessor<RESP> {
// build edgeContexts_ according to return props
nebula::cpp2::ErrorCode handleEdgeProps(std::vector<cpp2::EdgeProp>& edgeProps);

nebula::cpp2::ErrorCode buildFilter(const REQ& req);
nebula::cpp2::ErrorCode buildFilter(
const REQ& req, std::function<const std::string*(const REQ& req)>&& getFilter);
nebula::cpp2::ErrorCode buildYields(const REQ& req);

// build ttl info map
Expand Down
26 changes: 19 additions & 7 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca
std::vector<cpp2::EdgeProp> returnProps = {*req.return_columns_ref()};
ret = handleEdgeProps(returnProps);
buildEdgeColName(returnProps);
ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r) -> const std::string* {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just override the buildFilter? Passing another function here is a little weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The override code is almost same, so I don't do it as this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em... Actually the lambda is almost same as well. Not so important.

if (r.filter_ref().has_value()) {
return r.get_filter();
} else {
return nullptr;
}
});
return ret;
}

Expand All @@ -85,15 +92,16 @@ void ScanEdgeProcessor::onProcessFinished() {
StoragePlan<Cursor> ScanEdgeProcessor::buildPlan(
RuntimeContext* context,
nebula::DataSet* result,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors) {
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
StorageExpressionContext* expCtx) {
StoragePlan<Cursor> plan;
std::vector<std::unique_ptr<FetchEdgeNode>> edges;
for (const auto& ec : edgeContext_.propContexts_) {
edges.emplace_back(
std::make_unique<FetchEdgeNode>(context, &edgeContext_, ec.first, &ec.second));
}
auto output = std::make_unique<ScanEdgePropNode>(
context, std::move(edges), enableReadFollower_, limit_, cursors, result);
context, std::move(edges), enableReadFollower_, limit_, cursors, result, expCtx, filter_);

plan.addNode(std::move(output));
return plan;
Expand All @@ -104,10 +112,11 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> ScanEdgeProcessor
nebula::DataSet* result,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
PartitionID partId,
Cursor cursor) {
Cursor cursor,
StorageExpressionContext* expCtx) {
return folly::via(executor_,
[this, context, result, cursors, partId, input = std::move(cursor)]() {
auto plan = buildPlan(context, result, cursors);
[this, context, result, cursors, partId, input = std::move(cursor), expCtx]() {
auto plan = buildPlan(context, result, cursors, expCtx);

auto ret = plan.go(partId, input);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -119,8 +128,9 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> ScanEdgeProcessor

void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) {
contexts_.emplace_back(RuntimeContext(planContext_.get()));
expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_));
std::unordered_set<PartitionID> failedParts;
auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_);
auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_, &expCtxs_.front());
for (const auto& partEntry : req.get_parts()) {
auto partId = partEntry.first;
auto cursor = partEntry.second;
Expand All @@ -142,6 +152,7 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
nebula::DataSet result = resultDataSet_;
results_.emplace_back(std::move(result));
contexts_.emplace_back(RuntimeContext(planContext_.get()));
expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_));
}
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
Expand All @@ -150,7 +161,8 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : ""));
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
i++;
}

Expand Down
Loading