Skip to content

Commit

Permalink
use NPath
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Apr 26, 2023
1 parent a76eab7 commit ec7b687
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
103 changes: 64 additions & 39 deletions src/graph/executor/algo/AllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ folly::Future<Status> AllPathsExecutor::buildResult() {
}

folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
auto pathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto pathsPtr = std::make_shared<std::vector<NPath*>>();
threadLocalPtr_.reset(new std::vector<NPath*>());
threadLocalPtr_->reserve(64);
for (auto& vid : leftInitVids_) {
auto vidIter = leftAdjList_.find(vid);
if (vidIter == leftAdjList_.end()) {
Expand All @@ -323,7 +325,9 @@ folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
}
pathsPtr->reserve(adjEdges.size() + pathsPtr->size());
for (auto& edge : adjEdges) {
pathsPtr->emplace_back(std::vector<Value>({src, edge}));
NPath* newPath = new NPath(src, edge);
threadLocalPtr_->push_back(newPath);
pathsPtr->emplace_back(newPath);
}
}
size_t step = 2;
Expand All @@ -337,55 +341,49 @@ folly::Future<Status> AllPathsExecutor::buildPathMultiJobs() {
});
}

size_t AllPathsExecutor::getPathsSize(size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> pathsPtr,
const VertexMap<Value>& adjList) {
size_t size = 0;
for (auto i = start; i < end; ++i) {
auto& path = (*pathsPtr)[i];
auto& edgeValue = path.back();
DCHECK(edgeValue.isEdge());
auto& dst = edgeValue.getEdge().dst;

auto adjIter = adjList.find(dst);
if (adjIter == adjList.end()) {
continue;
}
auto& adjEdges = adjIter->second;
size += adjEdges.size();
// construct ROW[src1, [e1, v2, e2]]
Row AllPathsExecutor::convertNPath2Row(NPath* path) {
std::vector<Value> list;
NPath* head = path;
while (head != nullptr) {
list.emplace_back(head->edge);
list.emplace_back(head->vertex);
head = head->p;
}
return size;
Row row;
// add src;
row.values.emplace_back(list.back());
list.pop_back();
std::reverse(list.begin(), list.end());
List edgeList(std::move(list));
row.values.emplace_back(std::move(edgeList));
return row;
}

folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> pathsPtr) {
std::shared_ptr<std::vector<NPath*>> pathsPtr) {
if (cnt_.load(std::memory_order_relaxed) >= limit_) {
return folly::makeFuture<std::vector<Row>>(std::vector<Row>());
}

threadLocalPtr_.reset(new std::vector<NPath*>());
threadLocalPtr_->reserve(64);
auto& adjList = leftAdjList_;
auto currentPathPtr = std::make_unique<std::vector<Row>>();
auto newPathsPtr = std::make_shared<std::vector<std::vector<Value>>>();
auto newPathsPtr = std::make_shared<std::vector<NPath*>>();

if (step <= maxStep_) {
newPathsPtr->reserve(getPathsSize(start, end, pathsPtr, adjList));
}

for (auto i = start; i < end; ++i) {
auto& path = (*pathsPtr)[i];
auto& edgeValue = path.back();
auto path = (*pathsPtr)[i];
auto& edgeValue = path->edge;
DCHECK(edgeValue.isEdge());
auto& dst = edgeValue.getEdge().dst;
auto dstIter = rightInitVids_.find(dst);
if (dstIter != rightInitVids_.end()) {
Row row;
row.values.emplace_back(path.front());
List edgeList(std::vector<Value>(path.begin() + 1, path.end()));
row.values.emplace_back(std::move(edgeList));
auto row = convertNPath2Row(path);
// add dst
row.values.emplace_back(*dstIter);
currentPathPtr->emplace_back(std::move(row));
++cnt_;
Expand All @@ -402,19 +400,17 @@ folly::Future<std::vector<Row>> AllPathsExecutor::doBuildPath(
auto& adjedges = adjIter->second;
for (auto& edge : adjedges) {
if (noLoop_) {
if (hasSameVertices(path, edge.getEdge())) {
if (hasSameV(path, edge.getEdge())) {
continue;
}
} else {
if (hasSameEdge(path, edge.getEdge())) {
if (hasSameE(path, edge.getEdge())) {
continue;
}
}
// copy
auto newPath = path;
newPath.emplace_back(adjIter->first);
newPath.emplace_back(edge);
newPathsPtr->emplace_back(std::move(newPath));
NPath* newPath = new NPath(path, adjIter->first, edge);
threadLocalPtr_->push_back(newPath);
newPathsPtr->emplace_back(newPath);
}
}
}
Expand Down Expand Up @@ -472,6 +468,25 @@ folly::Future<Status> AllPathsExecutor::getPathProps() {
});
}

bool AllPathsExecutor::hasSameE(NPath* path, const Edge& edge) {
NPath* head = path;
while (head != nullptr) {
if (edge == head->edge) {
return true;
}
head = head->p;
}
return false;
}

bool AllPathsExecutor::hasSameV(NPath* path, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
}
UNUSED(path);
return false;
}

bool AllPathsExecutor::hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge) {
if (edge.src == edge.dst) {
return true;
Expand All @@ -489,5 +504,15 @@ bool AllPathsExecutor::hasSameVertices(const std::vector<Value>& edgeList, const
return false;
}

Status AllPathsExecutor::close() {
auto accessor = threadLocalPtr_.accessAllThreads();
for (auto iter = accessor.begin(); iter != accessor.end(); ++iter) {
for (auto& ptr : *iter) {
ptr->~NPath();
}
}
return Executor::close();
}

} // namespace graph
} // namespace nebula
41 changes: 30 additions & 11 deletions src/graph/executor/algo/AllPathsExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_

#include "graph/executor/StorageAccessExecutor.h"
#include "folly/ThreadLocal.h"

// Using the two-way BFS algorithm, a heuristic algorithm is used in the expansion process
// when the number of vid to be expanded on the left and right
Expand Down Expand Up @@ -48,8 +49,18 @@ class AllPathsExecutor final : public StorageAccessExecutor {

struct NPath {
NPath* p{nullptr};
Value v;
Value e;
Value vertex;
Value edge;
NPath(const Value& v, const Value& e) : vertex(v), edge(e) {}
NPath(NPath* path, const Value& v, const Value& e) : p(path), vertex(v), edge(e) {}
NPath(NPath&& v) noexcept : p(v.p), vertex(std::move(v.vertex)), edge(std::move(v.edge)) {}
NPath(const NPath& v) : p(v.p), vertex(v.vertex), edge(v.edge) {}
~NPath() {
if (p != nullptr) {
delete p;
p = nullptr;
}
}
};

private:
Expand All @@ -65,11 +76,18 @@ class AllPathsExecutor final : public StorageAccessExecutor {

void expandFromRight(GetNeighborsIter* iter);

folly::Future<std::vector<Row>> doBuildPath(
size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> edgeLists);
Row convertNPath2Row(NPath* path);

// folly::Future<std::vector<Row>> doBuildPath(
// size_t step,
// size_t start,
// size_t end,
// std::shared_ptr<std::vector<std::vector<Value>>> edgeLists);

folly::Future<std::vector<Row>> doBuildPath(size_t step,
size_t start,
size_t end,
std::shared_ptr<std::vector<NPath*>> paths);

folly::Future<Status> getPathProps();

Expand All @@ -78,11 +96,10 @@ class AllPathsExecutor final : public StorageAccessExecutor {
folly::Future<Status> buildResult();

bool hasSameVertices(const std::vector<Value>& edgeList, const Edge& edge);
bool hasSameV(NPath* path, const Edge& edge);
bool hasSameE(NPath* Path, const Edge& edge);

size_t getPathsSize(size_t start,
size_t end,
std::shared_ptr<std::vector<std::vector<Value>>> pathsPtr,
const VertexMap<Value>& adjList);
Status close() override;

private:
const AllPaths* pathNode_{nullptr};
Expand All @@ -105,6 +122,8 @@ class AllPathsExecutor final : public StorageAccessExecutor {

DataSet result_;
std::vector<Value> emptyPropVids_;
class NewTag {};
folly::ThreadLocalPtr<std::vector<NPath*>, NewTag> threadLocalPtr_;
};
} // namespace graph
} // namespace nebula
Expand Down
1 change: 1 addition & 0 deletions tests/tck/features/path/AllPath.feature
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2020 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
@jmq
Feature: All Path

Background:
Expand Down

0 comments on commit ec7b687

Please sign in to comment.