Skip to content

Commit

Permalink
use localThreadPtr to manage memory
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Apr 27, 2023
1 parent ea1996b commit 5d77e30
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 75 deletions.
109 changes: 48 additions & 61 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ DEFINE_uint32(
100,
"the number of vids to expand, when this threshold is exceeded, use heuristic expansion");
DEFINE_uint32(path_threshold_ratio, 2, "threshold for heuristics expansion");
DEFINE_uint32(path_batch_size, 5000, "number of paths constructed by each thread");
DEFINE_uint32(path_batch_size, 50000, "number of paths constructed by each thread");

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -156,13 +156,16 @@ folly::Future<Status> AllPathsExecutor::getNeighbors(bool reverse) {
}
auto listVal = std::make_shared<Value>(std::move(list));
auto iter = std::make_unique<GetNeighborsIter>(listVal);
time::Duration buildAdjTime;
auto key = folly::sformat("buildAdjTime {}step[{}]", reverse ? "reverse " : "", step);
if (reverse) {
rightNextStepVids_.clear();
expandFromRight(iter.get());
} else {
leftNextStepVids_.clear();
expandFromLeft(iter.get());
}
otherStats_.emplace(key, folly::sformat("{}(us)", buildAdjTime.elapsedInUSec()));
return Status::OK();
});
}
Expand Down Expand Up @@ -261,6 +264,7 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
// if key exists, discard the right adjacency's key & values
// because the right adjacency list may have fewer edges
// a->c->o, a->b, c->f, f->o
time::Duration mergeAdjTime;
for (auto& rAdj : rightAdjList_) {
auto& src = rAdj.first;
auto iter = leftAdjList_.find(src);
Expand Down Expand Up @@ -292,6 +296,7 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
}
}
}
otherStats_.emplace("merge_adj_time", folly::sformat("{}(us)", mergeAdjTime.elapsedInUSec()));
time::Duration buildPathTime;
auto future = buildPathMultiJobs();
return future.via(runner())
Expand All @@ -312,39 +317,43 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
auto pathsPtr = std::make_shared<std::vector<NPath*>>();
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::vector<NPath*>());
threadLocalPtr_->reserve(64);
threadLocalPtr_.reset(new std::deque<NPath>());
}
for (auto& vid : leftInitVids_) {
auto vidIter = leftAdjList_.find(vid);
if (vidIter == leftAdjList_.end()) {
continue;
}
auto src = vidIter->first;
auto& src = vidIter->first;
auto& adjEdges = vidIter->second;
if (adjEdges.empty()) {
continue;
}
pathsPtr->reserve(adjEdges.size() + pathsPtr->size());
for (auto& edge : adjEdges) {
NPath* newPath = new NPath(src, edge);
threadLocalPtr_->push_back(newPath);
pathsPtr->emplace_back(newPath);
threadLocalPtr_->emplace_back(NPath(src, edge));
pathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
size_t step = 2;
auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr);
return future.via(runner()).thenValue([this](std::vector<Row>&& paths) {
return future.via(runner()).thenValue([this](std::vector<std::pair<NPath*, Value>>&& paths) {
memory::MemoryCheckGuard guard;

if (!paths.empty()) {
result_.rows.swap(paths);
time::Duration convertPathTime;
for (auto& path : paths) {
result_.rows.emplace_back(convertNPath2Row(path.first, path.second));
}
otherStats_.emplace("convert_path_time",
folly::sformat("{}(us)", convertPathTime.elapsedInUSec()));
}
return Status::OK();
});
}

// construct ROW[src1, [e1, v2, e2]]
Row AllPathsExecutor::convertNPath2Row(NPath* path) {
// construct ROW[src1, [e1, v2, e2], v3]
Row AllPathsExecutor::convertNPath2Row(NPath* path, Value dst) {
std::vector<Value> list;
NPath* head = path;
while (head != nullptr) {
Expand All @@ -359,37 +368,34 @@ Row AllPathsExecutor::convertNPath2Row(NPath* path) {
std::reverse(list.begin(), list.end());
List edgeList(std::move(list));
row.values.emplace_back(std::move(edgeList));
row.values.emplace_back(std::move(dst));
return row;
}

folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr) {
folly::Future<std::vector<std::pair<AllPathsExecutor::NPath*, Value>>>
AllPathsExecutor::doBuildPath(size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<Row>>(std::vector<Row>());
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(
std::vector<std::pair<NPath*, Value>>());
}
if (threadLocalPtr_.get() == nullptr) {
threadLocalPtr_.reset(new std::vector<NPath*>());
threadLocalPtr_->reserve(64);
threadLocalPtr_.reset(new std::deque<NPath>());
}
auto& adjList = leftAdjList_;
auto currentPathPtr = std::make_unique<std::vector<Row>>();
auto currentStepResult = std::make_unique<std::vector<std::pair<NPath*, Value>>>();
auto newPathsPtr = std::make_shared<std::vector<NPath*>>();


for (auto i = start; i < end; ++i) {
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
auto& dst = edgeValue.getEdge().dst;
auto dstIter = rightInitVids_.find(dst);
if (dstIter != rightInitVids_.end()) {
auto row = convertNPath2Row(path);
// add dst
row.values.emplace_back(*dstIter);
currentPathPtr->emplace_back(std::move(row));
currentStepResult->emplace_back(std::make_pair(path, *dstIter));
++cnt_;
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
break;
Expand All @@ -404,26 +410,25 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
auto& adjedges = adjIter->second;
for (auto& edge : adjedges) {
if (noLoop_) {
if (hasSameV(path, edge.getEdge())) {
if (hasSameVertices(path, edge.getEdge())) {
continue;
}
} else {
if (hasSameE(path, edge.getEdge())) {
if (hasSameEdge(path, edge.getEdge())) {
continue;
}
}
NPath* newPath = new NPath(path, adjIter->first, edge);
threadLocalPtr_->push_back(newPath);
newPathsPtr->emplace_back(newPath);
threadLocalPtr_->emplace_back(NPath(path, adjIter->first, edge));
newPathsPtr->emplace_back(&threadLocalPtr_->back());
}
}
}

auto newPathsSize = newPathsPtr->size();
if (step > maxStep_ || newPathsSize == 0) {
return folly::makeFuture<std::vector<Row>>(std::move(*currentPathPtr));
return folly::makeFuture<std::vector<std::pair<NPath*, Value>>>(std::move(*currentStepResult));
}
std::vector<folly::Future<std::vector<Row>>> futures;
std::vector<folly::Future<std::vector<std::pair<NPath*, Value>>>> futures;
if (newPathsSize < FLAGS_path_batch_size) {
futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() {
return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr);
Expand All @@ -438,9 +443,10 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
}
}
return folly::collect(futures).via(runner()).thenValue(
[pathPtr = std::move(currentPathPtr)](std::vector<std::vector<Row>>&& paths) {
[pathPtr = std::move(currentStepResult)](
std::vector<std::vector<std::pair<NPath*, Value>>>&& paths) {
memory::MemoryCheckGuard guard;
std::vector<Row> result = std::move(*pathPtr);
std::vector<std::pair<NPath*, Value>> result = std::move(*pathPtr);
for (auto& path : paths) {
if (path.empty()) {
continue;
Expand Down Expand Up @@ -472,7 +478,7 @@ folly::Future<Status> AllPathsExecutor::getPathProps() {
});
}

bool AllPathsExecutor::hasSameE(NPath* path, const Edge& edge) {
bool AllPathsExecutor::hasSameEdge(NPath* path, const Edge& edge) {
NPath* head = path;
while (head != nullptr) {
if (edge == head->edge) {
Expand All @@ -483,40 +489,21 @@ bool AllPathsExecutor::hasSameE(NPath* path, const Edge& edge) {
return false;
}

bool AllPathsExecutor::hasSameV(NPath* path, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
}
UNUSED(path);
return false;
}

bool AllPathsExecutor::hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge) {
bool AllPathsExecutor::hasSameVertices(NPath* path, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
}
auto& vid = edge.dst;
auto iter = edgeList.begin() + 1;
for (; iter != edgeList.end(); iter++) {
if (iter->isEdge()) {
auto& edgeVal = iter->getEdge();
if (edgeVal.src == vid) {
return true;
}
NPath* head = path;
while (head != nullptr) {
auto& vertex = head->vertex;
if (vertex.getVertex().vid == vid) {
return true;
}
head = head->p;
}
return false;
}

Status AllPathsExecutor::close() {
auto accessor = threadLocalPtr_.accessAllThreads();
for (auto iter = accessor.begin(); iter != accessor.end(); ++iter) {
for (auto& ptr : *iter) {
delete ptr;
}
}
return Executor::close();
}

} // namespace graph
} // namespace nebula
22 changes: 9 additions & 13 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
// when expanding, if the vid has already been visited, do not visit again
// leftAdjList_ save result of forward expansion
// rightAdjList_ save result of backward expansion

namespace nebula {
namespace graph {
class AllPaths;
struct NPath;
class AllPathsExecutor final : public StorageAccessExecutor {
public:
AllPathsExecutor(const PlanNode* node, QueryContext* qctx)
Expand All @@ -49,8 +49,8 @@ class AllPathsExecutor final : public StorageAccessExecutor {

struct NPath {
NPath* p{nullptr};
Value vertex;
Value edge;
const Value& vertex;
const Value& edge;
NPath(const Value& v, const Value& e) : vertex(v), edge(e) {}
NPath(NPath* path, const Value& v, const Value& e) : p(path), vertex(v), edge(e) {}
NPath(NPath&& v) noexcept : p(v.p), vertex(std::move(v.vertex)), edge(std::move(v.edge)) {}
Expand All @@ -71,24 +71,20 @@ class AllPathsExecutor final : public StorageAccessExecutor {

void expandFromRight(GetNeighborsIter* iter);

Row convertNPath2Row(NPath* path);
Row convertNPath2Row(NPath* path, Value dst);

folly::Future<std::vector<Row>> doBuildPath(size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> paths);
folly::Future<std::vector<std::pair<NPath*, Value>>> doBuildPath(
size_t step, size_t start, size_t end, std::shared_ptr<std::vector<NPath*>> paths);

folly::Future<Status> getPathProps();

folly::Future<Status> buildPathMultiJobs();

folly::Future<Status> buildResult();

bool hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge);
bool hasSameV(NPath* path, const Edge& edge);
bool hasSameE(NPath* Path, const Edge& edge);
bool hasSameEdge(NPath* path, const Edge& edge);

Status close() override;
bool hasSameVertices(NPath* path, const Edge& edge);

private:
const AllPaths* pathNode_{nullptr};
Expand All @@ -112,7 +108,7 @@ class AllPathsExecutor final : public StorageAccessExecutor {
DataSet result_;
std::vector<Value> emptyPropVids_;
class NewTag {};
folly::ThreadLocalPtr<std::vector<NPath*>, NewTag> threadLocalPtr_;
folly::ThreadLocalPtr<std::deque<NPath>, NewTag> threadLocalPtr_;
};
} // namespace graph
} // namespace nebula
Expand Down
1 change: 0 additions & 1 deletion tests/tck/features/path/AllPath.feature
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: All Path

Background:
Expand Down

0 comments on commit 5d77e30

Please sign in to comment.