Skip to content

Commit

Permalink
storage add memmory tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Dec 22, 2022
1 parent 751a302 commit e896092
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 216 deletions.
4 changes: 4 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ enum ErrorCode {
E_QUERY_NOT_FOUND = -2073, // Query not found
E_AGENT_HB_FAILUE = -2074, // Failed to receive heartbeat from agent

E_GRAPH_MEMORY_EXCEEDED = -2600, // Graph memory exceeded

// 3xxx for storaged
E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election
E_KEY_HAS_EXISTS = -3002, // Key already exists
Expand Down Expand Up @@ -511,5 +513,7 @@ enum ErrorCode {
E_RAFT_ATOMIC_OP_FAILED = -3530, // Atomic operation failed
E_LEADER_LEASE_FAILED = -3531, // Leader lease expired

E_STORAGE_MEMORY_EXCEEDED = -3600, // Storage memory exceeded

E_UNKNOWN = -8000, // Unknown error
} (cpp.enum_strict)
26 changes: 26 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ class BaseProcessor {
delete this;
}

virtual void onError() {
if (counters_) {
stats::StatsManager::addValue(counters_->numCalls_);
if (!this->result_.get_failed_parts().empty()) {
stats::StatsManager::addValue(counters_->numErrors_);
}
}

this->result_.latency_in_us_ref() = this->duration_.elapsedInUSec();
if (!profileDetail_.empty()) {
this->result_.latency_detail_us_ref() = std::move(profileDetail_);
}

cpp2::PartitionResult thriftRet;
thriftRet.code_ref() = memoryExceeded_ ? nebula::cpp2::ErrorCode::E_STORAGE_MEMORY_EXCEEDED
: nebula::cpp2::ErrorCode::E_UNKNOWN;
thriftRet.part_id_ref() = 0;
this->result_.failed_parts_ref() = {thriftRet};

this->resp_.result_ref() = std::move(this->result_);
this->promise_.setValue(std::move(this->resp_));

delete this;
}

nebula::cpp2::ErrorCode getSpaceVidLen(GraphSpaceID spaceId) {
auto len = this->env_->schemaMan_->getSpaceVidLen(spaceId);
if (!len.ok()) {
Expand Down Expand Up @@ -130,6 +155,7 @@ class BaseProcessor {
std::map<std::string, int32_t> profileDetail_;
std::mutex profileMut_;
bool profileDetailFlag_{false};
bool memoryExceeded_{false};
};

} // namespace storage
Expand Down
27 changes: 26 additions & 1 deletion src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "clients/storage/InternalStorageClient.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/memory/MemoryUtils.h"
#include "common/meta/ServerBasedIndexManager.h"
#include "common/meta/ServerBasedSchemaManager.h"
#include "common/network/NetworkUtils.h"
Expand Down Expand Up @@ -49,6 +50,7 @@ DECLARE_string(local_ip);
DEFINE_bool(storage_kv_mode, false, "True for kv mode");
DEFINE_int32(num_io_threads, 16, "Number of IO threads");
DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread");
DEFINE_int32(check_memory_interval_in_secs, 1, "Memory check interval in seconds");

namespace nebula {
namespace storage {
Expand All @@ -64,6 +66,28 @@ StorageServer::StorageServer(HostAddr localHost,
walPath_(std::move(walPath)),
listenerPath_(std::move(listenerPath)) {}

Status StorageServer::setupMemoryMonitorThread() {
memoryMonitorThread_ = std::make_unique<thread::GenericWorker>();
if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) {
return Status::Error("Fail to start query engine background thread.");
}

auto updateMemoryWatermark = []() -> Status {
auto status = MemoryUtils::hitsHighWatermark();
NG_RETURN_IF_ERROR(status);
MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
return Status::OK();
};

// Just to test whether to get the right memory info
NG_RETURN_IF_ERROR(updateMemoryWatermark());

auto ms = FLAGS_check_memory_interval_in_secs * 1000;
memoryMonitorThread_->addRepeatTask(ms, updateMemoryWatermark);

return Status::OK();
}

std::unique_ptr<kvstore::KVStore> StorageServer::getStoreInstance() {
kvstore::KVOptions options;
options.dataPaths_ = dataPaths_;
Expand Down Expand Up @@ -280,7 +304,8 @@ bool StorageServer::start() {
}
serverStatus_ = STATUS_RUNNING;
}
return true;

return setupMemoryMonitorThread().ok();
}

void StorageServer::waitUntilStop() {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/StorageServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class StorageServer final {
enum ServiceStatus { STATUS_UNINITIALIZED = 0, STATUS_RUNNING = 1, STATUS_STOPPED = 2 };

private:
Status setupMemoryMonitorThread();

std::unique_ptr<kvstore::KVStore> getStoreInstance();

/**
Expand Down Expand Up @@ -126,6 +128,8 @@ class StorageServer final {
ServiceStatus serverStatus_{STATUS_UNINITIALIZED};
std::mutex muStop_;
std::condition_variable cvStop_;

std::unique_ptr<thread::GenericWorker> memoryMonitorThread_;
};

} // namespace storage
Expand Down
86 changes: 56 additions & 30 deletions src/storage/query/GetDstBySrcProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <robin_hood.h>

#include "common/memory/MemoryTracker.h"
#include "common/thread/GenericThreadPool.h"
#include "storage/exec/EdgeNode.h"
#include "storage/exec/GetDstBySrcNode.h"
Expand All @@ -28,38 +29,48 @@ void GetDstBySrcProcessor::process(const cpp2::GetDstBySrcRequest& req) {
}

void GetDstBySrcProcessor::doProcess(const cpp2::GetDstBySrcRequest& req) {
if (req.common_ref().has_value() && req.get_common()->profile_detail_ref().value_or(false)) {
profileDetailFlag_ = true;
profileDetail("GetDstBySrcProcessorTotal", 0);
profileDetail("GetDstBySrcProcessorDedup", 0);
}
try {
if (req.common_ref().has_value() && req.get_common()->profile_detail_ref().value_or(false)) {
profileDetailFlag_ = true;
profileDetail("GetDstBySrcProcessorTotal", 0);
profileDetail("GetDstBySrcProcessorDedup", 0);
}

spaceId_ = req.get_space_id();
auto retCode = getSpaceVidLen(spaceId_);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
pushResultCode(retCode, p.first);
spaceId_ = req.get_space_id();
auto retCode = getSpaceVidLen(spaceId_);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
pushResultCode(retCode, p.first);
}
onFinished();
return;
}
onFinished();
return;
}
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());

// check edgetypes exists
retCode = checkAndBuildContexts(req);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
pushResultCode(retCode, p.first);
this->planContext_ = std::make_unique<PlanContext>(
this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref());

// check edgetypes exists
retCode = checkAndBuildContexts(req);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
for (auto& p : req.get_parts()) {
pushResultCode(retCode, p.first);
}
onFinished();
return;
}
onFinished();
return;
}

if (!FLAGS_query_concurrently) {
runInSingleThread(req);
} else {
runInMultipleThread(req);
if (!FLAGS_query_concurrently) {
runInSingleThread(req);
} else {
runInMultipleThread(req);
}
} catch (std::bad_alloc& e) {
memoryExceeded_ = true;
onError();
} catch (std::exception& e) {
LOG(ERROR) << e.what();
onError();
} catch (...) {
onError();
}
}

Expand Down Expand Up @@ -124,7 +135,10 @@ void GetDstBySrcProcessor::runInMultipleThread(const cpp2::GetDstBySrcRequest& r
// flatResult_.reserve(sum);

for (size_t j = 0; j < tries.size(); j++) {
DCHECK(!tries[j].hasException());
if (tries[j].hasException()) {
onError();
return;
}
const auto& [code, partId] = tries[j].value();
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(code, spaceId_, partId);
Expand Down Expand Up @@ -168,7 +182,19 @@ folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>> GetDstBySrcProces
}
}
return std::make_pair(nebula::cpp2::ErrorCode::SUCCEEDED, partId);
});
})
.thenError(folly::tag_t<std::bad_alloc>{},
[this](const std::bad_alloc&) {
memoryExceeded_ = true;
return folly::makeFuture<std::pair<nebula::cpp2::ErrorCode, PartitionID>>(
std::runtime_error("Memory Limit Exceeded, " +
memory::MemoryStats::instance().toString()));
})
.thenError(folly::tag_t<std::exception>{}, [](const std::exception& e) {
LOG(ERROR) << e.what();
return folly::makeFuture<std::pair<nebula::cpp2::ErrorCode, PartitionID>>(
std::runtime_error(e.what()));
});
}

StoragePlan<VertexID> GetDstBySrcProcessor::buildPlan(RuntimeContext* context,
Expand Down
Loading

0 comments on commit e896092

Please sign in to comment.