Skip to content

Commit

Permalink
Support filter in scan. (#3329)
Browse files Browse the repository at this point in the history
* Support filter in scan.

* Fix compile error.

* Clear expression context when after filter.

Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and yixinglu committed Nov 25, 2021
1 parent 65fdeed commit 6a8a5f7
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/storage/context/StorageExpressionContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class StorageExpressionContext final : public ExpressionContext {
size_t vIdLen_;
bool isIntId_;

RowReader* reader_;
RowReader* reader_{nullptr};
std::string key_;
// tag or edge name
std::string name_;
Expand Down
78 changes: 64 additions & 14 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace storage {

using Cursor = std::string;

inline bool vTrue(const Value& v) { return v.isBool() && v.getBool(); }

// Node to scan vertices of one partition
class ScanVertexPropNode : public QueryNode<Cursor> {
public:
Expand All @@ -23,13 +25,17 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
bool enableReadFollower,
int64_t limit,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
nebula::DataSet* resultDataSet)
nebula::DataSet* resultDataSet,
StorageExpressionContext* expCtx = nullptr,
Expression* filter = nullptr)
: context_(context),
tagNodes_(std::move(tagNodes)),
enableReadFollower_(enableReadFollower),
limit_(limit),
cursors_(cursors),
resultDataSet_(resultDataSet) {
resultDataSet_(resultDataSet),
expCtx_(expCtx),
filter_(filter) {
name_ = "ScanVertexPropNode";
for (std::size_t i = 0; i < tagNodes_.size(); ++i) {
tagNodesIndex_.emplace(tagNodes_[i]->tagId(), i);
Expand Down Expand Up @@ -110,30 +116,48 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
})) {
for (auto& tagNode : tagNodes_) {
ret = tagNode->collectTagPropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, tagNode = tagNode.get(), this](
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, tagNode = tagNode.get(), this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectVertexProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
for (const auto& prop : *props) {
if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) {
auto value = QueryUtils::readVertexProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setTagProp(tagNode->getTagName(), prop.name_, value.value());
}
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
row.emplace_back(std::move(value).value());
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
for (auto& tagNode : tagNodes_) {
tagNode->clear();
}
Expand All @@ -149,6 +173,8 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
// cursors for next scan
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors_;
nebula::DataSet* resultDataSet_;
StorageExpressionContext* expCtx_{nullptr};
Expression* filter_{nullptr};
};

// Node to scan edge of one partition
Expand All @@ -161,13 +187,17 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
bool enableReadFollower,
int64_t limit,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
nebula::DataSet* resultDataSet)
nebula::DataSet* resultDataSet,
StorageExpressionContext* expCtx = nullptr,
Expression* filter = nullptr)
: context_(context),
edgeNodes_(std::move(edgeNodes)),
enableReadFollower_(enableReadFollower),
limit_(limit),
cursors_(cursors),
resultDataSet_(resultDataSet) {
resultDataSet_(resultDataSet),
expCtx_(expCtx),
filter_(filter) {
QueryNode::name_ = "ScanEdgePropNode";
for (std::size_t i = 0; i < edgeNodes_.size(); ++i) {
edgeNodesIndex_.emplace(edgeNodes_[i]->edgeType(), i);
Expand Down Expand Up @@ -230,30 +260,48 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
for (auto& edgeNode : edgeNodes_) {
ret = edgeNode->collectEdgePropsIfValid(
[&row](const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
[&row, edgeNode = edgeNode.get(), this](
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
for (const auto& prop : *props) {
if (prop.returned_) {
row.emplace_back(Value());
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, Value());
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
},
[&row, vIdLen, isIntId](
[&row, vIdLen, isIntId, edgeNode = edgeNode.get(), this](
folly::StringPiece key,
RowReader* reader,
const std::vector<PropContext>* props) -> nebula::cpp2::ErrorCode {
if (!QueryUtils::collectEdgeProps(key, vIdLen, isIntId, reader, props, row).ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
for (const auto& prop : *props) {
if (prop.returned_ || (prop.filtered_ && expCtx_ != nullptr)) {
auto value = QueryUtils::readEdgeProp(key, vIdLen, isIntId, reader, prop);
if (!value.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND;
}
if (prop.filtered_ && expCtx_ != nullptr) {
expCtx_->setEdgeProp(edgeNode->getEdgeName(), prop.name_, value.value());
}
if (prop.returned_) {
VLOG(2) << "Collect prop " << prop.name_;
row.emplace_back(std::move(value).value());
}
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
});
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
}
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED &&
(filter_ == nullptr || vTrue(filter_->eval(*expCtx_)))) {
resultDataSet_->rows.emplace_back(std::move(row));
}
expCtx_->clear();
for (auto& edgeNode : edgeNodes_) {
edgeNode->clear();
}
Expand All @@ -268,6 +316,8 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
// cursors for next scan
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors_;
nebula::DataSet* resultDataSet_;
StorageExpressionContext* expCtx_{nullptr};
Expression* filter_{nullptr};
};

} // namespace storage
Expand Down
8 changes: 7 additions & 1 deletion src/storage/query/GetNeighborsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::checkAndBuildContexts(
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
code = buildFilter(req);
code = buildFilter(req, [](const cpp2::GetNeighborsRequest& r) -> const std::string* {
if (r.get_traverse_spec().filter_ref().has_value()) {
return r.get_traverse_spec().get_filter();
} else {
return nullptr;
}
});
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
}
Expand Down
41 changes: 33 additions & 8 deletions src/storage/query/QueryBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildYields(const REQ& re
}

template <typename REQ, typename RESP>
nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildFilter(const REQ& req) {
const auto& traverseSpec = req.get_traverse_spec();
if (!traverseSpec.filter_ref().has_value()) {
nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::buildFilter(
const REQ& req, std::function<const std::string*(const REQ& req)>&& getFilter) {
const auto* filterStr = getFilter(req);
if (filterStr == nullptr) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
const auto& filterStr = *traverseSpec.filter_ref();

auto pool = &this->planContext_->objPool_;
// auto v = env_;
if (!filterStr.empty()) {
if (!filterStr->empty()) {
// the filter expression **must** return a bool
filter_ = Expression::decode(pool, filterStr);
filter_ = Expression::decode(pool, *filterStr);
if (filter_ == nullptr) {
return nebula::cpp2::ErrorCode::E_INVALID_FILTER;
}
Expand Down Expand Up @@ -438,8 +438,9 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
case Expression::Kind::kTagProperty:
case Expression::Kind::kSrcProperty: {
auto* sourceExp = static_cast<const SourcePropertyExpression*>(exp);
auto* sourceExp = static_cast<const PropertyExpression*>(exp);
const auto& tagName = sourceExp->sym();
const auto& propName = sourceExp->prop();
auto tagRet = this->env_->schemaMan_->toTagID(spaceId_, tagName);
Expand All @@ -458,6 +459,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
const auto& tagSchema = iter->second.back();

if (propName == kVid || propName == kTag) {
if (returned || filtered) {
addPropContextIfNotExists(tagContext_.propContexts_,
tagContext_.indexMap_,
tagContext_.tagNames_,
tagId,
tagName,
propName,
nullptr,
returned,
filtered);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -505,6 +517,17 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
const auto& edgeSchema = iter->second.back();

if (propName == kSrc || propName == kType || propName == kRank || propName == kDst) {
if (returned || filtered) {
addPropContextIfNotExists(edgeContext_.propContexts_,
edgeContext_.indexMap_,
edgeContext_.edgeNames_,
edgeType,
edgeName,
propName,
nullptr,
returned,
filtered);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -561,7 +584,6 @@ nebula::cpp2::ErrorCode QueryBaseProcessor<REQ, RESP>::checkExp(const Expression
case Expression::Kind::kSubscript:
case Expression::Kind::kAttribute:
case Expression::Kind::kLabelAttribute:
case Expression::Kind::kTagProperty:
case Expression::Kind::kVertex:
case Expression::Kind::kEdge:
case Expression::Kind::kLabel:
Expand Down Expand Up @@ -597,6 +619,9 @@ void QueryBaseProcessor<REQ, RESP>::addPropContextIfNotExists(
bool filtered,
const std::pair<size_t, cpp2::StatType>* statInfo) {
auto idxIter = indexMap.find(entryId);
if (idxIter == indexMap.end()) { // for edge type
idxIter = indexMap.find(-entryId);
}
if (idxIter == indexMap.end()) {
// if no property of tag/edge has been add to propContexts
PropContext ctx(propName.c_str(), field, returned, filtered, statInfo);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/query/QueryBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ class QueryBaseProcessor : public BaseProcessor<RESP> {
// build edgeContexts_ according to return props
nebula::cpp2::ErrorCode handleEdgeProps(std::vector<cpp2::EdgeProp>& edgeProps);

nebula::cpp2::ErrorCode buildFilter(const REQ& req);
nebula::cpp2::ErrorCode buildFilter(
const REQ& req, std::function<const std::string*(const REQ& req)>&& getFilter);
nebula::cpp2::ErrorCode buildYields(const REQ& req);

// build ttl info map
Expand Down
26 changes: 19 additions & 7 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca
std::vector<cpp2::EdgeProp> returnProps = {*req.return_columns_ref()};
ret = handleEdgeProps(returnProps);
buildEdgeColName(returnProps);
ret = buildFilter(req, [](const cpp2::ScanEdgeRequest& r) -> const std::string* {
if (r.filter_ref().has_value()) {
return r.get_filter();
} else {
return nullptr;
}
});
return ret;
}

Expand All @@ -85,15 +92,16 @@ void ScanEdgeProcessor::onProcessFinished() {
StoragePlan<Cursor> ScanEdgeProcessor::buildPlan(
RuntimeContext* context,
nebula::DataSet* result,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors) {
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
StorageExpressionContext* expCtx) {
StoragePlan<Cursor> plan;
std::vector<std::unique_ptr<FetchEdgeNode>> edges;
for (const auto& ec : edgeContext_.propContexts_) {
edges.emplace_back(
std::make_unique<FetchEdgeNode>(context, &edgeContext_, ec.first, &ec.second));
}
auto output = std::make_unique<ScanEdgePropNode>(
context, std::move(edges), enableReadFollower_, limit_, cursors, result);
context, std::move(edges), enableReadFollower_, limit_, cursors, result, expCtx, filter_);

plan.addNode(std::move(output));
return plan;
Expand All @@ -104,10 +112,11 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> ScanEdgeProcessor
nebula::DataSet* result,
std::unordered_map<PartitionID, cpp2::ScanCursor>* cursors,
PartitionID partId,
Cursor cursor) {
Cursor cursor,
StorageExpressionContext* expCtx) {
return folly::via(executor_,
[this, context, result, cursors, partId, input = std::move(cursor)]() {
auto plan = buildPlan(context, result, cursors);
[this, context, result, cursors, partId, input = std::move(cursor), expCtx]() {
auto plan = buildPlan(context, result, cursors, expCtx);

auto ret = plan.go(partId, input);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -119,8 +128,9 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> ScanEdgeProcessor

void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) {
contexts_.emplace_back(RuntimeContext(planContext_.get()));
expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_));
std::unordered_set<PartitionID> failedParts;
auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_);
auto plan = buildPlan(&contexts_.front(), &resultDataSet_, &cursors_, &expCtxs_.front());
for (const auto& partEntry : req.get_parts()) {
auto partId = partEntry.first;
auto cursor = partEntry.second;
Expand All @@ -142,6 +152,7 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
nebula::DataSet result = resultDataSet_;
results_.emplace_back(std::move(result));
contexts_.emplace_back(RuntimeContext(planContext_.get()));
expCtxs_.emplace_back(StorageExpressionContext(spaceVidLen_, isIntId_));
}
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
Expand All @@ -150,7 +161,8 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : ""));
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
i++;
}

Expand Down
Loading

0 comments on commit 6a8a5f7

Please sign in to comment.