Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GO M TO N STEPS which record the data in go traversal. #2091

Merged
merged 10 commits into from May 22, 2020
6 changes: 3 additions & 3 deletions src/graph/FindPathExecutor.cpp
Expand Up @@ -148,7 +148,7 @@ void FindPathExecutor::execute() {
return;
}

steps_ = step_.steps_ / 2 + step_.steps_ % 2;
steps_ = step_.recordTo_ / 2 + step_.recordTo_ % 2;
fromVids_ = from_.vids_;
toVids_ = to_.vids_;
visitedFrom_.insert(fromVids_.begin(), fromVids_.end());
Expand Down Expand Up @@ -291,7 +291,7 @@ inline void FindPathExecutor::meetOddPath(VertexID src, VertexID dst, Neighbor &
for (auto i = rangeF.first; i != rangeF.second; ++i) {
auto rangeT = pathTo_.equal_range(dst);
for (auto j = rangeT.first; j != rangeT.second; ++j) {
if (j->second.size() + i->second.size() > step_.steps_) {
if (j->second.size() + i->second.size() > step_.recordTo_) {
continue;
}
// Build path:
Expand Down Expand Up @@ -336,7 +336,7 @@ inline void FindPathExecutor::meetEvenPath(VertexID intersectId) {
auto rangeT = pathTo_.equal_range(intersectId);
for (auto i = rangeF.first; i != rangeF.second; ++i) {
for (auto j = rangeT.first; j != rangeT.second; ++j) {
if (j->second.size() + i->second.size() > step_.steps_) {
if (j->second.size() + i->second.size() > step_.recordTo_) {
continue;
}
// Build path:
Expand Down
491 changes: 246 additions & 245 deletions src/graph/GoExecutor.cpp

Large diffs are not rendered by default.

33 changes: 19 additions & 14 deletions src/graph/GoExecutor.h
Expand Up @@ -70,12 +70,11 @@ class GoExecutor final : public TraverseExecutor {
return curStep_ == steps_;
}

/**
* To check if `UPTO' is specified.
* If so, we are supposed to apply the filter in each step.
*/
bool isUpto() const {
return upto_;
// is record the response data
// E.G 0-> 1 -> 2, two step
bool isRecord() const {
// Record all for get next start even if user not require this response data
return true;
}

/**
Expand Down Expand Up @@ -108,9 +107,9 @@ class GoExecutor final : public TraverseExecutor {
StatusOr<std::vector<storage::cpp2::PropDef>> getStepOutProps();
StatusOr<std::vector<storage::cpp2::PropDef>> getDstProps();

void fetchVertexProps(std::vector<VertexID> ids, RpcResponse &&rpcResp);
void fetchVertexProps(std::vector<VertexID> ids);

void maybeFinishExecution(RpcResponse &&rpcResp);
void maybeFinishExecution();

/**
* To retrieve or generate the column names for the execution result.
Expand All @@ -120,7 +119,8 @@ class GoExecutor final : public TraverseExecutor {
/**
* To retrieve the dst ids from a stepping out response.
*/
StatusOr<std::vector<VertexID>> getDstIdsFromResp(RpcResponse &rpcResp) const;
std::vector<VertexID> getDstIdsFromResps(std::vector<RpcResponse>::iterator begin,
std::vector<RpcResponse>::iterator end) const;

/**
* get the edgeName when over all edges
Expand All @@ -129,13 +129,13 @@ class GoExecutor final : public TraverseExecutor {
/**
* All required data have arrived, finish the execution.
*/
void finishExecution(RpcResponse &&rpcResp);
void finishExecution();

/**
* To setup an intermediate representation of the execution result,
* which is about to be piped to the next executor.
*/
bool setupInterimResult(RpcResponse &&rpcResp, std::unique_ptr<InterimResult> &result);
bool setupInterimResult(std::unique_ptr<InterimResult> &result) const;

/**
* To setup the header of the execution result, i.e. the column names.
Expand All @@ -154,9 +154,9 @@ class GoExecutor final : public TraverseExecutor {
using Callback = std::function<Status(std::vector<VariantType>,
const std::vector<nebula::cpp2::SupportedType>&)>;

bool processFinalResult(RpcResponse &rpcResp, Callback cb) const;
bool processFinalResult(Callback cb) const;

StatusOr<std::vector<cpp2::RowValue>> toThriftResponse(RpcResponse&& resp);
StatusOr<std::vector<cpp2::RowValue>> toThriftResponse() const;

/**
* A container to hold the mapping from vertex id to its properties, used for lookups
Expand Down Expand Up @@ -204,12 +204,15 @@ class GoExecutor final : public TraverseExecutor {
kPipe,
};

// Join the RPC response to previous data
void joinResp(RpcResponse &&resp);

private:
GoSentence *sentence_{nullptr};
FromType fromType_{kInstantExpr};
uint32_t recordFrom_{1};
uint32_t steps_{1};
uint32_t curStep_{1};
bool upto_{false};
OverClause::Direction direction_{OverClause::Direction::kForward};
std::vector<EdgeType> edgeTypes_;
std::string *varname_{nullptr};
Expand All @@ -226,6 +229,8 @@ class GoExecutor final : public TraverseExecutor {
std::unique_ptr<VertexHolder> vertexHolder_;
std::unique_ptr<VertexBackTracker> backTracker_;
std::unique_ptr<cpp2::ExecutionResponse> resp_;
// Record the data of response in GO step
std::vector<RpcResponse> records_;
// The name of Tag or Edge, index of prop in data
using SchemaPropIndex = std::unordered_map<std::pair<std::string, std::string>, int64_t>;
};
Expand Down
145 changes: 144 additions & 1 deletion src/graph/test/GoTest.cpp
Expand Up @@ -312,7 +312,8 @@ TEST_P(GoTest, Distinct) {
auto &player = players_["Tony Parker"];
auto *fmt = "GO 2 STEPS FROM %ld OVER like YIELD DISTINCT like._dst";
auto query = folly::stringPrintf(fmt, player.vid());
client_->execute(query, resp);
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<VertexID>> expected = {
{3394245602834314645},
{-7579316172763586624},
Expand Down Expand Up @@ -2350,6 +2351,148 @@ TEST_P(GoTest, Contains) {
}
}

TEST_P(GoTest, WithIntermediateData) {
{
cpp2::ExecutionResponse resp;
auto &player = players_["Tony Parker"];
auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like YIELD DISTINCT like._dst";
Shylock-Hg marked this conversation as resolved.
Show resolved Hide resolved
auto query = folly::stringPrintf(fmt, player.vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);

std::vector<std::tuple<VertexID>> expected = {
{players_["Tony Parker"].vid()},
{players_["Manu Ginobili"].vid()},
{players_["LaMarcus Aldridge"].vid()},
{players_["Tim Duncan"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
// empty starts before last step
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO 1 TO 3 STEPS FROM %ld OVER serve";
auto query = folly::stringPrintf(fmt, players_["Tim Duncan"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);

std::vector<std::tuple<int64_t>> expected = {
};
ASSERT_TRUE(verifyResult(resp, expected));
}

// reversely
{
cpp2::ExecutionResponse resp;
auto &player = players_["Tony Parker"];
auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like REVERSELY YIELD DISTINCT like._dst";
auto query = folly::stringPrintf(fmt, player.vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<VertexID>> expected = {
{players_["Tim Duncan"].vid()},
{players_["LaMarcus Aldridge"].vid()},
{players_["Marco Belinelli"].vid()},
{players_["Boris Diaw"].vid()},
{players_["Dejounte Murray"].vid()},
{players_["Tony Parker"].vid()},
{players_["Manu Ginobili"].vid()},
{players_["Danny Green"].vid()},
{players_["Aron Baynes"].vid()},
{players_["Tiago Splitter"].vid()},
{players_["Shaquile O'Neal"].vid()},
{players_["Rudy Gay"].vid()},
{players_["Damian Lillard"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
// empty starts before last step
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO 1 TO 3 STEPS FROM %ld OVER serve REVERSELY";
auto query = folly::stringPrintf(fmt, teams_["Spurs"].vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);

std::vector<std::tuple<int64_t>> expected = {
};
ASSERT_TRUE(verifyResult(resp, expected));
}

// Bidirectionally
{
cpp2::ExecutionResponse resp;
auto &player = players_["Tony Parker"];
auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER like BIDIRECT YIELD DISTINCT like._dst";
auto query = folly::stringPrintf(fmt, player.vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);

std::vector<std::tuple<VertexID>> expected = {
{players_["Tim Duncan"].vid()},
{players_["LaMarcus Aldridge"].vid()},
{players_["Marco Belinelli"].vid()},
{players_["Boris Diaw"].vid()},
{players_["Dejounte Murray"].vid()},
{players_["Tony Parker"].vid()},
{players_["Manu Ginobili"].vid()},
{players_["Danny Green"].vid()},
{players_["Aron Baynes"].vid()},
{players_["Tiago Splitter"].vid()},
{players_["Shaquile O'Neal"].vid()},
{players_["Rudy Gay"].vid()},
{players_["Damian Lillard"].vid()},
{players_["LeBron James"].vid()},
{players_["Russell Westbrook"].vid()},
{players_["Chris Paul"].vid()},
{players_["Kyle Anderson"].vid()},
{players_["Kevin Durant"].vid()},
{players_["James Harden"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}

// over *
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER * YIELD serve._dst, like._dst";
const auto &player = players_["Russell Westbrook"];
auto query = folly::stringPrintf(fmt, player.vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t, int64_t>> expected = {
{teams_["Thunders"].vid(), 0},
{0, players_["Paul George"].vid()},
{0, players_["James Harden"].vid()},
{teams_["Pacers"].vid(), 0},
{teams_["Thunders"].vid(), 0},
{0, players_["Russell Westbrook"].vid()},
{teams_["Thunders"].vid(), 0},
{teams_["Rockets"].vid(), 0},
{0, players_["Russell Westbrook"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
{
cpp2::ExecutionResponse resp;
auto *fmt = "GO 1 TO 2 STEPS FROM %ld OVER * REVERSELY YIELD serve._dst, like._dst";
const auto &player = players_["Russell Westbrook"];
auto query = folly::stringPrintf(fmt, player.vid());
auto code = client_->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
std::vector<std::tuple<int64_t, int64_t>> expected = {
{0, players_["Dejounte Murray"].vid()},
{0, players_["James Harden"].vid()},
{0, players_["Paul George"].vid()},
{0, players_["Dejounte Murray"].vid()},
{0, players_["Russell Westbrook"].vid()},
{0, players_["Luka Doncic"].vid()},
{0, players_["Russell Westbrook"].vid()},
};
ASSERT_TRUE(verifyResult(resp, expected));
}
}

INSTANTIATE_TEST_CASE_P(IfPushdownFilter, GoTest, ::testing::Bool());
} // namespace graph
} // namespace nebula
4 changes: 4 additions & 0 deletions src/graph/test/TestBase.h
Expand Up @@ -200,6 +200,10 @@ class TestBase : public ::testing::Test {
return TestOK();
}

if (resp.get_rows() == nullptr) {
return TestError() << "No response data";
}

std::vector<Tuple> rows;
try {
rows = rowsToTuples<Tuple>(respToRecords(resp, std::move(ignoreColIndex)));
Expand Down
7 changes: 3 additions & 4 deletions src/parser/Clauses.cpp
Expand Up @@ -13,10 +13,9 @@ namespace nebula {
std::string StepClause::toString() const {
std::string buf;
buf.reserve(256);
if (isUpto()) {
buf += "UPTO ";
}
buf += std::to_string(steps_);
buf += std::to_string(step_.recordFrom_);
buf += " TO";
buf += std::to_string(step_.recordTo_);
buf += " STEPS";
return buf;
}
Expand Down
35 changes: 23 additions & 12 deletions src/parser/Clauses.h
Expand Up @@ -27,8 +27,8 @@ class Clause {
};

struct Step {
uint32_t steps_{0};
bool upto_{false};
uint32_t recordFrom_;
uint32_t recordTo_;
};

struct Where {
Expand All @@ -55,31 +55,42 @@ class Clause {

class StepClause final : public Clause {
public:
explicit StepClause(uint64_t steps = 1, bool isUpto = false) {
steps_ = steps;
isUpto_ = isUpto;
explicit StepClause(uint32_t recordTo = 1) // Keep same with before
: step_({recordTo, recordTo}) {
// check range validation in parser
kind_ = Kind::kStepClause;
}

// GO recoredFrom TO recordTo STEPS
// [recordFrom, recordTo]
explicit StepClause(uint32_t recordFrom, uint32_t recordTo)
: step_({recordFrom, recordTo}) {
// check range validation in parser
kind_ = Kind::kStepClause;
}

Status prepare(Step &step) const {
step.steps_ = steps_;
step.upto_ = isUpto_;
step.recordFrom_ = step_.recordFrom_;
step.recordTo_ = step_.recordTo_;
return Status::OK();
}

uint32_t steps() const {
return steps_;
return step_.recordTo_;
}

uint32_t recordFrom() const {
return step_.recordFrom_;
}

bool isUpto() const {
return isUpto_;
uint32_t recordTo() const {
return step_.recordTo_;
}

std::string toString() const;

private:
uint32_t steps_{1};
bool isUpto_{false};
Step step_;
};


Expand Down
14 changes: 9 additions & 5 deletions src/parser/parser.yy
Expand Up @@ -610,9 +610,13 @@ step_clause
ifOutOfRange($1, @1);
$$ = new StepClause($1);
}
| KW_UPTO INTEGER KW_STEPS {
ifOutOfRange($2, @2);
$$ = new StepClause($2, true);
| INTEGER KW_TO INTEGER KW_STEPS {
monadbobo marked this conversation as resolved.
Show resolved Hide resolved
ifOutOfRange($1, @2);
ifOutOfRange($3, @2);
if ($1 > $3) {
throw nebula::GraphParser::syntax_error(@1, "Invalid step range");
}
$$ = new StepClause($1, $3);
}
;

Expand Down Expand Up @@ -942,10 +946,10 @@ find_path_sentence
;

find_path_upto_clause
: %empty { $$ = new StepClause(5, true); }
: %empty { $$ = new StepClause(5); }
| KW_UPTO INTEGER KW_STEPS {
ifOutOfRange($2, @2);
$$ = new StepClause($2, true);
$$ = new StepClause($2);
}
;

Expand Down