Skip to content

Commit

Permalink
refine MemTracker log & adjust flag & fix build with gcc (#5155)
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Dec 29, 2022
1 parent c0e835b commit e287363
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 20 deletions.
22 changes: 20 additions & 2 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@
namespace nebula {
namespace memory {

constexpr int64_t KiB = 1024;
constexpr int64_t MiB = 1024 * KiB;
constexpr int64_t GiB = 1024 * MiB;

static std::string ReadableSize(double size) {
if (size < KiB) {
return fmt::format("{}B", size);
} else if (size < MiB) {
return fmt::format("{:.3f}KiB", size / KiB);
} else if (size < GiB) {
return fmt::format("{:.3f}MiB", size / MiB);
} else {
return fmt::format("{:.3f}GiB", size / GiB);
}
}

constexpr size_t
#if defined(__cpp_lib_hardware_interference_size)
CACHE_LINE_SIZE = hardware_destructive_interference_size;
Expand Down Expand Up @@ -86,6 +102,8 @@ class MemoryStats {
/// Set limit (maximum usable bytes) of memory
void setLimit(int64_t limit) {
if (this->limit_ != limit) {
LOG(INFO) << fmt::format(
"MemoryTracker update limit {} -> {}", ReadableSize(this->limit_), ReadableSize(limit));
this->limit_ = limit;
}
}
Expand All @@ -106,7 +124,7 @@ class MemoryStats {
}

std::string toString() {
return fmt::format("MemoryStats, limit:{}, used:{}", limit_, used_);
return fmt::format("MemoryStats: {}/{}", ReadableSize(limit_), ReadableSize(used_));
}

private:
Expand All @@ -126,7 +144,7 @@ class MemoryStats {
// Thread Local
static thread_local ThreadMemoryStats threadMemoryStats_;
// Each thread reserves this amount of memory
static constexpr int64_t kLocalReservedLimit_ = 1 * 1024 * 1024;
static constexpr int64_t kLocalReservedLimit_ = 1 * MiB;
};

// A global static memory tracker enable tracking every memory allocation and deallocation.
Expand Down
46 changes: 36 additions & 10 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,23 @@ DEFINE_string(cgroup_v2_memory_current_path,

DEFINE_bool(memory_purge_enabled, true, "memory purge enabled, default true");
DEFINE_int32(memory_purge_interval_seconds, 10, "memory purge interval in seconds, default 10");
DEFINE_bool(memory_tracker_detail_log, false, "print memory stats detail log");
DEFINE_bool(memory_tracker_detail_log, true, "print memory stats detail log");
DEFINE_int32(memory_tracker_detail_log_interval_ms,
60000,
"print memory stats detail log interval in ms");

DEFINE_double(memory_tracker_untracked_reserved_memory_mb,
50,
"memory tacker tracks memory of new/delete, this flag defined reserved untracked "
"memory (direct call malloc/free)");
DEFINE_double(memory_tracker_limit_ratio, 1, "memory tacker usable memory ratio to total limit");
DEFINE_double(memory_tracker_limit_ratio,
0.8,
"memory tacker usable memory ratio to (total_available - untracked_reserved_memory)");

using nebula::fs::FileUtils;

namespace nebula {
namespace memory {

static const std::regex reMemAvailable(
R"(^Mem(Available|Free|Total):\s+(\d+)\skB$)"); // when can't use MemAvailable, use MemFree
Expand All @@ -62,6 +69,7 @@ static const std::regex reTotalCache(R"(^total_(cache|inactive_file)\s+(\d+)$)")

std::atomic_bool MemoryUtils::kHitMemoryHighWatermark{false};
int64_t MemoryUtils::kLastPurge_{0};
int64_t MemoryUtils::kLastPrintMemoryTrackerStats_{0};

StatusOr<bool> MemoryUtils::hitsHighWatermark() {
if (FLAGS_system_memory_high_watermark_ratio >= 1.0) {
Expand Down Expand Up @@ -113,19 +121,28 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
// MemoryStats depends on jemalloc
#if ENABLE_JEMALLOC
// set MemoryStats limit (MemoryTracker track-able memory)
memory::MemoryStats::instance().setLimit(
(total - FLAGS_memory_tracker_untracked_reserved_memory_mb * 1024 * 1024) *
FLAGS_memory_tracker_limit_ratio);
int64_t trackable = total - FLAGS_memory_tracker_untracked_reserved_memory_mb * MiB;
if (trackable > 0) {
MemoryStats::instance().setLimit(trackable * FLAGS_memory_tracker_limit_ratio);
} else {
// Do not set limit, keep previous set limit or default limit
LOG(ERROR) << "Total available memory less than "
<< FLAGS_memory_tracker_untracked_reserved_memory_mb << " Mib";
}

// purge if enabled
if (FLAGS_memory_purge_enabled) {
int64_t now = time::WallClock::fastNowInSec();
if (now - kLastPurge_ > FLAGS_memory_purge_interval_seconds) {
// mallctl seems has issue with address_sanitizer, do purge only when address_sanitizer is off
#if defined(__clang)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
#endif
#else // gcc
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
kLastPurge_ = now;
}
Expand All @@ -142,13 +159,21 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
// usr: record by current process's MemoryStats
// used: bytes allocated by new operator
// total: sys_total * FLAGS_system_memory_high_watermark_ratio
int64_t now = time::WallClock::fastNowInMilliSec();
if (FLAGS_memory_tracker_detail_log) {
LOG(INFO) << " sys_used: " << static_cast<int64_t>(total - available) << " sys_total: " << total
<< " sys_ratio:" << (1 - available / total)
<< " usr_used:" << memory::MemoryStats::instance().used()
<< " usr_total:" << memory::MemoryStats::instance().getLimit()
<< " usr_ratio:" << memory::MemoryStats::instance().usedRatio();
if (now - kLastPrintMemoryTrackerStats_ >= FLAGS_memory_tracker_detail_log_interval_ms) {
LOG(INFO) << fmt::format("sys:{}/{} {:.2f}%",
ReadableSize(static_cast<int64_t>(total - available)),
ReadableSize(total),
(1 - available / total) * 100)
<< fmt::format(" usr:{}/{} {:.2f}%",
ReadableSize(MemoryStats::instance().used()),
ReadableSize(MemoryStats::instance().getLimit()),
MemoryStats::instance().usedRatio() * 100);
kLastPrintMemoryTrackerStats_ = now;
}
}

#endif

auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio;
Expand All @@ -168,4 +193,5 @@ StatusOr<uint64_t> MemoryUtils::readSysContents(const std::string& path) {
return value;
}

} // namespace memory
} // namespace nebula
3 changes: 3 additions & 0 deletions src/common/memory/MemoryUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "common/base/StatusOr.h"

namespace nebula {
namespace memory {

/**
* MemoryUtils will compute the memory consumption of containerization and physical machine
Expand All @@ -34,7 +35,9 @@ class MemoryUtils final {

private:
static int64_t kLastPurge_;
static int64_t kLastPrintMemoryTrackerStats_;
};

} // namespace memory
} // namespace nebula
#endif
12 changes: 10 additions & 2 deletions src/common/memory/NewDelete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@
/// 2. address_sanitizer is off
/// sanitizer has already override the new/delete operator,
/// only override new/delete operator only when address_sanitizer is off
#if defined(__clang)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#else // gcc
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#if defined(ENABLE_MEMORY_TRACKER)
/// new
void *operator new(std::size_t size) {
nebula::memory::trackMemory(size);
Expand Down Expand Up @@ -101,5 +111,3 @@ void operator delete[](void *ptr, std::size_t size, std::align_val_t align) noex
}

#endif
#endif
#endif
2 changes: 2 additions & 0 deletions src/common/memory/test/MemoryUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ DECLARE_bool(containerized);
DECLARE_double(system_memory_high_watermark_ratio);

namespace nebula {
namespace memory {

TEST(MemoryHighWatermarkTest, TestHitsHighWatermarkInHost) {
FLAGS_containerized = false;
Expand Down Expand Up @@ -45,4 +46,5 @@ TEST(MemoryHighWatermarkTest, DISABLED_TestNotHitsHighWatermarkInContainer) {
ASSERT_FALSE(std::move(status).value());
}

} // namespace memory
} // namespace nebula
2 changes: 1 addition & 1 deletion src/graph/context/Iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ bool Iterator::hitsSysMemoryHighWatermark() const {
numRowsModN_ -= FLAGS_num_rows_to_check_memory;
}
if (UNLIKELY(numRowsModN_ == 0)) {
if (MemoryUtils::kHitMemoryHighWatermark.load()) {
if (memory::MemoryUtils::kHitMemoryHighWatermark.load()) {
throw std::runtime_error(
folly::sformat("Used memory hits the high watermark({}) of total system memory.",
FLAGS_system_memory_high_watermark_ratio));
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ Status Executor::close() {
}

Status Executor::checkMemoryWatermark() {
if (node_->isQueryNode() && MemoryUtils::kHitMemoryHighWatermark.load()) {
if (node_->isQueryNode() && memory::MemoryUtils::kHitMemoryHighWatermark.load()) {
stats::StatsManager::addValue(kNumQueriesHitMemoryWatermark);
auto &spaceName = qctx()->rctx() ? qctx()->rctx()->session()->spaceName() : "";
if (FLAGS_enable_space_level_metrics && spaceName != "") {
Expand Down
4 changes: 2 additions & 2 deletions src/graph/service/QueryEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ Status QueryEngine::setupMemoryMonitorThread() {
}

auto updateMemoryWatermark = []() -> Status {
auto status = MemoryUtils::hitsHighWatermark();
auto status = memory::MemoryUtils::hitsHighWatermark();
NG_RETURN_IF_ERROR(status);
MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
memory::MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
return Status::OK();
};

Expand Down
4 changes: 2 additions & 2 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ Status StorageServer::setupMemoryMonitorThread() {
}

auto updateMemoryWatermark = []() -> Status {
auto status = MemoryUtils::hitsHighWatermark();
auto status = memory::MemoryUtils::hitsHighWatermark();
NG_RETURN_IF_ERROR(status);
MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
memory::MemoryUtils::kHitMemoryHighWatermark.store(std::move(status).value());
return Status::OK();
};

Expand Down

0 comments on commit e287363

Please sign in to comment.