Skip to content

Commit

Permalink
problem: no thread-safe alternative for ZMQ_PAIR
Browse files Browse the repository at this point in the history
Solution: create ZMQ_CHANNEL, the thread safe alternative
  • Loading branch information
somdoron committed May 9, 2020
1 parent 28cb820 commit 3da84c6
Show file tree
Hide file tree
Showing 14 changed files with 392 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -844,6 +844,7 @@ endif()
set(cxx-sources
precompiled.cpp
address.cpp
channel.cpp
client.cpp
clock.cpp
ctx.cpp
Expand Down Expand Up @@ -947,6 +948,7 @@ set(cxx-sources
atomic_counter.hpp
atomic_ptr.hpp
blob.hpp
channel.hpp
client.hpp
clock.hpp
command.hpp
Expand Down
9 changes: 8 additions & 1 deletion Makefile.am
Expand Up @@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \
src/atomic_counter.hpp \
src/atomic_ptr.hpp \
src/blob.hpp \
src/channel.cpp \
src/channel.hpp \
src/client.cpp \
src/client.hpp \
src/clock.cpp \
Expand Down Expand Up @@ -1047,7 +1049,8 @@ test_apps += tests/test_poller \
tests/test_reconnect_options \
tests/test_msg_init \
tests/test_hello_msg \
tests/test_disconnect_msg
tests/test_disconnect_msg \
tests/test_channel

tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
Expand Down Expand Up @@ -1108,6 +1111,10 @@ tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp
tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests_test_channel_SOURCES = tests/test_channel.cpp
tests_test_channel_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif

if FUZZING_ENGINE_LIB
Expand Down
2 changes: 1 addition & 1 deletion doc/zmq_bind.txt
Expand Up @@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7]

Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
linkzmq:zmq_socket[3].

Expand Down
4 changes: 2 additions & 2 deletions doc/zmq_connect.txt
Expand Up @@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7]

Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
linkzmq:zmq_socket[3].

Expand All @@ -39,7 +39,7 @@ immediately but as needed by 0MQ. Thus a successful call to _zmq_connect()_
does not mean that the connection was or could actually be established.
Because of this, for most transports and socket types the order in which
a 'server' socket is bound and a 'client' socket is connected to it does not
matter. The _ZMQ_PAIR_ sockets are an exception, as they do not automatically
matter. The _ZMQ_PAIR_ and _ZMQ_CHANNEL_ sockets are an exception, as they do not automatically
reconnect to endpoints.

NOTE: following a _zmq_connect()_, for socket types except for ZMQ_ROUTER,
Expand Down
44 changes: 43 additions & 1 deletion doc/zmq_socket.txt
Expand Up @@ -41,7 +41,7 @@ the event that a peer is unavailable to receive them.

Conventional sockets allow only strict one-to-one (two peers), many-to-one
(many clients, one server), or in some cases one-to-many (multicast)
relationships. With the exception of 'ZMQ_PAIR', 0MQ sockets may be connected
relationships. With the exception of 'ZMQ_PAIR' and 'ZMQ_CHANNEL', 0MQ sockets may be connected
*to multiple endpoints* using _zmq_connect()_, while simultaneously accepting
incoming connections *from multiple endpoints* bound to the socket using
_zmq_bind()_, thus allowing many-to-many relationships.
Expand All @@ -60,6 +60,7 @@ Following are the thread safe sockets:
* ZMQ_SCATTER
* ZMQ_GATHER
* ZMQ_PEER
* ZMQ_CHANNEL

.Socket types
The following sections present the socket types defined by 0MQ, grouped by the
Expand Down Expand Up @@ -476,6 +477,47 @@ Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Return EAGAIN

Channel pattern
~~~~~~~~~~~~~~~~~~~~~~
The channel pattern is the thread-safe version of the exclusive pair pattern.
The channel pattern is used to connect a peer to precisely one other
peer. This pattern is used for inter-thread communication across the inproc
transport.

NOTE: Channel is still in draft phase.

ZMQ_CHANNEL
^^^^^^^^
A socket of type 'ZMQ_CHANNEL' can only be connected to a single peer at any one
time. No message routing or filtering is performed on messages sent over a
'ZMQ_CHANNEL' socket.

