Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Refactor join. #1263

Closed
wants to merge 13 commits into from
Closed
8 changes: 4 additions & 4 deletions src/executor/query/JoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ namespace graph {

Status JoinExecutor::checkInputDataSets() {
auto* join = asNode<Join>(node());
lhsIter_ = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter();
lhsIter_ = ectx_->getVersionedResult(join->leftInputVar(), join->leftVarVersion()).iter();
DCHECK(!!lhsIter_);
VLOG(1) << "lhs: " << join->leftVar().first << " " << lhsIter_->size();
VLOG(1) << "lhs: " << join->leftInputVar() << " " << lhsIter_->size();
if (lhsIter_->isGetNeighborsIter() || lhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "Join executor does not support " << lhsIter_->kind();
return Status::Error(ss.str());
}
rhsIter_ =
ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second).iter();
ectx_->getVersionedResult(join->rightInputVar(), join->rightVarVersion()).iter();
DCHECK(!!rhsIter_);
VLOG(1) << "rhs: " << join->rightVar().first << " " << rhsIter_->size();
VLOG(1) << "rhs: " << join->rightInputVar() << " " << rhsIter_->size();
if (rhsIter_->isGetNeighborsIter() || rhsIter_->isDefaultIter()) {
std::stringstream ss;
ss << "Join executor does not support " << rhsIter_->kind();
Expand Down
2 changes: 1 addition & 1 deletion src/executor/query/LeftJoinExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Status LeftJoinExecutor::close() {

folly::Future<Status> LeftJoinExecutor::join() {
auto* join = asNode<Join>(node());
auto& rhsResult = ectx_->getVersionedResult(join->rightVar().first, join->rightVar().second);
auto& rhsResult = ectx_->getVersionedResult(join->rightInputVar(), join->rightVarVersion());
rightColSize_ = rhsResult.valuePtr()->getDataSet().colNames.size();

auto& hashKeys = join->hashKeys();
Expand Down
87 changes: 73 additions & 14 deletions src/executor/test/JoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "context/QueryContext.h"
#include "planner/plan/Query.h"
#include "planner/plan/Logic.h"
#include "executor/query/InnerJoinExecutor.h"
#include "executor/query/LeftJoinExecutor.h"
#include "executor/test/QueryTestBase.h"
Expand Down Expand Up @@ -94,9 +95,14 @@ void JoinTest::testInnerJoin(std::string left, std::string right,
auto probe = VariablePropertyExpression::make(pool_, right, "_vid");
std::vector<Expression*> probeKeys = {probe};

auto* join =
InnerJoin::make(qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
std::move(probeKeys));
auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = InnerJoin::make(
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(std::vector<std::string>{
"src", "dst", kVid, "tag_prop", "edge_prop", kDst});

Expand Down Expand Up @@ -132,9 +138,14 @@ void JoinTest::testLeftJoin(std::string left, std::string right,
auto probe = VariablePropertyExpression::make(pool_, right, "dst");
std::vector<Expression*> probeKeys = {probe};

auto* join =
LeftJoin::make(qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
std::move(probeKeys));
auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = LeftJoin::make(
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(std::vector<std::string>{
kVid, "tag_prop", "edge_prop", kDst, "src", "dst"});

Expand Down Expand Up @@ -201,8 +212,14 @@ TEST_F(JoinTest, InnerJoinTwice) {
auto probe = VariablePropertyExpression::make(pool_, right, "_vid");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join =
InnerJoin::make(qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
InnerJoin::make(qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys),
std::move(probeKeys));
join->setColNames(
std::vector<std::string>{"src", "dst", kVid, "tag_prop", "edge_prop", kDst});
Expand All @@ -221,8 +238,14 @@ TEST_F(JoinTest, InnerJoinTwice) {
auto probe = VariablePropertyExpression::make(pool_, right, "col1");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join =
InnerJoin::make(qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys),
InnerJoin::make(qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys),
std::move(probeKeys));
join->setColNames(std::vector<std::string>{
"src", "dst", kVid, "tag_prop", "edge_prop", kDst, "col1"});
Expand Down Expand Up @@ -323,8 +346,14 @@ TEST_F(JoinTest, LeftJoinTwice) {
auto probe = VariablePropertyExpression::make(pool_, right, "dst");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = LeftJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst"});

Expand All @@ -341,8 +370,14 @@ TEST_F(JoinTest, LeftJoinTwice) {
auto probe = VariablePropertyExpression::make(pool_, right, "col1");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = LeftJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst", "col1"});

Expand Down Expand Up @@ -426,8 +461,14 @@ TEST_F(JoinTest, LeftJoinAndInnerjoin) {
auto probe = VariablePropertyExpression::make(pool_, right, "dst");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = LeftJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst"});

Expand All @@ -444,8 +485,14 @@ TEST_F(JoinTest, LeftJoinAndInnerjoin) {
auto probe = VariablePropertyExpression::make(pool_, right, "col1");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = InnerJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst", "col1"});

Expand Down Expand Up @@ -495,8 +542,14 @@ TEST_F(JoinTest, InnerJoinAndLeftjoin) {
auto probe = VariablePropertyExpression::make(pool_, right, "dst");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = InnerJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst"});

Expand All @@ -513,8 +566,14 @@ TEST_F(JoinTest, InnerJoinAndLeftjoin) {
auto probe = VariablePropertyExpression::make(pool_, right, "col1");
std::vector<Expression*> probeKeys = {probe};

auto* leftNode = StartNode::make(qctx_.get());
leftNode->setOutputVar(left);

auto* rightNode = StartNode::make(qctx_.get());
rightNode->setOutputVar(right);

auto* join = LeftJoin::make(
qctx_.get(), nullptr, {left, 0}, {right, 0}, std::move(hashKeys), std::move(probeKeys));
qctx_.get(), {leftNode, 0}, {rightNode, 0}, std::move(hashKeys), std::move(probeKeys));
join->setColNames(
std::vector<std::string>{kVid, "tag_prop", "edge_prop", kDst, "src", "dst", "col1"});

Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/OptGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ const PlanNode *OptGroupNode::getPlan() const {
auto loop = static_cast<Loop *>(node_);
loop->setBody(const_cast<PlanNode *>(bodies_[0]->getPlan()));
}
DCHECK_EQ(node_->numDeps(), dependencies_.size());
DCHECK_EQ(node_->numDeps(), dependencies_.size())
<< " node kind: " << node_->kind() << " node output: " << node_->outputVar();
for (size_t i = 0; i < node_->numDeps(); ++i) {
node_->setDep(i, dependencies_[i]->getPlan());
}
Expand Down
48 changes: 48 additions & 0 deletions src/optimizer/OptRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ const PlanNode *MatchedResult::planNode(const std::vector<int32_t> &pos) const {
return DCHECK_NOTNULL(result->node)->node();
}

std::string MatchedResult::toString() const {
std::stringstream ss;
std::queue<const MatchedResult*> queue;
std::unordered_set<const MatchedResult*> visited;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you can delete the visited variable.

queue.emplace(this);

while (!queue.empty()) {
auto* n = queue.front();
queue.pop();
visited.emplace(n);
ss << n->node->node()->outputVar() << ",";

for (auto& dep : n->dependencies) {
queue.emplace(&dep);
}
}

auto ret = ss.str();
ret.pop_back();
return ret;
}

Pattern Pattern::create(graph::PlanNode::Kind kind, std::initializer_list<Pattern> patterns) {
Pattern pattern;
pattern.kind_ = kind;
Expand Down Expand Up @@ -77,14 +99,40 @@ StatusOr<MatchedResult> Pattern::match(const OptGroup *group) const {
return Status::Error();
}

std::string Pattern::toString() const {
std::stringstream ss;
std::queue<const Pattern*> queue;
std::unordered_set<const Pattern*> visited;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

queue.emplace(this);
while (!queue.empty()) {
auto& p = queue.front();
queue.pop();
visited.emplace(p);

ss << p->kind_ << ",";
for (auto& pattern : p->dependencies_) {
queue.emplace(&pattern);
}
}

auto ret = ss.str();
ret.pop_back();
return ss.str();
}

StatusOr<MatchedResult> OptRule::match(OptContext *ctx, const OptGroupNode *groupNode) const {
VLOG(1) << "Apply rule: " + this->toString()
<< " to node: " << groupNode->node()->outputVar();
const auto &pattern = this->pattern();
auto status = pattern.match(groupNode);
NG_RETURN_IF_ERROR(status);
auto matched = std::move(status).value();
if (!this->match(ctx, matched)) {
return Status::Error();
}
VLOG(1) << "Hit rule: " + this->toString()
<< " pattern: " << pattern.toString()
<< " subtree of plan: " << matched.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you cleanup these debug logs?

return matched;
}

Expand Down
4 changes: 4 additions & 0 deletions src/optimizer/OptRule.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct MatchedResult {
// {0, 1, 0} | this->dependencies[1].dependencies[0]
// {0, 1, 0, 1} | this->dependencies[1].dependencies[0].dependencies[1]
const graph::PlanNode *planNode(const std::vector<int32_t> &pos = {}) const;

std::string toString() const;
};

class Pattern final {
Expand All @@ -49,6 +51,8 @@ class Pattern final {

StatusOr<MatchedResult> match(const OptGroupNode *groupNode) const;

std::string toString() const;

private:
Pattern() = default;
StatusOr<MatchedResult> match(const OptGroup *group) const;
Expand Down
19 changes: 10 additions & 9 deletions src/optimizer/rule/PushFilterDownLeftJoinRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const Pattern& PushFilterDownLeftJoinRule::pattern() const {
}

StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
// TODO: Need a refactor after the join refactored
OptContext* octx,
const MatchedResult& matched) const {
auto* filterGroupNode = matched.node;
Expand All @@ -45,9 +46,10 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
auto* oldLeftJoinNode = static_cast<graph::LeftJoin*>(leftJoinNode);
const auto* condition = static_cast<graph::Filter*>(oldFilterNode)->condition();
DCHECK(condition);
const std::pair<std::string, int64_t>& leftVar = oldLeftJoinNode->leftVar();
const std::string& leftVar = oldLeftJoinNode->leftInputVar();
auto symTable = octx->qctx()->symTable();
std::vector<std::string> leftVarColNames = symTable->getVar(leftVar.first)->colNames;
std::vector<std::string> leftVarColNames = symTable->getVar(leftVar)->colNames;
auto rightDepGroupNode = leftJoinGroupNode->dependencies()[1];

// split the `condition` based on whether the varPropExpr comes from the left child
auto picker = [&leftVarColNames](const Expression* e) -> bool {
Expand Down Expand Up @@ -82,14 +84,13 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
auto* newLeftFilterNode = graph::Filter::make(
octx->qctx(),
const_cast<graph::PlanNode*>(oldLeftJoinNode->dep()),
graph::ExpressionUtils::rewriteInnerVar(filterPicked, leftVar.first));
newLeftFilterNode->setInputVar(leftVar.first);
graph::ExpressionUtils::rewriteInnerVar(filterPicked, leftVar));
newLeftFilterNode->setInputVar(leftVar);
newLeftFilterNode->setColNames(leftVarColNames);
auto newFilterGroup = OptGroup::create(octx);
auto newFilterGroupNode = newFilterGroup->makeGroupNode(newLeftFilterNode);
for (auto dep : leftJoinGroupNode->dependencies()) {
newFilterGroupNode->dependsOn(dep);
}
DCHECK_EQ(leftJoinGroupNode->dependencies().size(), 2);
newFilterGroupNode->dependsOn(leftJoinGroupNode->dependencies()[0]);
auto newLeftFilterOutputVar = newLeftFilterNode->outputVar();

// produce new LeftJoin node
Expand All @@ -115,14 +116,14 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
auto newLeftJoinGroup = OptGroup::create(octx);
auto newLeftJoinGroupNode = newLeftJoinGroup->makeGroupNode(newLeftJoinNode);
newAboveFilterGroupNode->setDeps({newLeftJoinGroup});
newLeftJoinGroupNode->setDeps({newFilterGroup});
newLeftJoinGroupNode->setDeps({newFilterGroup, rightDepGroupNode});
result.newGroupNodes.emplace_back(newAboveFilterGroupNode);
} else {
newLeftJoinNode->setOutputVar(oldFilterNode->outputVar());
newLeftJoinNode->setColNames(oldLeftJoinNode->colNames());
auto newLeftJoinGroupNode =
OptGroupNode::create(octx, newLeftJoinNode, filterGroupNode->group());
newLeftJoinGroupNode->setDeps({newFilterGroup});
newLeftJoinGroupNode->setDeps({newFilterGroup, rightDepGroupNode});
result.newGroupNodes.emplace_back(newLeftJoinGroupNode);
}
return result;
Expand Down
5 changes: 2 additions & 3 deletions src/planner/match/InnerJoinStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ PlanNode* InnerJoinStrategy::joinDataSet(const PlanNode* left, const PlanNode* r
}

auto join = InnerJoin::make(qctx_,
const_cast<PlanNode*>(right),
{left->outputVar(), 0},
{right->outputVar(), 0},
{const_cast<PlanNode*>(left), ExecutionContext::kLatestVersion},
{const_cast<PlanNode*>(right), ExecutionContext::kLatestVersion},
{buildExpr},
{probeExpr});
std::vector<std::string> colNames = left->colNames();
Expand Down
Loading