Permalink
Browse files

Fix polling on XREP socket

When polling on XREP socket in incoming message part was prefetched,
but not the identity of sender. The problem is fixed by this patch.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
  • Loading branch information...
1 parent f9eb763 commit 91fdedf25c4d76b0ec0aeb5d1d9f1c9a1a769447 @sustrik sustrik committed Dec 17, 2011
Showing with 42 additions and 11 deletions.
  1. +36 −9 src/xrep.cpp
  2. +6 −2 src/xrep.hpp
View
@@ -29,7 +29,7 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- prefetched (false),
+ prefetched (0),
more_in (false),
current_out (NULL),
more_out (false),
@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ int rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
// If there is a prefetched message, return it.
- if (prefetched) {
+ if (prefetched == 1) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more ? true : false;
- prefetched = false;
+ prefetched = 0;
return 0;
}
@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// have to the prefetched and return the ID of the peer instead.
int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
- prefetched = true;
+ prefetched = 1;
rc = msg_->close ();
errno_assert (rc == 0);
@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
// We may already have a message pre-fetched.
- if (prefetched)
+ if (prefetched > 0)
return true;
- // Try to read the next message to the pre-fetch buffer.
- int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
- if (rc != 0 && errno == EAGAIN)
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
return false;
+ }
zmq_assert (rc == 0);
- prefetched = true;
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
return true;
}
View
@@ -67,8 +67,12 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- // Have we prefetched a message.
- bool prefetched;
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
// Holds the prefetched message.
msg_t prefetched_msg;

0 comments on commit 91fdedf

Please sign in to comment.