Permalink
Browse files

set DONTWAIT on self.recv test methods, in case select is not trustwo…

…rthy

This is due to libzmq bug in 3.1 (LIBZMQ-280), and there are a couple of
other workarounds for the bug as well.
  • Loading branch information...
1 parent 2888c8c commit 2a38d3de1fa2e57acb0e6fda507e4b51f4f90b5d @minrk committed Nov 8, 2011
Showing with 28 additions and 7 deletions.
  1. +19 −7 zmq/tests/__init__.py
  2. +9 −0 zmq/tests/test_monqueue.py
View
@@ -22,6 +22,7 @@
#-----------------------------------------------------------------------------
import sys
+import time
from threading import Thread
from unittest import TestCase
@@ -110,17 +111,28 @@ def assertRaisesErrno(self, errno, func, *args, **kwargs):
else:
self.fail("Function did not raise any error")
- def recv(self, socket, *args, **kwargs):
- """call recv in a way that raises if there is nothing to receive"""
+ def _select_recv(self, multipart, socket, **kwargs):
+ """call recv[_multipart] in a way that raises if there is nothing to receive"""
+ if zmq.zmq_version_info() >= (3,1,0):
+ # zmq 3.1 has a bug, where poll can return false positives,
+ # so we wait a little bit just in case
+ # See LIBZMQ-280 on JIRA
+ time.sleep(0.1)
+
r,w,x = zmq.select([socket], [], [], timeout=5)
assert len(r) > 0, "Should have received a message"
- return socket.recv(*args, **kwargs)
+ kwargs['flags'] = zmq.DONTWAIT | kwargs.get('flags', 0)
+
+ recv = socket.recv_multipart if multipart else socket.recv
+ return recv(**kwargs)
+
+ def recv(self, socket, **kwargs):
+ """call recv in a way that raises if there is nothing to receive"""
+ return self._select_recv(False, socket, **kwargs)
- def recv_multipart(self, socket, *args, **kwargs):
+ def recv_multipart(self, socket, **kwargs):
"""call recv_multipart in a way that raises if there is nothing to receive"""
- r,w,x = zmq.select([socket], [], [], timeout=5)
- assert len(r) > 0, "Should have received a message"
- return socket.recv_multipart(*args, **kwargs)
+ return self._select_recv(True, socket, **kwargs)
class PollZMQTestCase(BaseZMQTestCase):
@@ -203,6 +203,15 @@ def test_router_router(self):
dev.bind_out('tcp://127.0.0.1:%i'%portb)
dev.start()
time.sleep(0.2)
+ if zmq.zmq_version_info() >= (3,1,0):
+ # flush erroneous poll state, due to LIBZMQ-280
+ ping_msg = [ asbytes('ping'), asbytes('pong') ]
+ for s in (a,b):
+ s.send_multipart(ping_msg)
+ try:
+ s.recv(zmq.NOBLOCK)
+ except zmq.ZMQError:
+ pass
msg = [ asbytes(m) for m in ('hello', 'there')]
a.send_multipart([asbytes('b')]+msg)
bmsg = self.recv_multipart(b)

0 comments on commit 2a38d3d

Please sign in to comment.