Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First subscriptions seem to be dropped when a PUB connects to a SUB #2267

Open
sublee opened this issue Dec 22, 2016 · 18 comments
Open

First subscriptions seem to be dropped when a PUB connects to a SUB #2267

sublee opened this issue Dec 22, 2016 · 18 comments

Comments

@sublee
Copy link

sublee commented Dec 22, 2016

I usually use PUB sockets as clients of SUB sockets. On this topology, first subscriptions from SUB sockets always seem to be dropped.

@kexplo who is my co-worker made a code to reproduce this issue by C++. There is a switch constant named PUB_CONNECTS. If it is true, a PUB socket connects to a SUB socket then the SUB socket won't receive the message the PUB socket sent. Otherwise, a SUB socket connects to a PUB socket and everything is going to be fine:

#include <zmq.h>
#include <iostream>
#include <unistd.h>


// When a SUB socket binds to an address then a PUB socket connects to the
// SUB socket, the first subscription seems to be dropped.
const bool PUB_CONNECTS = true;


int main(int argc, char *argv[])
{
  void *context = zmq_ctx_new();
  void *pub = zmq_socket(context, ZMQ_PUB);
  void *sub = zmq_socket(context, ZMQ_SUB);

  // Make one of PUB or SUB sockets be a server.
  void *server = 0;
  void *client = 0;
  if (PUB_CONNECTS)
  {
    server = sub;
    client = pub;
  }
  else
  {
    server = pub;
    client = sub;
  }
  zmq_bind(server, "tcp://127.0.0.1:5932");
  zmq_connect(client, "tcp://127.0.0.1:5932");

  // Start to subscribe "hello".
  zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "hello", 5);
  usleep(1000);

  // Publish and receive a message.
  zmq_send(pub, "hello world", 11, 0);
  std::cout << "SUB is waiting for an incoming message..." << std::endl;
  char recv_buf[40] = { 0 };
  int n_bytes = zmq_recv(sub, recv_buf, 40, 0);
  if (n_bytes > 0)
  {
    std::cout << recv_buf << std::endl;
  }

  // Finalize.
  zmq_close(sub);
  zmq_close(pub);
  zmq_ctx_destroy(context);
  return 0;
}

I tested with ZeroMQ-4.2.0.

@bluca
Copy link
Member

bluca commented Dec 30, 2016

This is an old and known problem, and this is why usually the PUB binds and the SUB connects.

https://zeromq.jira.com/browse/LIBZMQ-270

Basically, given the connect is asynchronous, there is nothing to send the subscribe to. The internal commands are processed in the application thread rather than in the I/O thread, so if you look with GDB you will see that when the zmq_recv call runs only at that point the subscription message is actually sent, because only at that point the sub socket context in the application thread sees that the pub is now connected and it can do it.

A workaround usable in real-world application is to poll, since that will also take care of running the internal state machine/commands, as highlighted in the example linked on that jira ticket:

https://gist.github.com/hintjens/7344533

So in your code, if you poll before and after the zmq_send, it will then work.

@bluca
Copy link
Member

bluca commented Dec 30, 2016

This should be fixed, but as it's written in the mentioned Jira ticket it would be quite complicated.

@sublee
Copy link
Author

sublee commented Jan 3, 2017

@bluca Thank you for explaining. I understood that bug. I think the documentation for PUB/SUB has to warn the bug for other users.

Is it okay that I don't close this issue until we have actually fixed the bug even though this issue is duplicated with an issue on the Jira?

@bluca
Copy link
Member

bluca commented Jan 3, 2017

Yes the Jira ticketing is deprecated and read-only

@sublee
Copy link
Author

sublee commented Jan 3, 2017

Okay. Does your team have someone who can fix the issue?

@bluca
Copy link
Member

bluca commented Jan 3, 2017

I do not have a team :-) I don't think anyone is actively looking into this at the moment.

@WallStProg
Copy link
Contributor

I'm trying to understand this, and apparently I'm not doing a very good job so far ;-)

First, let me see if I can describe the problem accurately:

Connect Direction Result
sub -> pub OK
pub -> sub NG

OK means that the publisher gets the subscription information from the subscriber, so subsequent messages published by the publisher will be delivered to the subscriber.

