Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Tracker #5082

Merged
merged 32 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
8734fa0
memory tracker
codesigner Dec 18, 2022
e72ab21
refine code
codesigner Dec 20, 2022
d1d7645
refine code
codesigner Dec 21, 2022
04642d8
add check
codesigner Dec 21, 2022
e179139
fix sanitize
codesigner Dec 21, 2022
8a3556d
add check
codesigner Dec 21, 2022
8999d49
fix build
codesigner Dec 21, 2022
9b5a11d
fix build
codesigner Dec 21, 2022
1e44036
fix sanitize issues
codesigner Dec 21, 2022
e935b10
minor, delete debug log
codesigner Dec 21, 2022
5168db2
storage add memmory tracker
codesigner Dec 22, 2022
b6a743c
add more check & add memory stats log flag
codesigner Dec 23, 2022
e8748a4
fix lint & memory monitor name in storaged
codesigner Dec 23, 2022
cd124bf
refine catch in scheduler
codesigner Dec 23, 2022
7068cac
move try catch on stack up in Storage's processors
codesigner Dec 23, 2022
ae64b0a
remove unused include
codesigner Dec 23, 2022
c07ec37
fix build
codesigner Dec 23, 2022
0bc1366
Merge branch 'master' into mem
codesigner Dec 26, 2022
a15ddc8
Merge branch 'master' into mem
codesigner Dec 26, 2022
797fa05
add jemalloc compile flag & MemTracker is use only when jemalloc is a…
codesigner Dec 26, 2022
3da2b0a
Merge branch 'master' into mem
codesigner Dec 26, 2022
4a1c285
add memory track in LookupProcessor.cpp
codesigner Dec 26, 2022
0a686d8
refine code
codesigner Dec 27, 2022
081f2e1
Merge branch 'master' into mem
codesigner Dec 27, 2022
b9b67d3
fix lint
codesigner Dec 27, 2022
d5559ec
fix build
codesigner Dec 27, 2022
3cf3c84
Merge branch 'master' into mem
codesigner Dec 27, 2022
e99b5be
Merge branch 'master' into mem
codesigner Dec 27, 2022
1e70e59
Merge branch 'master' into mem
Sophie-Xie Dec 27, 2022
2b082d0
Merge branch 'master' into mem
codesigner Dec 27, 2022
7f660c2
Merge branch 'master' into mem
codesigner Dec 27, 2022
19df08a
Merge branch 'master' into mem
Sophie-Xie Dec 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/nebula/ThirdPartyConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ find_package(Glog REQUIRED)
find_package(Googletest REQUIRED)
if(ENABLE_JEMALLOC)
find_package(Jemalloc REQUIRED)
add_definitions(-DENABLE_JEMALLOC)
endif()
find_package(Libevent REQUIRED)
find_package(Proxygen REQUIRED)
Expand Down
7 changes: 4 additions & 3 deletions src/common/memory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
#
# This source code is licensed under Apache 2.0 License.


nebula_add_library(
memory_obj OBJECT
MemoryUtils.cpp
memory_obj OBJECT
MemoryUtils.cpp
MemoryTracker.cpp
NewDelete.cpp
)

nebula_add_subdirectory(test)
172 changes: 172 additions & 0 deletions src/common/memory/Memory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#if ENABLE_JEMALLOC
#include <jemalloc/jemalloc.h>
dutor marked this conversation as resolved.
Show resolved Hide resolved
#endif

#include <new>

#include "common/base/Base.h"
#include "common/memory/MemoryTracker.h"

