Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Out-of-order message and slow performance when closing and reconnecting socket #3057

Open
will133 opened this issue Apr 26, 2018 · 8 comments

Comments

@will133
Copy link

will133 commented Apr 26, 2018

Please use this template for reporting suspected bugs or requests for help.

Issue description

I have posted this on pyzmq but it may be an issue in the library itself (zeromq/pyzmq#1171). Essentially I have an application where it would do a connect, send a message, close. It then can potentially reconnect, send another message, and close the socket. I've experienced messages being dropped occasionally even with linger set to what I think is reasonable.

Environment

  • libzmq version (commit hash if unreleased): 4.2.5 (from conda-forge) with pyzmq 17.0.0
  • OS: Linux
  • Python: 3.6.5

Minimal test code / Steps to reproduce the issue

import threading

_NUM_MESSAGE = 1000


class ReconnectThread(threading.Thread):
    def __init__(self, ctx, server_url):
        super(ReconnectThread, self).__init__()
        self.ctx = ctx
        self.server_url = server_url

    def _get_new_socket(self):
        socket = self.ctx.socket(zmq.DEALER)
        socket.linger = 1000
        socket.connect(self.server_url)
        return socket

    def run(self):
        for i in range(_NUM_MESSAGE):
            sock = self._get_new_socket()
            sock.send_multipart([('blah_%s' % i).encode('utf8')])
            sock.close()


if __name__ == "__main__":
    import zmq
    ctx = zmq.Context()
    try:
        server_socket = ctx.socket(zmq.DEALER)
        server_url = 'tcp://127.0.0.1:7384'
        server_socket.bind(server_url)
        thread = ReconnectThread(ctx, server_url)
        thread.start()
        for i in range(_NUM_MESSAGE):
            print(server_socket.recv_multipart())
    finally:
        ctx.destroy()

What's the actual result? (include assertion message & call stack if applicable)

I found two issues with the results. First is that it can appear out of order:

[b'blah_0']
[b'blah_1']
[b'blah_3']
[b'blah_10']
[b'blah_7']
[b'blah_8']
...

According to @minrk, this can be due to close() being async (which doesn't really close the socket), which causes multiple sockets being connected at the same time. I think this behavior is somewhat confusing. Is there some what to actually enforce the close() and linger to happen in a synchronous way?

I'd have to emphasize that in my actual code there shouldn't be that many connections (with close() happening in parallel). It's typically only a few, but it's just very difficult to reproduce the problem so I had to write this simple case.

The second problem, which is more serious, is that the code can hang. It seems like the linger of 1000 (1 second) is not sufficient. Thus messages are dropped and the main thread (recv_multipart()) can not received the same number of message. If I adjust the linger to 5000 I can get through with no hanging, but the output sometimes can stutter for a few seconds, which I didn't expect to happen.

What's the expected result?

I'd expect a more predictable close() or an alternative to make sure the linger happen before close() is returning. I understand the linger can drop messages, but since the alternative can be hanging (when the other end point goes away), I found myself always trying to set the appropriate linger. It does seem though with the close() behavior being unpredictable it's hard for me to set the correct value.

@bluca
Copy link
Member

bluca commented Apr 26, 2018

zmq_close is asynchronous.

You cannot expect ordering between different connections - it just doesn't make sense. Use the same socket if you require ordering between messages, or implement reordering in your application using monotonically increasing sequence number.
If you don't want to drop messages, set linger to unlimited.

@will133
Copy link
Author

will133 commented Apr 26, 2018

I think I'm ok with the async order (it's unexpected but I'll accept that). I think this is a question of the close() call. Can you tell me though what actually would happen when you call close()? Since the connect/send/close calls are all async, is it possible to have the close() happen before the connection is established so the send can be lost?

@bluca
Copy link
Member

bluca commented Apr 26, 2018

Yes, depending on the linger value (EDIT: and of course on external contingencies like the network stack, kernel, thread scheduling, etc etc) - to test it, simply connect to a non-existing endpoint and close the socket.
If you don't want that, you can set the linger to unlimited.

@bluca
Copy link
Member

bluca commented Apr 26, 2018

In general though, constantly creating and destroying sockets is an anti-pattern - it's configuration code paths, so performance will be terrible, and they are generally async operations.
What you want is long lived threads and sockets - see the actor pattern in CZMQ https://github.com/zeromq/czmq#zactor---simple-actor-framework

@will133
Copy link
Author

will133 commented Apr 30, 2018

I don't usually disconnect/reconnect the socket like my test case. I was debugging why a message would get dropped upon one reconnect call. The sequence is really:

socket = _get_new_socket()
socket.connect(...)
# there's an error case when it need to tell the dealer socket that there's 
# something wrong that we need to disconnect
socket.send_multipart(...)
socket.close()

# would attempt to reconnect with the same url:
socket = _get_new_socket()
socket.connect(...)
# Say we are ok again, send a message saying we are ok.
socket.send_multipart(...)

For some very difficult to reproduce case either send_multipart() message would get dropped occasionally. This is even if I set the linger for the socket to be 5000 (5 seconds), which I figured would be enough time to send before the socket is closed. The server should not be busy so I'd think it should at least made good attempt to deliver, so that seems odd to me.

I only peeked a little into the code and it seems the zmq_close() would only set a flag and not wait for anything. Do you know if the flag is set what would happen to the send_multipart() calls even if you have linger set? Would it be possible since I'm reconnecting to the same socket/url the earlier/later message would be dropped?

I ended up putting a sleep after the socket.close() and my code is now more reliable. I just want to understand more about what's going on since I do not like putting some random sleep everywhere where this can be an issue.

@bluca
Copy link
Member

bluca commented Apr 30, 2018

Multiparts messages are atomic - either all parts are sent or none is.
I don't understand why you set a limited linger and a random sleep, rather than simply setting the linger to -1 given it seems you don't want to drop.

@will133
Copy link
Author

will133 commented Apr 30, 2018

The problem is the server that I'm connecting to can be down, so setting a linger of -1 would be hanging the process forever.

@bluca
Copy link
Member

bluca commented Apr 30, 2018

Then change your application protocol to send a receipt message back instead of relying on random sleeps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants