Skip to content

Commit

Permalink
Memory Tracker (#5082)
Browse files Browse the repository at this point in the history
* memory tracker

* refine code

* refine code

* add check

* fix sanitize

* add check

* fix build

* fix build

* fix sanitize issues

* minor, delete debug log

* storage add memmory tracker

* add more check & add memory stats log flag

* fix lint & memory monitor name in storaged

* refine catch in scheduler

* move try catch on stack up in Storage's processors

* remove unused include

* fix build

* add jemalloc compile flag & MemTracker is use only when jemalloc is avalable, depends on jemalloc to get accurate free size

* add memory track in LookupProcessor.cpp

* refine code

* fix lint

* fix build

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
codesigner and Sophie-Xie committed Dec 28, 2022
1 parent 682be67 commit d9787bd
Show file tree
Hide file tree
Showing 47 changed files with 1,547 additions and 308 deletions.
1 change: 1 addition & 0 deletions cmake/nebula/ThirdPartyConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ find_package(Glog REQUIRED)
find_package(Googletest REQUIRED)
if(ENABLE_JEMALLOC)
find_package(Jemalloc REQUIRED)
add_definitions(-DENABLE_JEMALLOC)
endif()
find_package(Libevent REQUIRED)
find_package(Proxygen REQUIRED)
Expand Down
7 changes: 4 additions & 3 deletions src/common/memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
#
# This source code is licensed under Apache 2.0 License.


nebula_add_library(
memory_obj OBJECT
MemoryUtils.cpp
memory_obj OBJECT
MemoryUtils.cpp
MemoryTracker.cpp
NewDelete.cpp
)

nebula_add_subdirectory(test)
172 changes: 172 additions & 0 deletions src/common/memory/Memory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#if ENABLE_JEMALLOC
#include <jemalloc/jemalloc.h>
#endif

#include <new>

#include "common/base/Base.h"
#include "common/memory/MemoryTracker.h"

namespace nebula {
namespace memory {

inline ALWAYS_INLINE size_t alignToSizeT(std::align_val_t align) noexcept {
return static_cast<size_t>(align);
}

inline ALWAYS_INLINE void* newImpl(std::size_t size) {
void* ptr = malloc(size);

if (LIKELY(ptr != nullptr)) return ptr;

throw std::bad_alloc{};
}

inline ALWAYS_INLINE void* newImpl(std::size_t size, std::align_val_t align) {
void* ptr = aligned_alloc(alignToSizeT(align), size);

if (LIKELY(ptr != nullptr)) return ptr;

throw std::bad_alloc{};
}

inline ALWAYS_INLINE void* newNoException(std::size_t size) noexcept {
return malloc(size);
}

inline ALWAYS_INLINE void* newNoException(std::size_t size, std::align_val_t align) noexcept {
return aligned_alloc(static_cast<size_t>(align), size);
}

inline ALWAYS_INLINE void deleteImpl(void* ptr) noexcept {
free(ptr);
}

#if ENABLE_JEMALLOC
inline ALWAYS_INLINE void deleteSized(void* ptr, std::size_t size) noexcept {
if (UNLIKELY(ptr == nullptr)) return;
sdallocx(ptr, size, 0);
}
#else
inline ALWAYS_INLINE void deleteSized(void* ptr, std::size_t size) noexcept {
UNUSED(size);
free(ptr);
}
#endif

#if ENABLE_JEMALLOC
inline ALWAYS_INLINE void deleteSized(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
if (UNLIKELY(ptr == nullptr)) return;
sdallocx(ptr, size, MALLOCX_ALIGN(alignToSizeT(align)));
}
#else
inline ALWAYS_INLINE void deleteSized(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
UNUSED(size);
UNUSED(align);
free(ptr);
}
#endif

inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size) {
size_t actual_size = size;

#if ENABLE_JEMALLOC
// The nallocx() function allocates no memory,
// but it performs the same size computation as the mallocx() function
if (LIKELY(size != 0)) {
actual_size = nallocx(size, 0);
}
#endif
return actual_size;
}
inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size, std::align_val_t align) {
size_t actual_size = size;

#if ENABLE_JEMALLOC
// The nallocx() function allocates no memory,
// but it performs the same size computation as the mallocx() function
if (LIKELY(size != 0)) {
actual_size = nallocx(size, MALLOCX_ALIGN(alignToSizeT(align)));
}
#else
UNUSED(align);
#endif
return actual_size;
}

inline ALWAYS_INLINE void trackMemory(std::size_t size) {
std::size_t actual_size = getActualAllocationSize(size);
MemoryTracker::allocNoThrow(actual_size);
}

inline ALWAYS_INLINE void trackMemory(std::size_t size, std::align_val_t align) {
std::size_t actual_size = getActualAllocationSize(size, align);
MemoryTracker::allocNoThrow(actual_size);
}

inline ALWAYS_INLINE void untrackMemory(void* ptr) noexcept {
try {
#if ENABLE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, 0));
}
#else
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
#endif
} catch (...) {
}
}

inline ALWAYS_INLINE void untrackMemory(void* ptr, std::size_t size) noexcept {
try {
#if ENABLE_JEMALLOC
UNUSED(size);
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, 0));
}
#else
if (size) {
MemoryTracker::free(size);
}
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
#endif
} catch (...) {
}
}