namespace nebula {
namespace memory {

inline ALWAYS_INLINE size_t alignToSizeT(std::align_val_t align) noexcept {
return static_cast<size_t>(align);
}

inline ALWAYS_INLINE void* newImpl(std::size_t size) {
void* ptr = malloc(size);

if (LIKELY(ptr != nullptr)) return ptr;

throw std::bad_alloc{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use our own exception type to distinguish with the standard library.

Copy link
Contributor Author

@codesigner codesigner Dec 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave this for future improvement, It is a little tricky here, if the exception to throw new something, it will recursive result in throw that exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious

}

inline ALWAYS_INLINE void* newImpl(std::size_t size, std::align_val_t align) {
void* ptr = aligned_alloc(alignToSizeT(align), size);

if (LIKELY(ptr != nullptr)) return ptr;

throw std::bad_alloc{};
}

inline ALWAYS_INLINE void* newNoException(std::size_t size) noexcept {
return malloc(size);
}

inline ALWAYS_INLINE void* newNoException(std::size_t size, std::align_val_t align) noexcept {
return aligned_alloc(static_cast<size_t>(align), size);
}

inline ALWAYS_INLINE void deleteImpl(void* ptr) noexcept {
free(ptr);
}

#if ENABLE_JEMALLOC
inline ALWAYS_INLINE void deleteSized(void* ptr, std::size_t size) noexcept {
if (UNLIKELY(ptr == nullptr)) return;
sdallocx(ptr, size, 0);
}
#else
inline ALWAYS_INLINE void deleteSized(void* ptr, std::size_t size) noexcept {
UNUSED(size);
free(ptr);
}
#endif

#if ENABLE_JEMALLOC
inline ALWAYS_INLINE void deleteSized(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
if (UNLIKELY(ptr == nullptr)) return;
sdallocx(ptr, size, MALLOCX_ALIGN(alignToSizeT(align)));
}
#else
inline ALWAYS_INLINE void deleteSized(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
UNUSED(size);
UNUSED(align);
free(ptr);
}
#endif

inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size) {
size_t actual_size = size;

#if ENABLE_JEMALLOC
// The nallocx() function allocates no memory,
// but it performs the same size computation as the mallocx() function
if (LIKELY(size != 0)) {
actual_size = nallocx(size, 0);
dutor marked this conversation as resolved.
Show resolved Hide resolved
}
#endif
return actual_size;
}
inline ALWAYS_INLINE size_t getActualAllocationSize(size_t size, std::align_val_t align) {
size_t actual_size = size;

#if ENABLE_JEMALLOC
// The nallocx() function allocates no memory,
// but it performs the same size computation as the mallocx() function
if (LIKELY(size != 0)) {
actual_size = nallocx(size, MALLOCX_ALIGN(alignToSizeT(align)));
}
#else
UNUSED(align);
#endif
return actual_size;
}

inline ALWAYS_INLINE void trackMemory(std::size_t size) {
std::size_t actual_size = getActualAllocationSize(size);
MemoryTracker::allocNoThrow(actual_size);
}

inline ALWAYS_INLINE void trackMemory(std::size_t size, std::align_val_t align) {
std::size_t actual_size = getActualAllocationSize(size, align);
MemoryTracker::allocNoThrow(actual_size);
}

inline ALWAYS_INLINE void untrackMemory(void* ptr) noexcept {
try {
#if ENABLE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, 0));
}
#else
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
#endif
} catch (...) {
}
}

inline ALWAYS_INLINE void untrackMemory(void* ptr, std::size_t size) noexcept {
try {
#if ENABLE_JEMALLOC
UNUSED(size);
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, 0));
}
#else
if (size) {
MemoryTracker::free(size);
}
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
#endif
} catch (...) {
}
}

inline ALWAYS_INLINE void untrackMemory(void* ptr,
std::size_t size,
std::align_val_t align) noexcept {
try {
#if ENABLE_JEMALLOC
UNUSED(size);
if (LIKELY(ptr != nullptr)) {
MemoryTracker::free(sallocx(ptr, MALLOCX_ALIGN(alignToSizeT(align))));
}
#else
UNUSED(align);
if (size) {
MemoryTracker::free(size);
} else {
// malloc_usable_size() result may greater or equal to allocated size.
MemoryTracker::free(malloc_usable_size(ptr));
}
#endif
} catch (...) {
}
}

} // namespace memory
} // namespace nebula
46 changes: 46 additions & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "common/memory/MemoryTracker.h"

