Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEALER socket losing messages #3

Closed
wants to merge 1 commit into from
Closed

Conversation

wehriam
Copy link

@wehriam wehriam commented Oct 18, 2011

Hi all,

I am seeing strange behavior in an application using DEALER and ROUTER sockets.

Requests made by the DEALER are always received by the ROUTER, but the responses made by the ROUTER are sometimes lost. I've added a test to demonstrate this behavior. It is the only change in this pull request.

https://github.com/wehriam/txZMQ/blob/master/txZMQ/test/test_async_dealer_router.py

In the application (but not demonstrated in the test) after some varying number of requests responses eventually fail to arrive entirely.

I was unsuccessful in resolving this issue and hope you'll have better luck in determining the underlying problem.

OSX Lion,
Python 2.7.1
ZMQ 2.1.9

Twisted==11.0.0
pep8==0.6.1
pyflakes==0.5.0
pyzmq==2.1.10
txZMQ==0.3
wsgiref==0.1.2
zope.interface==3.8.0

@fangpenlin
Copy link

Hi,

I encounter same problem, I notice that is caused by that the send operation is complete before we goes into reactor.run, namely, the select. For example:

send a message
time.sleep(3)
reactor.run()

It appears the fd will be notified only when the send operation complete, just the moment. Then it sets POLLIN event in socket. Therefore, reactor.run gets no chance to get the read event from fd.

send a message
time.sleep(3)
# fd read is notified here, and cleaned
# POLLIN event is set then
# reactor gets no chance to catch read event from fd
reactor.run()

To solve the problem, I think you can add def doRead() in send method

    def send(self, message):
        """
        Send message via ZeroMQ.

        @param message: message data
        """
        if not hasattr(message, '__iter__'):
            self.queue.append((0, message))
        else:
            self.queue.extend([(constants.SNDMORE, m) for m in message[:-1]])
            self.queue.append((0, message[-1]))

        # this is crazy hack: if we make such call, zeromq happily signals
        # available events on other connections
        self.socket.getsockopt(constants.EVENTS)

        self._startWriting()
        # sometimes the _startWriting completes before we goes into select of reactor.run
        # read the data here to avoid cases like that
        doRead()

It works fine for me, but I'm not sure is that behavior of zeromq correct.

@fangpenlin
Copy link

A workaround

class _BugFixMixin(object):

    def send(self, message):
        txZMQ.ZmqConnection.send(self, message)
        # XXX: sometimes, send operation is completed and data is available
        # for reading before we goes into select, namely, reactor
        # in this case, select can't catch a read event from fd
        # to solve the problem, we should check is there data to read here
        # it will clean the POLLIN event of the socket, it makes
        # the fd will be notified if there is event later
        reactor.callFromThread(txZMQ.ZmqConnection.doRead, self)

example usage:

class ZmqXREQConnection(_BugFixMixin, txZMQ.ZmqXREQConnection):
   pass

@minmax
Copy link

minmax commented Mar 22, 2012

Code for reproducing bug

https://gist.github.com/2160998

@erikkaplun
Copy link

The first workaround proposed by @victorlin worked. I preferred to avoid the second one as it uses threads.

@minmax
Copy link

minmax commented Mar 28, 2012

The first workaround proposed by @victorlin worked. I preferred to avoid the second one as it uses threads.

ZMQ is not threadsafe, so we need to work with its socket in same thread where it was created.
But txzmq does not care about that for a long time and sockets are supposed to be created and used in reactor's main thread.

As a result:

  1. do not wrap "doRead" with callFromThread or change other code to.
  2. wrap "doRead" with callLater(0, ...), to avoid stack overflow.

@fangpenlin
Copy link

I don't understand what you mean. As I know, reactor.callFromThread is same to callLater(0, ...).

Let's say, reactor.callFromThread add the function call to reactor, so it will get called at next iteration. Likewise, callLater(0, ...) create a function call at 0 second later, it will be triggered at next iteration as well. They are all in reactor.run (main thread), what's different? Stack overflow?

@minmax
Copy link

minmax commented Mar 28, 2012

callFromThread - "Use this method when you want to run a function in the reactor's thread from another thread."
txzmq are not threadsafe, so we do not need to use this method here.

@erikkaplun
Copy link

I had:

reactor.callLater(0, self.doRead)

but started running into trouble when calling .shutdown() on my ZmqConnection instances (in unit tests, twisted.trial requires shutting down your connections before exiting). The problem is that after .shutdown(), the callLater(0, ...) is still in effect, and when doRead fires for the last time, .shutdown() will have been called already and self.socket will be None, thus:

 File "/Users/erik/work/txzmq/txzmq/connection.py", line 174, in doRead
    events = self.socket.getsockopt(constants.EVENTS)
exceptions.AttributeError: 'NoneType' object has no attribute 'getsockopt'

The workaround/fix is:

reactor.callLater(0, lambda: self.doRead() if self.socket else None)

I will make this fix available in my fork of txzmq.

However, does anybody know if it would be cleaner or more efficient to store the IDelayedCall object returned by reactor.callLater and call .cancel() on that instead?

@smira
Copy link
Owner

smira commented Mar 29, 2012

Currently testing code with the fix... I'll try to make some implementation with compromise on performance overhead to call doRead on each send operation.

Thanks for the reports!

@erikkaplun
Copy link

For some reason when storing the IDelayedCall and then .cancel()-ing, I still get the same error, so I'm using this now:

reactor.callLater(0, lambda: self.doRead() if self.socket else None)

This solution is also much simpler—the hack-fix is contained in only one place/line of code.

@smira
Copy link
Owner

smira commented Mar 30, 2012

Thanks to everybody who submitted patches, testcases and filed bugs!

I've comitted "fix" based on your ideas and suggestions. Could you please give current code a try?

I will be testing it in my environment for some time and then push new version to PyPi if nothing scary comes out.

@smira
Copy link
Owner

smira commented May 18, 2012

The fix "works for me", so closing this pull request, as there were no comments.

I'll release new version to PyPi ASAP.

@smira smira closed this May 18, 2012
@erikkaplun
Copy link

yes, the fix has seemed to work for us so far as well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants