Skip to content

Commit

Permalink
Merge pull request #461 from hurtonm/code_cleanup
Browse files Browse the repository at this point in the history
session_base: code cleanup
  • Loading branch information
hintjens committed Nov 7, 2012
2 parents 5da9712 + e51a1f0 commit 4e028ec
Showing 1 changed file with 13 additions and 17 deletions.
30 changes: 13 additions & 17 deletions src/session_base.cpp
Expand Up @@ -157,8 +157,7 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_)
{
// First message to send is identity
if (!identity_sent) {
zmq_assert (!(msg_->flags () & msg_t::more));
if (unlikely (!identity_sent)) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
Expand All @@ -179,7 +178,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_)
{
// First message to receive is identity
if (!identity_received) {
if (unlikely (!identity_received)) {
msg_->set_flags (msg_t::identity);
identity_received = true;
if (!options.recv_identity) {
Expand Down Expand Up @@ -228,10 +227,8 @@ void zmq::session_base_t::clean_pipes ()
msg_t msg;
int rc = msg.init ();
errno_assert (rc == 0);
if (pull_msg (&msg) != 0) {
zmq_assert (!incomplete_in);
break;
}
rc = pull_msg (&msg);
errno_assert (rc == 0);
rc = msg.close ();
errno_assert (rc == 0);
}
Expand All @@ -258,11 +255,10 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
terminate ();
}


// If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed
// with termination safely.
if (pending && !pipe && terminating_pipes.size () == 0)
if (pending && !pipe && terminating_pipes.empty ())
proceed_with_term ();
}

Expand Down Expand Up @@ -470,21 +466,24 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif

#if defined ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_OPENPGM

// Both PGM and EPGM transports are using the same infrastructure.
if (addr->protocol == "pgm" || addr->protocol == "epgm") {

zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);

// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (addr->protocol == "epgm");
bool const udp_encapsulation = addr->protocol == "epgm";

// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {

// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
io_thread, options);
alloc_assert (pgm_sender);

Expand All @@ -493,11 +492,10 @@ void zmq::session_base_t::start_connecting (bool wait_)

send_attach (this, pgm_sender);
}
else
if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
else {

// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
io_thread, options);
alloc_assert (pgm_receiver);

Expand All @@ -506,8 +504,6 @@ void zmq::session_base_t::start_connecting (bool wait_)

send_attach (this, pgm_receiver);
}
else
zmq_assert (false);

return;
}
Expand Down

0 comments on commit 4e028ec

Please sign in to comment.