Skip to content

Commit

Permalink
Implement the multi query parts of openCypher (#3519)
Browse files Browse the repository at this point in the history
* Validate multi patterns and alternative of match with unwind.

* Impl the planner part.

* Add BiCartesianProduct, BiInnerJoin, BiLeftJoin.

* Todo argument support.

* Fix the join logic and fix col names.

* Impl BiCartesianProduct/BiLeftJoin/BiInnerJoin.

* Fix path alias output and fix the plan connect.

* Add argument node and ArgumentFinder.

* Support argument.

* Support exec ArgumentExecutor and do dedup in ArgumentExecutor.

* Do InnerJoin or CartesianProduct on multi query parts.

* Fix alias type when validate columns.

* Do not track previous path if traverse/appendv is the first one.

* Fix return all aliases.

* Fix query part without matchs.

* Fix connect query part and unwind.

* Fix with test.

* Fix traverse and appendvertices when track path.

* Fix start from mid.

* Fix test.

* Use null when left join.

* Add test for multi query parts.

* Fix aliases not pass to next query parts.

* Fix union.

* Connect two path plan.

* Refactor SegmentsConnector.

* Add some new tests and fix executor test.

* Fix test.

* Fix format.

* Fix build.

* Fix moveRow in Traverse and AppendVertices.

* Fix connect query part.

* Fix error msg.

* Forbid redefined alias in a single path pattern.
  • Loading branch information
CPWstatic committed Dec 29, 2021
1 parent 4962c25 commit fedffc8
Show file tree
Hide file tree
Showing 65 changed files with 1,544 additions and 807 deletions.
14 changes: 14 additions & 0 deletions src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,20 @@ void SequentialIter::doReset(size_t pos) {
iter_ = rows_->begin() + pos;
}

const Value& SequentialIter::getColumn(const std::string& col) const {
if (!valid()) {
return Value::kNullValue;
}
auto& row = *iter_;
auto index = colIndices_.find(col);
if (index == colIndices_.end()) {
return Value::kNullValue;
}

DCHECK_LT(index->second, row.values.size()) << "index: " << index->second << " row" << row;
return row.values[index->second];
}

const Value& SequentialIter::getColumn(int32_t index) const {
return getColumnByIndex(index, iter_);
}
Expand Down
14 changes: 1 addition & 13 deletions src/graph/context/Iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -465,19 +465,7 @@ class SequentialIter : public Iterator {
return rows_->size();
}

const Value& getColumn(const std::string& col) const override {
if (!valid()) {
return Value::kNullValue;
}
auto& row = *iter_;
auto index = colIndices_.find(col);
if (index == colIndices_.end()) {
return Value::kNullValue;
}

DCHECK_LT(index->second, row.values.size());
return row.values[index->second];
}
const Value& getColumn(const std::string& col) const override;

const Value& getColumn(int32_t index) const override;

Expand Down
98 changes: 98 additions & 0 deletions src/graph/context/Symbols.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,103 @@ std::string SymbolTable::toString() const {
return ss.str();
}

SymbolTable::SymbolTable(ObjectPool* objPool) {
DCHECK(objPool != nullptr);
objPool_ = objPool;
}

Variable* SymbolTable::newVariable(std::string name) {
VLOG(1) << "New variable for: " << name;
auto* variable = objPool_->makeAndAdd<Variable>(name);
addVar(std::move(name), variable);
return variable;
}

void SymbolTable::addVar(std::string varName, Variable* variable) {
vars_.emplace(std::move(varName), variable);
}

bool SymbolTable::readBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->readBy.emplace(node);
return true;
}

bool SymbolTable::writtenBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->writtenBy.emplace(node);
return true;
}

bool SymbolTable::deleteReadBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->readBy.erase(node);
return true;
}

bool SymbolTable::deleteWrittenBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
for (auto& alias : var->second->colNames) {
auto found = aliasGeneratedBy_.find(alias);
if (found != aliasGeneratedBy_.end()) {
if (found->second == varName) {
aliasGeneratedBy_.erase(alias);
}
}
}
var->second->writtenBy.erase(node);
return true;
}

bool SymbolTable::updateReadBy(const std::string& oldVar,
const std::string& newVar,
PlanNode* node) {
return deleteReadBy(oldVar, node) && readBy(newVar, node);
}

bool SymbolTable::updateWrittenBy(const std::string& oldVar,
const std::string& newVar,
PlanNode* node) {
return deleteWrittenBy(oldVar, node) && writtenBy(newVar, node);
}

Variable* SymbolTable::getVar(const std::string& varName) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return nullptr;
} else {
return var->second;
}
}

