Skip to content

Commit

Permalink
Couple of bugs in XREP handling of identities fixed.
Browse files Browse the repository at this point in the history
wq:
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
  • Loading branch information
sustrik committed Nov 13, 2011
1 parent f8b0055 commit 1c23970
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions src/xrep.cpp
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0; return 0;
} }


// Get next message part. pipe_t *pipe = NULL;
pipe_t *pipe; while (true) {
int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0) // Get next message part.
return -1; int rc = fq.recvpipe (msg_, flags_, &pipe);

if (rc != 0)
// If identity is received, change the key assigned to the pipe. return -1;
if (unlikely (msg_->flags () & msg_t::identity)) {
// If identity is received, change the key assigned to the pipe.
if (likely (!(msg_->flags () & msg_t::identity)))
break;

zmq_assert (!more_in); zmq_assert (!more_in);


// Empty identity means we can preserve the auto-generated identity. // Empty identity means we can preserve the auto-generated identity.
Expand All @@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
} }
zmq_assert (it != outpipes.end ()); zmq_assert (it != outpipes.end ());
} }

// After processing the identity, try to get the next message.
rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
return -1;
} }


// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
Expand All @@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)


// We are at the beginning of a new message. Move the message part we // We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead. // have to the prefetched and return the ID of the peer instead.
rc = prefetched_msg.move (*msg_); int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
prefetched = true; prefetched = true;
rc = msg_->close (); rc = msg_->close ();
Expand All @@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void)


bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_in ()
{ {
// We may already have a message pre-fetched.
if (prefetched) if (prefetched)
return true; return true;
return fq.has_in ();
// Try to read the next message to the pre-fetch buffer.
int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN)
return false;
zmq_assert (rc == 0);
prefetched = true;
return true;
} }


bool zmq::xrep_t::xhas_out () bool zmq::xrep_t::xhas_out ()
Expand Down

0 comments on commit 1c23970

Please sign in to comment.