namespace nebula {
namespace memory {

thread_local ThreadMemoryStats MemoryStats::threadMemoryStats_;

ThreadMemoryStats::ThreadMemoryStats() : reserved(0) {}

ThreadMemoryStats::~ThreadMemoryStats() {
// Return to global any reserved bytes on destruction
if (reserved != 0) {
MemoryStats::instance().freeGlobal(reserved);
DLOG(INFO) << std::this_thread::get_id() << " return reserved " << reserved;
}
}

void MemoryTracker::alloc(int64_t size) {
bool throw_if_memory_exceeded = true;
allocImpl(size, throw_if_memory_exceeded);
}

void MemoryTracker::allocNoThrow(int64_t size) {
bool throw_if_memory_exceeded = false;
allocImpl(size, throw_if_memory_exceeded);
}

void MemoryTracker::realloc(int64_t old_size, int64_t new_size) {
int64_t addition = new_size - old_size;
addition > 0 ? alloc(addition) : free(-addition);
}

void MemoryTracker::free(int64_t size) {
MemoryStats::instance().free(size);
}

void MemoryTracker::allocImpl(int64_t size, bool) {
MemoryStats::instance().alloc(size);
}

} // namespace memory
} // namespace nebula
143 changes: 143 additions & 0 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/* Copyright (c) 2022 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include <atomic>

#include "common/base/Base.h"

namespace nebula {
namespace memory {

// Memory stats for each thread.
struct ThreadMemoryStats {
ThreadMemoryStats();
~ThreadMemoryStats();

// reserved bytes size in current thread
int64_t reserved;
};

/**
* Memory Stats enable record memory usage at thread local scope and global scope.
* Design:
* It is a singleton instance, designed for Thread scope and Global scope
* memory quota counting.
*
* Thread: Each thread has a reserved memory quota got from global. Each time alloc() or free()
* occurs, it first tries request size from local reserved.
* Global: Counting the global used memory, the actually used memory may less than the counted
* usage, since threads have quota reservation.
*/
class MemoryStats {
public:
static MemoryStats& instance() {
static MemoryStats stats;
return stats;
}

/// Inform size of memory allocation
inline ALWAYS_INLINE void alloc(int64_t size) {
int64_t willBe = threadMemoryStats_.reserved - size;

if (UNLIKELY(willBe < 0)) {
// if local reserved is not enough, calculate how many bytes needed to get from global.
int64_t getFromGlobal = kLocalReservedLimit_;
while (willBe + getFromGlobal <= 0) {
getFromGlobal += kLocalReservedLimit_;
}
// allocGlobal() may end with bad_alloc, only invoke allocGlobal() once (ALL_OR_NOTHING
// semantic)
allocGlobal(getFromGlobal);
willBe += getFromGlobal;
}
// Only update after successful allocations, failed allocations should not be taken into
// account.
threadMemoryStats_.reserved = willBe;
}

/// Inform size of memory deallocation
inline ALWAYS_INLINE void free(int64_t size) {
threadMemoryStats_.reserved += size;
// Return if local reserved exceed limit
while (threadMemoryStats_.reserved > kLocalReservedLimit_) {
// freeGlobal() never fail, can be invoked multiple times.
freeGlobal(kLocalReservedLimit_);
threadMemoryStats_.reserved -= kLocalReservedLimit_;
}
}

/// Free global memory, two user case may call this function:
/// 1. free()
/// 2. destruction of ThreadMemoryStats return reserved memory
inline ALWAYS_INLINE void freeGlobal(int64_t bytes) {
used_.fetch_sub(bytes, std::memory_order_relaxed);
}

/// Set limit (maximum usable bytes) of memory
void setLimit(int64_t limit) {
if (this->limit_ != limit) {
this->limit_ = limit;
}
}

/// Get limit (maximum usable bytes) of memory
int64_t getLimit() {
return this->limit_;
}

/// Get current used bytes of memory
int64_t used() {
return used_;
}

/// Calculate used ratio of memory
double usedRatio() {
return used_ / static_cast<double>(limit_);
}

std::string toString() {
return fmt::format("MemoryStats, limit:{}, used:{}", limit_, used_);
}

private:
inline ALWAYS_INLINE void allocGlobal(int64_t size) {
int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed);
if (willBe > limit_) {
// revert
used_.fetch_sub(size, std::memory_order_relaxed);
throw std::bad_alloc();
}
}

private:
// Global
int64_t limit_{std::numeric_limits<int64_t>::max()};
std::atomic<int64_t> used_{0};
// Thread Local
static thread_local ThreadMemoryStats threadMemoryStats_;
dutor marked this conversation as resolved.
Show resolved Hide resolved
// Each thread reserves this amount of memory
static constexpr int64_t kLocalReservedLimit_ = 1 * 1024 * 1024;
};

// A global static memory tracker enable tracking every memory allocation and deallocation.
// This is not the place where real memory allocation or deallocation happens, only do the
// memory tracking.
struct MemoryTracker {
/// Call the following functions before calling of corresponding operations with memory
/// allocators.
static void alloc(int64_t size);
static void allocNoThrow(int64_t size);
static void realloc(int64_t old_size, int64_t new_size);

/// This function should be called after memory deallocation.
static void free(int64_t size);

private:
static void allocImpl(int64_t size, bool throw_if_memory_exceeded);
};

} // namespace memory
} // namespace nebula
Loading