Skip to content

Commit

Permalink
Lookup limit push down (#2796)
Browse files Browse the repository at this point in the history
* lookup limit

* add testcase

* addressed comments

* add test case

* comment typo
  • Loading branch information
bright-starry-sky committed Sep 15, 2021
1 parent 488413b commit dbea9c7
Show file tree
Hide file tree
Showing 13 changed files with 588 additions and 134 deletions.
10 changes: 8 additions & 2 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,18 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
folly::EventBase* evb) {
// TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer.
auto status = getHostParts(space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::LookupIndexResp>>(
std::runtime_error(status.status().toString()));
}
nebula::cpp2::SchemaID schemaId;
if (isEdge) {
schemaId.set_edge_type(tagOrEdge);
} else {
schemaId.set_tag_id(tagOrEdge);
}

auto& clusters = status.value();
std::unordered_map<HostAddr, cpp2::LookupIndexRequest> requests;
Expand All @@ -510,8 +517,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::LookupIndexResp>> GraphStorageClient:

cpp2::IndexSpec spec;
spec.set_contexts(contexts);
spec.set_is_edge(isEdge);
spec.set_tag_or_edge_id(tagOrEdge);
spec.set_schema_id(schemaId);
req.set_indices(spec);
req.set_common(common);
}
Expand Down
5 changes: 3 additions & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,7 @@ struct IndexQueryContext {
struct IndexSpec {
// In order to union multiple indices, multiple index hints are allowed
1: required list<IndexQueryContext> contexts,
2: required bool is_edge,
3: required i32 tag_or_edge_id,
2: common.SchemaID schema_id,
}


Expand All @@ -537,6 +536,8 @@ struct LookupIndexRequest {
// Support kVid and kTag for vertex, kSrc, kType, kRank and kDst for edge.
4: optional list<binary> return_columns,
5: optional RequestCommon common,
// max row count of each partition in this response
6: optional i64 limit,
}


Expand Down
11 changes: 9 additions & 2 deletions src/storage/exec/IndexEdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class IndexEdgeNode final : public RelNode<T> {
IndexEdgeNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName)
const std::string& schemaName,
int64_t limit)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {
schemaName_(schemaName),
limit_(limit) {
RelNode<T>::name_ = "IndexEdgeNode";
}

Expand All @@ -40,7 +42,11 @@ class IndexEdgeNode final : public RelNode<T> {
data_.clear();
std::vector<storage::cpp2::EdgeKey> edges;
auto* iter = static_cast<EdgeIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand Down Expand Up @@ -93,6 +99,7 @@ class IndexEdgeNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode_;
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas_;
const std::string& schemaName_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
10 changes: 8 additions & 2 deletions src/storage/exec/IndexScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class IndexScanNode : public RelNode<T> {

IndexScanNode(RuntimeContext* context,
IndexID indexId,
std::vector<cpp2::IndexColumnHint> columnHints)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)) {
std::vector<cpp2::IndexColumnHint> columnHints,
int64_t limit)
: context_(context), indexId_(indexId), columnHints_(std::move(columnHints)), limit_(limit) {
/**
* columnHints's elements are {scanType = PREFIX|RANGE; beginStr; endStr},
* {scanType = PREFIX|RANGE; beginStr;
Expand Down Expand Up @@ -71,7 +72,11 @@ class IndexScanNode : public RelNode<T> {
auto* sh = context_->isEdge() ? context_->edgeSchema_ : context_->tagSchema_;
auto ttlProp = CommonUtils::ttlProps(sh);
data_.clear();
int64_t count = 0;
while (!!iter_ && iter_->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return {};
}
Expand Down Expand Up @@ -172,6 +177,7 @@ class IndexScanNode : public RelNode<T> {
std::unique_ptr<IndexIterator> iter_;
std::pair<std::string, std::string> scanPair_;
std::vector<cpp2::IndexColumnHint> columnHints_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
11 changes: 9 additions & 2 deletions src/storage/exec/IndexVertexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ class IndexVertexNode final : public RelNode<T> {
IndexVertexNode(RuntimeContext* context,
IndexScanNode<T>* indexScanNode,
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas,
const std::string& schemaName)
const std::string& schemaName,
int64_t limit)
: context_(context),
indexScanNode_(indexScanNode),
schemas_(schemas),
schemaName_(schemaName) {
schemaName_(schemaName),
limit_(limit) {
RelNode<T>::name_ = "IndexVertexNode";
}

Expand All @@ -40,7 +42,11 @@ class IndexVertexNode final : public RelNode<T> {
data_.clear();
std::vector<VertexID> vids;
auto* iter = static_cast<VertexIndexIterator*>(indexScanNode_->iterator());
int64_t count = 0;
while (iter && iter->valid()) {
if (limit_ > -1 && count++ == limit_) {
break;
}
if (context_->isPlanKilled()) {
return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED;
}
Expand Down Expand Up @@ -84,6 +90,7 @@ class IndexVertexNode final : public RelNode<T> {
IndexScanNode<T>* indexScanNode_;
const std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>>& schemas_;
const std::string& schemaName_;
int64_t limit_;
std::vector<kvstore::KV> data_;
};

Expand Down
40 changes: 25 additions & 15 deletions src/storage/index/LookupBaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());
const auto& indices = req.get_indices();
this->planContext_->isEdge_ = indices.get_is_edge();
const auto& schemaId = indices.get_schema_id();
this->planContext_->isEdge_ = schemaId.getType() == nebula::cpp2::SchemaID::Type::edge_type;
this->context_ = std::make_unique<RuntimeContext>(this->planContext_.get());
if (context_->isEdge()) {
context_->edgeType_ = indices.get_tag_or_edge_id();
context_->edgeType_ = schemaId.get_edge_type();
auto edgeName = this->env_->schemaMan_->toEdgeName(spaceId_, context_->edgeType_);
if (!edgeName.ok()) {
return nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND;
Expand All @@ -40,7 +41,7 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
schemas_ = std::move(allEdges).value()[context_->edgeType_];
context_->edgeSchema_ = schemas_.back().get();
} else {
context_->tagId_ = indices.get_tag_or_edge_id();
context_->tagId_ = schemaId.get_tag_id();
auto tagName = this->env_->schemaMan_->toTagName(spaceId_, context_->tagId_);
if (!tagName.ok()) {
return nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND;
Expand Down Expand Up @@ -75,6 +76,15 @@ nebula::cpp2::ErrorCode LookupBaseProcessor<REQ, RESP>::requestCheck(
}
}

// limit
if (req.limit_ref().has_value()) {
if (*req.limit_ref() < 0) {
LOG(ERROR) << "Incorrect parameter : LIMIT = " << *req.limit_ref();
return nebula::cpp2::ErrorCode::E_INVALID_PARM;
}
limit_ = *req.limit_ref();
}

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand Down Expand Up @@ -275,8 +285,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
const std::vector<meta::cpp2::ColumnDef>& fields) {
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();
auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);

auto output = std::make_unique<IndexOutputNode<IndexID>>(
result, context_.get(), indexScan.get(), hasNullableCol, fields);
Expand Down Expand Up @@ -309,11 +319,11 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
edge->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), edge.get());
output->addDependency(edge.get());
Expand All @@ -322,7 +332,7 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_);
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
vertex->addDependency(indexScan.get());
auto output = std::make_unique<IndexOutputNode<IndexID>>(result, context_.get(), vertex.get());
output->addDependency(vertex.get());
Expand Down Expand Up @@ -360,8 +370,8 @@ std::unique_ptr<IndexOutputNode<IndexID>> LookupBaseProcessor<REQ, RESP>::buildP
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);

auto filter = std::make_unique<IndexFilterNode<IndexID>>(
context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge());
Expand Down Expand Up @@ -411,11 +421,11 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
auto indexId = ctx.get_index_id();
auto colHints = ctx.get_column_hints();

auto indexScan =
std::make_unique<IndexScanNode<IndexID>>(context_.get(), indexId, std::move(colHints));
auto indexScan = std::make_unique<IndexScanNode<IndexID>>(
context_.get(), indexId, std::move(colHints), limit_);
if (context_->isEdge()) {
auto edge = std::make_unique<IndexEdgeNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->edgeName_);
context_.get(), indexScan.get(), schemas_, context_->edgeName_, limit_);
edge->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), edge.get(), exprCtx, exp);
Expand All @@ -429,7 +439,7 @@ LookupBaseProcessor<REQ, RESP>::buildPlanWithDataAndFilter(nebula::DataSet* resu
return output;
} else {
auto vertex = std::make_unique<IndexVertexNode<IndexID>>(
context_.get(), indexScan.get(), schemas_, context_->tagName_);
context_.get(), indexScan.get(), schemas_, context_->tagName_, limit_);
vertex->addDependency(indexScan.get());
auto filter =
std::make_unique<IndexFilterNode<IndexID>>(context_.get(), vertex.get(), exprCtx, exp);
Expand Down
1 change: 1 addition & 0 deletions src/storage/index/LookupBaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class LookupBaseProcessor : public BaseProcessor<RESP> {
// Save schemas when column is out of index, need to read from data
std::vector<std::shared_ptr<const meta::NebulaSchemaProvider>> schemas_;
std::vector<size_t> deDupColPos_;
int64_t limit_ = -1;
};

} // namespace storage
Expand Down
19 changes: 19 additions & 0 deletions src/storage/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,26 @@ nebula_add_test(
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_test(
NAME
index_scan_limit_test
SOURCES
IndexScanLimitTest.cpp
OBJECTS
$<TARGET_OBJECTS:storage_common_obj>
$<TARGET_OBJECTS:meta_service_handler>
$<TARGET_OBJECTS:mock_obj>
$<TARGET_OBJECTS:ws_common_obj>
$<TARGET_OBJECTS:ws_obj>
${storage_test_deps}
LIBRARIES
${ROCKSDB_LIBRARIES}
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
wangle
gtest
)

nebula_add_executable(
Expand Down
5 changes: 3 additions & 2 deletions src/storage/test/DeleteTagsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ cpp2::LookupIndexRequest buildLookupRequest(int32_t totalParts, std::string play
cpp2::LookupIndexRequest req;
nebula::storage::cpp2::IndexSpec indices;
req.set_space_id(1);
indices.set_tag_or_edge_id(1);
indices.set_is_edge(false);
nebula::cpp2::SchemaID schemaId;
schemaId.set_tag_id(1);
indices.set_schema_id(schemaId);
std::vector<PartitionID> parts;
for (PartitionID partId = 1; partId <= totalParts; partId++) {
parts.emplace_back(partId);
Expand Down
Loading

0 comments on commit dbea9c7

Please sign in to comment.