Skip to content

Commit

Permalink
add more check & add memory stats log flag
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Dec 23, 2022
1 parent e896092 commit 6c4995a
Show file tree
Hide file tree
Showing 31 changed files with 545 additions and 256 deletions.
1 change: 1 addition & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ ThreadMemoryStats::~ThreadMemoryStats() {
// Return to global any reserved bytes on destruction
if (reserved != 0) {
MemoryStats::instance().freeGlobal(reserved);
DLOG(INFO) << std::this_thread::get_id() << " return reserved " << reserved;
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
namespace nebula {
namespace memory {

class MemoryStats;

// Memory stats for each thread.
struct ThreadMemoryStats {
ThreadMemoryStats();
Expand Down
23 changes: 18 additions & 5 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ DEFINE_string(cgroup_v2_memory_current_path,

DEFINE_bool(memory_purge_enabled, true, "memory purge enabled, default true");
DEFINE_int32(memory_purge_interval_seconds, 10, "memory purge interval in seconds, default 10");
DEFINE_bool(memory_stats_detail_log, false, "print memory stats detail log");

using nebula::fs::FileUtils;

Expand Down Expand Up @@ -120,11 +121,23 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
}

// print system & application level memory stats
DLOG(INFO) << " sys_used: " << static_cast<int64_t>(total - available) << " sys_total: " << total
<< " sys_ratio:" << (1 - available / total)
<< " usr_used:" << memory::MemoryStats::instance().used()
<< " usr_total:" << memory::MemoryStats::instance().getLimit()
<< " usr_ratio:" << memory::MemoryStats::instance().usedRatio();
// sys: read from system environment, varies depends on environment:
// container: controlled by cgroup,
// used: read from memory.current in cgroup path
// total: read from memory.max in cgroup path
// physical machine: judge by system level memory consumption
// used: current used memory of the system
// total: all physical memory installed
// usr: record by current process's MemoryStats
// used: bytes allocated by new operator
// total: sys_total * FLAGS_system_memory_high_watermark_ratio
if (FLAGS_memory_stats_detail_log) {
LOG(INFO) << " sys_used: " << static_cast<int64_t>(total - available) << " sys_total: " << total
<< " sys_ratio:" << (1 - available / total)
<< " usr_used:" << memory::MemoryStats::instance().used()
<< " usr_total:" << memory::MemoryStats::instance().getLimit()
<< " usr_ratio:" << memory::MemoryStats::instance().usedRatio();
}

auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio;
LOG_IF_EVERY_N(WARNING, hits, 100)
Expand Down
5 changes: 5 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ class Executor : private boost::noncopyable, private cpp::NonMovable {
// Throw runtime error to stop whole execution early
folly::Future<Status> error(Status status) const;

static Status memoryExceededStatus() {
return Status::Error("Graph Error: GRAPH_MEMORY_EXCEEDED(%d)",
static_cast<int32_t>(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED));
}

protected:
static Executor *makeExecutor(const PlanNode *node,
QueryContext *qctx,
Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class StorageAccessExecutor : public Executor {
"Storage Error: Part {} raft buffer is full. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
case nebula::cpp2::ErrorCode::E_STORAGE_MEMORY_EXCEEDED:
return Status::Error("Storage Error: STORAGE_MEMORY_EXCEEDED(%d)",
static_cast<int32_t>(code));
default:
auto status = Status::Error("Storage Error: part: %d, error: %s(%d).",
partId,
Expand Down
26 changes: 20 additions & 6 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {
ds.colNames = pathNode_->colNames();
ds.rows.swap(currentDs_.rows);
return finish(ResultBuilder().value(Value(std::move(ds))).build());
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) { return folly::makeFuture<Status>(memoryExceededStatus()); })
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -147,12 +153,20 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
}
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
});
return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return Status::OK();
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) { return folly::makeFuture<Status>(memoryExceededStatus()); })
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

DataSet BFSShortestPathExecutor::doConjunct(const std::vector<Value>& meetVids,
Expand Down
127 changes: 80 additions & 47 deletions src/graph/executor/algo/BatchShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ folly::Future<Status> BatchShortestPath::execute(const HashSet& startVids,
result->append(std::move(ds));
}
return Status::OK();
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -106,6 +113,13 @@ folly::Future<Status> BatchShortestPath::shortestPath(size_t rowNum, size_t step
}
}
return handleResponse(rowNum, stepNum);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -139,6 +153,13 @@ folly::Future<Status> BatchShortestPath::getNeighbors(size_t rowNum, size_t step
.thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) {
addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse);
return buildPath(rowNum, std::move(resp), reverse);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -294,6 +315,13 @@ folly::Future<Status> BatchShortestPath::handleResponse(size_t rowNum, size_t st
leftPathMap.clear();
rightPathMap.clear();
return shortestPath(rowNum, stepNum + 1);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -347,58 +375,63 @@ folly::Future<bool> BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep)
}

auto future = getMeetVids(rowNum, oddStep, meetVids);
return future.via(qctx_->rctx()->runner()).thenValue([this, rowNum, oddStep](auto&& vertices) {
if (vertices.empty()) {
return false;
}
robin_hood::unordered_flat_map<Value, Value, std::hash<Value>> verticesMap;
for (auto& vertex : vertices) {
verticesMap[vertex.getVertex().vid] = std::move(vertex);
}
auto& terminationMap = terminationMaps_[rowNum];
auto& leftPathMaps = currentLeftPathMaps_[rowNum];
auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum];
for (const auto& leftPathMap : leftPathMaps) {
auto findCommonVid = rightPathMaps.find(leftPathMap.first);
if (findCommonVid == rightPathMaps.end()) {
continue;
}
auto findCommonVertex = verticesMap.find(findCommonVid->first);
if (findCommonVertex == verticesMap.end()) {
continue;
}
auto& rightPaths = findCommonVid->second;
for (const auto& srcPaths : leftPathMap.second) {
auto range = terminationMap.equal_range(srcPaths.first);
if (range.first == range.second) {
continue;
return future.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, oddStep](auto&& vertices) {
if (vertices.empty()) {
return false;
}
robin_hood::unordered_flat_map<Value, Value, std::hash<Value>> verticesMap;
for (auto& vertex : vertices) {
verticesMap[vertex.getVertex().vid] = std::move(vertex);
}
for (const auto& dstPaths : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == dstPaths.first) {
if (singleShortest_ && !found->second.second) {
break;
auto& terminationMap = terminationMaps_[rowNum];
auto& leftPathMaps = currentLeftPathMaps_[rowNum];
auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum];
for (const auto& leftPathMap : leftPathMaps) {
auto findCommonVid = rightPathMaps.find(leftPathMap.first);
if (findCommonVid == rightPathMaps.end()) {
continue;
}
auto findCommonVertex = verticesMap.find(findCommonVid->first);
if (findCommonVertex == verticesMap.end()) {
continue;
}
auto& rightPaths = findCommonVid->second;
for (const auto& srcPaths : leftPathMap.second) {
auto range = terminationMap.equal_range(srcPaths.first);
if (range.first == range.second) {
continue;
}
for (const auto& dstPaths : rightPaths) {
for (auto found = range.first; found != range.second; ++found) {
if (found->second.first == dstPaths.first) {
if (singleShortest_ && !found->second.second) {
break;
}
doConjunctPath(
srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum);
found->second.second = false;
}
}
doConjunctPath(srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum);
found->second.second = false;
}
}
}
}
}
// update terminationMap
for (auto iter = terminationMap.begin(); iter != terminationMap.end();) {
if (!iter->second.second) {
iter = terminationMap.erase(iter);
} else {
++iter;
}
}
if (terminationMap.empty()) {
return true;
}
return false;
});
// update terminationMap
for (auto iter = terminationMap.begin(); iter != terminationMap.end();) {
if (!iter->second.second) {
iter = terminationMap.erase(iter);
} else {
++iter;
}
}
if (terminationMap.empty()) {
return true;
}
return false;
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<bool>(std::runtime_error(e.what()));
});
}

