Skip to content

Commit

Permalink
fix race condition in session init
Browse files Browse the repository at this point in the history
Signed-off-by: Dhammika Pathirana <dhammika@gmail.com>
  • Loading branch information
dhammika authored and sustrik committed Dec 17, 2010
1 parent 27e83cc commit b19ee99
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
22 changes: 19 additions & 3 deletions src/zmq_engine.cpp
Expand Up @@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outsize (0),
encoder (out_batch_size),
inout (NULL),
ephemeral_inout (NULL),
options (options_),
plugged (false)
{
Expand All @@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
zmq_assert (!plugged);
plugged = true;
ephemeral_inout = NULL;

// Conncet to session/init object.
// Connect to session/init object.
zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_);
Expand Down Expand Up @@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from init/session object.
encoder.set_inout (NULL);
decoder.set_inout (NULL);
ephemeral_inout = inout;
inout = NULL;
}

Expand Down Expand Up @@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event ()
}

// Flush all messages the decoder may have produced.
inout->flush ();
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout);
ephemeral_inout->flush ();
} else {
inout->flush ();
}

if (disconnection)
error ();
Expand All @@ -152,7 +161,14 @@ void zmq::zmq_engine_t::out_event ()

outpos = NULL;
encoder.get_data (&outpos, &outsize);


// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout);
ephemeral_inout->flush ();
return;
}

// If there is no data to send, stop polling for output.
if (outsize == 0) {
reset_pollout (handle);
Expand Down
3 changes: 3 additions & 0 deletions src/zmq_engine.hpp
Expand Up @@ -70,6 +70,9 @@ namespace zmq

i_inout *inout;

// Detached transient inout handler.
i_inout *ephemeral_inout;

options_t options;

bool plugged;
Expand Down
44 changes: 27 additions & 17 deletions src/zmq_init.cpp
Expand Up @@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
sent (false),
received (false),
socket (socket_),
Expand Down Expand Up @@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
options.identity.size ());
sent = true;

// If initialisation is done, pass the engine to the session and
// destroy the init object.
// Try finalize initialization.
finalise_initialisation ();

return true;
Expand All @@ -92,6 +92,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)

received = true;

// Try finalize initialization.
finalise_initialisation ();

return true;
}

Expand All @@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush ()
if (!received)
return;

// If initialisation is done, pass the engine to the session and
// destroy the init object.
finalise_initialisation ();
// Initialization is done, dispatch engine.
if (ephemeral_engine)
dispatch_engine ();
}

void zmq::zmq_init_t::detach ()
Expand Down Expand Up @@ -134,18 +137,31 @@ void zmq::zmq_init_t::process_unplug ()
}

void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
if (sent && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
return;
}
}

void zmq::zmq_init_t::dispatch_engine ()
{
if (sent && received) {

// Engine must be detached.
zmq_assert (!engine);
zmq_assert (ephemeral_engine);

// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
// know about the session only if this object is owned by it. Thus,
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
engine->unplug ();
send_attach (session, engine, peer_identity, true);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}
Expand All @@ -165,9 +181,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
Expand All @@ -178,9 +192,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
Expand All @@ -194,9 +206,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
Expand Down
4 changes: 4 additions & 0 deletions src/zmq_init.hpp
Expand Up @@ -44,6 +44,7 @@ namespace zmq
private:

void finalise_initialisation ();
void dispatch_engine ();

// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
Expand All @@ -58,6 +59,9 @@ namespace zmq
// Associated wire-protocol engine.
i_engine *engine;

// Detached transient engine.
i_engine *ephemeral_engine;

// True if our own identity was already sent to the peer.
bool sent;

Expand Down

0 comments on commit b19ee99

Please sign in to comment.