Permalink
Browse files

allow XSUB/XPUB to send/recv messages unrelated to sub/unsub (LIBZMQ-…

…490)

zmq::xpub_t::xread_activated() – change to process messages without 0
or 1 prefix, but without affecting subscriptions

zmq::xsub_t::xsend() – change to send rather than discard messages
without 0 or 1 prefix, but without affecting subscriptions

Update documentation
  • Loading branch information...
jgm-esseforma committed Jan 8, 2013
1 parent 98a91e8 commit d32e3922785f170ce24159ab5e4b44badc473ec1
Showing with 11 additions and 9 deletions.
  1. +4 −2 doc/zmq_socket.txt
  2. +3 −0 src/xpub.cpp
  3. +4 −7 src/xsub.cpp
View
@@ -224,7 +224,8 @@ ZMQ_XPUB
Same as ZMQ_PUB except that you can receive subscriptions from the peers
in form of incoming messages. Subscription message is a byte 1 (for
subscriptions) or byte 0 (for unsubscriptions) followed by the subscription
-body.
+body. Messages without a sub/unsub prefix are also received, but have no
+effect on subscription status.
[horizontal]
.Summary of ZMQ_XPUB characteristics
@@ -240,7 +241,8 @@ ZMQ_XSUB
^^^^^^^^
Same as ZMQ_SUB except that you subscribe by sending subscription messages to
the socket. Subscription message is a byte 1 (for subscriptions) or byte 0
-(for unsubscriptions) followed by the subscription body.
+(for unsubscriptions) followed by the subscription body. Messages without a
+sub/unsub prefix may also be sent, but have no effect on subscription status.
[horizontal]
.Summary of ZMQ_XSUB characteristics
View
@@ -74,6 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
pending.push_back (blob_t (data, size));
}
+ else /*process message unrelated to sub/unsub*/ {
+ pending.push_back (blob_t (data, size));
+ }
sub.close ();
}
View
@@ -87,12 +87,6 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data ();
- // Malformed subscriptions.
- if (size < 1 || (*data != 0 && *data != 1)) {
- errno = EINVAL;
- return -1;
- }
-
// Process the subscription.
if (*data == 1) {
// this used to filter out duplicate subscriptions,
@@ -102,10 +96,13 @@ int zmq::xsub_t::xsend (msg_t *msg_)
subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_);
}
- else {
+ else if (*data == 0) {
if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_);
}
+ else /*upstream message unrelated to sub/unsub*/ {
+ return dist.send_to_all (msg_);
+ }
int rc = msg_->close ();
errno_assert (rc == 0);

0 comments on commit d32e392

Please sign in to comment.