Skip to content

Commit

Permalink
Merge pull request #11 from axelriet/tbb/scalable_allocator
Browse files Browse the repository at this point in the history
tbb/scalable_allocator
  • Loading branch information
axelriet committed Dec 16, 2023
2 parents 6adab94 + 0a4efa3 commit ea4fc3d
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 14 deletions.
26 changes: 22 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ option(WITH_VMCI "Build with support for VMware VMCI (host<->guests sockets)" OF
option(WITH_HVSOCKET "Build with support for Microsoft Hyper-V (host<->guests sockets)" OFF)
option(WITH_VSOCK "Build with support for VSOCK (hvsocket, virtio, vmci) (host<->guests sockets)" OFF)
option(WITH_CUSTOM_MESSAGE_ALLOCATOR "Build with support for custom message allocators" OFF)
option(WITH_TBB_SCALABLE_ALLOCATOR "Build with support for Intel's TBB scalable memory allocator" OFF)

if(APPLE)
option(ZMQ_BUILD_FRAMEWORK "Build as OS X framework" OFF)
Expand Down Expand Up @@ -1142,10 +1143,23 @@ set(public_headers include/zmq.h include/zmq_utils.h include/zmq_sal.h)
set(readme-docs AUTHORS LICENSE NEWS)

# -----------------------------------------------------------------------------
# Optional modules
# Optional modules and features

if(WITH_TBB_SCALABLE_ALLOCATOR)
message(STATUS "Building with support for Intel's TBB scalable memory allocator.")
set(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR 1)
find_path(TBB_INCLUDE_DIRS ./oneapi/tbb.h PATHS "C:/Program Files (x86)/Intel/oneAPI/tbb/latest/include" NO_DEFAULT_PATH)
include_directories(${TBB_INCLUDE_DIRS})
set(TBB_MALLOC_LIBS ${TBB_INCLUDE_DIRS}/../lib/vc_mt/tbb12.lib ${TBB_INCLUDE_DIRS}/../lib/vc_mt/tbbmalloc.lib)
set(TBB_MALLOC_FOUND 1)
endif()

if(WITH_CUSTOM_MESSAGE_ALLOCATOR)
message(STATUS "Building with with support for custom message allocators.")
if(WITH_TBB_SCALABLE_ALLOCATOR)
message(STATUS "Building with support for custom message allocators. Intel's TBB scalable memory allocator used as default.")
else()
message(STATUS "Building with support for custom message allocators.")
endif()
set(ZMQ_HAVE_CUSTOM_ALLOCATOR 1)
endif()

Expand Down Expand Up @@ -1513,11 +1527,15 @@ if(BUILD_SHARED)
endif()

if(norm_FOUND)
target_link_libraries(libzmq norm::norm)
target_link_libraries(libzmq norm::norm)
endif()

if(OPENPGM_FOUND)
target_link_libraries(libzmq ${OPENPGM_LIBRARIES})
target_link_libraries(libzmq ${OPENPGM_LIBRARIES})
endif()

if(TBB_MALLOC_FOUND)
target_link_libraries(libzmq ${TBB_MALLOC_LIBS})
endif()
endif()

Expand Down
1 change: 1 addition & 0 deletions builds/cmake/platform.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
#cmakedefine ZMQ_HAVE_HVSOCKET

#cmakedefine ZMQ_HAVE_CUSTOM_ALLOCATOR
#cmakedefine ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR

#cmakedefine ZMQ_MAKE_VALGRIND_HAPPY

Expand Down
39 changes: 34 additions & 5 deletions perf/remote_thr_ca.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ const char *HintToString (_In_ ZMQ_MSG_ALLOC_HINT hint)
// the same lookaside list. 3-4 buckets should be enough, the last one is
// "everything above". You can have a different HWM's for each bucket
// depending on what you learned about frequencies, and how much you want
// to spend for the lookasides. You control the tradeoffs. Have fun!
// to spend for the lookasides. You control the tradeoffs. This may be a
// little bit faster provided there is no contention on the allocator.
//
// Yet another approach is to use the Intel TBB scalable allocator which
// is frigteningly quick even in presence of heavy contention.
//
// Then remember the three rules: measure, measure, measure. The
// chances that you significantly and consistently improve on the
Expand All @@ -97,6 +101,12 @@ const char *HintToString (_In_ ZMQ_MSG_ALLOC_HINT hint)
#include <windows.h>
#include <mutex>

// #define ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR

#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
#include <tbb/scalable_allocator.h>
#endif

#define USE_HEAPALLOC
#define KEEP_ASIDE_HWM 1000

Expand All @@ -109,6 +119,10 @@ SLIST_HEADER LookasideList;
_Must_inspect_result_ _Ret_opt_bytecap_ (cb) void *ZMQ_CDECL
msg_alloc (_In_ size_t cb, _In_ ZMQ_MSG_ALLOC_HINT hint)
{
#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
hint; // Unused
return scalable_malloc (cb);
#else
if (hint == ZMQ_MSG_ALLOC_HINT_OUTGOING) {
void *ptr = InterlockedPopEntrySList (&LookasideList);
if (ptr != NULL) {
Expand All @@ -130,11 +144,16 @@ _Must_inspect_result_ _Ret_opt_bytecap_ (cb) void *ZMQ_CDECL
return malloc (cb);
#endif
}
#endif
}

void ZMQ_CDECL msg_free (_Pre_maybenull_ _Post_invalid_ void *ptr_,
_In_ ZMQ_MSG_ALLOC_HINT hint)
{
#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
hint; // Unused
return scalable_free (ptr_);
#else
if (hint == ZMQ_MSG_ALLOC_HINT_OUTGOING) {
// There is a possibility that we sligthly exceed the HWM in case
// of heavy contention. Not really a problem.
Expand All @@ -154,6 +173,7 @@ void ZMQ_CDECL msg_free (_Pre_maybenull_ _Post_invalid_ void *ptr_,
free (ptr_);
#endif
}
#endif
}
#else
//
Expand Down Expand Up @@ -198,11 +218,13 @@ int ZMQ_CDECL main (int argc, char *argv[])
curve = 1;
}

#if !defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
#ifdef USE_HEAPALLOC
hHeap = HeapCreate (HEAP_NO_SERIALIZE, 1024 * 1024, 0);
#endif

InitializeSListHead (&LookasideList);
#endif

(void)zmq_set_custom_msg_allocator (msg_alloc, msg_free);

ctx = zmq_init (1);
Expand Down Expand Up @@ -287,16 +309,23 @@ int ZMQ_CDECL main (int argc, char *argv[])
if (count) {
for (void *ptr = InterlockedPopEntrySList (&LookasideList); ptr != NULL;
ptr = InterlockedPopEntrySList (&LookasideList)) {
#ifdef USE_HEAPALLOC
(void) HeapFree (hHeap, 0, ptr);

#if !defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
#ifdef USE_HEAPALLOC
(void) HeapFree (hHeap, 0, ptr);
#else
free (ptr);
#endif
#else
free (ptr);
scalable_free (ptr);
#endif
}
}

#if !defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
#ifdef USE_HEAPALLOC
HeapDestroy (hHeap);
#endif
#endif

return 0;
Expand Down
12 changes: 12 additions & 0 deletions src/decoder_allocators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ unsigned char *zmq::shared_message_memory_allocator::allocate ()
#ifndef NDEBUG
_messages_allocated = true;
#endif
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_buf = static_cast<unsigned char *> (scalable_malloc (allocationsize));
#else
_buf = static_cast<unsigned char *> (std::malloc (allocationsize));
#endif
#endif
alloc_assert (_buf);

Expand All @@ -85,8 +89,12 @@ void zmq::shared_message_memory_allocator::deallocate ()
c->~atomic_counter_t ();
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
zmq::free (_buf, ZMQ_MSG_ALLOC_HINT_INCOMING);
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_buf);
#else
std::free (_buf);
#endif
#endif
}
clear ();
Expand Down Expand Up @@ -122,8 +130,12 @@ void ZMQ_CDECL zmq::shared_message_memory_allocator::call_dec_ref (void *,
c->~atomic_counter_t ();
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
zmq::free (buf, ZMQ_MSG_ALLOC_HINT_INCOMING);
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (buf);
#else
std::free (buf);
#endif
#endif
buf = NULL;
}
Expand Down
10 changes: 9 additions & 1 deletion src/decoder_allocators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ class c_single_allocator
_buf (static_cast<unsigned char *> (
zmq::malloc (_buf_size, ZMQ_MSG_ALLOC_HINT_INCOMING)))
#else
_buf (static_cast<unsigned char *> (std::malloc (_buf_size)))
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_buf (static_cast<unsigned char *> (scalable_malloc (_buf_size)))
#else
_buf (static_cast<unsigned char *> (std::malloc (_buf_size)))
#endif
#endif
{
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
Expand All @@ -43,8 +47,12 @@ class c_single_allocator
{
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
zmq::free (_buf, ZMQ_MSG_ALLOC_HINT_INCOMING);
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_buf);
#else
std::free (_buf);
#endif
#endif
}

