Skip to content

Commit

Permalink
[memtracker] improve throw bad_alloc logic (#5167)
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Dec 30, 2022
1 parent 4a5c548 commit 3bce910
Show file tree
Hide file tree
Showing 55 changed files with 721 additions and 442 deletions.
12 changes: 12 additions & 0 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/base/Logging.h"
#include "common/base/StatusOr.h"
#include "common/datatypes/HostAddr.h"
#include "common/memory/MemoryTracker.h"
#include "common/ssl/SSLConfig.h"
#include "common/stats/StatsManager.h"
#include "common/thrift/ThriftTypes.h"
Expand Down Expand Up @@ -100,6 +101,7 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
return folly::collectAll(respFutures)
.deferValue([this, requests = std::move(requests), totalLatencies, hosts](
std::vector<folly::Try<StatusOr<Response>>>&& resps) {
memory::MemoryCheckGuard guard;
StorageRpcResponse<Response> rpcResp(resps.size());
for (size_t i = 0; i < resps.size(); i++) {
auto& host = hosts->at(i);
Expand Down Expand Up @@ -158,11 +160,13 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
auto spaceId = request.get_space_id();
return folly::via(evb)
.thenValue([remoteFunc = std::move(remoteFunc), request, evb, host, this](auto&&) {
memory::MemoryCheckGuard guard;
// NOTE: Create new channel on each thread to avoid TIMEOUT RPC error
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
return remoteFunc(client.get(), request);
})
.thenValue([spaceId, this](Response&& resp) mutable -> StatusOr<Response> {
memory::MemoryCheckGuard guard;
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
auto partId = part.get_part_id();
Expand Down Expand Up @@ -192,6 +196,14 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
}
return std::move(resp);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<StatusOr<Response>>(std::bad_alloc());
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception& e) {
return folly::makeFuture<StatusOr<Response>>(std::runtime_error(e.what()));
})
.thenError([request, host, spaceId, this](
folly::exception_wrapper&& exWrapper) mutable -> StatusOr<Response> {
stats::StatsManager::addValue(kNumRpcSentToStoragedFailed);
Expand Down
24 changes: 23 additions & 1 deletion src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct ThreadMemoryStats {

// reserved bytes size in current thread
int64_t reserved;
bool throwOnMemoryExceeded{false};
};

/**
Expand Down Expand Up @@ -127,10 +128,20 @@ class MemoryStats {
return fmt::format("MemoryStats: {}/{}", ReadableSize(limit_), ReadableSize(used_));
}

// turn on current thread's throwOnMemoryExceeded
static void turnOnThrow() {
threadMemoryStats_.throwOnMemoryExceeded = true;
}

// turn off current thread's throwOnMemoryExceeded
static void turnOffThrow() {
threadMemoryStats_.throwOnMemoryExceeded = false;
}

private:
inline ALWAYS_INLINE void allocGlobal(int64_t size) {
int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed);
if (willBe > limit_) {
if (threadMemoryStats_.throwOnMemoryExceeded && willBe > limit_) {
// revert
used_.fetch_sub(size, std::memory_order_relaxed);
throw std::bad_alloc();
Expand All @@ -147,6 +158,17 @@ class MemoryStats {
static constexpr int64_t kLocalReservedLimit_ = 1 * MiB;
};

// A guard to only enable memory check (throw when memory exceed) during its lifetime.
struct MemoryCheckGuard {
MemoryCheckGuard() {
MemoryStats::turnOnThrow();
}

~MemoryCheckGuard() {
MemoryStats::turnOffThrow();
}
};

// A global static memory tracker enable tracking every memory allocation and deallocation.
// This is not the place where real memory allocation or deallocation happens, only do the
// memory tracking.
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator
futures.emplace_back(folly::via(
runner(),
[begin, end, tmpIter = iter->copy(), f = std::move(scatter)]() mutable -> ScatterResult {
memory::MemoryCheckGuard guard;
// Since not all iterators are linear, so iterates to the begin pos
size_t tmp = 0;
for (; tmpIter->valid() && tmp < begin; ++tmp) {
Expand Down
14 changes: 12 additions & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,26 @@ folly::Future<Status> BFSShortestPathExecutor::execute() {
}

std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); });
auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); });
auto leftFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(false);
});
auto rightFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(true);
});
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
return conjunctPath();
})
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
step_++;
DataSet ds;
Expand Down Expand Up @@ -147,6 +155,7 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
batchVids.push_back(vid);
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
memory::MemoryCheckGuard guard;
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
Expand All @@ -156,6 +165,7 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
Expand Down
5 changes: 5 additions & 0 deletions src/graph/executor/algo/BatchShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ folly::Future<Status> BatchShortestPath::execute(const HashSet& startVids,
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.thenValue([this, result](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
Expand Down Expand Up @@ -107,6 +108,7 @@ folly::Future<Status> BatchShortestPath::shortestPath(size_t rowNum, size_t step
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, stepNum](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
Expand Down Expand Up @@ -151,6 +153,7 @@ folly::Future<Status> BatchShortestPath::getNeighbors(size_t rowNum, size_t step
nullptr)
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) {
memory::MemoryCheckGuard guard;
addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse);
return buildPath(rowNum, std::move(resp), reverse);
})
Expand Down Expand Up @@ -280,6 +283,7 @@ folly::Future<Status> BatchShortestPath::handleResponse(size_t rowNum, size_t st
return folly::makeFuture(Status::OK())
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum](auto&& status) {
memory::MemoryCheckGuard guard;
// odd step
UNUSED(status);
return conjunctPath(rowNum, true);
Expand Down Expand Up @@ -377,6 +381,7 @@ folly::Future<bool> BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep)
auto future = getMeetVids(rowNum, oddStep, meetVids);
return future.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, oddStep](auto&& vertices) {
memory::MemoryCheckGuard guard;
if (vertices.empty()) {
return false;
}
Expand Down
31 changes: 23 additions & 8 deletions src/graph/executor/algo/MultiShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@ folly::Future<Status> MultiShortestPathExecutor::execute() {
}

std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); });
auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); });
auto leftFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(false);
});
auto rightFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(true);
});
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
// oddStep
UNUSED(status);
return conjunctPath(true);
})
.thenValue([this](auto&& termination) {
memory::MemoryCheckGuard guard;
// termination is true, all paths has found
if (termination || step_ * 2 > pathNode_->steps()) {
return folly::makeFuture<bool>(true);
Expand Down Expand Up @@ -266,8 +274,10 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() {
memory::MemoryCheckGuard guard;
return doConjunct(iters);
});
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
Expand All @@ -283,23 +293,28 @@ folly::Future<bool> MultiShortestPathExecutor::conjunctPath(bool oddStep) {
}
pathIters.emplace_back(leftIter, rightIter);
if (++i == batchSize) {
auto future = folly::via(
runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() {
memory::MemoryCheckGuard guard;
return doConjunct(iters);
});
futures.emplace_back(std::move(future));
pathIters.reserve(batchSize);
i = 0;
}
}
}
if (i != 0) {
auto future =
folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); });
auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() {
memory::MemoryCheckGuard guard;
return doConjunct(iters);
});
futures.emplace_back(std::move(future));
}

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
Expand Down
32 changes: 24 additions & 8 deletions src/graph/executor/algo/ProduceAllPathsExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,26 @@ folly::Future<Status> ProduceAllPathsExecutor::execute() {
}
}
std::vector<folly::Future<Status>> futures;
auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); });
auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); });
auto leftFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(false);
});
auto rightFuture = folly::via(runner(), [this]() {
memory::MemoryCheckGuard guard;
return buildPath(true);
});
futures.emplace_back(std::move(leftFuture));
futures.emplace_back(std::move(rightFuture));

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
return conjunctPath();
})
.thenValue([this](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
step_++;
DataSet ds;
Expand Down Expand Up @@ -147,11 +155,14 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {
if (++i == batchSize) {
auto endIter = leftIter;
endIter++;
auto oddStepFuture = folly::via(
runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, true); });
auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() {
memory::MemoryCheckGuard guard;
return doConjunct(startIter, endIter, true);
});
futures.emplace_back(std::move(oddStepFuture));
if (step_ * 2 <= pathNode_->steps()) {
auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() {
memory::MemoryCheckGuard guard;
return doConjunct(startIter, endIter, false);
});
futures.emplace_back(std::move(evenStepFuture));
Expand All @@ -163,19 +174,24 @@ folly::Future<Status> ProduceAllPathsExecutor::conjunctPath() {
}
if (i != 0) {
auto endIter = leftPaths_.end();
auto oddStepFuture = folly::via(
runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, true); });
auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() {
memory::MemoryCheckGuard guard;
return doConjunct(startIter, endIter, true);
});
futures.emplace_back(std::move(oddStepFuture));
if (step_ * 2 <= pathNode_->steps()) {
auto evenStepFuture = folly::via(
runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, false); });
auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() {
memory::MemoryCheckGuard guard;
return doConjunct(startIter, endIter, false);
});
futures.emplace_back(std::move(evenStepFuture));
}
}

return folly::collect(futures)
.via(runner())
.thenValue([this](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
currentDs_.append(std::move(resp));
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/ShortestPathBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ folly::Future<std::vector<Value>> ShortestPathBase::getMeetVidsProps(
nullptr)
.via(qctx_->rctx()->runner())
.thenValue([this, getPropsTime](PropRpcResponse&& resp) {
memory::MemoryCheckGuard guard;
addStats(resp, getPropsTime.elapsedInUSec());
return handlePropResp(std::move(resp));
})
Expand Down
5 changes: 5 additions & 0 deletions src/graph/executor/algo/SingleShortestPath.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ folly::Future<Status> SingleShortestPath::execute(const HashSet& startVids,
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.thenValue([this, result](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
NG_RETURN_IF_ERROR(resp);
}
Expand Down Expand Up @@ -76,6 +77,7 @@ folly::Future<Status> SingleShortestPath::shortestPath(size_t rowNum, size_t ste
return folly::collect(futures)
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, stepNum](auto&& resps) {
memory::MemoryCheckGuard guard;
for (auto& resp : resps) {
if (!resp.ok()) {
return folly::makeFuture<Status>(std::move(resp));
Expand Down Expand Up @@ -122,6 +124,7 @@ folly::Future<Status> SingleShortestPath::getNeighbors(size_t rowNum,
nullptr)
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) {
memory::MemoryCheckGuard guard;
addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse);
return buildPath(rowNum, std::move(resp), reverse);
})
Expand Down Expand Up @@ -197,6 +200,7 @@ folly::Future<Status> SingleShortestPath::handleResponse(size_t rowNum, size_t s
return folly::makeFuture<Status>(Status::OK())
.via(qctx_->rctx()->runner())
.thenValue([this, rowNum, stepNum](auto&& status) {
memory::MemoryCheckGuard guard;
UNUSED(status);
return conjunctPath(rowNum, stepNum);
})
Expand Down Expand Up @@ -279,6 +283,7 @@ folly::Future<bool> SingleShortestPath::buildEvenPath(size_t rowNum,
auto future = getMeetVidsProps(meetVids);
return future.via(qctx_->rctx()->runner())
.thenValue([this, rowNum](auto&& vertices) {
memory::MemoryCheckGuard guard;
if (vertices.empty()) {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/algo/SubgraphExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ folly::Future<Status> SubgraphExecutor::getNeighbors() {
currentStep_ == 1 ? nullptr : subgraph_->tagFilter())
.via(runner())
.thenValue([this, getNbrTime](RpcResponse&& resp) mutable {
memory::MemoryCheckGuard guard;
otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec()));
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
Expand Down
Loading

0 comments on commit 3bce910

Please sign in to comment.