void BatchShortestPath::doConjunctPath(const std::vector<CustomPath>& leftPaths,
Expand Down
46 changes: 29 additions & 17 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
ds.colNames = pathNode_->colNames();
ds.rows.swap(currentDs_.rows);
return finish(ResultBuilder().value(Value(std::move(ds))).build());
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -290,24 +297,29 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
futures.emplace_back(std::move(future));
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}

for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
if (!iter->second.second) {
iter = terminationMap_.erase(iter);
} else {
++iter;
}
}
if (terminationMap_.empty()) {
ectx_->setValue(terminationVar_, true);
return true;
}
return false;
});
for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) {
if (!iter->second.second) {
iter = terminationMap_.erase(iter);
} else {
++iter;
}
}
if (terminationMap_.empty()) {
ectx_->setValue(terminationVar_, true);
return true;
}
return false;
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<bool>(std::runtime_error(e.what()));
});
}

void MultiShortestPathExecutor::setNextStepVid(const Interims& paths, const string& var) {
Expand Down
35 changes: 25 additions & 10 deletions src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ folly::Future<Status> ProduceAllPathsExecutor::execute() {
ds.colNames = pathNode_->colNames();
ds.rows.swap(currentDs_.rows);
return finish(ResultBuilder().value(Value(std::move(ds))).build());
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<Status>(Executor::memoryExceededStatus());
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

Expand Down Expand Up @@ -166,16 +173,24 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {
}
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
preLeftPaths_.swap(leftPaths_);
preRightPaths_.swap(rightPaths_);
leftPaths_.clear();
rightPaths_.clear();
return Status::OK();
});
return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
preLeftPaths_.swap(leftPaths_);
preRightPaths_.swap(rightPaths_);
leftPaths_.clear();
rightPaths_.clear();
return Status::OK();
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) { return folly::makeFuture<Status>(memoryExceededStatus()); })
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
return folly::makeFuture<Status>(std::runtime_error(e.what()));
});
}

void ProduceAllPathsExecutor::setNextStepVid(Interims& paths, const string& var) {
Expand Down
Loading

0 comments on commit 6c4995a

Please sign in to comment.