Skip to content

Commit

Permalink
Fix 1478: receive unsubscriptions in XPUB when verbose
Browse files Browse the repository at this point in the history
Fixes not receiving unsubscription messages in XPUB socket with
ZMQ_XPUB_VERBOSE and using a XSUB-XPUB proxy in front.

This adds two modifications:

- It adds a new flag, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, to enable verbose
  unsubscription messages, necessary when using a XSUB/XPUB proxy.

- It adds a boolean switch to zmq::mtrie_t::rm () to control if the
  callback is invoked every time or only in the last removal. Necessary
  when a pipe is terminated and the verbose mode for unsubscriptions is
  enabled.
  • Loading branch information
jimenezrick committed Jul 23, 2015
1 parent 305c075 commit ec5592d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 23 deletions.
24 changes: 22 additions & 2 deletions doc/zmq_setsockopt.txt
Expand Up @@ -14,8 +14,9 @@ SYNOPSIS

Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER,
ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM
and ZMQ_RCVHWM, only take effect for subsequent socket bind/connects.
ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, ZMQ_REQ_CORRELATE,
ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, only take effect for
subsequent socket bind/connects.

Specifically, security options take effect for subsequent bind/connect calls,
and can be changed at any time to affect subsequent binds and/or connects.
Expand Down Expand Up @@ -839,6 +840,25 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB


ZMQ_XPUB_VERBOSE_UNSUBSCRIBE: provide all unsubscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions.
A value of '0' is the default and passes only the last unsubscription message to
upstream. A value of '1' passes all unsubscription messages upstream.

This behaviour should be enabled in all the intermediary XPUB sockets if
ZMQ_XPUB_VERBOSE is also being used in order to allow the correct forwarding
of all the unsubscription messages.

NOTE: This behaviour only takes effect when ZMQ_XPUB_VERBOSE is also enabled.

[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_XPUB


ZMQ_XPUB_MANUAL: change the subscription handling to manual
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket subscription handling mode manual/automatic.
Expand Down
1 change: 1 addition & 0 deletions include/zmq.h
Expand Up @@ -319,6 +319,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
#define ZMQ_HEARTBEAT_IVL 75
#define ZMQ_HEARTBEAT_TTL 76
#define ZMQ_HEARTBEAT_TIMEOUT 77
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78

/* Message options */
#define ZMQ_MORE 1
Expand Down
23 changes: 14 additions & 9 deletions src/mtrie.cpp
Expand Up @@ -159,23 +159,28 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,

void zmq::mtrie_t::rm (pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
void *arg_, bool call_on_uniq_)
{
unsigned char *buff = NULL;
rm_helper (pipe_, &buff, 0, 0, func_, arg_);
rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_);
free (buff);
}

void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
void *arg_, bool call_on_uniq_)
{
// Remove the subscription from this node.
if (pipes && pipes->erase (pipe_) && pipes->empty ()) {
func_ (*buff_, buffsize_, arg_);
delete pipes;
pipes = 0;
if (pipes && pipes->erase (pipe_)) {
if (!call_on_uniq_ || pipes->empty ()) {
func_ (*buff_, buffsize_, arg_);
}

if (pipes->empty ()) {
delete pipes;
pipes = 0;
}
}

// Adjust the buffer.
Expand All @@ -194,7 +199,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
(*buff_) [buffsize_] = min;
buffsize_++;
next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
func_, arg_);
func_, arg_, call_on_uniq_);

// Prune the node if it was made redundant by the removal
if (next.node->is_redundant ()) {
Expand All @@ -217,7 +222,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
(*buff_) [buffsize_] = min + c;
if (next.table [c]) {
next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
maxbuffsize_, func_, arg_);
maxbuffsize_, func_, arg_, call_on_uniq_);

// Prune redundant nodes from the mtrie
if (next.table [c]->is_redundant ()) {
Expand Down
9 changes: 5 additions & 4 deletions src/mtrie.hpp
Expand Up @@ -54,11 +54,12 @@ namespace zmq
bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);

// Remove all subscriptions for a specific peer from the trie.
// If there are no subscriptions left on some topics, invoke the
// supplied callback function.
// The call_on_uniq_ flag controls if the callback is invoked
// when there are no subscriptions left on some topics or on
// every removal.
void rm (zmq::pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_);
void *arg_, bool call_on_uniq_);

// Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated.
Expand All @@ -75,7 +76,7 @@ namespace zmq
void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_);
void *arg_, bool call_on_uniq_);
bool rm_helper (unsigned char *prefix_, size_t size_,
zmq::pipe_t *pipe_);
bool is_redundant () const;
Expand Down
21 changes: 14 additions & 7 deletions src/xpub.cpp
Expand Up @@ -36,7 +36,8 @@

zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
verbose (false),
verbose_subs (false),
verbose_unsubs (false),
more (false),
lossy (true),
manual(false),
Expand Down Expand Up @@ -101,9 +102,11 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
else
unique = subscriptions.add(data + 1, size - 1, pipe_);

// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
// If the (un)subscription is not a duplicate store it so that it can be
// passed to the user on next recv call unless verbose mode is enabled
// which makes to pass always these messages.
if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
(*data == 0 && verbose_unsubs && verbose_subs))) {
pending_data.push_back(blob_t(data, size));
pending_flags.push_back(0);
}
Expand All @@ -126,15 +129,19 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE ||
option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
{
if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL;
return -1;
}

if (option_ == ZMQ_XPUB_VERBOSE)
verbose = (*static_cast <const int*> (optval_) != 0);
verbose_subs = (*static_cast <const int*> (optval_) != 0);
else
if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE)
verbose_unsubs = (*static_cast <const int*> (optval_) != 0);
else
if (option_ == ZMQ_XPUB_NODROP)
lossy = (*static_cast <const int*> (optval_) == 0);
Expand Down Expand Up @@ -173,7 +180,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
// Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions
// upstream.
subscriptions.rm (pipe_, send_unsubscription, this);
subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);

dist.pipe_terminated (pipe_);
}
Expand Down
6 changes: 5 additions & 1 deletion src/xpub.hpp
Expand Up @@ -84,7 +84,11 @@ namespace zmq

// If true, send all subscription messages upstream, not just
// unique ones
bool verbose;
bool verbose_subs;

// If true, send all unsubscription messages upstream, not just
// unique ones
bool verbose_unsubs;

// True if we are in the middle of sending a multi-part message.
bool more;
Expand Down

0 comments on commit ec5592d

Please sign in to comment.