From cd46df5829c2a89541e2ca7836c328ed5e75e9f7 Mon Sep 17 00:00:00 2001 From: jimingquan Date: Thu, 27 Apr 2023 13:49:34 +0800 Subject: [PATCH] use localThreadPtr to manage memory --- src/graph/executor/algo/AllPathsExecutor.cpp | 109 ++++++++----------- src/graph/executor/algo/AllPathsExecutor.h | 22 ++-- tests/tck/features/path/AllPath.feature | 1 - 3 files changed, 57 insertions(+), 75 deletions(-) diff --git a/src/graph/executor/algo/AllPathsExecutor.cpp b/src/graph/executor/algo/AllPathsExecutor.cpp index 4930b77caa0..51d794dd5e5 100644 --- a/src/graph/executor/algo/AllPathsExecutor.cpp +++ b/src/graph/executor/algo/AllPathsExecutor.cpp @@ -12,7 +12,7 @@ DEFINE_uint32( 100, "the number of vids to expand, when this threshold is exceeded, use heuristic expansion"); DEFINE_uint32(path_threshold_ratio, 2, "threshold for heuristics expansion"); -DEFINE_uint32(path_batch_size, 5000, "number of paths constructed by each thread"); +DEFINE_uint32(path_batch_size, 50000, "number of paths constructed by each thread"); namespace nebula { namespace graph { @@ -156,6 +156,8 @@ folly::Future AllPathsExecutor::getNeighbors(bool reverse) { } auto listVal = std::make_shared(std::move(list)); auto iter = std::make_unique(listVal); + time::Duration buildAdjTime; + auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step); if (reverse) { rightNextStepVids_.clear(); expandFromRight(iter.get()); @@ -163,6 +165,7 @@ folly::Future AllPathsExecutor::getNeighbors(bool reverse) { leftNextStepVids_.clear(); expandFromLeft(iter.get()); } + otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec())); return Status::OK(); }); } @@ -261,6 +264,7 @@ folly::Future AllPathsExecutor::buildResult() { // if key exists, discard the right adjacency's key & values // because the right adjacency list may have fewer edges // a->c->o, a->b, c->f, f->o + time::Duration mergeAdjTime; for (auto& rAdj : rightAdjList_) { auto& src = rAdj.first; auto iter = leftAdjList_.find(src); @@ -292,6 +296,7 @@ folly::Future AllPathsExecutor::buildResult() { } } } + otherStats_.emplace("merge_adj_time", folly::sformat("{}(us)", mergeAdjTime.elapsedInUSec())); time::Duration buildPathTime; auto future = buildPathMultiJobs(); return future.via(runner()) @@ -312,39 +317,43 @@ folly::Future AllPathsExecutor::buildResult() { folly::Future AllPathsExecutor::buildPathMultiJobs() { auto pathsPtr = std::make_shared>(); if (threadLocalPtr_.get() == nullptr) { - threadLocalPtr_.reset(new std::vector()); - threadLocalPtr_->reserve(64); + threadLocalPtr_.reset(new std::deque()); } for (auto& vid : leftInitVids_) { auto vidIter = leftAdjList_.find(vid); if (vidIter == leftAdjList_.end()) { continue; } - auto src = vidIter->first; + auto& src = vidIter->first; auto& adjEdges = vidIter->second; if (adjEdges.empty()) { continue; } pathsPtr->reserve(adjEdges.size() + pathsPtr->size()); for (auto& edge : adjEdges) { - NPath* newPath = new NPath(src, edge); - threadLocalPtr_->push_back(newPath); - pathsPtr->emplace_back(newPath); + threadLocalPtr_->emplace_back(NPath(src, edge)); + pathsPtr->emplace_back(&threadLocalPtr_->back()); } } size_t step = 2; auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr); - return future.via(runner()).thenValue([this](std::vector&& paths) { + return future.via(runner()).thenValue([this](std::vector>&& paths) { memory::MemoryCheckGuard guard; + if (!paths.empty()) { - result_.rows.swap(paths); + time::Duration convertPathTime; + for (auto& path : paths) { + result_.rows.emplace_back(convertNPath2Row(path.first, path.second)); + } + otherStats_.emplace("convert_path_time", + folly::sformat("{}(us)", convertPathTime.elapsedInUSec())); } return Status::OK(); }); } -// construct ROW[src1, [e1, v2, e2]] -Row AllPathsExecutor::convertNPath2Row(NPath* path) { +// construct ROW[src1, [e1, v2, e2], v3] +Row AllPathsExecutor::convertNPath2Row(NPath* path, Value dst) { std::vector list; NPath* head = path; while (head != nullptr) { @@ -359,26 +368,26 @@ Row AllPathsExecutor::convertNPath2Row(NPath* path) { std::reverse(list.begin(), list.end()); List edgeList(std::move(list)); row.values.emplace_back(std::move(edgeList)); + row.values.emplace_back(std::move(dst)); return row; } -folly::Future> AllPathsExecutor::doBuildPath( - size_t step, - size_t start, - size_t end, - std::shared_ptr> pathsPtr) { +folly::Future>> +AllPathsExecutor::doBuildPath(size_t step, + size_t start, + size_t end, + std::shared_ptr> pathsPtr) { if (cnt_.load(std::memory_order_relaxed) >= limit_) { - return folly::makeFuture>(std::vector()); + return folly::makeFuture>>( + std::vector>()); } if (threadLocalPtr_.get() == nullptr) { - threadLocalPtr_.reset(new std::vector()); - threadLocalPtr_->reserve(64); + threadLocalPtr_.reset(new std::deque()); } auto& adjList = leftAdjList_; - auto currentPathPtr = std::make_unique>(); + auto currentStepResult = std::make_unique>>(); auto newPathsPtr = std::make_shared>(); - for (auto i = start; i < end; ++i) { auto path = (*pathsPtr)[i]; auto& edgeValue = path->edge; @@ -386,10 +395,7 @@ folly::Future> AllPathsExecutor::doBuildPath( auto& dst = edgeValue.getEdge().dst; auto dstIter = rightInitVids_.find(dst); if (dstIter != rightInitVids_.end()) { - auto row = convertNPath2Row(path); - // add dst - row.values.emplace_back(*dstIter); - currentPathPtr->emplace_back(std::move(row)); + currentStepResult->emplace_back(std::make_pair(path, *dstIter)); ++cnt_; if (cnt_.load(std::memory_order_relaxed) >= limit_) { break; @@ -404,26 +410,25 @@ folly::Future> AllPathsExecutor::doBuildPath( auto& adjedges = adjIter->second; for (auto& edge : adjedges) { if (noLoop_) { - if (hasSameV(path, edge.getEdge())) { + if (hasSameVertices(path, edge.getEdge())) { continue; } } else { - if (hasSameE(path, edge.getEdge())) { + if (hasSameEdge(path, edge.getEdge())) { continue; } } - NPath* newPath = new NPath(path, adjIter->first, edge); - threadLocalPtr_->push_back(newPath); - newPathsPtr->emplace_back(newPath); + threadLocalPtr_->emplace_back(NPath(path, adjIter->first, edge)); + newPathsPtr->emplace_back(&threadLocalPtr_->back()); } } } auto newPathsSize = newPathsPtr->size(); if (step > maxStep_ || newPathsSize == 0) { - return folly::makeFuture>(std::move(*currentPathPtr)); + return folly::makeFuture>>(std::move(*currentStepResult)); } - std::vector>> futures; + std::vector>>> futures; if (newPathsSize < FLAGS_path_batch_size) { futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() { return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr); @@ -438,9 +443,10 @@ folly::Future> AllPathsExecutor::doBuildPath( } } return folly::collect(futures).via(runner()).thenValue( - [pathPtr = std::move(currentPathPtr)](std::vector>&& paths) { + [pathPtr = std::move(currentStepResult)]( + std::vector>>&& paths) { memory::MemoryCheckGuard guard; - std::vector result = std::move(*pathPtr); + std::vector> result = std::move(*pathPtr); for (auto& path : paths) { if (path.empty()) { continue; @@ -472,7 +478,7 @@ folly::Future AllPathsExecutor::getPathProps() { }); } -bool AllPathsExecutor::hasSameE(NPath* path, const Edge& edge) { +bool AllPathsExecutor::hasSameEdge(NPath* path, const Edge& edge) { NPath* head = path; while (head != nullptr) { if (edge == head->edge) { @@ -483,40 +489,21 @@ bool AllPathsExecutor::hasSameE(NPath* path, const Edge& edge) { return false; } -bool AllPathsExecutor::hasSameV(NPath* path, const Edge& edge) { - if (edge.src == edge.dst) { - return true; - } - UNUSED(path); - return false; -} - -bool AllPathsExecutor::hasSameVertices(const std::vector& edgeList, const Edge& edge) { +bool AllPathsExecutor::hasSameVertices(NPath* path, const Edge& edge) { if (edge.src == edge.dst) { return true; } auto& vid = edge.dst; - auto iter = edgeList.begin() + 1; - for (; iter != edgeList.end(); iter++) { - if (iter->isEdge()) { - auto& edgeVal = iter->getEdge(); - if (edgeVal.src == vid) { - return true; - } + NPath* head = path; + while (head != nullptr) { + auto& vertex = head->vertex; + if (vertex.getVertex().vid == vid) { + return true; } + head = head->p; } return false; } -Status AllPathsExecutor::close() { - auto accessor = threadLocalPtr_.accessAllThreads(); - for (auto iter = accessor.begin(); iter != accessor.end(); ++iter) { - for (auto& ptr : *iter) { - delete ptr; - } - } - return Executor::close(); -} - } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/AllPathsExecutor.h b/src/graph/executor/algo/AllPathsExecutor.h index 25038c18e44..9282d77408e 100644 --- a/src/graph/executor/algo/AllPathsExecutor.h +++ b/src/graph/executor/algo/AllPathsExecutor.h @@ -27,10 +27,10 @@ // when expanding, if the vid has already been visited, do not visit again // leftAdjList_ save result of forward expansion // rightAdjList_ save result of backward expansion - namespace nebula { namespace graph { class AllPaths; +struct NPath; class AllPathsExecutor final : public StorageAccessExecutor { public: AllPathsExecutor(const PlanNode* node, QueryContext* qctx) @@ -49,8 +49,8 @@ class AllPathsExecutor final : public StorageAccessExecutor { struct NPath { NPath* p{nullptr}; - Value vertex; - Value edge; + const Value& vertex; + const Value& edge; NPath(const Value& v, const Value& e) : vertex(v), edge(e) {} NPath(NPath* path, const Value& v, const Value& e) : p(path), vertex(v), edge(e) {} NPath(NPath&& v) noexcept : p(v.p), vertex(std::move(v.vertex)), edge(std::move(v.edge)) {} @@ -71,12 +71,10 @@ class AllPathsExecutor final : public StorageAccessExecutor { void expandFromRight(GetNeighborsIter* iter); - Row convertNPath2Row(NPath* path); + Row convertNPath2Row(NPath* path, Value dst); - folly::Future> doBuildPath(size_t step, - size_t start, - size_t end, - std::shared_ptr> paths); + folly::Future>> doBuildPath( + size_t step, size_t start, size_t end, std::shared_ptr> paths); folly::Future getPathProps(); @@ -84,11 +82,9 @@ class AllPathsExecutor final : public StorageAccessExecutor { folly::Future buildResult(); - bool hasSameVertices(const std::vector& edgeList, const Edge& edge); - bool hasSameV(NPath* path, const Edge& edge); - bool hasSameE(NPath* Path, const Edge& edge); + bool hasSameEdge(NPath* path, const Edge& edge); - Status close() override; + bool hasSameVertices(NPath* path, const Edge& edge); private: const AllPaths* pathNode_{nullptr}; @@ -112,7 +108,7 @@ class AllPathsExecutor final : public StorageAccessExecutor { DataSet result_; std::vector emptyPropVids_; class NewTag {}; - folly::ThreadLocalPtr, NewTag> threadLocalPtr_; + folly::ThreadLocalPtr, NewTag> threadLocalPtr_; }; } // namespace graph } // namespace nebula diff --git a/tests/tck/features/path/AllPath.feature b/tests/tck/features/path/AllPath.feature index 55c296b3575..91afd8c9342 100644 --- a/tests/tck/features/path/AllPath.feature +++ b/tests/tck/features/path/AllPath.feature @@ -1,7 +1,6 @@ # Copyright (c) 2020 vesoft inc. All rights reserved. # # This source code is licensed under Apache 2.0 License. -@jmq Feature: All Path Background: