Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition with received message causes ZMQ_CONNECT_ROUTING_ID to be assigned to wrong socket #3191

Closed
mvilim opened this issue Jul 25, 2018 · 3 comments

Comments

@mvilim
Copy link
Contributor

mvilim commented Jul 25, 2018

Issue description

There is an problem when a ROUTER socket is trying to connect to another socket using ZMQ_CONNECT_ROUTING_ID while receiving messages from a third socket. If a message is received (in the io thread) between setting the socket option and connecting to the socket, the routing id can be applied to the socket from which we received the message instead of the socket to which we are connecting.

I believe this is caused by socket_base_t::connect where pending commands are processed. If we process a command where a received message causes us to bind to new socket, we will use the routing id for that socket rather than the target of the connect call. The same behavior can be triggered by setting ZMQ_CONNECT_ROUTING_ID and then sending or receiving messages on the same socket (as that will also trigger process_commands).

I believe a possible solution is to change identify_peer in the stream and router socket implementations so that they can differentiate between peers being connected to from a local connect call (possibly by passing another argument in) and peers being connected to because of a received message.

I'm open to trying to write a pull request for this, but I first wanted to validate that those more familiar with the library see it as a bug. According to the docs, the option should only apply to sockets connected to via zmq_connect: "The ZMQ_CONNECT_RID option sets the peer id of the next host connected via the zmq_connect() call, and immediately readies that connection for data transfer with the named id". With the current behavior, there is no way to use ZMQ_CONNECT_ROUTING_ID reliably on a socket that is also receiving connections.

Environment

  • libzmq version: commit: 25f47cc
  • OS: Linux version 4.17.4-1-ARCH

Minimal test code / Steps to reproduce the issue

#include "testutil.hpp"

void test_router_2_router_while_receiving ()
{
    void *xbind, *zbind, *yconn;
    int ret;
    char buff[256];
    char msg[] = "hi 1";
    const char *xbindip = "tcp://127.0.0.1:5555";
    const char *zbindip = "tcp://127.0.0.1:5556";
    int zero = 0;
    size_t len = MAX_SOCKET_STRING;
    char x_endpoint[MAX_SOCKET_STRING];
    char z_endpoint[MAX_SOCKET_STRING];
    void *ctx = zmq_ctx_new ();

    //  Create xbind socket.
    xbind = zmq_socket (ctx, ZMQ_ROUTER);
    assert (xbind);
    ret = zmq_setsockopt (xbind, ZMQ_LINGER, &zero, sizeof (zero));
    assert (0 == ret);
    ret = zmq_bind (xbind, xbindip);
    assert (0 == ret);
    ret = zmq_getsockopt (xbind, ZMQ_LAST_ENDPOINT, x_endpoint, &len);
    assert (0 == ret);

    //  Create zbind socket.
    zbind = zmq_socket (ctx, ZMQ_ROUTER);
    assert (zbind);
    ret = zmq_setsockopt (zbind, ZMQ_LINGER, &zero, sizeof (zero));
    assert (0 == ret);
    ret = zmq_bind (zbind, zbindip);
    assert (0 == ret);
    ret = zmq_getsockopt (zbind, ZMQ_LAST_ENDPOINT, z_endpoint, &len);
    assert (0 == ret);

    //  Create yconn socket.
    yconn = zmq_socket (ctx, ZMQ_ROUTER);
    assert (yconn);
    ret = zmq_setsockopt (yconn, ZMQ_LINGER, &zero, sizeof (zero));
    assert (0 == ret);

    // set identites for each socket
    ret = zmq_setsockopt (xbind, ZMQ_ROUTING_ID, "X", 2);
    ret = zmq_setsockopt (yconn, ZMQ_ROUTING_ID, "Y", 2);
    ret = zmq_setsockopt (zbind, ZMQ_ROUTING_ID, "Z", 2);

    //  Make call to connect using a connect_routing_id.
    ret = zmq_setsockopt (yconn, ZMQ_CONNECT_ROUTING_ID, "X", 2);
    assert (0 == ret);
    ret = zmq_connect (yconn, x_endpoint);
    assert (0 == ret);

    //  Send some data.
    ret = zmq_send (yconn, "X", 2, ZMQ_SNDMORE);
    assert (2 == ret);
    ret = zmq_send (yconn, msg, 5, 0);
    assert (5 == ret);

    // wait for the Y->X message to be received (make sure Y is likely to win the race)
    msleep(SETTLE_TIME);

    // Now xbind tries to connect to zbind and send a message
    // this routing id will actually be applied to the Y socket while flushing received commands
    ret = zmq_setsockopt (xbind, ZMQ_CONNECT_ROUTING_ID, "Z", 2);
    assert (0 == ret);
    ret = zmq_connect (xbind, z_endpoint);
    assert (0 == ret);

    //  Try to send some data from x to z.
    ret = zmq_send (xbind, "Z", 2, ZMQ_SNDMORE);
    assert (2 == ret);
    ret = zmq_send (xbind, msg, 5, 0);
    assert (5 == ret);
    
    // wait for the X->Z message to be received (this is necessary for the 
    // test to fail without blocking)
    msleep(SETTLE_TIME);

    // nothing should have been received on the Y socket, but if Y wins the race, these assertions will fail
    ret = zmq_recv (yconn, buff, 256, ZMQ_DONTWAIT);
    assert (ret == -1);
    assert (zmq_errno () == EAGAIN);

    // the message should have been received on the Z socket
    ret = zmq_recv (zbind, buff, 256, 0);
    assert (ret && 'X' == buff[0]);
    ret = zmq_recv (zbind, buff + 128, 128, 0);
    assert (5 == ret && 'h' == buff[128]);

    ret = zmq_unbind (xbind, x_endpoint);
    assert (0 == ret);
    ret = zmq_unbind (zbind, z_endpoint);
    assert (0 == ret);
    ret = zmq_close (yconn);
    assert (0 == ret);
    ret = zmq_close (xbind);
    assert (0 == ret);
    ret = zmq_close (zbind);
    assert (0 == ret);

    zmq_ctx_destroy (ctx);
}

