Permalink
Browse files

Merge pull request #320 from shripchenko/master

ZMQ BUG FOUND + fixes for zmq_unbind() / zmq_disconnect() usage corner cases
  • Loading branch information...
2 parents 952127d + 057fab0 commit b9fb9198779a6db23aadb1cd803c76edd8dcde5a @hintjens hintjens committed Apr 21, 2012
Showing with 113 additions and 6 deletions.
  1. +1 −1 src/ipc_listener.cpp
  2. +1 −1 src/pipe.cpp
  3. +4 −3 src/session_base.cpp
  4. +6 −0 src/socket_base.cpp
  5. +1 −0 src/tcp_listener.cpp
  6. +3 −1 tests/Makefile.am
  7. +97 −0 tests/test_term_endpoint.cpp
View
@@ -65,6 +65,7 @@ void zmq::ipc_listener_t::process_plug ()
void zmq::ipc_listener_t::process_term (int linger_)
{
rm_fd (handle);
+ close ();
own_t::process_term (linger_);
}
@@ -182,4 +183,3 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
}
#endif
-
View
@@ -359,7 +359,7 @@ void zmq::pipe_t::terminate (bool delay_)
// active state.
else if (state == delimited) {
send_pipe_term (peer);
- state = terminated;
+ state = terminated;
}
// There are no other states.
View
@@ -357,6 +357,7 @@ void zmq::session_base_t::proceed_with_term ()
void zmq::session_base_t::timer_event (int id_)
{
+
// Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id);
@@ -376,13 +377,13 @@ void zmq::session_base_t::detached ()
}
// Reconnect.
- if (options.reconnect_ivl != -1)
- start_connecting (true);
+ if (options.reconnect_ivl != -1)
+ start_connecting (true);
// For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
- pipe->hiccup ();
+ pipe->hiccup ();
}
void zmq::session_base_t::start_connecting (bool wait_)
View
@@ -552,6 +552,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return -1;
}
+ // Process pending commands, if any, since there could be pending unprocessed process_own()'s
+ // (from launch_child() for example) we're asked to terminate now.
+ int rc = process_commands (0, false);
+ if (unlikely (rc != 0))
+ return -1;
+
// Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second)
View
@@ -73,6 +73,7 @@ void zmq::tcp_listener_t::process_plug ()
void zmq::tcp_listener_t::process_term (int linger_)
{
rm_fd (handle);
+ close ();
own_t::process_term (linger_);
}
View
@@ -13,7 +13,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_invalid_rep \
test_msg_flags \
test_connect_resolve \
- test_last_endpoint
+ test_last_endpoint \
+ test_term_endpoint
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
@@ -33,6 +34,7 @@ test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
+test_term_endpoint_SOURCES = test_term_endpoint.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
@@ -0,0 +1,97 @@
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
+
+
+int main (int argc, char *argv [])
+{
+ int rc;
+ char buf[32];
+ const char *ep = "tcp://127.0.0.1:5560";
+
+ fprintf (stderr, "unbind endpoint test running...\n");
+
+ // Create infrastructure.
+ void *ctx = zmq_init (1);
+ assert (ctx);
+ void *push = zmq_socket (ctx, ZMQ_PUSH);
+ assert (push);
+ rc = zmq_bind (push, ep);
+ assert (rc == 0);
+ void *pull = zmq_socket (ctx, ZMQ_PULL);
+ assert (pull);
+ rc = zmq_connect (pull, ep);
+ assert (rc == 0);
+
+ // Pass one message through to ensure the connection is established.
+ rc = zmq_send (push, "ABC", 3, 0);
+ assert (rc == 3);
+ rc = zmq_recv (pull, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Unbind the lisnening endpoint
+ rc = zmq_unbind (push, ep);
+ assert (rc == 0);
+
+ // Let events some time
+ zmq_sleep (1);
+
+ // Check that sending would block (there's no outbound connection).
+ rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
+ assert (rc == -1 && zmq_errno () == EAGAIN);
+
+ // Clean up.
+ rc = zmq_close (pull);
+ assert (rc == 0);
+ rc = zmq_close (push);
+ assert (rc == 0);
+ rc = zmq_term (ctx);
+ assert (rc == 0);
+
+
+ // Now the other way round.
+ fprintf (stderr, "disconnect endpoint test running...\n");
+
+
+ // Create infrastructure.
+ ctx = zmq_init (1);
+ assert (ctx);
+ push = zmq_socket (ctx, ZMQ_PUSH);
+ assert (push);
+ rc = zmq_connect (push, ep);
+ assert (rc == 0);
+ pull = zmq_socket (ctx, ZMQ_PULL);
+ assert (pull);
+ rc = zmq_bind (pull, ep);
+ assert (rc == 0);
+
+ // Pass one message through to ensure the connection is established.
+ rc = zmq_send (push, "ABC", 3, 0);
+ assert (rc == 3);
+ rc = zmq_recv (pull, buf, sizeof (buf), 0);
+ assert (rc == 3);
+
+ // Disconnect the bound endpoint
+ rc = zmq_disconnect (push, ep);
+ assert (rc == 0);
+
+ // Let events some time
+ zmq_sleep (1);
+
+ // Check that sending would block (there's no inbound connections).
+ rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
+ assert (rc == -1 && zmq_errno () == EAGAIN);
+
+ // Clean up.
+ rc = zmq_close (pull);
+ assert (rc == 0);
+ rc = zmq_close (push);
+ assert (rc == 0);
+ rc = zmq_term (ctx);
+ assert (rc == 0);
+
+ return 0;
+}

0 comments on commit b9fb919

Please sign in to comment.