NG means that the publisher does NOT get the subscription information from the subscriber, so that subsequent messages published by the publisher are NOT delivered to the subscriber.

There are a number of references to this problem, incuding:

Several of these suggest doing a zmq_poll, to run the internal commands (I believe this is the internal zmq::socket_base_t::process_commands method).

Pieter's workaround (https://gist.github.com/hintjens/7344533) has the sub socket calling zmq_poll after the pub socket connects to it. That works fine in the simple example provided, but in the real world the sub and pub sockets are going to be in separate programs, so:

How does the sub socket know that a publisher has connected to it and it needs to call zmq_poll?

There is another problem that I think I may be running into, which is that my sub socket is ALWAYS in a zmq_poll call, but it seems that the socket only gets one chance to call process_commands, and that is on entry to zmq_poll.

How can a sub socket in a zmq_poll call get interrupted when it has commands that it needs to run?

(Setting a timeout on the zmq_poll call should work, but I don't know that the timeout can be set low enough to be useful, without burning a CPU core).

In an earlier comment in this thread, Luca says:

So in your code, if you poll before and after the zmq_send, it will then work.

  • I assume that this means to poll on the sub socket?
  • If that is so, we're back to my earlier question about how does a sub socket in another process know it needs to call zmq_poll?

At this point, the only general solution I can think of is to use zmq_socket_monitor to, at a minimum, force any running zmq_poll call to return and then be re-executed.

Or is there an easier way? I've been calling zmq_poll after zmq_connect on sockets, but that doesn't appear to help -- it's unnecessary when sub connects to pub, and it doesn't do anything when pub connects to sub -- it's the sub that needs to call zmq_poll, but he's stuck in zmq_poll already, with no obvious way to break out.

Thanks in advance for any advice, suggestions, etc.!

@WallStProg
Copy link
Contributor

I've put together an exhaustive survey of the inter-thread communications methods here: https://github.com/WallStProg/zmqtests/tree/master/threads.

This includes a sample of using PUB/SUB sockets for inter-thread communication that illustrates the underlying problem, as well as problems with the suggested work-arounds.

Short version is that I've been unable to come up with any workaround that reliably avoids losing initial messages when PUB connects to SUB. Polling on the PUB side helps a bit, but not enough to be reliable -- it looks like the poll needs to happen on the SUB side, but at least in the most common scenarios I can come up with the SUB side is already sitting in a zmq_poll call.

One potential solution that occurs to me would be to have the SUB side return EINTR from the zmq_poll call when a connection attempt is made. This would allow the code on the SUB side to re-enter the zmq_poll call, which could trigger the exchange of subscription information. I have no idea whether this would be practical, or even possible. Also, since this would change existing behavior it would probably need to be explicitly requested by zmq_setsockopt).

If there's a way to resolve this issue that I haven't been able to find, I'd be very grateful to learn it. Thanks in advance!

@mesca
Copy link

mesca commented Jun 4, 2018

Any progress on this issue?

@WallStProg
Copy link
Contributor

On my end, I've been running a bunch of tests that I think demonstrate that the "work-around" laid out in https://gist.github.com/hintjens/7344533 is unreliable -- the code will eventually deadlock on the zmq_recv call. (In one test, this happened on iteration # 723).

I haven't yet had time to write this up in a way that is rigorous enough to present, but I hope to be able to do so soon.

@WallStProg
Copy link
Contributor

It turns out that there's nothing special needed -- simply run the code repeatedly and it will eventually hang on the call to zmq_recv. In the test documented, this was on iteration 17.

So far as I can tell the work-around doesn't...

@WallStProg
Copy link
Contributor

gdb.txt
screenshot-2

@somdoron
Copy link
Member

Every socket has a file descriptor attached to it for signaling.
So when you add the SUB to a zmq_poll, the poll will be interrupted whenever there is something to process.
zmq_poll will then call ZMQ_EVENTS which will process the commands.

So, adding SUB to the main loop zmq_poll should allow the SUB to handle all incoming connections.
@WallStProg you are saying that is not the case?

The workaround fails because sometimes the connecting phase takes more than 1ms (the timeout for the zmq_poll).