When a 'ZMQ_CHANNEL' socket enters the 'mute' state due to having reached the
high water mark for the connected peer, or, for connection-oriented transports,
if the ZMQ_IMMEDIATE option is set and there is no connected peer, then
any linkzmq:zmq_send[3] operations on the socket shall block until the peer
becomes available for sending; messages are not discarded.

While 'ZMQ_CHANNEL' sockets can be used over transports other than linkzmq:zmq_inproc[7],
their inability to auto-reconnect coupled with the fact new incoming connections will
be terminated while any previous connections (including ones in a closing state)
exist makes them unsuitable for TCP in most cases.

NOTE: 'ZMQ_CHANNEL' sockets are designed for inter-thread communication across
the linkzmq:zmq_inproc[7] transport and do not implement functionality such
as auto-reconnection.

NOTE: 'ZMQ_CHANNEL' sockets are threadsafe. They do not accept ZMQ_RCVMORE on receives.
This limits them to single part data.

[horizontal]
.Summary of ZMQ_CHANNEL characteristics
Compatible peer sockets:: 'ZMQ_CHANNEL'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Incoming routing strategy:: N/A
Outgoing routing strategy:: N/A
Action in mute state:: Block

Native Pattern
~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions include/zmq.h
Expand Up @@ -659,6 +659,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
#define ZMQ_PEER 19
#define ZMQ_CHANNEL 20

/* DRAFT Socket options. */
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
Expand Down
160 changes: 160 additions & 0 deletions src/channel.cpp
@@ -0,0 +1,160 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "precompiled.hpp"
#include "macros.hpp"
#include "channel.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"

zmq::channel_t::channel_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true),
_pipe (NULL)
{
options.type = ZMQ_CHANNEL;
}

zmq::channel_t::~channel_t ()
{
zmq_assert (!_pipe);
}

void zmq::channel_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);

zmq_assert (pipe_ != NULL);

// ZMQ_PAIR socket can only be connected to a single peer.
// The socket rejects any further connection requests.
if (_pipe == NULL)
_pipe = pipe_;
else
pipe_->terminate (false);
}

void zmq::channel_t::xpipe_terminated (pipe_t *pipe_)
{
if (pipe_ == _pipe)
_pipe = NULL;
}

void zmq::channel_t::xread_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}

void zmq::channel_t::xwrite_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}

int zmq::channel_t::xsend (msg_t *msg_)
{
// CHANNEL sockets do not allow multipart data (ZMQ_SNDMORE)
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}

if (!_pipe || !_pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}

_pipe->flush ();

// Detach the original message from the data buffer.
const int rc = msg_->init ();
errno_assert (rc == 0);

return 0;
}

int zmq::channel_t::xrecv (msg_t *msg_)
{
// Deallocate old content of the message.
int rc = msg_->close ();
errno_assert (rc == 0);

if (!_pipe) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);

errno = EAGAIN;
return -1;
}

// Drop any messages with more flag
bool read = _pipe->read (msg_);
while (read && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
read = _pipe->read (msg_);
while (read && msg_->flags () & msg_t::more)
read = _pipe->read (msg_);

// get the new message
if (read)
read = _pipe->read (msg_);
}

if (!read) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);

errno = EAGAIN;
return -1;
}

return 0;
}

bool zmq::channel_t::xhas_in ()
{
if (!_pipe)
return false;

return _pipe->check_read ();
}

bool zmq::channel_t::xhas_out ()
{
if (!_pipe)
return false;

return _pipe->check_write ();
}
69 changes: 69 additions & 0 deletions src/channel.hpp
@@ -0,0 +1,69 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#ifndef __ZMQ_CHANNEL_HPP_INCLUDED__
#define __ZMQ_CHANNEL_HPP_INCLUDED__

#include "blob.hpp"
#include "socket_base.hpp"
#include "session_base.hpp"

namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;

class channel_t ZMQ_FINAL : public socket_base_t
{
public:
channel_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~channel_t ();

// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);

private:
zmq::pipe_t *_pipe;

ZMQ_NON_COPYABLE_NOR_MOVABLE (channel_t)
};
}

#endif

0 comments on commit 3da84c6

Please sign in to comment.