inline ALWAYS_INLINE void untrackMemory(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
try {
#if ENABLE_JEMALLOC
UNUSED(size);
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align))));
}
#else
UNUSED(align);
if (size) {
MemoryTracker::free(size);
} else {
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
}
#endif
} catch (...) {
}
}

} // namespace memory
} // namespace nebula
46 changes: 46 additions & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "common/memory/MemoryTracker.h"

namespace nebula {
namespace memory {

thread_local ThreadMemoryStats MemoryStats::threadMemoryStats_;

ThreadMemoryStats::ThreadMemoryStats() : reserved(0) {}

ThreadMemoryStats::~ThreadMemoryStats() {
// Return to global any reserved bytes on destruction
if (reserved != 0) {
MemoryStats::instance().freeGlobal(reserved);
DLOG(INFO) << std::this_thread::get_id() << " return reserved " << reserved;
}
}

void MemoryTracker::alloc(int64_t size) {
bool throw_if_memory_exceeded = true;
allocImpl(size, throw_if_memory_exceeded);
}

void MemoryTracker::allocNoThrow(int64_t size) {
bool throw_if_memory_exceeded = false;
allocImpl(size, throw_if_memory_exceeded);
}

void MemoryTracker::realloc(int64_t old_size, int64_t new_size) {
int64_t addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}

void MemoryTracker::free(int64_t size) {
MemoryStats::instance().free(size);
}

void MemoryTracker::allocImpl(int64_t size, bool) {
MemoryStats::instance().alloc(size);
}

} // namespace memory
} // namespace nebula
143 changes: 143 additions & 0 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include <atomic>

#include "common/base/Base.h"

namespace nebula {
namespace memory {

// Memory stats for each thread.
struct ThreadMemoryStats {
ThreadMemoryStats();
~ThreadMemoryStats();

// reserved bytes size in current thread
int64_t reserved;
};

/**
* Memory Stats enable record memory usage at thread local scope and global scope.
* Design:
* It is a singleton instance, designed for Thread scope and Global scope
* memory quota counting.
*
* Thread: Each thread has a reserved memory quota got from global. Each time alloc() or free()
* occurs, it first tries request size from local reserved.
* Global: Counting the global used memory, the actually used memory may less than the counted
* usage, since threads have quota reservation.
*/
class MemoryStats {
public:
static MemoryStats& instance() {
static MemoryStats stats;
return stats;
}

/// Inform size of memory allocation
inline ALWAYS_INLINE void alloc(int64_t size) {
int64_t willBe = threadMemoryStats_.reserved - size;

if (UNLIKELY(willBe < 0)) {
// if local reserved is not enough, calculate how many bytes needed to get from global.
int64_t getFromGlobal = kLocalReservedLimit_;
while (willBe + getFromGlobal <= 0) {
getFromGlobal += kLocalReservedLimit_;
}
// allocGlobal() may end with bad_alloc, only invoke allocGlobal() once (ALL_OR_NOTHING
// semantic)
allocGlobal(getFromGlobal);
willBe += getFromGlobal;
}
// Only update after successful allocations, failed allocations should not be taken into
// account.
threadMemoryStats_.reserved = willBe;
}

/// Inform size of memory deallocation
inline ALWAYS_INLINE void free(int64_t size) {
threadMemoryStats_.reserved += size;
// Return if local reserved exceed limit
while (threadMemoryStats_.reserved > kLocalReservedLimit_) {
// freeGlobal() never fail, can be invoked multiple times.
freeGlobal(kLocalReservedLimit_);
threadMemoryStats_.reserved -= kLocalReservedLimit_;
}
}

/// Free global memory, two user case may call this function:
/// 1. free()
/// 2. destruction of ThreadMemoryStats return reserved memory
inline ALWAYS_INLINE void freeGlobal(int64_t bytes) {
used_.fetch_sub(bytes, std::memory_order_relaxed);
}

/// Set limit (maximum usable bytes) of memory
void setLimit(int64_t limit) {
if (this->limit_ != limit) {
this->limit_ = limit;
}
}

/// Get limit (maximum usable bytes) of memory
int64_t getLimit() {
return this->limit_;
}

/// Get current used bytes of memory
int64_t used() {
return used_;
}

/// Calculate used ratio of memory
double usedRatio() {
return used_ / static_cast<double>(limit_);
}

std::string toString() {
return fmt::format("MemoryStats, limit:{}, used:{}", limit_, used_);
}

private:
inline ALWAYS_INLINE void allocGlobal(int64_t size) {
int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed);
if (willBe > limit_) {
// revert
used_.fetch_sub(size, std::memory_order_relaxed);
throw std::bad_alloc();
}
}

private:
// Global
int64_t limit_{std::numeric_limits<int64_t>::max()};
std::atomic<int64_t> used_{0};
// Thread Local
static thread_local ThreadMemoryStats threadMemoryStats_;
// Each thread reserves this amount of memory
static constexpr int64_t kLocalReservedLimit_ = 1 * 1024 * 1024;
};

// A global static memory tracker enable tracking every memory allocation and deallocation.
// This is not the place where real memory allocation or deallocation happens, only do the
// memory tracking.
struct MemoryTracker {
/// Call the following functions before calling of corresponding operations with memory
/// allocators.
static void alloc(int64_t size);
static void allocNoThrow(int64_t size);
static void realloc(int64_t old_size, int64_t new_size);

/// This function should be called after memory deallocation.
static void free(int64_t size);

private:
static void allocImpl(int64_t size, bool throw_if_memory_exceeded);
};

} // namespace memory
} // namespace nebula
Loading

0 comments on commit d9787bd

Please sign in to comment.