diff --git a/src/common/memory/MemoryTracker.cpp b/src/common/memory/MemoryTracker.cpp index e7c18f546e4..d7bc96432a4 100644 --- a/src/common/memory/MemoryTracker.cpp +++ b/src/common/memory/MemoryTracker.cpp @@ -15,6 +15,7 @@ ThreadMemoryStats::~ThreadMemoryStats() { // Return to global any reserved bytes on destruction if (reserved != 0) { MemoryStats::instance().freeGlobal(reserved); + DLOG(INFO) << std::this_thread::get_id() << " return reserved " << reserved; } } diff --git a/src/common/memory/MemoryTracker.h b/src/common/memory/MemoryTracker.h index 4e44e8eef56..0d54205c4d0 100644 --- a/src/common/memory/MemoryTracker.h +++ b/src/common/memory/MemoryTracker.h @@ -11,8 +11,6 @@ namespace nebula { namespace memory { -class MemoryStats; - // Memory stats for each thread. struct ThreadMemoryStats { ThreadMemoryStats(); diff --git a/src/common/memory/MemoryUtils.cpp b/src/common/memory/MemoryUtils.cpp index 92260117944..9bb3fb9a31f 100644 --- a/src/common/memory/MemoryUtils.cpp +++ b/src/common/memory/MemoryUtils.cpp @@ -42,6 +42,7 @@ DEFINE_string(cgroup_v2_memory_current_path, DEFINE_bool(memory_purge_enabled, true, "memory purge enabled, default true"); DEFINE_int32(memory_purge_interval_seconds, 10, "memory purge interval in seconds, default 10"); +DEFINE_bool(memory_stats_detail_log, false, "print memory stats detail log"); using nebula::fs::FileUtils; @@ -120,11 +121,23 @@ StatusOr MemoryUtils::hitsHighWatermark() { } // print system & application level memory stats - DLOG(INFO) << " sys_used: " << static_cast(total - available) << " sys_total: " << total - << " sys_ratio:" << (1 - available / total) - << " usr_used:" << memory::MemoryStats::instance().used() - << " usr_total:" << memory::MemoryStats::instance().getLimit() - << " usr_ratio:" << memory::MemoryStats::instance().usedRatio(); + // sys: read from system environment, varies depends on environment: + // container: controlled by cgroup, + // used: read from memory.current in cgroup path + // total: read from memory.max in cgroup path + // physical machine: judge by system level memory consumption + // used: current used memory of the system + // total: all physical memory installed + // usr: record by current process's MemoryStats + // used: bytes allocated by new operator + // total: sys_total * FLAGS_system_memory_high_watermark_ratio + if (FLAGS_memory_stats_detail_log) { + LOG(INFO) << " sys_used: " << static_cast(total - available) << " sys_total: " << total + << " sys_ratio:" << (1 - available / total) + << " usr_used:" << memory::MemoryStats::instance().used() + << " usr_total:" << memory::MemoryStats::instance().getLimit() + << " usr_ratio:" << memory::MemoryStats::instance().usedRatio(); + } auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio; LOG_IF_EVERY_N(WARNING, hits, 100) diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 31f70d39c83..4736347b7ba 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -76,6 +76,11 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { // Throw runtime error to stop whole execution early folly::Future error(Status status) const; + static Status memoryExceededStatus() { + return Status::Error("Graph Error: GRAPH_MEMORY_EXCEEDED(%d)", + static_cast(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED)); + } + protected: static Executor *makeExecutor(const PlanNode *node, QueryContext *qctx, diff --git a/src/graph/executor/StorageAccessExecutor.h b/src/graph/executor/StorageAccessExecutor.h index fbd69b14b37..18ddbe83e40 100644 --- a/src/graph/executor/StorageAccessExecutor.h +++ b/src/graph/executor/StorageAccessExecutor.h @@ -131,6 +131,9 @@ class StorageAccessExecutor : public Executor { "Storage Error: Part {} raft buffer is full. Please retry later.", partId)); case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED: return Status::Error("Storage Error: Atomic operation failed."); + case nebula::cpp2::ErrorCode::E_STORAGE_MEMORY_EXCEEDED: + return Status::Error("Storage Error: STORAGE_MEMORY_EXCEEDED(%d)", + static_cast(code)); default: auto status = Status::Error("Storage Error: part: %d, error: %s(%d).", partId, diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index f17164b4990..b464a58ba74 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -47,6 +47,12 @@ folly::Future BFSShortestPathExecutor::execute() { ds.colNames = pathNode_->colNames(); ds.rows.swap(currentDs_.rows); return finish(ResultBuilder().value(Value(std::move(ds))).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -147,12 +153,20 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { } } - return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } - return Status::OK(); - }); + return folly::collect(futures) + .via(runner()) + .thenValue([this](auto&& resps) { + for (auto& resp : resps) { + currentDs_.append(std::move(resp)); + } + return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + }); } DataSet BFSShortestPathExecutor::doConjunct(const std::vector& meetVids, diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index 2b0ab6fc8cc..c017f87166c 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -33,6 +33,13 @@ folly::Future BatchShortestPath::execute(const HashSet& startVids, result->append(std::move(ds)); } return Status::OK(); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -106,6 +113,13 @@ folly::Future BatchShortestPath::shortestPath(size_t rowNum, size_t step } } return handleResponse(rowNum, stepNum); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -139,6 +153,13 @@ folly::Future BatchShortestPath::getNeighbors(size_t rowNum, size_t step .thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) { addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -294,6 +315,13 @@ folly::Future BatchShortestPath::handleResponse(size_t rowNum, size_t st leftPathMap.clear(); rightPathMap.clear(); return shortestPath(rowNum, stepNum + 1); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -347,58 +375,63 @@ folly::Future BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep) } auto future = getMeetVids(rowNum, oddStep, meetVids); - return future.via(qctx_->rctx()->runner()).thenValue([this, rowNum, oddStep](auto&& vertices) { - if (vertices.empty()) { - return false; - } - robin_hood::unordered_flat_map> verticesMap; - for (auto& vertex : vertices) { - verticesMap[vertex.getVertex().vid] = std::move(vertex); - } - auto& terminationMap = terminationMaps_[rowNum]; - auto& leftPathMaps = currentLeftPathMaps_[rowNum]; - auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum]; - for (const auto& leftPathMap : leftPathMaps) { - auto findCommonVid = rightPathMaps.find(leftPathMap.first); - if (findCommonVid == rightPathMaps.end()) { - continue; - } - auto findCommonVertex = verticesMap.find(findCommonVid->first); - if (findCommonVertex == verticesMap.end()) { - continue; - } - auto& rightPaths = findCommonVid->second; - for (const auto& srcPaths : leftPathMap.second) { - auto range = terminationMap.equal_range(srcPaths.first); - if (range.first == range.second) { - continue; + return future.via(qctx_->rctx()->runner()) + .thenValue([this, rowNum, oddStep](auto&& vertices) { + if (vertices.empty()) { + return false; + } + robin_hood::unordered_flat_map> verticesMap; + for (auto& vertex : vertices) { + verticesMap[vertex.getVertex().vid] = std::move(vertex); } - for (const auto& dstPaths : rightPaths) { - for (auto found = range.first; found != range.second; ++found) { - if (found->second.first == dstPaths.first) { - if (singleShortest_ && !found->second.second) { - break; + auto& terminationMap = terminationMaps_[rowNum]; + auto& leftPathMaps = currentLeftPathMaps_[rowNum]; + auto& rightPathMaps = oddStep ? preRightPathMaps_[rowNum] : currentRightPathMaps_[rowNum]; + for (const auto& leftPathMap : leftPathMaps) { + auto findCommonVid = rightPathMaps.find(leftPathMap.first); + if (findCommonVid == rightPathMaps.end()) { + continue; + } + auto findCommonVertex = verticesMap.find(findCommonVid->first); + if (findCommonVertex == verticesMap.end()) { + continue; + } + auto& rightPaths = findCommonVid->second; + for (const auto& srcPaths : leftPathMap.second) { + auto range = terminationMap.equal_range(srcPaths.first); + if (range.first == range.second) { + continue; + } + for (const auto& dstPaths : rightPaths) { + for (auto found = range.first; found != range.second; ++found) { + if (found->second.first == dstPaths.first) { + if (singleShortest_ && !found->second.second) { + break; + } + doConjunctPath( + srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum); + found->second.second = false; + } } - doConjunctPath(srcPaths.second, dstPaths.second, findCommonVertex->second, rowNum); - found->second.second = false; } } } - } - } - // update terminationMap - for (auto iter = terminationMap.begin(); iter != terminationMap.end();) { - if (!iter->second.second) { - iter = terminationMap.erase(iter); - } else { - ++iter; - } - } - if (terminationMap.empty()) { - return true; - } - return false; - }); + // update terminationMap + for (auto iter = terminationMap.begin(); iter != terminationMap.end();) { + if (!iter->second.second) { + iter = terminationMap.erase(iter); + } else { + ++iter; + } + } + if (terminationMap.empty()) { + return true; + } + return false; + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + }); } void BatchShortestPath::doConjunctPath(const std::vector& leftPaths, diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index 3c3b1e4cd18..a7b4f641dc3 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -57,6 +57,13 @@ folly::Future MultiShortestPathExecutor::execute() { ds.colNames = pathNode_->colNames(); ds.rows.swap(currentDs_.rows); return finish(ResultBuilder().value(Value(std::move(ds))).build()); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -290,24 +297,29 @@ folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { futures.emplace_back(std::move(future)); } - return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } + return folly::collect(futures) + .via(runner()) + .thenValue([this](auto&& resps) { + for (auto& resp : resps) { + currentDs_.append(std::move(resp)); + } - for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) { - if (!iter->second.second) { - iter = terminationMap_.erase(iter); - } else { - ++iter; - } - } - if (terminationMap_.empty()) { - ectx_->setValue(terminationVar_, true); - return true; - } - return false; - }); + for (auto iter = terminationMap_.begin(); iter != terminationMap_.end();) { + if (!iter->second.second) { + iter = terminationMap_.erase(iter); + } else { + ++iter; + } + } + if (terminationMap_.empty()) { + ectx_->setValue(terminationVar_, true); + return true; + } + return false; + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + }); } void MultiShortestPathExecutor::setNextStepVid(const Interims& paths, const string& var) { diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp index 6d47c73eced..a66fbd7a900 100644 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp @@ -45,6 +45,13 @@ folly::Future ProduceAllPathsExecutor::execute() { ds.colNames = pathNode_->colNames(); ds.rows.swap(currentDs_.rows); return finish(ResultBuilder().value(Value(std::move(ds))).build()); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -166,16 +173,24 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { } } - return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } - preLeftPaths_.swap(leftPaths_); - preRightPaths_.swap(rightPaths_); - leftPaths_.clear(); - rightPaths_.clear(); - return Status::OK(); - }); + return folly::collect(futures) + .via(runner()) + .thenValue([this](auto&& resps) { + for (auto& resp : resps) { + currentDs_.append(std::move(resp)); + } + preLeftPaths_.swap(leftPaths_); + preRightPaths_.swap(rightPaths_); + leftPaths_.clear(); + rightPaths_.clear(); + return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + }); } void ProduceAllPathsExecutor::setNextStepVid(Interims& paths, const string& var) { diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index 7185ffb3579..b8e678ab57c 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -41,6 +41,9 @@ folly::Future> ShortestPathBase::getMeetVidsProps( .thenValue([this, getPropsTime](PropRpcResponse&& resp) { addStats(resp, getPropsTime.elapsedInUSec()); return handlePropResp(std::move(resp)); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture>(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/algo/ShortestPathBase.h b/src/graph/executor/algo/ShortestPathBase.h index ec94174b2e3..d5f26ea6c74 100644 --- a/src/graph/executor/algo/ShortestPathBase.h +++ b/src/graph/executor/algo/ShortestPathBase.h @@ -7,6 +7,7 @@ #include #include "graph/planner/plan/Algo.h" +#include "graph/executor/Executor.h" using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetNeighborsResponse; diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index 6b2affb2f40..87f90705890 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -32,6 +32,13 @@ folly::Future SingleShortestPath::execute(const HashSet& startVids, result->append(std::move(ds)); } return Status::OK(); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -75,6 +82,13 @@ folly::Future SingleShortestPath::shortestPath(size_t rowNum, size_t ste } } return handleResponse(rowNum, stepNum); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -110,6 +124,13 @@ folly::Future SingleShortestPath::getNeighbors(size_t rowNum, .thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) { addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -189,6 +210,13 @@ folly::Future SingleShortestPath::handleResponse(size_t rowNum, size_t s return folly::makeFuture(Status::OK()); } return shortestPath(rowNum, stepNum + 1); + }) + .thenError(folly::tag_t{}, + [](const std::bad_alloc&) { + return folly::makeFuture(Executor::memoryExceededStatus()); + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -249,33 +277,37 @@ void SingleShortestPath::buildOddPath(size_t rowNum, const std::vector& m folly::Future SingleShortestPath::buildEvenPath(size_t rowNum, const std::vector& meetVids) { auto future = getMeetVidsProps(meetVids); - return future.via(qctx_->rctx()->runner()).thenValue([this, rowNum](auto&& vertices) { - if (vertices.empty()) { - return false; - } - for (auto& meetVertex : vertices) { - if (!meetVertex.isVertex()) { - continue; - } - auto meetVid = meetVertex.getVertex().vid; - auto leftPaths = createLeftPath(rowNum, meetVid); - auto rightPaths = createRightPath(rowNum, meetVid, false); - for (auto& leftPath : leftPaths) { - for (auto& rightPath : rightPaths) { - Row path = leftPath; - auto& steps = path.values.back().mutableList().values; - steps.emplace_back(meetVertex); - steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end() - 1); - path.emplace_back(rightPath.values.back()); - resultDs_[rowNum].rows.emplace_back(std::move(path)); - if (singleShortest_) { - return true; + return future.via(qctx_->rctx()->runner()) + .thenValue([this, rowNum](auto&& vertices) { + if (vertices.empty()) { + return false; + } + for (auto& meetVertex : vertices) { + if (!meetVertex.isVertex()) { + continue; + } + auto meetVid = meetVertex.getVertex().vid; + auto leftPaths = createLeftPath(rowNum, meetVid); + auto rightPaths = createRightPath(rowNum, meetVid, false); + for (auto& leftPath : leftPaths) { + for (auto& rightPath : rightPaths) { + Row path = leftPath; + auto& steps = path.values.back().mutableList().values; + steps.emplace_back(meetVertex); + steps.insert(steps.end(), rightPath.values.begin(), rightPath.values.end() - 1); + path.emplace_back(rightPath.values.back()); + resultDs_[rowNum].rows.emplace_back(std::move(path)); + if (singleShortest_) { + return true; + } + } } } - } - } - return true; - }); + return true; + }) + .thenError(folly::tag_t{}, [](const std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + }); } std::vector SingleShortestPath::createLeftPath(size_t rowNum, const Value& meetVid) { diff --git a/src/graph/executor/algo/SubgraphExecutor.cpp b/src/graph/executor/algo/SubgraphExecutor.cpp index 57842631f91..0f88d575e5e 100644 --- a/src/graph/executor/algo/SubgraphExecutor.cpp +++ b/src/graph/executor/algo/SubgraphExecutor.cpp @@ -67,11 +67,9 @@ folly::Future SubgraphExecutor::getNeighbors() { vids_.clear(); return handleResponse(std::move(resp)); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception& e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/maintain/EdgeExecutor.cpp b/src/graph/executor/maintain/EdgeExecutor.cpp index ed4f144141d..e42592225a3 100644 --- a/src/graph/executor/maintain/EdgeExecutor.cpp +++ b/src/graph/executor/maintain/EdgeExecutor.cpp @@ -26,6 +26,12 @@ folly::Future CreateEdgeExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -54,6 +60,12 @@ folly::Future DescEdgeExecutor::execute() { .value(Value(std::move(ret).value())) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -73,6 +85,12 @@ folly::Future DropEdgeExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -80,8 +98,11 @@ folly::Future ShowEdgesExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listEdgeSchemas(spaceId).via(runner()).thenValue( - [this, spaceId](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listEdgeSchemas(spaceId) + .via(runner()) + .thenValue([this, spaceId](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edges failed: " << resp.status(); return resp.status(); @@ -103,6 +124,12 @@ folly::Future ShowEdgesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -129,6 +156,12 @@ folly::Future ShowCreateEdgeExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -148,6 +181,12 @@ folly::Future AlterEdgeExecutor::execute() { return resp.status(); } return finish(ResultBuilder().value(Value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/maintain/EdgeIndexExecutor.cpp b/src/graph/executor/maintain/EdgeIndexExecutor.cpp index 818e7ed5f08..ae8175e3a21 100644 --- a/src/graph/executor/maintain/EdgeIndexExecutor.cpp +++ b/src/graph/executor/maintain/EdgeIndexExecutor.cpp @@ -33,6 +33,12 @@ folly::Future CreateEdgeIndexExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -52,6 +58,12 @@ folly::Future DropEdgeIndexExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -79,6 +91,12 @@ folly::Future DescEdgeIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -105,6 +123,12 @@ folly::Future ShowCreateEdgeIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -113,8 +137,11 @@ folly::Future ShowEdgeIndexesExecutor::execute() { auto *iNode = asNode(node()); const auto &bySchema = iNode->name(); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listEdgeIndexes(spaceId).via(runner()).thenValue( - [this, spaceId, bySchema](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listEdgeIndexes(spaceId) + .via(runner()) + .thenValue([this, spaceId, bySchema](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edge indexes failed" << resp.status(); return resp.status(); @@ -158,6 +185,12 @@ folly::Future ShowEdgeIndexesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -165,8 +198,11 @@ folly::Future ShowEdgeIndexStatusExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listEdgeIndexStatus(spaceId).via(runner()).thenValue( - [this, spaceId](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listEdgeIndexStatus(spaceId) + .via(runner()) + .thenValue([this, spaceId](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edge index status failed" << resp.status(); @@ -187,6 +223,12 @@ folly::Future ShowEdgeIndexStatusExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 92d0018ace9..b0345ce2906 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -25,6 +25,12 @@ folly::Future CreateFTIndexExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -51,50 +57,66 @@ folly::Future DropFTIndexExecutor::execute() { LOG(WARNING) << "Drop fulltext index '" << inode->getName() << "' failed: " << ftRet; } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } folly::Future ShowFTIndexesExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listFTIndexes().via(runner()).thenValue( - [this, spaceId](StatusOr> resp) { - if (!resp.ok()) { - LOG(WARNING) << "SpaceId: " << spaceId << ", Show fulltext indexes failed" - << resp.status(); - return resp.status(); - } + return qctx() + ->getMetaClient() + ->listFTIndexes() + .via(runner()) + .thenValue( + [this, spaceId](StatusOr> resp) { + if (!resp.ok()) { + LOG(WARNING) << "SpaceId: " << spaceId << ", Show fulltext indexes failed" + << resp.status(); + return resp.status(); + } - auto indexes = std::move(resp).value(); - DataSet dataSet; - dataSet.colNames = {"Name", "Schema Type", "Schema Name", "Fields"}; - for (auto &index : indexes) { - if (index.second.get_space_id() != spaceId) { - continue; - } - auto shmId = index.second.get_depend_schema(); - auto isEdge = shmId.getType() == nebula::cpp2::SchemaID::Type::edge_type; - auto shmNameRet = - isEdge ? this->qctx_->schemaMng()->toEdgeName(spaceId, shmId.get_edge_type()) - : this->qctx_->schemaMng()->toTagName(spaceId, shmId.get_tag_id()); - if (!shmNameRet.ok()) { - LOG(WARNING) << "SpaceId: " << spaceId - << ", Get schema name failed: " << shmNameRet.status(); - return shmNameRet.status(); - } - std::string fields; - folly::join(", ", index.second.get_fields(), fields); - Row row; - row.values.emplace_back(index.first); - row.values.emplace_back(isEdge ? "Edge" : "Tag"); - row.values.emplace_back(std::move(shmNameRet).value()); - row.values.emplace_back(std::move(fields)); - dataSet.rows.emplace_back(std::move(row)); - } - return finish(ResultBuilder() - .value(Value(std::move(dataSet))) - .iter(Iterator::Kind::kDefault) - .build()); + auto indexes = std::move(resp).value(); + DataSet dataSet; + dataSet.colNames = {"Name", "Schema Type", "Schema Name", "Fields"}; + for (auto &index : indexes) { + if (index.second.get_space_id() != spaceId) { + continue; + } + auto shmId = index.second.get_depend_schema(); + auto isEdge = shmId.getType() == nebula::cpp2::SchemaID::Type::edge_type; + auto shmNameRet = + isEdge ? this->qctx_->schemaMng()->toEdgeName(spaceId, shmId.get_edge_type()) + : this->qctx_->schemaMng()->toTagName(spaceId, shmId.get_tag_id()); + if (!shmNameRet.ok()) { + LOG(WARNING) << "SpaceId: " << spaceId + << ", Get schema name failed: " << shmNameRet.status(); + return shmNameRet.status(); + } + std::string fields; + folly::join(", ", index.second.get_fields(), fields); + Row row; + row.values.emplace_back(index.first); + row.values.emplace_back(isEdge ? "Edge" : "Tag"); + row.values.emplace_back(std::move(shmNameRet).value()); + row.values.emplace_back(std::move(fields)); + dataSet.rows.emplace_back(std::move(row)); + } + return finish(ResultBuilder() + .value(Value(std::move(dataSet))) + .iter(Iterator::Kind::kDefault) + .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/maintain/TagExecutor.cpp b/src/graph/executor/maintain/TagExecutor.cpp index c30578388dd..cc17aeb2da0 100644 --- a/src/graph/executor/maintain/TagExecutor.cpp +++ b/src/graph/executor/maintain/TagExecutor.cpp @@ -26,6 +26,12 @@ folly::Future CreateTagExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -52,6 +58,12 @@ folly::Future DescTagExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -71,6 +83,12 @@ folly::Future DropTagExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -78,8 +96,11 @@ folly::Future ShowTagsExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listTagSchemas(spaceId).via(runner()).thenValue( - [this, spaceId](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listTagSchemas(spaceId) + .via(runner()) + .thenValue([this, spaceId](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tags failed: " << resp.status(); return resp.status(); @@ -101,6 +122,12 @@ folly::Future ShowTagsExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -127,6 +154,12 @@ folly::Future ShowCreateTagExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -146,6 +179,12 @@ folly::Future AlterTagExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } } // namespace graph diff --git a/src/graph/executor/maintain/TagIndexExecutor.cpp b/src/graph/executor/maintain/TagIndexExecutor.cpp index b9687a895cb..a25779a0c4c 100644 --- a/src/graph/executor/maintain/TagIndexExecutor.cpp +++ b/src/graph/executor/maintain/TagIndexExecutor.cpp @@ -33,6 +33,12 @@ folly::Future CreateTagIndexExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -52,6 +58,12 @@ folly::Future DropTagIndexExecutor::execute() { return resp.status(); } return Status::OK(); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -79,6 +91,12 @@ folly::Future DescTagIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -105,6 +123,12 @@ folly::Future ShowCreateTagIndexExecutor::execute() { } return finish( ResultBuilder().value(std::move(ret).value()).iter(Iterator::Kind::kDefault).build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -113,8 +137,11 @@ folly::Future ShowTagIndexesExecutor::execute() { auto *iNode = asNode(node()); const auto &bySchema = iNode->name(); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listTagIndexes(spaceId).via(runner()).thenValue( - [this, spaceId, bySchema](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listTagIndexes(spaceId) + .via(runner()) + .thenValue([this, spaceId, bySchema](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tag indexes failed" << resp.status(); return resp.status(); @@ -159,6 +186,12 @@ folly::Future ShowTagIndexesExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } @@ -166,8 +199,11 @@ folly::Future ShowTagIndexStatusExecutor::execute() { SCOPED_TIMER(&execTime_); auto spaceId = qctx()->rctx()->session()->space().id; - return qctx()->getMetaClient()->listTagIndexStatus(spaceId).via(runner()).thenValue( - [this, spaceId](StatusOr> resp) { + return qctx() + ->getMetaClient() + ->listTagIndexStatus(spaceId) + .via(runner()) + .thenValue([this, spaceId](StatusOr> resp) { if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tag index status failed: " << resp.status(); @@ -188,6 +224,12 @@ folly::Future ShowTagIndexStatusExecutor::execute() { .value(Value(std::move(dataSet))) .iter(Iterator::Kind::kDefault) .build()); + }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { + return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index 2d1ad3b6254..3372ddf0351 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -73,11 +73,9 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception& e) { return folly::makeFuture(std::runtime_error(e.what())); }); @@ -136,11 +134,9 @@ folly::Future DeleteTagsExecutor::deleteTags() { NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception& e) { return folly::makeFuture(std::runtime_error(e.what())); }); @@ -230,11 +226,9 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception& e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index bfcf919b7c1..a87a4a575ad 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -41,11 +41,9 @@ folly::Future InsertVerticesExecutor::insertVertices() { NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); @@ -79,11 +77,9 @@ folly::Future InsertEdgesExecutor::insertEdges() { NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index 94ef781d95e..0011ebfce28 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -83,11 +83,9 @@ folly::Future UpdateVertexExecutor::execute() { } return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); @@ -142,11 +140,9 @@ folly::Future UpdateEdgeExecutor::execute() { } return Status::OK(); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index 53e19cbe479..0cdfe005402 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -71,11 +71,9 @@ folly::Future AppendVerticesExecutor::appendVertices() { return handleRespMultiJobs(std::move(rpcResp)); } }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/GetDstBySrcExecutor.cpp b/src/graph/executor/query/GetDstBySrcExecutor.cpp index ccef0639096..bdb1a927804 100644 --- a/src/graph/executor/query/GetDstBySrcExecutor.cpp +++ b/src/graph/executor/query/GetDstBySrcExecutor.cpp @@ -62,12 +62,10 @@ folly::Future GetDstBySrcExecutor::execute() { } return handleResponse(resp, this->gd_->colNames()); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 70d20f18f0b..8a8dcf18569 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -106,11 +106,9 @@ folly::Future GetEdgesExecutor::getEdges() { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), ge->colNames()); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index ea0245b312e..9fdadb5174a 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -76,12 +76,10 @@ folly::Future GetNeighborsExecutor::execute() { } return handleResponse(resp); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) + .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); } diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index e2559c7b087..147a812ea7a 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -57,11 +57,9 @@ folly::Future GetVerticesExecutor::getVertices() { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), gv->colNames()); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index 843bd0fb25b..1744ba1faf4 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -50,11 +50,9 @@ folly::Future IndexScanExecutor::indexScan() { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp)); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/ScanEdgesExecutor.cpp b/src/graph/executor/query/ScanEdgesExecutor.cpp index 1fc8981a784..65b27813ae1 100644 --- a/src/graph/executor/query/ScanEdgesExecutor.cpp +++ b/src/graph/executor/query/ScanEdgesExecutor.cpp @@ -46,11 +46,9 @@ folly::Future ScanEdgesExecutor::scanEdges() { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), {}); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/ScanVerticesExecutor.cpp b/src/graph/executor/query/ScanVerticesExecutor.cpp index 00d8d3ed216..c97bfbfa53a 100644 --- a/src/graph/executor/query/ScanVerticesExecutor.cpp +++ b/src/graph/executor/query/ScanVerticesExecutor.cpp @@ -47,11 +47,9 @@ folly::Future ScanVerticesExecutor::scanVertices() { addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), sv->colNames()); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc &) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc &) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception &e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 61ec57e7040..aa37aa45e3d 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -110,11 +110,9 @@ folly::Future TraverseExecutor::getNeighbors() { addStats(resp, getNbrTime.elapsedInUSec()); return handleResponse(std::move(resp)); }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(std::runtime_error( - "Memory Limit Exceeded, " + memory::MemoryStats::instance().toString())); - }) + .thenError( + folly::tag_t{}, + [](const std::bad_alloc&) { return folly::makeFuture(memoryExceededStatus()); }) .thenError(folly::tag_t{}, [](const std::exception& e) { return folly::makeFuture(std::runtime_error(e.what())); }); diff --git a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp index 3bb81af59f8..26253ba21ea 100644 --- a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp +++ b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp @@ -16,15 +16,23 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) : } folly::Future AsyncMsgNotifyBasedScheduler::schedule() { - auto root = qctx_->plan()->root(); - if (FLAGS_enable_lifetime_optimize) { - // special for root - root->outputVarPtr()->userCount.store(std::numeric_limits::max(), - std::memory_order_relaxed); - analyzeLifetime(root); + try { + auto root = qctx_->plan()->root(); + if (FLAGS_enable_lifetime_optimize) { + // special for root + root->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); + analyzeLifetime(root); + } + auto executor = Executor::create(root, qctx_); + return doSchedule(executor); + } catch (std::bad_alloc& e) { + return folly::makeFuture(Executor::memoryExceededStatus()); + } catch (std::exception& e) { + return folly::makeFuture(std::runtime_error(e.what())); + } catch (...) { + return folly::makeFuture(std::runtime_error("unknown exception")); } - auto executor = Executor::create(root, qctx_); - return doSchedule(executor); } folly::Future AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) const { @@ -83,6 +91,7 @@ folly::Future AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) c scheduleExecutor(std::move(currentExeFutures), exe, runner) .thenTry([this, pros = std::move(currentExePromises)](auto&& t) mutable { + // any exception or status not ok handled with notifyError if (t.hasException()) { notifyError(pros, Status::Error(std::move(t).exception().what())); } else {