void SymbolTable::setAliasGeneratedBy(const std::vector<std::string>& aliases,
const std::string& varName) {
for (auto& alias : aliases) {
if (aliasGeneratedBy_.count(alias) == 0) {
aliasGeneratedBy_.emplace(alias, varName);
}
}
}

StatusOr<std::string> SymbolTable::getAliasGeneratedBy(const std::string& alias) {
auto found = aliasGeneratedBy_.find(alias);
if (found == aliasGeneratedBy_.end()) {
return Status::Error("Not found a variable that generates the alias: %s", alias.c_str());
} else {
return found->second;
}
}
} // namespace graph
} // namespace nebula
75 changes: 16 additions & 59 deletions src/graph/context/Symbols.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,81 +52,38 @@ struct Variable {

class SymbolTable final {
public:
explicit SymbolTable(ObjectPool* objPool) {
DCHECK(objPool != nullptr);
objPool_ = objPool;
}
explicit SymbolTable(ObjectPool* objPool);

Variable* newVariable(std::string name) {
VLOG(1) << "New variable for: " << name;
auto* variable = objPool_->makeAndAdd<Variable>(name);
addVar(std::move(name), variable);
return variable;
}
Variable* newVariable(std::string name);

void addVar(std::string varName, Variable* variable) {
vars_.emplace(std::move(varName), variable);
}
void addVar(std::string varName, Variable* variable);

bool readBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->readBy.emplace(node);
return true;
}
bool readBy(const std::string& varName, PlanNode* node);

bool writtenBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->writtenBy.emplace(node);
return true;
}
bool writtenBy(const std::string& varName, PlanNode* node);

bool deleteReadBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->readBy.erase(node);
return true;
}
bool deleteReadBy(const std::string& varName, PlanNode* node);

bool deleteWrittenBy(const std::string& varName, PlanNode* node) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return false;
}
var->second->writtenBy.erase(node);
return true;
}
bool deleteWrittenBy(const std::string& varName, PlanNode* node);

bool updateReadBy(const std::string& oldVar, const std::string& newVar, PlanNode* node) {
return deleteReadBy(oldVar, node) && readBy(newVar, node);
}
bool updateReadBy(const std::string& oldVar, const std::string& newVar, PlanNode* node);

bool updateWrittenBy(const std::string& oldVar, const std::string& newVar, PlanNode* node) {
return deleteWrittenBy(oldVar, node) && writtenBy(newVar, node);
}
bool updateWrittenBy(const std::string& oldVar, const std::string& newVar, PlanNode* node);

Variable* getVar(const std::string& varName) {
auto var = vars_.find(varName);
if (var == vars_.end()) {
return nullptr;
} else {
return var->second;
}
}
Variable* getVar(const std::string& varName);

void setAliasGeneratedBy(const std::vector<std::string>& aliases, const std::string& varName);

StatusOr<std::string> getAliasGeneratedBy(const std::string& alias);

std::string toString() const;

private:
ObjectPool* objPool_{nullptr};
// var name -> variable
std::unordered_map<std::string, Variable*> vars_;
// alias -> first variable that generate the alias
std::unordered_map<std::string, std::string> aliasGeneratedBy_;
};

} // namespace graph
Expand Down
39 changes: 28 additions & 11 deletions src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct WhereClauseContext final : CypherClauseContextBase {
WhereClauseContext() : CypherClauseContextBase(CypherClauseKind::kWhere) {}

Expression* filter{nullptr};
std::unordered_map<std::string, AliasType>* aliasesUsed{nullptr};
std::unordered_map<std::string, AliasType> aliasesAvailable;
};