Specific fix to the workaround can be to have the zmq_poll in a loop and to query the number of connected peers (which is not possible at the moment).
I can make a pull request that will allow checking the number of peers of a socket.
So the workaround will look something like this:

    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void *pub = zmq_socket (ctx, ZMQ_PUB);
    assert (pub);
    int rc;

    void *sub = zmq_socket (ctx, ZMQ_SUB);
    rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_bind (sub, "tcp://127.0.0.1:5555");
    assert (rc >= 0);

    rc = zmq_connect (pub, "tcp://127.0.0.1:5555");
    assert (rc >= 0);

    zmq_pollitem_t pollitems [] = { { sub, 0, ZMQ_POLLIN, 0 } };
    while (true) {
        zmq_poll (pollitems, 1, 1);
        int peers;
        rc = zmq_getsockopt (sub, ZMQ_PEERS, &peers, 4);
        if (peers != 0)
            break;
    }

    rc = zmq_send (pub, "TEST", 5, 0);
    assert (rc == 5);
    char buffer [5];

    puts ("Waiting for message");
    rc = zmq_recv (sub, buffer, 5, 0);
    assert (rc == 5);
    assert (strcmp (buffer, "TEST") == 0);

    puts (" OK");
    return 0;

@WallStProg @mesca @sublee will that help?

@WallStProg
Copy link
Contributor

Hi Doron:

Thanks for the update.

@WallStProg you are saying that is not the case?

What I'm saying is that the canonical "work-around" for this issue, originally posted by Pieter (calling zmq_poll to trigger process_commands), is not at all reliable. Unfortunately, newbies keep getting pointed to it, but the fact is that it just flat doesn't work -- sometimes (most of the time, in fact), you "get lucky", but that's completely non-deterministic.

The workaround fails because sometimes the connecting phase takes more than 1ms (the timeout for the zmq_poll).

OK, finally an explanation that makes sense!

I can make a pull request that will allow checking the number of peers of a socket.

I'm sure that would be very helpful for a lot of people, but in my case it's not -- my network is dynamic, so clients don't know how many peers they are supposed to have.

@skrap
Copy link

skrap commented Apr 4, 2019

Out of curiosity, do push/pull sockets suffer from this issue as well? I'm wondering if a better topology (for my use case) might be:

                      +----+
             +----+   |PUSH|   +----+
             |PUSH|   +-+--+   |PUSH|
             +----+     |      +----+
                  \--\  |  /---/
                     |  |  |
                    +------+
                    | PULL |
                    |   |  |
                    | XPUB |
                    +------+
             +-------/ | \------+
             |         |        |
          +--+--+   +--+--+  +--+--+
          | SUB |   | SUB |  | SUB |
          +-----+   +-----+  +-----+

...with the broker in the middle republishing the messages received from the PULL socket. Does this get around the issue? I know this has the disadvantage of every "published" message going through the broker, but in my case I don't care — that's happening anyhow.

@WallStProg
Copy link
Contributor

WallStProg commented Apr 4, 2019 via email

@skrap
Copy link

skrap commented Apr 5, 2019

Hello Bill!
Thanks for the detailed and thoughtful response!
I'm actually using ZMQ as an IPC message bus in an embedded linux context, so there are a few assumptions which I've made which simplify things quite a bit:

  • I control process initialization order — the broker comes up first, and if it ever goes down, I shut down everything and bring it back up in order. The broker is very simple, though, and I've never seen it crash.
  • Some processes are very transient, and like to come up, fire a few messages into the bus, and go away. While I could implement a welcome message as you indicated, it seems like the push channel is a better fit. Also because the effort involved is not much more than s/PUB/PUSH/,s/XSUB/PULL/ ;-)
  • As it happens, I don't depend on any SUBs being serviced immediately. I suppose if I was [ab]using the bus as sort of a RPC mechanism, I would need a strategy for the case when either side was not fully attached to the bus yet, but fortunately that's not my issue right now!
    Many thanks again!
    Jonah

@stale
Copy link

stale bot commented Apr 4, 2020

This issue has been automatically marked as stale because it has not had activity for 365 days. It will be closed if no further activity occurs within 56 days. Thank you for your contributions.

@stale stale bot added the stale label Apr 4, 2020
@stale stale bot closed this as completed May 30, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants