Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
v2.5.0 cherry-pick some bug fix from master. (#1299)
Browse files Browse the repository at this point in the history
* Fix the go return bad_type when the vertex without the tag (#1273)

Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>

* fix util functions (#1277)

small delete

* Check whether index is valid at runtime (#1291)

* Check whether index is valid at runtime

* Fix failed ut

* Fix steps

* drop the used space

* Format

* Replace the lastUser by user count.… (#1243)

* Replace the lastUser by user count.The last user maybe could run simultaneously, so can't determine which is real last user in timeline.

* Fix the typo.

Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>

* fix first plan node input var (#1298)

Co-authored-by: laura-ding <48548375+laura-ding@users.noreply.github.com>
Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>
Co-authored-by: kyle.cao <kyle.cao@vesoft.com>
Co-authored-by: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com>
  • Loading branch information
5 people committed Aug 3, 2021
1 parent 5368115 commit 6a8570f
Show file tree
Hide file tree
Showing 30 changed files with 225 additions and 240 deletions.
3 changes: 3 additions & 0 deletions src/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ const Value& GetNeighborsIter::getTagProp(const std::string& tag,
auto colId = index->second.colIdx;
auto& row = *currentRow_;
DCHECK_GT(row.size(), colId);
if (row[colId].empty()) {
return Value::kEmpty;
}
if (!row[colId].isList()) {
return Value::kNullBadType;
}
Expand Down
12 changes: 2 additions & 10 deletions src/context/Symbols.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,8 @@ struct Variable {
std::unordered_set<PlanNode*> readBy;
std::unordered_set<PlanNode*> writtenBy;

// None means will used in later
// non-positive means static lifetime
// positive means last user id
folly::Optional<int64_t> lastUser;

void setLastUser(int64_t id) {
if (!lastUser.hasValue()) {
lastUser = id;
}
}
// the count of use the variable
std::atomic<uint64_t> userCount;
};

class SymbolTable final {
Expand Down
13 changes: 9 additions & 4 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <folly/String.h>
#include <folly/executors/InlineExecutor.h>
#include <atomic>

#include "common/base/Memory.h"
#include "common/base/ObjectPool.h"
Expand Down Expand Up @@ -581,16 +582,20 @@ folly::Future<Status> Executor::error(Status status) const {
void Executor::drop() {
for (const auto &inputVar : node()->inputVars()) {
if (inputVar != nullptr) {
if (inputVar->lastUser.value() == node()->id()) {
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
// Make sure use the variable happened-before decrement count
if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) {
// Make sure drop happened-after count decrement
CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0);
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
}
}
}
}

Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->lastUser.hasValue()) {
if (!FLAGS_enable_lifetime_optimize ||
node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) {
numRows_ = result.size();
ectx_->setResult(node()->outputVar(), std::move(result));
} else {
Expand Down
12 changes: 11 additions & 1 deletion src/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "executor/query/IndexScanExecutor.h"

#include <algorithm>

#include "planner/plan/PlanNode.h"
#include "context/QueryContext.h"
#include "service/GraphFlags.h"
Expand All @@ -28,8 +30,16 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
DataSet dataSet({"dummy"});
return finish(ResultBuilder().value(Value(std::move(dataSet))).finish());
}

const auto &ictxs = lookup->queryContext();
auto iter = std::find_if(
ictxs.begin(), ictxs.end(), [](auto &ictx) { return !ictx.index_id_ref().is_set(); });
if (ictxs.empty() || iter != ictxs.end()) {
return Status::Error("There is no index to use at runtime");
}

return storageClient->lookupIndex(lookup->space(),
lookup->queryContext(),
ictxs,
lookup->isEdge(),
lookup->schemaId(),
lookup->returnColumns())
Expand Down
5 changes: 2 additions & 3 deletions src/executor/test/FilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ class FilterTest : public QueryTestBase {
#define FILTER_RESUTL_CHECK(inputName, outputName, sentence, expected) \
do { \
qctx_->symTable()->newVariable(outputName); \
auto* pool = qctx_->objPool(); \
auto yieldSentence = getYieldSentence(sentence, qctx_.get()); \
auto columns = yieldSentence->columns(); \
for (auto& col : columns) { \
col->setExpr(ExpressionUtils::rewriteLabelAttr2EdgeProp(pool, col->expr())); \
col->setExpr(ExpressionUtils::rewriteLabelAttr2EdgeProp(col->expr())); \
} \
auto* whereSentence = yieldSentence->where(); \
whereSentence->setFilter( \
ExpressionUtils::rewriteLabelAttr2EdgeProp(pool, whereSentence->filter())); \
ExpressionUtils::rewriteLabelAttr2EdgeProp(whereSentence->filter())); \
auto* filterNode = Filter::make(qctx_.get(), nullptr, yieldSentence->where()->filter()); \
filterNode->setInputVar(inputName); \
filterNode->setOutputVar(outputName); \
Expand Down
7 changes: 3 additions & 4 deletions src/optimizer/rule/PushFilterDownLeftJoinRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
const std::pair<std::string, int64_t>& leftVar = oldLeftJoinNode->leftVar();
auto symTable = octx->qctx()->symTable();
std::vector<std::string> leftVarColNames = symTable->getVar(leftVar.first)->colNames;
auto objPool = octx->qctx()->objPool();

// split the `condition` based on whether the varPropExpr comes from the left child
auto picker = [&leftVarColNames](const Expression* e) -> bool {
Expand All @@ -73,7 +72,7 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
};
Expression* filterPicked = nullptr;
Expression* filterUnpicked = nullptr;
graph::ExpressionUtils::splitFilter(objPool, condition, picker, &filterPicked, &filterUnpicked);
graph::ExpressionUtils::splitFilter(condition, picker, &filterPicked, &filterUnpicked);

if (!filterPicked) {
return TransformResult::noTransform();
Expand All @@ -83,7 +82,7 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
auto* newLeftFilterNode = graph::Filter::make(
octx->qctx(),
const_cast<graph::PlanNode*>(oldLeftJoinNode->dep()),
graph::ExpressionUtils::rewriteInnerVar(objPool, filterPicked, leftVar.first));
graph::ExpressionUtils::rewriteInnerVar(filterPicked, leftVar.first));
newLeftFilterNode->setInputVar(leftVar.first);
newLeftFilterNode->setColNames(leftVarColNames);
auto newFilterGroup = OptGroup::create(octx);
Expand All @@ -100,7 +99,7 @@ StatusOr<OptRule::TransformResult> PushFilterDownLeftJoinRule::transform(
std::vector<Expression*> newHashKeys;
for (auto* k : hashKeys) {
newHashKeys.emplace_back(
graph::ExpressionUtils::rewriteInnerVar(objPool, k, newLeftFilterOutputVar));
graph::ExpressionUtils::rewriteInnerVar(k, newLeftFilterOutputVar));
}
newLeftJoinNode->setHashKeys(newHashKeys);

Expand Down
3 changes: 1 addition & 2 deletions src/optimizer/rule/PushFilterDownProjectRule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ StatusOr<OptRule::TransformResult> PushFilterDownProjectRule::transform(
DCHECK_EQ(projNode->kind(), PlanNode::Kind::kProject);
const auto* oldProjNode = static_cast<const graph::Project*>(projNode);
const auto* condition = oldFilterNode->condition();
auto objPool = octx->qctx()->objPool();

auto projColNames = oldProjNode->colNames();
auto projColumns = oldProjNode->columns()->columns();
Expand Down Expand Up @@ -86,7 +85,7 @@ StatusOr<OptRule::TransformResult> PushFilterDownProjectRule::transform(
};
Expression* filterPicked = nullptr;
Expression* filterUnpicked = nullptr;
graph::ExpressionUtils::splitFilter(objPool, condition, picker, &filterPicked, &filterUnpicked);
graph::ExpressionUtils::splitFilter(condition, picker, &filterPicked, &filterUnpicked);

if (!filterPicked) {
return TransformResult::noTransform();
Expand Down
3 changes: 1 addition & 2 deletions src/planner/match/LabelIndexSeek.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ StatusOr<SubPlan> LabelIndexSeek::transformNode(NodeContext* nodeCtx) {
}
}
if (canBeEmbeded2IndexScan) {
auto* srcFilter =
ExpressionUtils::rewriteLabelAttr2TagProp(pool, flattenFilter);
auto* srcFilter = ExpressionUtils::rewriteLabelAttr2TagProp(flattenFilter);
storage::cpp2::IndexQueryContext ctx;
ctx.set_filter(Expression::encode(*srcFilter));
scan->setIndexQueryContext({ctx});
Expand Down
4 changes: 3 additions & 1 deletion src/planner/match/MatchPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ StatusOr<SubPlan> MatchPlanner::transform(AstContext* astCtx) {
}
}

auto finalPlan = connectSegments(subplans, matchCtx->clauses);
auto finalPlan = connectSegments(astCtx, subplans, matchCtx->clauses);
NG_RETURN_IF_ERROR(finalPlan);
return std::move(finalPlan).value();
}

StatusOr<SubPlan> MatchPlanner::connectSegments(
AstContext* astCtx,
std::vector<SubPlan>& subplans,
std::vector<std::unique_ptr<CypherClauseContextBase>>& clauses) {
DCHECK(!subplans.empty());
subplans.front().tail->setInputVar(astCtx->qctx->vctx()->anonVarGen()->getVar());
if (subplans.size() == 1) {
return subplans.front();
}
Expand Down
1 change: 1 addition & 0 deletions src/planner/match/MatchPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class MatchPlanner final : public Planner {

private:
StatusOr<SubPlan> connectSegments(
AstContext* astCtx,
std::vector<SubPlan>& subplans,
std::vector<std::unique_ptr<CypherClauseContextBase>>& clauses);
};
Expand Down
4 changes: 3 additions & 1 deletion src/scheduler/AsyncMsgNotifyBasedScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) :

folly::Future<Status> AsyncMsgNotifyBasedScheduler::schedule() {
if (FLAGS_enable_lifetime_optimize) {
qctx_->plan()->root()->outputVarPtr()->setLastUser(-1); // special for root
// special for root
qctx_->plan()->root()->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
analyzeLifetime(qctx_->plan()->root());
}
auto executor = Executor::create(qctx_->plan()->root(), qctx_);
Expand Down
22 changes: 17 additions & 5 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/

#include "scheduler/Scheduler.h"
#include <atomic>
#include <limits>

#include "context/QueryContext.h"
#include "executor/ExecutionError.h"
Expand All @@ -28,10 +30,15 @@ namespace graph {
const auto currentInLoop = std::get<1>(current);
for (auto& inputVar : currentNode->inputVars()) {
if (inputVar != nullptr) {
inputVar->setLastUser(
(currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop)
? -1
: currentNode->id());
if (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) {
inputVar->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
} else {
if (inputVar->userCount.load(std::memory_order_relaxed) !=
std::numeric_limits<uint64_t>::max()) {
inputVar->userCount.fetch_add(1, std::memory_order_relaxed);
}
}
}
}
stack.pop();
Expand All @@ -42,13 +49,18 @@ namespace graph {
switch (currentNode->kind()) {
case PlanNode::Kind::kSelect: {
auto sel = static_cast<const Select*>(currentNode);
// used by scheduler
sel->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
stack.push(std::make_tuple(sel->then(), currentInLoop));
stack.push(std::make_tuple(sel->otherwise(), currentInLoop));
break;
}
case PlanNode::Kind::kLoop: {
auto loop = static_cast<const Loop*>(currentNode);
loop->outputVarPtr()->setLastUser(-1);
// used by scheduler
loop->outputVarPtr()->userCount.store(std::numeric_limits<uint64_t>::max(),
std::memory_order_relaxed);
stack.push(std::make_tuple(loop->body(), true));
break;
}
Expand Down
Loading

0 comments on commit 6a8570f

Please sign in to comment.