struct OrderByClauseContext final : CypherClauseContextBase {
Expand All @@ -99,7 +99,7 @@ struct YieldClauseContext final : CypherClauseContextBase {

bool distinct{false};
const YieldColumns* yieldColumns{nullptr};
std::unordered_map<std::string, AliasType>* aliasesUsed{nullptr};
std::unordered_map<std::string, AliasType> aliasesAvailable;

bool hasAgg_{false};
bool needGenProject_{false};
Expand Down Expand Up @@ -128,14 +128,19 @@ struct WithClauseContext final : CypherClauseContextBase {
std::unordered_map<std::string, AliasType> aliasesGenerated;
};

struct MatchClauseContext final : CypherClauseContextBase {
MatchClauseContext() : CypherClauseContextBase(CypherClauseKind::kMatch) {}

struct Path final {
std::vector<NodeInfo> nodeInfos;
std::vector<EdgeInfo> edgeInfos;
PathBuildExpression* pathBuild{nullptr};
};

struct MatchClauseContext final : CypherClauseContextBase {
MatchClauseContext() : CypherClauseContextBase(CypherClauseKind::kMatch) {}

bool isOptional{false};
std::vector<Path> paths;
std::unique_ptr<WhereClauseContext> where;
std::unordered_map<std::string, AliasType>* aliasesUsed{nullptr};
std::unordered_map<std::string, AliasType> aliasesAvailable;
std::unordered_map<std::string, AliasType> aliasesGenerated;
};

Expand All @@ -145,14 +150,25 @@ struct UnwindClauseContext final : CypherClauseContextBase {
Expression* unwindExpr{nullptr};
std::string alias;

// TODO: refactor alias
std::unordered_map<std::string, AliasType>* aliasesUsed{nullptr};
std::unordered_map<std::string, AliasType> aliasesAvailable;
std::unordered_map<std::string, AliasType> aliasesGenerated;
};

// A QueryPart begin with an arbitrary number of MATCH clauses, followed by either
// (1) WITH and an optional UNWIND,
// (2) a single UNWIND,
// (3) a RETURN in case of the last query part.
struct QueryPart final {
std::vector<std::unique_ptr<MatchClauseContext>> matchs;
// A with/unwind/return
std::unique_ptr<CypherClauseContextBase> boundary;
std::unordered_map<std::string, AliasType> aliasesAvailable;
std::unordered_map<std::string, AliasType> aliasesGenerated;
};

struct MatchAstContext final : AstContext {
// Alternative of Match/Unwind/With and ends with Return.
std::vector<std::unique_ptr<CypherClauseContextBase>> clauses;
// A cypher query is made up of many QueryPart
struct CypherContext final : AstContext {
std::vector<QueryPart> queryParts;
};

struct PatternContext {
Expand All @@ -166,6 +182,7 @@ struct NodeContext final : PatternContext {
: PatternContext(PatternKind::kNode, m), info(i) {}

NodeInfo* info{nullptr};
std::unordered_set<std::string>* nodeAliasesAvailable;

// Output fields
ScanInfo scanInfo;
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nebula_add_library(
logic/PassThroughExecutor.cpp
logic/StartExecutor.cpp
logic/SelectExecutor.cpp
logic/ArgumentExecutor.cpp
query/AggregateExecutor.cpp
query/DedupExecutor.cpp
query/FilterExecutor.cpp
Expand Down
13 changes: 13 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "graph/executor/algo/ProduceAllPathsExecutor.h"
#include "graph/executor/algo/ProduceSemiShortestPathExecutor.h"
#include "graph/executor/algo/SubgraphExecutor.h"
#include "graph/executor/logic/ArgumentExecutor.h"
#include "graph/executor/logic/LoopExecutor.h"
#include "graph/executor/logic/PassThroughExecutor.h"
#include "graph/executor/logic/SelectExecutor.h"
Expand Down Expand Up @@ -524,6 +525,18 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kAppendVertices: {
return pool->add(new AppendVerticesExecutor(node, qctx));
}
case PlanNode::Kind::kBiLeftJoin: {
return pool->add(new BiLeftJoinExecutor(node, qctx));
}
case PlanNode::Kind::kBiInnerJoin: {
return pool->add(new BiInnerJoinExecutor(node, qctx));
}
case PlanNode::Kind::kBiCartesianProduct: {
return pool->add(new BiCartesianProductExecutor(node, qctx));
}
case PlanNode::Kind::kArgument: {
return pool->add(new ArgumentExecutor(node, qctx));
}
case PlanNode::Kind::kUnknown: {
LOG(FATAL) << "Unknown plan node kind " << static_cast<int32_t>(node->kind());
break;
Expand Down
17 changes: 17 additions & 0 deletions src/graph/executor/algo/CartesianProductExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,22 @@ void CartesianProductExecutor::doCartesianProduct(const DataSet& lds,
}
}

BiCartesianProductExecutor::BiCartesianProductExecutor(const PlanNode* node, QueryContext* qctx)
: CartesianProductExecutor(node, qctx) {
name_ = "BiCartesianProductExecutor";
}

folly::Future<Status> BiCartesianProductExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto* BiCP = asNode<BiCartesianProduct>(node());
const auto& lds = ectx_->getResult(BiCP->leftInputVar()).value().getDataSet();
const auto& rds = ectx_->getResult(BiCP->rightInputVar()).value().getDataSet();
DataSet result;
doCartesianProduct(lds, rds, result);
result.colNames = BiCP->colNames();
VLOG(1) << "Cartesian Product is : " << result;
return finish(ResultBuilder().value(Value(std::move(result))).build());
}
} // namespace graph
} // namespace nebula
Loading

0 comments on commit fedffc8

Please sign in to comment.