Skip to content

Commit

Permalink
More TBB-ification of low-level constructs
Browse files Browse the repository at this point in the history
  • Loading branch information
axelriet committed Dec 23, 2023
1 parent 3cbbf1e commit a418414
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 13 deletions.
32 changes: 30 additions & 2 deletions src/blob.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <algorithm>
#include <ios>

#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
#include <tbb/scalable_allocator.h>
#endif

#if __cplusplus >= 201103L || defined(_MSC_VER) && _MSC_VER > 1700
#define ZMQ_HAS_MOVE_SEMANTICS
#define ZMQ_MAP_INSERT_OR_EMPLACE(k, v) emplace (k, v)
Expand Down Expand Up @@ -50,7 +54,11 @@ struct blob_t

// Creates a blob_t of a given size, with uninitialized content.
explicit blob_t (const size_t size_) :
_data (static_cast<unsigned char *> (std::malloc (size_))),
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_data (static_cast<unsigned char *> (scalable_malloc (size_))),
#else
_data (static_cast<unsigned char *> (std::malloc (size_))),
#endif
_size (size_),
_owned (true)
{
Expand All @@ -60,7 +68,11 @@ struct blob_t
// Creates a blob_t of a given size, an initializes content by copying
// from another buffer.
blob_t (const unsigned char *const data_, const size_t size_) :
_data (static_cast<unsigned char *> (std::malloc (size_))),
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_data (static_cast<unsigned char *> (scalable_malloc (size_))),
#else
_data (static_cast<unsigned char *> (std::malloc (size_))),
#endif
_size (size_),
_owned (true)
{
Expand Down Expand Up @@ -100,7 +112,11 @@ struct blob_t
void set_deep_copy (blob_t const &other_)
{
clear ();
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_data = static_cast<unsigned char *> (scalable_malloc (other_._size));
#else
_data = static_cast<unsigned char *> (std::malloc (other_._size));
#endif
alloc_assert (!other_._size || _data);
_size = other_._size;
_owned = true;
Expand All @@ -113,7 +129,11 @@ struct blob_t
void set (const unsigned char *const data_, const size_t size_)
{
clear ();
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
_data = static_cast<unsigned char *> (scalable_malloc (size_));
#else
_data = static_cast<unsigned char *> (std::malloc (size_));
#endif
alloc_assert (!size_ || _data);
_size = size_;
_owned = true;
Expand All @@ -126,7 +146,11 @@ struct blob_t
void clear ()
{
if (_owned) {
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_data);
#else
std::free (_data);
#endif
}
_data = 0;
_size = 0;
Expand All @@ -135,7 +159,11 @@ struct blob_t
~blob_t ()
{
if (_owned) {
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_data);
#else
std::free (_data);
#endif
}
}

Expand Down
85 changes: 79 additions & 6 deletions src/generic_mtrie_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <algorithm>
#include <list>

#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
#include <tbb/scalable_allocator.h>
#endif

#include "err.hpp"
#include "macros.hpp"
#include "generic_mtrie.hpp"
Expand All @@ -33,7 +37,11 @@ template <typename T> generic_mtrie_t<T>::~generic_mtrie_t ()
for (unsigned short i = 0; i != _count; ++i) {
LIBZMQ_DELETE (_next.table[i]);
}
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (_next.table);
#else
std::free (_next.table);
#endif
}
}

Expand All @@ -57,7 +65,11 @@ bool generic_mtrie_t<T>::add (prefix_t prefix_, size_t size_, value_t *pipe_)
generic_mtrie_t *oldp = it->_next.node;
it->_count = (it->_min < c ? c - it->_min : it->_min - c) + 1;
it->_next.table = static_cast<generic_mtrie_t **> (
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_malloc (sizeof (generic_mtrie_t *) * it->_count));
#else
std::malloc (sizeof (generic_mtrie_t *) * it->_count));
#endif
alloc_assert (it->_next.table);
for (unsigned short i = 0; i != it->_count; ++i)
it->_next.table[i] = 0;
Expand All @@ -67,17 +79,31 @@ bool generic_mtrie_t<T>::add (prefix_t prefix_, size_t size_, value_t *pipe_)
// The new character is above the current character range.
const unsigned short old_count = it->_count;
it->_count = c - it->_min + 1;
it->_next.table = static_cast<generic_mtrie_t **> (realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
it->_next.table =
static_cast<generic_mtrie_t **> (scalable_realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#else
it->_next.table =
static_cast<generic_mtrie_t **> (std::realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#endif
alloc_assert (it->_next.table);
for (unsigned short i = old_count; i != it->_count; i++)
it->_next.table[i] = NULL;
} else {
// The new character is below the current character range.
const unsigned short old_count = it->_count;
it->_count = (it->_min + old_count) - c;
it->_next.table = static_cast<generic_mtrie_t **> (realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
it->_next.table =
static_cast<generic_mtrie_t **> (scalable_realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#else
it->_next.table =
static_cast<generic_mtrie_t **> (std::realloc (
it->_next.table, sizeof (generic_mtrie_t *) * it->_count));
#endif
alloc_assert (it->_next.table);
memmove (it->_next.table + it->_min - c, it->_next.table,
old_count * sizeof (generic_mtrie_t *));
Expand Down Expand Up @@ -170,7 +196,11 @@ void generic_mtrie_t<T>::rm (value_t *pipe_,
if (it.size >= maxbuffsize) {
maxbuffsize = it.size + 256;
buff =
static_cast<unsigned char *> (realloc (buff, maxbuffsize));
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
static_cast<unsigned char *> (scalable_realloc (buff, maxbuffsize));
#else
static_cast<unsigned char *> (std::realloc (buff, maxbuffsize));
#endif
alloc_assert (buff);
}

Expand Down Expand Up @@ -305,7 +335,11 @@ void generic_mtrie_t<T>::rm (value_t *pipe_,
// Free the node table if it's no longer used.
switch (it.node->_live_nodes) {
case 0:
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (it.node->_next.table);
#else
std::free (it.node->_next.table);
#endif
it.node->_next.table = NULL;
it.node->_count = 0;
break;
Expand All @@ -324,7 +358,11 @@ void generic_mtrie_t<T>::rm (value_t *pipe_,
it.node->_next
.table[it.new_min - it.node->_min];
zmq_assert (node);
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (it.node->_next.table);
#else
std::free (it.node->_next.table);
#endif
it.node->_next.node = node;
}
it.node->_count = 1;
Expand Down Expand Up @@ -354,17 +392,26 @@ void generic_mtrie_t<T>::rm (value_t *pipe_,
it.new_max - it.new_min + 1;
it.node->_next.table =
static_cast<generic_mtrie_t **> (
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_malloc (
sizeof (generic_mtrie_t *)
* it.node->_count));
#else
std::malloc (sizeof (generic_mtrie_t *)
* it.node->_count));
#endif
alloc_assert (it.node->_next.table);

memmove (it.node->_next.table,
old_table
+ (it.new_min - it.node->_min),
sizeof (generic_mtrie_t *)
* it.node->_count);
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (old_table);
#else
std::free (old_table);

#endif
it.node->_min = it.new_min;
}
}
Expand All @@ -373,7 +420,11 @@ void generic_mtrie_t<T>::rm (value_t *pipe_,
}
}

#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (buff);
#else
std::free (buff);
#endif
}

template <typename T>
Expand Down Expand Up @@ -469,7 +520,11 @@ generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
it.node->_min += (unsigned char) i;
it.node->_count = 1;
generic_mtrie_t *oldp = it.node->_next.table[i];
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (it.node->_next.table);
#else
std::free (it.node->_next.table);
#endif
it.node->_next.table = NULL;
it.node->_next.node = oldp;
} else if (it.current_child == it.node->_min) {
Expand All @@ -484,12 +539,21 @@ generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
it.node->_count -= i;
generic_mtrie_t **old_table = it.node->_next.table;
it.node->_next.table =
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
static_cast<generic_mtrie_t **> (scalable_malloc (
sizeof (generic_mtrie_t *) * it.node->_count));
#else
static_cast<generic_mtrie_t **> (std::malloc (
sizeof (generic_mtrie_t *) * it.node->_count));
#endif
alloc_assert (it.node->_next.table);
memmove (it.node->_next.table, old_table + i,
sizeof (generic_mtrie_t *) * it.node->_count);
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (old_table);
#else
std::free (old_table);
#endif
} else if (it.current_child
== it.node->_min + it.node->_count - 1) {
// We can compact the table "from the right"
Expand All @@ -502,12 +566,21 @@ generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
it.node->_count -= i;
generic_mtrie_t **old_table = it.node->_next.table;
it.node->_next.table =
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
static_cast<generic_mtrie_t **> (scalable_malloc (
sizeof (generic_mtrie_t *) * it.node->_count));
#else
static_cast<generic_mtrie_t **> (std::malloc (
sizeof (generic_mtrie_t *) * it.node->_count));
#endif
alloc_assert (it.node->_next.table);
memmove (it.node->_next.table, old_table,
sizeof (generic_mtrie_t *) * it.node->_count);
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (old_table);
#else
std::free (old_table);
#endif
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/ip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include "config.hpp"
#include "address.hpp"

#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
#include <tbb/scalable_allocator.h>
#endif

#if !defined ZMQ_HAVE_WINDOWS
#include <fcntl.h>
#include <sys/types.h>
Expand Down Expand Up @@ -467,7 +471,11 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
const size_t dummy_size =
1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy =
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
static_cast<unsigned char *> (scalable_malloc (dummy_size));
#else
static_cast<unsigned char *> (std::malloc (dummy_size));
#endif
wsa_assert (dummy);

int still_to_send = static_cast<int> (dummy_size);
Expand All @@ -489,7 +497,11 @@ static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
wsa_assert (nbytes != SOCKET_ERROR);
still_to_recv -= nbytes;
}
#ifdef ZMQ_HAVE_TBB_SCALABLE_ALLOCATOR
scalable_free (dummy);
#else
std::free (dummy);
#endif
}

// Save errno if error occurred in bind/listen/connect/accept.
Expand Down

0 comments on commit a418414

Please sign in to comment.