Expand Down
34 changes: 30 additions & 4 deletions src/msg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ _Must_inspect_result_ _Ret_opt_bytecap_ (
if (hint < ZMQ_MSG_ALLOC_HINT_NONE || hint > ZMQ_MSG_ALLOC_HINT_MAX) {
zmq_assert (false);
}
#else
LIBZMQ_UNUSED (hint);
#endif

#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
return scalable_malloc (cb);
#else
return std::malloc (cb);
#endif
}

static void ZMQ_CDECL default_msg_free (
Expand All @@ -49,9 +55,15 @@ static void ZMQ_CDECL default_msg_free (
if (hint < ZMQ_MSG_ALLOC_HINT_NONE || hint > ZMQ_MSG_ALLOC_HINT_MAX) {
zmq_assert (false);
}
#else
LIBZMQ_UNUSED (hint);
#endif

#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
scalable_free (ptr);
#else
std::free (ptr);
#endif
}

#ifndef NDEBUG
Expand Down Expand Up @@ -163,9 +175,14 @@ int zmq::msg_t::init_size (size_t size_)
#ifndef NDEBUG
_messages_allocated = true;
#endif
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_u.lmsg.content =
static_cast<content_t *> (scalable_malloc (sizeof (content_t) + size_));
#else
_u.lmsg.content =
static_cast<content_t *> (std::malloc (sizeof (content_t) + size_));
#endif
#endif
if (unlikely (!_u.lmsg.content)) {
errno = ENOMEM;
Expand All @@ -178,8 +195,6 @@ int zmq::msg_t::init_size (size_t size_)
_u.lmsg.content->hint = NULL;
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
_u.lmsg.content->custom_allocation_hint = ZMQ_MSG_ALLOC_HINT_OUTGOING;
#else
_u.lmsg.content->custom_allocation_hint = ZMQ_MSG_ALLOC_HINT_NONE;
#endif
new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
Expand Down Expand Up @@ -260,9 +275,14 @@ int zmq::msg_t::init_data (_In_opt_ void *data_,
#ifndef NDEBUG
_messages_allocated = true;
#endif
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_u.lmsg.content =
static_cast<content_t *> (scalable_malloc (sizeof (content_t)));
#else
_u.lmsg.content =
static_cast<content_t *> (std::malloc (sizeof (content_t)));
#endif
#endif
if (!_u.lmsg.content) {
errno = ENOMEM;
Expand All @@ -275,8 +295,6 @@ int zmq::msg_t::init_data (_In_opt_ void *data_,
_u.lmsg.content->hint = hint_;
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
_u.lmsg.content->custom_allocation_hint = ZMQ_MSG_ALLOC_HINT_FIXED_SIZE;
#else
_u.lmsg.content->custom_allocation_hint = ZMQ_MSG_ALLOC_HINT_NONE;
#endif
new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
Expand Down Expand Up @@ -374,8 +392,12 @@ int zmq::msg_t::close ()
_u.lmsg.content->hint);
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
zmq::free (_u.lmsg.content, _u.lmsg.content->custom_allocation_hint);
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_u.lmsg.content);
#else
std::free (_u.lmsg.content);
#endif
#endif
}
}
Expand Down Expand Up @@ -634,8 +656,12 @@ bool zmq::msg_t::rm_refs (int refs_)
}
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
zmq::free (_u.lmsg.content, _u.lmsg.content->custom_allocation_hint);
#else
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_u.lmsg.content);
#else
std::free (_u.lmsg.content);
#endif
#endif
return false;
}
Expand Down
4 changes: 4 additions & 0 deletions src/precompiled.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@
#include <gssapi/gssapi_krb5.h>
#endif

#if defined(ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR)
#include <tbb/scalable_allocator.h>
#endif

#include "options.hpp"

#endif // _MSC_VER
Expand Down
7 changes: 7 additions & 0 deletions src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,14 @@ ZMQ_EXPORT_IMPL (bool)
zmq_set_custom_msg_allocator (_In_ zmq_custom_msg_alloc_fn *malloc_,
_In_ zmq_custom_msg_free_fn *free_)
{
#ifdef ZMQ_HAVE_CUSTOM_ALLOCATOR
return zmq::set_custom_msg_allocator (malloc_, free_);
#else
LIBZMQ_UNUSED (malloc_);
LIBZMQ_UNUSED (free_);
errno = ENOTSUP;
return false;
#endif
}

_At_ (msg_, _Pre_invalid_ _Pre_notnull_ _Post_valid_) ZMQ_EXPORT_IMPL (int)
Expand Down

0 comments on commit ea4fc3d

Please sign in to comment.