diff --git a/src/graph/FindPathExecutor.cpp b/src/graph/FindPathExecutor.cpp index 134c07c8c2a..defb191aa30 100644 --- a/src/graph/FindPathExecutor.cpp +++ b/src/graph/FindPathExecutor.cpp @@ -148,7 +148,7 @@ void FindPathExecutor::execute() { return; } - steps_ = step_.steps_ / 2 + step_.steps_ % 2; + steps_ = step_.recordTo_ / 2 + step_.recordTo_ % 2; fromVids_ = from_.vids_; toVids_ = to_.vids_; visitedFrom_.insert(fromVids_.begin(), fromVids_.end()); @@ -291,7 +291,7 @@ inline void FindPathExecutor::meetOddPath(VertexID src, VertexID dst, Neighbor & for (auto i = rangeF.first; i != rangeF.second; ++i) { auto rangeT = pathTo_.equal_range(dst); for (auto j = rangeT.first; j != rangeT.second; ++j) { - if (j->second.size() + i->second.size() > step_.steps_) { + if (j->second.size() + i->second.size() > step_.recordTo_) { continue; } // Build path: @@ -336,7 +336,7 @@ inline void FindPathExecutor::meetEvenPath(VertexID intersectId) { auto rangeT = pathTo_.equal_range(intersectId); for (auto i = rangeF.first; i != rangeF.second; ++i) { for (auto j = rangeT.first; j != rangeT.second; ++j) { - if (j->second.size() + i->second.size() > step_.steps_) { + if (j->second.size() + i->second.size() > step_.recordTo_) { continue; } // Build path: diff --git a/src/graph/GoExecutor.cpp b/src/graph/GoExecutor.cpp index 4320da94a62..ac966bd23dd 100644 --- a/src/graph/GoExecutor.cpp +++ b/src/graph/GoExecutor.cpp @@ -126,12 +126,8 @@ void GoExecutor::execute() { Status GoExecutor::prepareStep() { auto *clause = sentence_->stepClause(); if (clause != nullptr) { - steps_ = clause->steps(); - upto_ = clause->isUpto(); - } - - if (isUpto()) { - return Status::Error("`UPTO' not supported yet"); + recordFrom_ = clause->recordFrom(); + steps_ = clause->recordTo(); } if (steps_ != 1) { @@ -574,7 +570,7 @@ void GoExecutor::stepOut() { }; auto error = [this] (auto &&e) { LOG(ERROR) << "Exception when handle out-bounds/in-bounds: " << e.what(); - doError(Status::Error("Exeception when handle out-bounds/in-bounds: %s.", + doError(Status::Error("Exception when handle out-bounds/in-bounds: %s.", e.what().c_str())); }; std::move(future).via(runner).thenValue(cb).thenError(error); @@ -582,16 +578,14 @@ void GoExecutor::stepOut() { void GoExecutor::onStepOutResponse(RpcResponse &&rpcResp) { + joinResp(std::move(rpcResp)); + if (isFinalStep()) { - maybeFinishExecution(std::move(rpcResp)); + maybeFinishExecution(); return; } else { - auto status = getDstIdsFromResp(rpcResp); - if (!status.ok()) { - doError(std::move(status).status()); - return; - } - starts_ = std::move(status).value(); + auto dsts = getDstIdsFromResps(records_.end() - 1, records_.end()); + starts_ = std::move(dsts); if (starts_.empty()) { onEmptyInputs(); return; @@ -602,35 +596,28 @@ void GoExecutor::onStepOutResponse(RpcResponse &&rpcResp) { } -void GoExecutor::maybeFinishExecution(RpcResponse &&rpcResp) { +void GoExecutor::maybeFinishExecution() { auto requireDstProps = expCtx_->hasDstTagProp(); // Non-reversely traversal, no properties required on destination nodes // Or, Reversely traversal but no properties on edge and destination nodes required. // Note that the `dest` which used in reversely traversal means the `src` in foword edge. if (!requireDstProps) { - finishExecution(std::move(rpcResp)); - return; - } - - auto dstIdStatus = getDstIdsFromResp(rpcResp); - - if (!dstIdStatus.ok()) { - doError(std::move(dstIdStatus).status()); + finishExecution(); return; } - auto dstids = std::move(dstIdStatus).value(); + auto dstIds = getDstIdsFromResps(records_.begin() + recordFrom_ - 1, records_.end()); // Reaching the dead end - if (dstids.empty()) { + if (dstIds.empty()) { onEmptyInputs(); return; } DCHECK(requireDstProps); // Only properties on destination nodes required - fetchVertexProps(std::move(dstids), std::move(rpcResp)); + fetchVertexProps(std::move(dstIds)); return; } @@ -638,22 +625,25 @@ void GoExecutor::onVertexProps(RpcResponse &&rpcResp) { UNUSED(rpcResp); } -StatusOr> GoExecutor::getDstIdsFromResp(RpcResponse &rpcResp) const { +std::vector GoExecutor::getDstIdsFromResps(std::vector::iterator begin, + std::vector::iterator end) const { std::unordered_set set; - for (auto &resp : rpcResp.responses()) { - auto *vertices = resp.get_vertices(); - if (vertices == nullptr) { - continue; - } + for (auto it = begin; it != end; ++it) { + for (const auto &resp : it->responses()) { + auto *vertices = resp.get_vertices(); + if (vertices == nullptr) { + continue; + } - for (auto &vdata : *vertices) { - for (auto &edata : vdata.edge_data) { - for (auto& edge : edata.get_edges()) { - auto dst = edge.get_dst(); - if (!isFinalStep() && backTracker_ != nullptr) { - backTracker_->add(vdata.get_vertex_id(), dst); + for (const auto &vdata : *vertices) { + for (const auto &edata : vdata.edge_data) { + for (const auto& edge : edata.get_edges()) { + auto dst = edge.get_dst(); + if (!isFinalStep() && backTracker_ != nullptr) { + backTracker_->add(vdata.get_vertex_id(), dst); + } + set.emplace(dst); } - set.emplace(dst); } } } @@ -661,7 +651,7 @@ StatusOr> GoExecutor::getDstIdsFromResp(RpcResponse &rpcRe return std::vector(set.begin(), set.end()); } -void GoExecutor::finishExecution(RpcResponse &&rpcResp) { +void GoExecutor::finishExecution() { // MayBe we can do better. std::vector> yc; if (expCtx_->isOverAllEdge() && yields_.empty()) { @@ -678,7 +668,7 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) { if (onResult_) { std::unique_ptr outputs; - if (!setupInterimResult(std::move(rpcResp), outputs)) { + if (!setupInterimResult(outputs)) { return; } onResult_(std::move(outputs)); @@ -686,7 +676,7 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) { auto start = time::WallClock::fastNowInMicroSec(); resp_ = std::make_unique(); resp_->set_column_names(getResultColumnNames()); - auto ret = toThriftResponse(std::forward(rpcResp)); + auto ret = toThriftResponse(); if (FLAGS_trace_go) { LOG(INFO) << "Process the resp from storaged, total time " << time::WallClock::fastNowInMicroSec() - start << "us"; @@ -702,12 +692,14 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) { doFinish(Executor::ProcessControl::kNext); } -StatusOr> GoExecutor::toThriftResponse(RpcResponse&& rpcResp) { +StatusOr> GoExecutor::toThriftResponse() const { std::vector rows; int64_t totalRows = 0; - for (auto& resp : rpcResp.responses()) { - if (resp.get_total_edges() != nullptr) { - totalRows += *resp.get_total_edges(); + for (const auto &rpcResp : records_) { + for (const auto& resp : rpcResp.responses()) { + if (resp.get_total_edges() != nullptr) { + totalRows += *resp.get_total_edges(); + } } } rows.reserve(totalRows); @@ -767,7 +759,7 @@ StatusOr> GoExecutor::toThriftResponse(RpcResponse&& return Status::OK(); }; // cb - if (!processFinalResult(rpcResp, cb)) { + if (!processFinalResult(cb)) { return Status::Error("process failed"); } if (FLAGS_trace_go) { @@ -778,7 +770,7 @@ StatusOr> GoExecutor::toThriftResponse(RpcResponse&& StatusOr> GoExecutor::getStepOutProps() { std::vector props; - if (!isFinalStep()) { + if (!isRecord()) { for (auto &e : edgeTypes_) { storage::cpp2::PropDef pd; pd.owner = storage::cpp2::PropOwner::EDGE; @@ -873,7 +865,7 @@ StatusOr> GoExecutor::getDstProps() { } -void GoExecutor::fetchVertexProps(std::vector ids, RpcResponse &&rpcResp) { +void GoExecutor::fetchVertexProps(std::vector ids) { auto spaceId = ectx()->rctx()->session()->space(); auto status = getDstProps(); if (!status.ok()) { @@ -883,7 +875,7 @@ void GoExecutor::fetchVertexProps(std::vector ids, RpcResponse &&rpcRe auto returns = status.value(); auto future = ectx()->getStorageClient()->getVertexProps(spaceId, ids, returns); auto *runner = ectx()->rctx()->runner(); - auto cb = [this, stepOutResp = std::move(rpcResp)] (auto &&result) mutable { + auto cb = [this] (auto &&result) mutable { auto completeness = result.completeness(); if (completeness == 0) { doError(Status::Error("Get dest props failed")); @@ -901,7 +893,7 @@ void GoExecutor::fetchVertexProps(std::vector ids, RpcResponse &&rpcRe for (auto &resp : result.responses()) { vertexHolder_->add(resp); } - finishExecution(std::move(stepOutResp)); + finishExecution(); return; }; auto error = [this] (auto &&e) { @@ -927,7 +919,7 @@ std::vector GoExecutor::getResultColumnNames() const { } -bool GoExecutor::setupInterimResult(RpcResponse &&rpcResp, std::unique_ptr &result) { +bool GoExecutor::setupInterimResult(std::unique_ptr &result) const { // Generic results result = std::make_unique(getResultColumnNames()); std::shared_ptr schema; @@ -997,7 +989,7 @@ bool GoExecutor::setupInterimResult(RpcResponse &&rpcResp, std::unique_ptrrctx()->session()->space(); - +bool GoExecutor::processFinalResult(Callback cb) const { auto uniqResult = std::make_unique>(); + auto spaceId = ectx()->rctx()->session()->space(); std::vector colTypes; for (auto *column : yields_) { colTypes.emplace_back(calculateExprType(column->expr())); } - std::vector record; - record.reserve(yields_.size()); - for (auto &resp : all) { - if (resp.get_vertices() == nullptr) { - continue; - } - std::unordered_map> tagSchema; - auto *vschema = resp.get_vertex_schema(); - if (vschema != nullptr) { - std::transform(vschema->cbegin(), vschema->cend(), - std::inserter(tagSchema, tagSchema.begin()), [](auto &schema) { - return std::make_pair( - schema.first, - std::make_shared(schema.second)); - }); - } - - std::unordered_map> edgeSchema; - auto *eschema = resp.get_edge_schema(); - if (eschema != nullptr) { - std::transform(eschema->cbegin(), eschema->cend(), - std::inserter(edgeSchema, edgeSchema.begin()), [](auto &schema) { - return std::make_pair( - schema.first, - std::make_shared(schema.second)); - }); - } - VLOG(1) << "Total resp.vertices size " << resp.vertices.size(); - for (auto &vdata : resp.vertices) { - DCHECK(vdata.__isset.edge_data); - auto tagData = vdata.get_tag_data(); - auto srcId = vdata.get_vertex_id(); - VLOG(1) << "Total vdata.edge_data size " << vdata.edge_data.size(); - for (auto &edata : vdata.edge_data) { - auto edgeType = edata.type; - VLOG(1) << "Total edata.edges size " << edata.edges.size() - << ", for edge " << edgeType; - std::shared_ptr currEdgeSchema; - auto it = edgeSchema.find(edgeType); - if (it != edgeSchema.end()) { - currEdgeSchema = it->second; - } - VLOG(1) << "CurrEdgeSchema is null? " << (currEdgeSchema == nullptr); - for (auto& edge : edata.edges) { - auto dstId = edge.get_dst(); - Getters getters; - getters.getEdgeDstId = [this, - &dstId, - &edgeType] (const std::string& edgeName) - -> OptVariantType { - if (edgeTypes_.size() > 1) { - EdgeType type; - auto found = expCtx_->getEdgeType(edgeName, type); + for (auto rpcResp = records_.begin() + recordFrom_ - 1; rpcResp != records_.end(); ++rpcResp) { + const auto& all = rpcResp->responses(); + + std::vector record; + record.reserve(yields_.size()); + for (const auto &resp : all) { + if (resp.get_vertices() == nullptr) { + continue; + } + std::unordered_map> tagSchema; + auto *vschema = resp.get_vertex_schema(); + if (vschema != nullptr) { + std::transform(vschema->cbegin(), vschema->cend(), + std::inserter(tagSchema, tagSchema.begin()), [](auto &schema) { + return std::make_pair( + schema.first, + std::make_shared(schema.second)); + }); + } + + std::unordered_map> edgeSchema; + auto *eschema = resp.get_edge_schema(); + if (eschema != nullptr) { + std::transform(eschema->cbegin(), eschema->cend(), + std::inserter(edgeSchema, edgeSchema.begin()), [](auto &schema) { + return std::make_pair( + schema.first, + std::make_shared(schema.second)); + }); + } + VLOG(1) << "Total resp.vertices size " << resp.vertices.size(); + for (const auto &vdata : resp.vertices) { + DCHECK(vdata.__isset.edge_data); + auto tagData = vdata.get_tag_data(); + auto srcId = vdata.get_vertex_id(); + VLOG(1) << "Total vdata.edge_data size " << vdata.edge_data.size(); + for (const auto &edata : vdata.edge_data) { + auto edgeType = edata.type; + VLOG(1) << "Total edata.edges size " << edata.edges.size() + << ", for edge " << edgeType; + std::shared_ptr currEdgeSchema; + auto it = edgeSchema.find(edgeType); + if (it != edgeSchema.end()) { + currEdgeSchema = it->second; + } + VLOG(1) << "CurrEdgeSchema is null? " << (currEdgeSchema == nullptr); + for (const auto& edge : edata.edges) { + auto dstId = edge.get_dst(); + Getters getters; + getters.getEdgeDstId = [this, + &dstId, + &edgeType] (const std::string& edgeName) + -> OptVariantType { + if (edgeTypes_.size() > 1) { + EdgeType type; + auto found = expCtx_->getEdgeType(edgeName, type); + if (!found) { + return Status::Error( + "Get edge type for `%s' failed in getters.", + edgeName.c_str()); + } + if (type != std::abs(edgeType)) { + return 0L; + } + } + return dstId; + }; + getters.getSrcTagProp = [&spaceId, + &tagData, + &tagSchema, + this] (const std::string &tag, + const std::string &prop) -> OptVariantType { + TagID tagId; + auto found = expCtx_->getTagId(tag, tagId); if (!found) { return Status::Error( - "Get edge type for `%s' failed in getters.", - edgeName.c_str()); + "Get tag id for `%s' failed in getters.", tag.c_str()); } - if (type != std::abs(edgeType)) { - return 0L; - } - } - return dstId; - }; - getters.getSrcTagProp = [&spaceId, - &tagData, - &tagSchema, - this] (const std::string &tag, - const std::string &prop) -> OptVariantType { - TagID tagId; - auto found = expCtx_->getTagId(tag, tagId); - if (!found) { - return Status::Error( - "Get tag id for `%s' failed in getters.", tag.c_str()); - } - auto it2 = std::find_if(tagData.cbegin(), - tagData.cend(), - [&tagId] (auto &td) { - if (td.tag_id == tagId) { - return true; + auto it2 = std::find_if(tagData.cbegin(), + tagData.cend(), + [&tagId] (auto &td) { + if (td.tag_id == tagId) { + return true; + } + return false; + }); + if (it2 == tagData.cend()) { + auto ts = ectx()->schemaManager()->getTagSchema(spaceId, tagId); + if (ts == nullptr) { + return Status::Error("No tag schema for %s", tag.c_str()); + } + return RowReader::getDefaultProp(ts.get(), prop); } - return false; - }); - if (it2 == tagData.cend()) { - auto ts = ectx()->schemaManager()->getTagSchema(spaceId, tagId); - if (ts == nullptr) { - return Status::Error("No tag schema for %s", tag.c_str()); + DCHECK(it2->__isset.data); + auto vreader = RowReader::getRowReader(it2->data, tagSchema[tagId]); + auto res = RowReader::getPropByName(vreader.get(), prop); + if (!ok(res)) { + return Status::Error( + folly::sformat("get prop({}.{}) failed", tag, prop)); } - return RowReader::getDefaultProp(ts.get(), prop); - } - DCHECK(it2->__isset.data); - auto vreader = RowReader::getRowReader(it2->data, tagSchema[tagId]); - auto res = RowReader::getPropByName(vreader.get(), prop); - if (!ok(res)) { - return Status::Error( - folly::sformat("get prop({}.{}) failed", tag, prop)); - } - return value(res); - }; - // In reverse mode, it is used to get the src props. - getters.getDstTagProp = [&spaceId, - &dstId, - this] (const std::string &tag, - const std::string &prop) -> OptVariantType { - TagID tagId; - auto found = expCtx_->getTagId(tag, tagId); - if (!found) { - return Status::Error( - "Get tag id for `%s' failed in getters.", tag.c_str()); - } - auto ret = vertexHolder_->get(dstId, tagId, prop); - if (!ret.ok()) { - auto ts = ectx()->schemaManager()->getTagSchema(spaceId, tagId); - if (ts == nullptr) { - return Status::Error("No tag schema for %s", tag.c_str()); + return value(res); + }; + // In reverse mode, it is used to get the src props. + getters.getDstTagProp = [&spaceId, + &dstId, + this] (const std::string &tag, + const std::string &prop) -> OptVariantType { + TagID tagId; + auto found = expCtx_->getTagId(tag, tagId); + if (!found) { + return Status::Error( + "Get tag id for `%s' failed in getters.", tag.c_str()); } - return RowReader::getDefaultProp(ts.get(), prop); - } - return ret.value(); - }; - getters.getVariableProp = [&srcId, - this] (const std::string &prop) -> OptVariantType { - if (!uniqueStart_) { - return Status::NotSupported( - "Not supported duplicate start from variable"); - } - return getPropFromInterim(srcId, prop); - }; - getters.getInputProp = [&srcId, - this] (const std::string &prop) -> OptVariantType { - if (!uniqueStart_) { - return Status::NotSupported("Not supported duplicate start from input"); - } - return getPropFromInterim(srcId, prop); - }; + auto ret = vertexHolder_->get(dstId, tagId, prop); + if (!ret.ok()) { + auto ts = ectx()->schemaManager()->getTagSchema(spaceId, tagId); + if (ts == nullptr) { + return Status::Error("No tag schema for %s", tag.c_str()); + } + return RowReader::getDefaultProp(ts.get(), prop); + } + return ret.value(); + }; + getters.getVariableProp = [&srcId, + this] (const std::string &prop) -> OptVariantType { + if (!uniqueStart_) { + return Status::NotSupported( + "Not supported duplicate start from variable"); + } + return getPropFromInterim(srcId, prop); + }; + getters.getInputProp = [&srcId, + this] (const std::string &prop) -> OptVariantType { + if (!uniqueStart_) { + return Status::NotSupported( + "Not supported duplicate start from input"); + } + return getPropFromInterim(srcId, prop); + }; - std::unique_ptr reader; - if (currEdgeSchema) { - reader = RowReader::getRowReader(edge.props, currEdgeSchema); - } + std::unique_ptr reader; + if (currEdgeSchema) { + reader = RowReader::getRowReader(edge.props, currEdgeSchema); + } - // In reverse mode, we should handle _src - getters.getAliasProp = [&reader, - &srcId, - &edgeType, - &edgeSchema, - this] (const std::string &edgeName, - const std::string &prop) mutable - -> OptVariantType { - EdgeType type; - auto found = expCtx_->getEdgeType(edgeName, type); - if (!found) { - return Status::Error( + // In reverse mode, we should handle _src + getters.getAliasProp = [&reader, + &srcId, + &edgeType, + &edgeSchema, + this] (const std::string &edgeName, + const std::string &prop) mutable + -> OptVariantType { + EdgeType type; + auto found = expCtx_->getEdgeType(edgeName, type); + if (!found) { + return Status::Error( "Get edge type for `%s' failed in getters.", edgeName.c_str()); - } - if (std::abs(edgeType) != type) { - auto sit = edgeSchema.find( + } + if (std::abs(edgeType) != type) { + auto sit = edgeSchema.find( direction_ == OverClause::Direction::kBackward ? -type : type); - if (sit == edgeSchema.end()) { - std::string errMsg = folly::stringPrintf( - "Can't find shcema for %s when get default.", - edgeName.c_str()); - LOG(ERROR) << errMsg; - return Status::Error(errMsg); + if (sit == edgeSchema.end()) { + std::string errMsg = folly::stringPrintf( + "Can't find shcema for %s when get default.", + edgeName.c_str()); + LOG(ERROR) << errMsg; + return Status::Error(errMsg); + } + return RowReader::getDefaultProp(sit->second.get(), prop); } - return RowReader::getDefaultProp(sit->second.get(), prop); - } - if (prop == _SRC) { - return srcId; - } - DCHECK(reader != nullptr); - auto res = RowReader::getPropByName(reader.get(), prop); - if (!ok(res)) { - LOG(ERROR) << "Can't get prop for " << prop - << ", edge " << edgeName; - return Status::Error( - folly::sformat("get prop({}.{}) failed", - edgeName, - prop)); + if (prop == _SRC) { + return srcId; + } + DCHECK(reader != nullptr); + auto res = RowReader::getPropByName(reader.get(), prop); + if (!ok(res)) { + LOG(ERROR) << "Can't get prop for " << prop + << ", edge " << edgeName; + return Status::Error( + folly::sformat("get prop({}.{}) failed", + edgeName, + prop)); + } + return value(std::move(res)); + }; // getAliasProp + // Evaluate filter + if (whereWrapper_->filter_ != nullptr) { + auto value = whereWrapper_->filter_->eval(getters); + if (!value.ok()) { + doError(std::move(value).status()); + return false; + } + if (!Expression::asBool(value.value())) { + continue; + } } - return value(std::move(res)); - }; // getAliasProp - // Evaluate filter - if (whereWrapper_->filter_ != nullptr) { - auto value = whereWrapper_->filter_->eval(getters); - if (!value.ok()) { - doError(std::move(value).status()); - return false; + record.clear(); + for (auto *column : yields_) { + auto *expr = column->expr(); + auto value = expr->eval(getters); + if (!value.ok()) { + doError(std::move(value).status()); + return false; + } + record.emplace_back(std::move(value.value())); } - if (!Expression::asBool(value.value())) { - continue; + // Check if duplicate + if (distinct_) { + auto ret = uniqResult->emplace(boost::hash_range(record.begin(), + record.end())); + if (!ret.second) { + continue; + } } - } - record.clear(); - for (auto *column : yields_) { - auto *expr = column->expr(); - auto value = expr->eval(getters); - if (!value.ok()) { - doError(std::move(value).status()); + auto cbStatus = cb(std::move(record), colTypes); + if (!cbStatus.ok()) { + LOG(ERROR) << cbStatus; + doError(std::move(cbStatus)); return false; } - record.emplace_back(std::move(value.value())); - } - // Check if duplicate - if (distinct_) { - auto ret = uniqResult->emplace(boost::hash_range(record.begin(), - record.end())); - if (!ret.second) { - continue; - } - } - auto cbStatus = cb(std::move(record), colTypes); - if (!cbStatus.ok()) { - LOG(ERROR) << cbStatus; - doError(std::move(cbStatus)); - return false; - } - } // for edges - } // for edata - } // for `vdata' - } // for `resp' + } // for edges + } // for edata + } // for `vdata' + } // for `resp' + } return true; } @@ -1344,5 +1339,10 @@ OptVariantType GoExecutor::getPropFromInterim(VertexID id, const std::string &pr DCHECK(index_ != nullptr); return index_->getColumnWithVID(rootId, prop); } + +void GoExecutor::joinResp(RpcResponse &&resp) { + records_.emplace_back(resp); +} + } // namespace graph } // namespace nebula diff --git a/src/graph/GoExecutor.h b/src/graph/GoExecutor.h index 934caa58f2d..9789f5ff8f3 100644 --- a/src/graph/GoExecutor.h +++ b/src/graph/GoExecutor.h @@ -70,12 +70,11 @@ class GoExecutor final : public TraverseExecutor { return curStep_ == steps_; } - /** - * To check if `UPTO' is specified. - * If so, we are supposed to apply the filter in each step. - */ - bool isUpto() const { - return upto_; + // is record the response data + // Won't return the properties(only dst) if false + // E.G 0-> 1 -> 2, two step + bool isRecord() const { + return curStep_ >= recordFrom_ && curStep_ <= steps_; } /** @@ -108,9 +107,9 @@ class GoExecutor final : public TraverseExecutor { StatusOr> getStepOutProps(); StatusOr> getDstProps(); - void fetchVertexProps(std::vector ids, RpcResponse &&rpcResp); + void fetchVertexProps(std::vector ids); - void maybeFinishExecution(RpcResponse &&rpcResp); + void maybeFinishExecution(); /** * To retrieve or generate the column names for the execution result. @@ -120,7 +119,8 @@ class GoExecutor final : public TraverseExecutor { /** * To retrieve the dst ids from a stepping out response. */ - StatusOr> getDstIdsFromResp(RpcResponse &rpcResp) const; + std::vector getDstIdsFromResps(std::vector::iterator begin, + std::vector::iterator end) const; /** * get the edgeName when over all edges @@ -129,13 +129,13 @@ class GoExecutor final : public TraverseExecutor { /** * All required data have arrived, finish the execution. */ - void finishExecution(RpcResponse &&rpcResp); + void finishExecution(); /** * To setup an intermediate representation of the execution result, * which is about to be piped to the next executor. */ - bool setupInterimResult(RpcResponse &&rpcResp, std::unique_ptr &result); + bool setupInterimResult(std::unique_ptr &result) const; /** * To setup the header of the execution result, i.e. the column names. @@ -154,9 +154,9 @@ class GoExecutor final : public TraverseExecutor { using Callback = std::function, const std::vector&)>; - bool processFinalResult(RpcResponse &rpcResp, Callback cb) const; + bool processFinalResult(Callback cb) const; - StatusOr> toThriftResponse(RpcResponse&& resp); + StatusOr> toThriftResponse() const; /** * A container to hold the mapping from vertex id to its properties, used for lookups @@ -204,12 +204,15 @@ class GoExecutor final : public TraverseExecutor { kPipe, }; + // Join the RPC response to previous data + void joinResp(RpcResponse &&resp); + private: GoSentence *sentence_{nullptr}; FromType fromType_{kInstantExpr}; + uint32_t recordFrom_{1}; uint32_t steps_{1}; uint32_t curStep_{1}; - bool upto_{false}; OverClause::Direction direction_{OverClause::Direction::kForward}; std::vector edgeTypes_; std::string *varname_{nullptr}; @@ -226,6 +229,8 @@ class GoExecutor final : public TraverseExecutor { std::unique_ptr vertexHolder_; std::unique_ptr backTracker_; std::unique_ptr resp_; + // Record the data of response in GO step + std::vector records_; // TODO(shylock) Join lose the data with duplicate input(VID) map bool uniqueStart_{false}; // #2087 Workaround // The name of Tag or Edge, index of prop in data diff --git a/src/graph/test/GoTest.cpp b/src/graph/test/GoTest.cpp index b24f2075358..d86da36dd61 100644 --- a/src/graph/test/GoTest.cpp +++ b/src/graph/test/GoTest.cpp @@ -312,7 +312,8 @@ TEST_P(GoTest, Distinct) { auto &player = players_["Tony Parker"]; auto *fmt = "GO 2 STEPS FROM %ld OVER like YIELD DISTINCT like._dst"; auto query = folly::stringPrintf(fmt, player.vid()); - client_->execute(query, resp); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); std::vector> expected = { {3394245602834314645}, {-7579316172763586624}, @@ -2369,6 +2370,203 @@ TEST_P(GoTest, Contains) { } } +TEST_P(GoTest, WithIntermediateData) { + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tony Parker"]; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like YIELD DISTINCT like._dst"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + std::vector> expected = { + {players_["Tony Parker"].vid()}, + {players_["Manu Ginobili"].vid()}, + {players_["LaMarcus Aldridge"].vid()}, + {players_["Tim Duncan"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + // With properties + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tony Parker"]; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like " + "YIELD DISTINCT like._dst, like.likeness, $$.player.name"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + std::vector> expected = { + {players_["Manu Ginobili"].vid(), 95, "Manu Ginobili"}, + {players_["LaMarcus Aldridge"].vid(), 90, "LaMarcus Aldridge"}, + {players_["Tim Duncan"].vid(), 95, "Tim Duncan"}, + {players_["Tony Parker"].vid(), 95, "Tony Parker"}, + {players_["Tony Parker"].vid(), 75, "Tony Parker"}, + {players_["Tim Duncan"].vid(), 75, "Tim Duncan"}, + {players_["Tim Duncan"].vid(), 90, "Tim Duncan"}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + // empty starts before last step + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO 1 TO 3 STEPS FROM %ld OVER serve"; + auto query = folly::stringPrintf(fmt, players_["Tim Duncan"].vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + std::vector> expected = { + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + + // reversely + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tony Parker"]; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like REVERSELY YIELD DISTINCT like._dst"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {players_["Tim Duncan"].vid()}, + {players_["LaMarcus Aldridge"].vid()}, + {players_["Marco Belinelli"].vid()}, + {players_["Boris Diaw"].vid()}, + {players_["Dejounte Murray"].vid()}, + {players_["Tony Parker"].vid()}, + {players_["Manu Ginobili"].vid()}, + {players_["Danny Green"].vid()}, + {players_["Aron Baynes"].vid()}, + {players_["Tiago Splitter"].vid()}, + {players_["Shaquile O'Neal"].vid()}, + {players_["Rudy Gay"].vid()}, + {players_["Damian Lillard"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + // empty starts before last step + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO 1 TO 3 STEPS FROM %ld OVER serve REVERSELY"; + auto query = folly::stringPrintf(fmt, teams_["Spurs"].vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + // Empty response + std::vector> expected = { + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + + // Bidirectionally + { + cpp2::ExecutionResponse resp; + auto &player = players_["Tony Parker"]; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like BIDIRECT YIELD DISTINCT like._dst"; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + + std::vector> expected = { + {players_["Tim Duncan"].vid()}, + {players_["LaMarcus Aldridge"].vid()}, + {players_["Marco Belinelli"].vid()}, + {players_["Boris Diaw"].vid()}, + {players_["Dejounte Murray"].vid()}, + {players_["Tony Parker"].vid()}, + {players_["Manu Ginobili"].vid()}, + {players_["Danny Green"].vid()}, + {players_["Aron Baynes"].vid()}, + {players_["Tiago Splitter"].vid()}, + {players_["Shaquile O'Neal"].vid()}, + {players_["Rudy Gay"].vid()}, + {players_["Damian Lillard"].vid()}, + {players_["LeBron James"].vid()}, + {players_["Russell Westbrook"].vid()}, + {players_["Chris Paul"].vid()}, + {players_["Kyle Anderson"].vid()}, + {players_["Kevin Durant"].vid()}, + {players_["James Harden"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + + // over * + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER * YIELD serve._dst, like._dst"; + const auto &player = players_["Russell Westbrook"]; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {teams_["Thunders"].vid(), 0}, + {0, players_["Paul George"].vid()}, + {0, players_["James Harden"].vid()}, + {teams_["Pacers"].vid(), 0}, + {teams_["Thunders"].vid(), 0}, + {0, players_["Russell Westbrook"].vid()}, + {teams_["Thunders"].vid(), 0}, + {teams_["Rockets"].vid(), 0}, + {0, players_["Russell Westbrook"].vid()}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + // With properties + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER * " + "YIELD serve._dst, like._dst, serve.start_year, like.likeness, $$.player.name"; + const auto &player = players_["Russell Westbrook"]; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {teams_["Thunders"].vid(), 0, 2008, 0, ""}, + {0, players_["Paul George"].vid(), 0, 90, "Paul George"}, + {0, players_["James Harden"].vid(), 0, 90, "James Harden"}, + {teams_["Pacers"].vid(), 0, 2010, 0, ""}, + {teams_["Thunders"].vid(), 0, 2017, 0, ""}, + {0, players_["Russell Westbrook"].vid(), 0, 95, "Russell Westbrook"}, + {teams_["Thunders"].vid(), 0, 2009, 0, ""}, + {teams_["Rockets"].vid(), 0, 2012, 0, ""}, + {0, players_["Russell Westbrook"].vid(), 0, 80, "Russell Westbrook"}, + }; + ASSERT_TRUE(verifyResult(resp, expected)); + } + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER * REVERSELY YIELD serve._dst, like._dst"; + const auto &player = players_["Russell Westbrook"]; + auto query = folly::stringPrintf(fmt, player.vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = { + {0, players_["Dejounte Murray"].vid()}, + {0, players_["James Harden"].vid()}, + {0, players_["Paul George"].vid()}, + {0, players_["Dejounte Murray"].vid()}, + {0, players_["Russell Westbrook"].vid()}, + {0, players_["Luka Doncic"].vid()}, + {0, players_["Russell Westbrook"].vid()}, + }; + } +} + +TEST_P(GoTest, ErrorMsg) { + { + cpp2::ExecutionResponse resp; + auto *fmt = "GO FROM %ld OVER serve YIELD $$.player.name as name"; + auto query = folly::stringPrintf(fmt, players_["Tim Duncan"].vid()); + auto code = client_->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + std::vector> expected = {""}; + ASSERT_TRUE(verifyResult(resp, expected)); + } +} + TEST_P(GoTest, issue2087) { // from input { @@ -2401,12 +2599,10 @@ TEST_P(GoTest, ZeroStep) { auto code = client_->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - // Empty response std::vector> expected = { }; ASSERT_TRUE(verifyResult(resp, expected)); } - { // a normal traversal cpp2::ExecutionResponse resp; @@ -2421,18 +2617,6 @@ TEST_P(GoTest, ZeroStep) { } } -TEST_P(GoTest, ErrorMsg) { - { - cpp2::ExecutionResponse resp; - auto *fmt = "GO FROM %ld OVER serve YIELD $$.player.name as name"; - auto query = folly::stringPrintf(fmt, players_["Tim Duncan"].vid()); - auto code = client_->execute(query, resp); - ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - std::vector> expected = {""}; - ASSERT_TRUE(verifyResult(resp, expected)); - } -} - INSTANTIATE_TEST_CASE_P(IfPushdownFilter, GoTest, ::testing::Bool()); } // namespace graph } // namespace nebula diff --git a/src/graph/test/TestBase.h b/src/graph/test/TestBase.h index 2838d3b0e4c..84bd8248142 100644 --- a/src/graph/test/TestBase.h +++ b/src/graph/test/TestBase.h @@ -200,6 +200,10 @@ class TestBase : public ::testing::Test { return TestOK(); } + if (resp.get_rows() == nullptr) { + return TestError() << "No response data"; + } + std::vector rows; try { rows = rowsToTuples(respToRecords(resp, std::move(ignoreColIndex))); diff --git a/src/parser/Clauses.cpp b/src/parser/Clauses.cpp index 5eb8b5ca419..80e3457de4f 100644 --- a/src/parser/Clauses.cpp +++ b/src/parser/Clauses.cpp @@ -13,10 +13,9 @@ namespace nebula { std::string StepClause::toString() const { std::string buf; buf.reserve(256); - if (isUpto()) { - buf += "UPTO "; - } - buf += std::to_string(steps_); + buf += std::to_string(step_.recordFrom_); + buf += " TO"; + buf += std::to_string(step_.recordTo_); buf += " STEPS"; return buf; } diff --git a/src/parser/Clauses.h b/src/parser/Clauses.h index 516c6ac2b2f..ec425e62b53 100644 --- a/src/parser/Clauses.h +++ b/src/parser/Clauses.h @@ -27,8 +27,8 @@ class Clause { }; struct Step { - uint32_t steps_{0}; - bool upto_{false}; + uint32_t recordFrom_; + uint32_t recordTo_; }; struct Where { @@ -55,31 +55,42 @@ class Clause { class StepClause final : public Clause { public: - explicit StepClause(uint64_t steps = 1, bool isUpto = false) { - steps_ = steps; - isUpto_ = isUpto; + explicit StepClause(uint32_t recordTo = 1) // Keep same with before + : step_({recordTo, recordTo}) { + // check range validation in parser + kind_ = Kind::kStepClause; + } + + // GO recoredFrom TO recordTo STEPS + // [recordFrom, recordTo] + explicit StepClause(uint32_t recordFrom, uint32_t recordTo) + : step_({recordFrom, recordTo}) { + // check range validation in parser kind_ = Kind::kStepClause; } Status prepare(Step &step) const { - step.steps_ = steps_; - step.upto_ = isUpto_; + step.recordFrom_ = step_.recordFrom_; + step.recordTo_ = step_.recordTo_; return Status::OK(); } uint32_t steps() const { - return steps_; + return step_.recordTo_; + } + + uint32_t recordFrom() const { + return step_.recordFrom_; } - bool isUpto() const { - return isUpto_; + uint32_t recordTo() const { + return step_.recordTo_; } std::string toString() const; private: - uint32_t steps_{1}; - bool isUpto_{false}; + Step step_; }; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 12482dfc2f5..116b33380f9 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -610,9 +610,13 @@ step_clause ifOutOfRange($1, @1); $$ = new StepClause($1); } - | KW_UPTO INTEGER KW_STEPS { - ifOutOfRange($2, @2); - $$ = new StepClause($2, true); + | INTEGER KW_TO INTEGER KW_STEPS { + ifOutOfRange($1, @2); + ifOutOfRange($3, @2); + if ($1 > $3) { + throw nebula::GraphParser::syntax_error(@1, "Invalid step range"); + } + $$ = new StepClause($1, $3); } ; @@ -942,10 +946,10 @@ find_path_sentence ; find_path_upto_clause - : %empty { $$ = new StepClause(5, true); } + : %empty { $$ = new StepClause(5); } | KW_UPTO INTEGER KW_STEPS { ifOutOfRange($2, @2); - $$ = new StepClause($2, true); + $$ = new StepClause($2); } ; diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 4292cd0e929..bb57ffdf84f 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -45,10 +45,22 @@ TEST(Parser, Go) { } { GQLParser parser; - std::string query = "GO UPTO 2 STEPS FROM 1 OVER friend"; + std::string query = "GO 2 TO 3 STEPS FROM 1 OVER friend"; auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "GO 2 TO 2 STEPS FROM 1 OVER friend"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } + { + GQLParser parser; + std::string query = "GO 3 TO 2 STEPS FROM 1 OVER friend"; + auto result = parser.parse(query); + ASSERT_FALSE(result.ok()) << result.status(); + } { GQLParser parser; std::string query = "GO FROM 1 OVER friend"; diff --git a/src/storage/client/StorageClient.h b/src/storage/client/StorageClient.h index d24fd56dbe9..67fdb2a0e0a 100644 --- a/src/storage/client/StorageClient.h +++ b/src/storage/client/StorageClient.h @@ -64,6 +64,10 @@ class StorageRpcResponse final { return responses_; } + const std::vector& responses() const { + return responses_; + } + const std::vector>& hostLatency() const { return hostLatency_; }