Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/scan multiple parts #3262

Merged
merged 17 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,36 +559,68 @@ StorageRpcRespFuture<cpp2::GetNeighborsResponse> GraphStorageClient::lookupAndTr
});
}

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> GraphStorageClient::scanEdge(
cpp2::ScanEdgeRequest req, folly::EventBase* evb) {
std::pair<HostAddr, cpp2::ScanEdgeRequest> request;
auto host = this->getLeader(req.get_space_id(), req.get_part_id());
if (!host.ok()) {
return folly::makeFuture<StatusOr<cpp2::ScanEdgeResponse>>(host.status());
StorageRpcRespFuture<cpp2::ScanEdgeResponse> GraphStorageClient::scanEdge(
const CommonRequestParam& param,
const cpp2::EdgeProp& edgeProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanEdgeRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanEdgeResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
for (const auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(param.space);
req.set_parts(std::move(c.second));
req.set_return_columns(edgeProp);
req.set_limit(limit);
if (filter != nullptr) {
req.set_filter(filter->encode());
}
req.set_common(param.toReqCommon());
}
request.first = std::move(host).value();
request.second = std::move(req);

return getResponse(evb,
std::move(request),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
return collectResponse(param.evb,
std::move(requests),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanEdgeRequest& r) { return client->future_scanEdge(r); });
}

folly::Future<StatusOr<cpp2::ScanVertexResponse>> GraphStorageClient::scanVertex(
cpp2::ScanVertexRequest req, folly::EventBase* evb) {
std::pair<HostAddr, cpp2::ScanVertexRequest> request;
auto host = this->getLeader(req.get_space_id(), req.get_part_id());
if (!host.ok()) {
return folly::makeFuture<StatusOr<cpp2::ScanVertexResponse>>(host.status());
StorageRpcRespFuture<cpp2::ScanVertexResponse> GraphStorageClient::scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter) {
std::unordered_map<HostAddr, cpp2::ScanVertexRequest> requests;
auto status = getHostPartsWithCursor(param.space);
if (!status.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ScanVertexResponse>>(
std::runtime_error(status.status().toString()));
}
auto& clusters = status.value();
for (const auto& c : clusters) {
auto& host = c.first;
auto& req = requests[host];
req.set_space_id(param.space);
req.set_parts(std::move(c.second));
req.set_return_columns(vertexProp);
req.set_limit(limit);
if (filter != nullptr) {
req.set_filter(filter->encode());
}
req.set_common(param.toReqCommon());
}
request.first = std::move(host).value();
request.second = std::move(req);

return getResponse(evb,
std::move(request),
[](cpp2::GraphStorageServiceAsyncClient* client,
const cpp2::ScanVertexRequest& r) { return client->future_scanVertex(r); });
return collectResponse(
param.evb,
std::move(requests),
[](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::ScanVertexRequest& r) {
return client->future_scanVertex(r);
});
}

StatusOr<std::function<const VertexID&(const Row&)>> GraphStorageClient::getIdFromRow(
Expand Down
13 changes: 9 additions & 4 deletions src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);

folly::Future<StatusOr<cpp2::ScanEdgeResponse>> scanEdge(cpp2::ScanEdgeRequest req,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::ScanEdgeResponse> scanEdge(const CommonRequestParam& param,
const cpp2::EdgeProp& vertexProp,
int64_t limit,
const Expression* filter);

folly::Future<StatusOr<cpp2::ScanVertexResponse>> scanVertex(cpp2::ScanVertexRequest req,
folly::EventBase* evb = nullptr);
StorageRpcRespFuture<cpp2::ScanVertexResponse> scanVertex(
const CommonRequestParam& param,
const std::vector<cpp2::VertexProp>& vertexProp,
int64_t limit,
const Expression* filter);

private:
StatusOr<std::function<const VertexID&(const Row&)>> getIdFromRow(GraphSpaceID space,
Expand Down
23 changes: 23 additions & 0 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,5 +329,28 @@ StorageClientBase<ClientType>::getHostParts(GraphSpaceID spaceId) const {
return hostParts;
}

template <typename ClientType>
StatusOr<std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>>>
StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) const {
std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>> hostParts;
auto status = metaClient_->partsNum(spaceId);
if (!status.ok()) {
return Status::Error("Space not found, spaceid: %d", spaceId);
}

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
if (!leader.ok()) {
return leader.status();
}
hostParts[leader.value()].emplace(partId, c);
}
return hostParts;
}

} // namespace storage
} // namespace nebula
11 changes: 3 additions & 8 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class StorageClientBase {
std::unordered_map<PartitionID, std::vector<typename Container::value_type>>>>
clusterIdsToHosts(GraphSpaceID spaceId, const Container& ids, GetIdFunc f) const;

StatusOr<std::unordered_map<HostAddr, std::unordered_map<PartitionID, cpp2::ScanCursor>>>
getHostPartsWithCursor(GraphSpaceID spaceId) const;

virtual StatusOr<meta::PartHosts> getPartHosts(GraphSpaceID spaceId, PartitionID partId) const {
CHECK(metaClient_ != nullptr);
return metaClient_->getPartHostsFromCache(spaceId, partId);
Expand Down Expand Up @@ -208,14 +211,6 @@ class StorageClientBase {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::ScanEdgeRequest& req) const {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::ScanVertexRequest& req) const {
return {req.get_part_id()};
}

bool isValidHostPtr(const HostAddr* addr) {
return addr != nullptr && !addr->host.empty() && addr->port != 0;
}
Expand Down
58 changes: 30 additions & 28 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -560,24 +560,29 @@ struct LookupAndTraverseRequest {
* End of Index section
*/

struct ScanCursor {
3: bool has_next,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The has_next seems useless, just judge by next_cursor is enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just keep the origin fields.

// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
}

struct ScanVertexRequest {
Shylock-Hg marked this conversation as resolved.
Show resolved Hide resolved
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
// start key of this block
3: optional binary cursor,
4: VertexProp return_columns,
2: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
parts,
3: list<VertexProp> return_columns,
// max row count of tag in this response
5: i64 limit,
4: i64 limit,
// only return data in time range [start_time, end_time)
6: optional i64 start_time,
7: optional i64 end_time,
8: optional binary filter,
5: optional i64 start_time,
6: optional i64 end_time,
7: optional binary filter,
// when storage enable multi versions and only_latest_version is true, only return latest version.
// when storage disable multi versions, just use the default value.
9: bool only_latest_version = false,
8: bool only_latest_version = false,
// if set to false, forbid follower read
10: bool enable_read_from_follower = true,
11: optional RequestCommon common,
9: bool enable_read_from_follower = true,
10: optional RequestCommon common,
}

struct ScanVertexResponse {
Expand All @@ -586,29 +591,27 @@ struct ScanVertexResponse {
// Each column represents one property. the column name is in the form of "tag_name.prop_alias"
// in the same order which specified in VertexProp in request.
2: common.DataSet vertex_data,
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
3: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
cursors;
}

struct ScanEdgeRequest {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
// start key of this block
3: optional binary cursor,
4: EdgeProp return_columns,
2: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
parts,
3: EdgeProp return_columns,
// max row count of edge in this response
5: i64 limit,
4: i64 limit,
// only return data in time range [start_time, end_time)
6: optional i64 start_time,
7: optional i64 end_time,
8: optional binary filter,
5: optional i64 start_time,
6: optional i64 end_time,
7: optional binary filter,
// when storage enable multi versions and only_latest_version is true, only return latest version.
// when storage disable multi versions, just use the default value.
9: bool only_latest_version = false,
8: bool only_latest_version = false,
// if set to false, forbid follower read
10: bool enable_read_from_follower = true,
11: optional RequestCommon common,
9: bool enable_read_from_follower = true,
10: optional RequestCommon common,
}

struct ScanEdgeResponse {
Expand All @@ -617,9 +620,8 @@ struct ScanEdgeResponse {
// Each column represents one property. the column name is in the form of "edge_name.prop_alias"
// in the same order which specified in EdgeProp in requesss.
2: common.DataSet edge_data,
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
3: map<common.PartitionID, ScanCursor> (cpp.template = "std::unordered_map")
cursors;
}

struct TaskPara {
Expand Down
21 changes: 18 additions & 3 deletions src/storage/exec/EdgeNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class EdgeNode : public IterateNode<T> {
return valueHandler(this->key(), this->reader(), props_);
}

const std::string& getEdgeName() { return edgeName_; }
const std::string& getEdgeName() const { return edgeName_; }

EdgeType edgeType() const { return edgeType_; }

protected:
EdgeNode(RuntimeContext* context,
Expand Down Expand Up @@ -113,15 +115,28 @@ class FetchEdgeNode final : public EdgeNode<cpp2::EdgeKey> {
(*edgeKey.dst_ref()).getStr());
ret = context_->env()->kvstore_->get(context_->spaceId(), partId, key_, &val_);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
resetReader();
return nebula::cpp2::ErrorCode::SUCCEEDED;
return doExecute(key_, val_);
} else if (ret == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
// regard key not found as succeed as well, upper node will handle it
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return ret;
}

nebula::cpp2::ErrorCode doExecute(const std::string& key, const std::string& value) {
key_ = key;
val_ = value;
resetReader();
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void clear() {
valid_ = false;
key_.clear();
val_.clear();
reader_.reset();
}

private:
void resetReader() {
reader_.reset(*schemas_, val_);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/exec/RelNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class RelNode {

explicit RelNode(const std::string& name) : name_(name) {}

const std::string& name() const { return name_; }

std::string name_ = "RelNode";
std::vector<RelNode<T>*> dependencies_;
bool hasDependents_ = false;
Expand Down
Loading