What's the actual result? (include assertion message & call stack if applicable)

The Y socket receives the message sent from X to identity Z. The assertion on line 82 of the test above will fail.

What's the expected result?

The Z socket should receive the message. You can change the likely winner of the race by removing the first call to msleep. This will cause the X socket to (likely) connect to Z and apply the routing id before it receives the message from Y.

@bluca
Copy link
Member

bluca commented Jul 25, 2018

As the zmq_setsockopt manpage says, you must set the options before connecting/binding apart from a restricted set of options.

@mvilim
Copy link
Contributor Author

mvilim commented Jul 25, 2018

In the test I provided the option is set before connecting:

// Now xbind tries to connect to zbind and send a message
// this routing id will actually be applied to the Y socket while flushing received commands
ret = zmq_setsockopt (xbind, ZMQ_CONNECT_ROUTING_ID, "Z", 2);
assert (0 == ret);
ret = zmq_connect (xbind, z_endpoint);
assert (0 == ret);

While in the zmq_connect call, a received message from a different socket (yconn) is processed, leading to the misasssignment of the "Z" id to the Y socket.

mvilim pushed a commit to mvilim/libzmq that referenced this issue Jul 25, 2018
…nnection (Issue zeromq#3191)

Solution: Add an identifier parameter for local attach to zmq::socket_base_t::attach_pipe
mvilim pushed a commit to mvilim/libzmq that referenced this issue Jul 26, 2018
…nnection (Issue zeromq#3191)

Solution: Add an identifier parameter for local attach to zmq::socket_base_t::attach_pipe
@bluca
Copy link
Member

bluca commented Jul 27, 2018

Fixed by #3193

@bluca bluca closed this as completed Jul 27, 2018
MohammadAlTurany pushed a commit to FairRootGroup/libzmq that referenced this issue Jan 28, 2019
…nnection (Issue zeromq#3191)

Solution: Add an identifier parameter for local attach to zmq::socket_base_t::attach_pipe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants