Permalink
Browse files

Fixed issue #443

  • Loading branch information...
1 parent d6e0ae2 commit f87bf38293a4259ab0d9ad58506981736eb96ae4 @hintjens hintjens committed Oct 7, 2012
Showing with 37 additions and 1 deletion.
  1. +14 −0 doc/zmq_setsockopt.txt
  2. +1 −0 include/zmq.h
  3. +17 −1 src/xpub.cpp
  4. +5 −0 src/xpub.hpp
View
@@ -385,6 +385,20 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
+ZMQ_XPUB_VERBOSE: Set the XPUB socket behavior
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the 'XPUB' socket behavior on new subscriptions. A value of '0' is the default
+and passes only new subscription messages to upstream. A value of '1' passes all
+subscription messages upstream.
+
+[horizontal]
+Option value type:: int
+Option value unit:: 0, 1
+Default value:: 0
+Applicable socket types:: ZMQ_XPUB
+
+
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Override 'SO_KEEPALIVE' socket option(where supported by OS).
View
@@ -249,6 +249,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
+#define ZMQ_XPUB_VERBOSE 40
/* Message options */
#define ZMQ_MORE 1
View
@@ -28,6 +28,7 @@
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
+ verbose(false),
more (false)
{
options.type = ZMQ_XPUB;
@@ -70,7 +71,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call.
- if (unique && options.type != ZMQ_PUB)
+ if (options.type == ZMQ_XPUB && (unique || verbose))
pending.push_back (blob_t (data, size));
}
@@ -83,6 +84,21 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
dist.activated (pipe_);
}
+int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != ZMQ_XPUB_VERBOSE) {
+ errno = EINVAL;
+ return -1;
+ }
+ if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ verbose = *static_cast <const int*> (optval_);
+ return 0;
+}
+
void zmq::xpub_t::xterminated (pipe_t *pipe_)
{
// Remove the pipe from the trie. If there are topics that nobody
View
@@ -54,6 +54,7 @@ namespace zmq
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterminated (zmq::pipe_t *pipe_);
private:
@@ -72,6 +73,10 @@ namespace zmq
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
+ // If true, send all subscription messages upstream, not just
+ // unique ones
+ bool verbose;
+
// True if we are in the middle of sending a multi-part message.
bool more;

0 comments on commit f87bf38

Please sign in to comment.