diff --git a/CMakeLists.txt b/CMakeLists.txt index 85747f6262..68cc847048 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -615,6 +615,8 @@ if(MSVC) zmq_check_cxx_flag_prepend("/analyze") + zmq_check_cxx_flag_prepend("/Zc:__cplusplus") # enables right reporting of __cplusplus, ref. https://devblogs.microsoft.com/cppblog/msvc-now-correctly-reports-__cplusplus/ + # C++11/14/17-specific, but maybe possible via conditional defines zmq_check_cxx_flag_prepend("/wd26440") # Function '...' can be declared 'noexcept' zmq_check_cxx_flag_prepend("/wd26432") # If you define or delete any default operation in the type '...', define or @@ -893,6 +895,7 @@ endif() set(cxx-sources precompiled.cpp address.cpp + allocator_default.cpp channel.cpp client.cpp clock.cpp @@ -993,6 +996,7 @@ set(cxx-sources zmtp_engine.cpp # at least for VS, the header files must also be listed address.hpp + allocator_default.hpp array.hpp atomic_counter.hpp atomic_ptr.hpp diff --git a/Makefile.am b/Makefile.am index 3b1ae7a152..4684f0eebe 100755 --- a/Makefile.am +++ b/Makefile.am @@ -22,6 +22,8 @@ include_HEADERS = \ src_libzmq_la_SOURCES = \ src/address.cpp \ src/address.hpp \ + src/allocator_default.cpp \ + src/allocator_default.hpp \ src/array.hpp \ src/atomic_counter.hpp \ src/atomic_ptr.hpp \ diff --git a/include/zmq.h b/include/zmq.h index bf0f2dfd56..33e84b8815 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -643,6 +643,22 @@ ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn *func_, void *arg_); ZMQ_EXPORT void zmq_threadclose (void *thread_); +struct zmq_allocator_t +{ + // Allocate a chunk of memory of size len and return the pointer + void *(*allocate_fn) (void *allocator, size_t len); + + // Deallocate the memory chunk pointed to by data_ + void (*deallocate_fn) (void *allocator, void *data_); + + // Return true if this is an allocator and alive, otherwise false + bool (*check_tag_fn) (void *allocator); + + void (*destroy_fn) (void *allocator); + + void *allocator; +}; + /******************************************************************************/ /* These functions are DRAFT and disabled in stable releases, and subject to */ /* change at ANY time until declared stable. */ @@ -699,6 +715,15 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_, void *optval_, size_t *optvallen_); +// ZMQ-provided message-pool implementations. */ +// default allocator using malloc/free +#define ZMQ_MSG_ALLOCATOR_DEFAULT 0 + +ZMQ_EXPORT void *zmq_msg_allocator_new (int type_); +ZMQ_EXPORT int zmq_msg_allocator_destroy (void **allocator_); +ZMQ_EXPORT int +zmq_msg_init_allocator (zmq_msg_t *msg_, size_t size_, void *allocator_); + /* DRAFT Socket methods. */ ZMQ_EXPORT int zmq_join (void *s, const char *group); ZMQ_EXPORT int zmq_leave (void *s, const char *group); diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 8378950cd9..efd1cfa4bb 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -27,6 +27,7 @@ along with this program. If not, see . */ +#include "platform.hpp" #include "../include/zmq.h" #include #include @@ -67,6 +68,11 @@ int main (int argc, char *argv[]) return -1; } +#if (defined ZMQ_BUILD_DRAFT_API) + // EXPERIMENTAL ALLOCATOR FOR MSG_T + void *allocator = zmq_msg_allocator_new (ZMQ_MSG_ALLOCATOR_DEFAULT); +#endif + s = zmq_socket (ctx, ZMQ_PUSH); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); @@ -105,7 +111,11 @@ int main (int argc, char *argv[]) } for (i = 0; i != message_count; i++) { +#if (defined ZMQ_BUILD_DRAFT_API) + rc = zmq_msg_init_allocator (&msg, message_size, allocator); +#else rc = zmq_msg_init_size (&msg, message_size); +#endif if (rc != 0) { printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno)); return -1; @@ -134,5 +144,11 @@ int main (int argc, char *argv[]) return -1; } +#if (defined ZMQ_BUILD_DRAFT_API) + // IMPORTANT: destroy the allocator only after zmq_ctx_term() since otherwise + // some zmq_msg_t may still be "in fly" + zmq_msg_allocator_destroy (&allocator); +#endif + return 0; } diff --git a/src/allocator_default.cpp b/src/allocator_default.cpp new file mode 100644 index 0000000000..75fa8bf275 --- /dev/null +++ b/src/allocator_default.cpp @@ -0,0 +1,60 @@ +/* + Copyright (c) 2019-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "allocator_default.hpp" + +#include + +zmq::allocator_default_t::allocator_default_t () +{ + _tag = 0xCAFEEBEB; +} + + +zmq::allocator_default_t::~allocator_default_t () +{ + // Mark this instance as dead + _tag = 0xdeadbeef; +} + +void *zmq::allocator_default_t::allocate (size_t len_) +{ + return operator new (len_, std::nothrow); +} + +void zmq::allocator_default_t::deallocate (void *data_) +{ + operator delete (data_); +} + +bool zmq::allocator_default_t::check_tag () const +{ + return _tag == 0xCAFEEBEB; +} diff --git a/src/allocator_default.hpp b/src/allocator_default.hpp new file mode 100644 index 0000000000..514fa2d40f --- /dev/null +++ b/src/allocator_default.hpp @@ -0,0 +1,76 @@ +/* + Copyright (c) 2019-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_ALLOCATOR_HPP_INCLUDED__ +#define __ZMQ_I_ALLOCATOR_HPP_INCLUDED__ + +namespace zmq +{ +class allocator_default_t +{ + public: + allocator_default_t (); + + ~allocator_default_t (); + + static void *allocate_fn (void *allocator_, size_t len_) + { + return static_cast (allocator_)->allocate (len_); + } + + static void deallocate_fn (void *allocator_, void *data_) + { + return static_cast (allocator_) + ->deallocate (data_); + } + + static bool check_tag_fn (void *allocator_) + { + return static_cast (allocator_)->check_tag (); + } + + static void destroy_fn (void *allocator_) + { + operator delete (static_cast (allocator_)); + } + + // allocate() typically gets called by the consumer thread: the user app thread(s) + void *allocate (size_t len_); + + void deallocate (void *data_); + + bool check_tag () const; + + private: + // Used to check whether the object is a socket. + uint32_t _tag; +}; +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 2b64c5104e..67e0b8f7a5 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -47,6 +47,7 @@ #include "err.hpp" #include "msg.hpp" #include "random.hpp" +#include "allocator_default.hpp" #ifdef ZMQ_HAVE_VMCI #include @@ -311,6 +312,18 @@ int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_) } break; + /*case ZMQ_MSG_ALLOCATOR: { + if (optvallen_ == sizeof (zmq::allocator_t)) { + const zmq::allocator_t *all = + static_cast (optval_); + if (all->check_tag ()) { + _allocator = const_cast (all); + return 0; + } + } + break; + }*/ + default: { return thread_ctx_t::set (option_, optval_, optvallen_); } @@ -381,6 +394,9 @@ int zmq::ctx_t::get (int option_, void *optval_, const size_t *optvallen_) return 0; } break; + /* + case ZMQ_MSG_ALLOCATOR: { + } break;*/ default: { return thread_ctx_t::get (option_, optval_, optvallen_); diff --git a/src/ctx.hpp b/src/ctx.hpp index 0ccd68fa5b..fb4abeb5a9 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -35,6 +35,7 @@ #include #include +//#include "allocator_default.hpp" #include "mailbox.hpp" #include "array.hpp" #include "config.hpp" @@ -221,6 +222,9 @@ class ctx_t ZMQ_FINAL : public thread_ctx_t // Synchronisation of access to the list of inproc endpoints. mutex_t _endpoints_sync; + // Allocator for messages + //allocator_t *_allocator; + // Maximum socket ID. static atomic_counter_t max_socket_id; diff --git a/src/msg.cpp b/src/msg.cpp index 2116d1e40c..bc21340036 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -40,6 +40,7 @@ #include "likely.hpp" #include "metadata.hpp" #include "err.hpp" +#include "allocator_default.hpp" // Check whether the sizes of public representation of the message (zmq_msg_t) // and private representation of the message (zmq::msg_t) match. @@ -48,6 +49,7 @@ typedef char zmq_msg_size_check[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; + bool zmq::msg_t::check () const { return _u.base.type >= type_min && _u.base.type <= type_max; @@ -100,6 +102,7 @@ int zmq::msg_t::init_size (size_t size_) _u.lmsg.metadata = NULL; _u.lmsg.type = type_lmsg; _u.lmsg.flags = 0; + _u.lmsg.allocator_was_used = 0; _u.lmsg.group.sgroup.group[0] = '\0'; _u.lmsg.group.type = group_type_short; _u.lmsg.routing_id = 0; @@ -184,6 +187,7 @@ int zmq::msg_t::init_data (void *data_, _u.lmsg.metadata = NULL; _u.lmsg.type = type_lmsg; _u.lmsg.flags = 0; + _u.lmsg.allocator_was_used = 0; _u.lmsg.group.sgroup.group[0] = '\0'; _u.lmsg.group.type = group_type_short; _u.lmsg.routing_id = 0; @@ -203,6 +207,53 @@ int zmq::msg_t::init_data (void *data_, return 0; } +void allocator_free (void *data_, void *hint_) +{ + zmq_allocator_t *allocator = reinterpret_cast (hint_); + allocator->deallocate_fn (allocator->allocator, data_); +} + +int zmq::msg_t::init_from_allocator (size_t size_, zmq_allocator_t *alloc_) +{ + zmq_assert (alloc_ != NULL); + + if (size_ <= max_vsm_size) { + // in case we can fit the message data inside the msg_t itself, this option will always + // be fastest rather than using the allocator! + _u.vsm.metadata = NULL; + _u.vsm.type = type_vsm; + _u.vsm.flags = 0; + _u.vsm.size = static_cast (size_); + _u.vsm.group.sgroup.group[0] = '\0'; + _u.lmsg.group.type = group_type_short; + _u.vsm.routing_id = 0; + } else { + _u.lmsg.metadata = NULL; + _u.lmsg.type = type_lmsg; + _u.lmsg.flags = 0; + _u.lmsg.allocator_was_used = 1; + _u.lmsg.group.sgroup.group[0] = '\0'; + _u.lmsg.group.type = group_type_short; + _u.lmsg.routing_id = 0; + _u.lmsg.content = reinterpret_cast ( + alloc_->allocate_fn (alloc_->allocator, size_ + sizeof (content_t))); + + if (!_u.lmsg.content) { + errno = ENOMEM; + return -1; + } + + _u.lmsg.content->data = _u.lmsg.content + 1; + _u.lmsg.content->size = size_; + _u.lmsg.content->ffn = + reinterpret_cast (&allocator_free); + _u.lmsg.content->hint = alloc_; + new (&_u.lmsg.content->refcnt) zmq::atomic_counter_t (); + } + + return 0; +} + int zmq::msg_t::init_delimiter () { _u.delimiter.metadata = NULL; @@ -283,10 +334,24 @@ int zmq::msg_t::close () // counter so we call the destructor explicitly now. _u.lmsg.content->refcnt.~atomic_counter_t (); - if (_u.lmsg.content->ffn) - _u.lmsg.content->ffn (_u.lmsg.content->data, - _u.lmsg.content->hint); - free (_u.lmsg.content); + if (_u.lmsg.allocator_was_used) { + // take a local copy since we are going to remove (through the user-provided deallocator) + // the whole malloc'ed buffer, including the content_t block itself! + // NOTE: this copy should not be strictly needed but it's here just to help debugging: + content_t content; + content.data = _u.lmsg.content->data; + content.size = _u.lmsg.content->size; + content.ffn = _u.lmsg.content->ffn; + content.hint = _u.lmsg.content->hint; + if (content.ffn) + /* return to the allocator the memory starting from the content_t struct */ + content.ffn (_u.lmsg.content, content.hint); + } else { + if (_u.lmsg.content->ffn) + _u.lmsg.content->ffn (_u.lmsg.content->data, + _u.lmsg.content->hint); + free (_u.lmsg.content); + } } } diff --git a/src/msg.hpp b/src/msg.hpp index d956b2ac81..f9fd3828d7 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -47,6 +47,7 @@ // zmq_free_fn defined in zmq.h. extern "C" { typedef void(msg_free_fn) (void *data_, void *hint_); +struct zmq_allocator_t; } namespace zmq @@ -110,6 +111,7 @@ class msg_t size_t size_, msg_free_fn *ffn_, void *hint_); + int init_from_allocator (size_t size_, zmq_allocator_t *alloc_); int init_delimiter (); int init_join (); int init_leave (); @@ -274,9 +276,10 @@ class msg_t { metadata_t *metadata; content_t *content; + unsigned char allocator_was_used; // boolean flag unsigned char unused[msg_t_size - - (sizeof (metadata_t *) + sizeof (content_t *) + 2 + - (sizeof (metadata_t *) + sizeof (content_t *) + 3 + sizeof (uint32_t) + sizeof (group_t))]; unsigned char type; unsigned char flags; diff --git a/src/zmq.cpp b/src/zmq.cpp index 2c2e3d52f7..0a09642873 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -96,6 +96,7 @@ struct iovec #include "timers.hpp" #include "ip.hpp" #include "address.hpp" +#include "allocator_default.hpp" #if defined ZMQ_HAVE_OPENPGM #define __PGM_WININT_H__ @@ -213,6 +214,48 @@ int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_) } +// New allocator API + +void *zmq_msg_allocator_new (int type_) +{ + zmq_allocator_t *allocator = new (std::nothrow) zmq_allocator_t; + zmq::allocator_default_t *allocator_default = NULL; + switch (type_) { + case ZMQ_MSG_ALLOCATOR_DEFAULT: + allocator_default = new (std::nothrow) zmq::allocator_default_t; + allocator->allocate_fn = &allocator_default->allocate_fn; + allocator->deallocate_fn = &allocator_default->deallocate_fn; + allocator->check_tag_fn = &allocator_default->check_tag_fn; + allocator->allocator = allocator_default; + allocator->destroy_fn = &allocator_default->destroy_fn; + break; + } + + if (!allocator || !allocator->allocator) { + errno = ENOMEM; + return NULL; + } + return allocator; +} + +int zmq_msg_allocator_destroy (void **allocator_) +{ + if (allocator_) { + zmq_allocator_t *const allocator = + static_cast (*allocator_); + if (allocator && allocator->check_tag_fn (allocator->allocator)) { + allocator->destroy_fn (allocator->allocator); + allocator->allocator = NULL; + delete allocator; + *allocator_ = NULL; + return 0; + } + } + errno = EFAULT; + return -1; +} + + // Stable/legacy context API void *zmq_init (int io_threads_) @@ -616,6 +659,13 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) return (reinterpret_cast (msg_))->init_size (size_); } +int zmq_msg_init_allocator (zmq_msg_t *msg_, size_t size_, void *allocator_) +{ + return (reinterpret_cast (msg_)) + ->init_from_allocator (size_, + reinterpret_cast (allocator_)); +} + int zmq_msg_init_buffer (zmq_msg_t *msg_, const void *buf_, size_t size_) { return (reinterpret_cast (msg_))->init_buffer (buf_, size_); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index dc07244780..a89888032a 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -86,6 +86,13 @@ int zmq_ctx_get_ext (void *context_, void *optval_, size_t *optvallen_); +/* ZMQ-provided message-pool implementations. */ +// default allocator using malloc/free +#define ZMQ_MSG_ALLOCATOR_DEFAULT 0 + +void *zmq_msg_allocator_new (int type_); +int zmq_msg_allocator_destroy (void **allocator_); + /* DRAFT Socket methods. */ int zmq_join (void *s_, const char *group_); int zmq_leave (void *s_, const char *group_); diff --git a/tests/test_msg_init.cpp b/tests/test_msg_init.cpp index 75e781027f..241a8b74eb 100644 --- a/tests/test_msg_init.cpp +++ b/tests/test_msg_init.cpp @@ -74,6 +74,39 @@ void test_msg_init_buffer () TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2)); } +void test_msg_init_allocator () +{ +#if defined(ZMQ_BUILD_DRAFT_API) + const char *data = "foobar"; + zmq_msg_t msg; + void *allocator = zmq_msg_allocator_new (ZMQ_MSG_ALLOCATOR_DEFAULT); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_allocator (&msg, 6, allocator)); + TEST_ASSERT_EQUAL_INT (6, zmq_msg_size (&msg)); + memcpy (zmq_msg_data (&msg), data, 6); + TEST_ASSERT_EQUAL_STRING_LEN (data, zmq_msg_data (&msg), 6); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + zmq_msg_t msg2; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_allocator (&msg2, 0, allocator)); + TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&msg2)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg2)); + + void *data3 = malloc (1024); + memset (data3, 1, 1024); + zmq_msg_t msg3; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_allocator (&msg3, 1024, allocator)); + TEST_ASSERT_EQUAL_INT (1024, zmq_msg_size (&msg3)); + memcpy (zmq_msg_data (&msg3), data3, 1024); + TEST_ASSERT_EQUAL_MEMORY (data3, zmq_msg_data (&msg3), 1024); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg3)); + free (data3); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_allocator_destroy (&allocator)); +#else + TEST_IGNORE_MESSAGE ("libzmq without DRAFT support, ignoring test"); +#endif +} + int main (void) { setup_test_environment (); @@ -82,5 +115,6 @@ int main (void) RUN_TEST (test_msg_init); RUN_TEST (test_msg_init_size); RUN_TEST (test_msg_init_buffer); + RUN_TEST (test_msg_init_allocator); return UNITY_END (); }