diff --git a/src/context/Symbols.h b/src/context/Symbols.h index b77073b71..4b6ca906a 100644 --- a/src/context/Symbols.h +++ b/src/context/Symbols.h @@ -47,16 +47,8 @@ struct Variable { std::unordered_set readBy; std::unordered_set writtenBy; - // None means will used in later - // non-positive means static lifetime - // positive means last user id - folly::Optional lastUser; - - void setLastUser(int64_t id) { - if (!lastUser.hasValue()) { - lastUser = id; - } - } + // the count of use the variable + std::atomic userCount; }; class SymbolTable final { diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 6323d3c6f..08ddf02e4 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -8,6 +8,7 @@ #include #include +#include #include "common/base/Memory.h" #include "common/base/ObjectPool.h" @@ -581,16 +582,20 @@ folly::Future Executor::error(Status status) const { void Executor::drop() { for (const auto &inputVar : node()->inputVars()) { if (inputVar != nullptr) { - if (inputVar->lastUser.value() == node()->id()) { - ectx_->dropResult(inputVar->name); - VLOG(1) << "Drop variable " << node()->outputVar(); + // Make sure use the variable happened-before decrement count + if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) { + // Make sure drop happened-after count decrement + CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0); + ectx_->dropResult(inputVar->name); + VLOG(1) << "Drop variable " << node()->outputVar(); } } } } Status Executor::finish(Result &&result) { - if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->lastUser.hasValue()) { + if (!FLAGS_enable_lifetime_optimize || + node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) { numRows_ = result.size(); ectx_->setResult(node()->outputVar(), std::move(result)); } else { diff --git a/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp b/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp index eea6c9eb2..e658079d9 100644 --- a/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp +++ b/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp @@ -16,7 +16,9 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) : folly::Future AsyncMsgNotifyBasedScheduler::schedule() { if (FLAGS_enable_lifetime_optimize) { - qctx_->plan()->root()->outputVarPtr()->setLastUser(-1); // special for root + // special for root + qctx_->plan()->root()->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); analyzeLifetime(qctx_->plan()->root()); } auto executor = Executor::create(qctx_->plan()->root(), qctx_); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index d76325367..a233accf5 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -5,6 +5,8 @@ */ #include "scheduler/Scheduler.h" +#include +#include #include "context/QueryContext.h" #include "executor/ExecutionError.h" @@ -28,10 +30,15 @@ namespace graph { const auto currentInLoop = std::get<1>(current); for (auto& inputVar : currentNode->inputVars()) { if (inputVar != nullptr) { - inputVar->setLastUser( - (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) - ? -1 - : currentNode->id()); + if (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) { + inputVar->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); + } else { + if (inputVar->userCount.load(std::memory_order_relaxed) != + std::numeric_limits::max()) { + inputVar->userCount.fetch_add(1, std::memory_order_relaxed); + } + } } } stack.pop(); @@ -42,13 +49,18 @@ namespace graph { switch (currentNode->kind()) { case PlanNode::Kind::kSelect: { auto sel = static_cast(currentNode); + // used by scheduler + sel->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); stack.push(std::make_tuple(sel->then(), currentInLoop)); stack.push(std::make_tuple(sel->otherwise(), currentInLoop)); break; } case PlanNode::Kind::kLoop: { auto loop = static_cast(currentNode); - loop->outputVarPtr()->setLastUser(-1); + // used by scheduler + loop->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); stack.push(std::make_tuple(loop->body(), true)); break; }