Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add support for pyzmq's Poller class.

  • Loading branch information...
commit 2c513d84ac31a1cde78283cfb57a7c79d2a86ea2 1 parent 808c9ba
@flowblok flowblok authored
Showing with 151 additions and 2 deletions.
  1. +151 −2 zmq/_zmq.py
View
153 zmq/_zmq.py
@@ -79,7 +79,7 @@ def __str__(self):
return self.strerror
def _check_nonzero(result, func, arguments):
- if result != 0:
+ if result == -1:
raise ZMQError(get_errno())
return result
@@ -176,6 +176,17 @@ class zmq_msg_t(Structure):
libzmq.zmq_recv.restype = c_int
libzmq.zmq_recv.argtypes = [c_void_p, POINTER(zmq_msg_t), c_int]
+class zmq_pollitem_t(Structure):
+ _fields_ = [
+ ('socket', c_void_p),
+ ('fd', c_int),
+ ('events', c_short),
+ ('revents', c_short)
+ ]
+
+libzmq.zmq_poll.restype = c_int
+libzmq.zmq_poll.argtypes = [POINTER(zmq_pollitem_t), c_int, c_long]
+
def _default_errcheck():
for symbol in dir(libzmq):
if symbol.startswith('zmq_'):
@@ -197,7 +208,7 @@ def _shortcuts():
_shortcuts()
# Higher-level interface. Partially copied from pyzmq.
-
+
class Context(object):
def __init__(self, io_threads=1):
"""The io_threads argument specifies the size of the ØMQ thread pool to
@@ -494,5 +505,143 @@ def recv(self, flags=0, copy=True, track=False):
finally:
zmq_msg_close(byref(msg))
+def _poll(sockets, timeout=-1):
+ """_poll(sockets, timeout=-1)
+
+ Poll a set of 0MQ sockets, native file descs. or sockets.
+
+ Parameters
+ ----------
+ sockets : list of tuples of (socket, flags)
+ Each element of this list is a two-tuple containing a socket
+ and a flags. The socket may be a 0MQ socket or any object with
+ a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting
+ for incoming messages), zmq.POLLOUT (for detecting that send is OK)
+ or zmq.POLLIN|zmq.POLLOUT for detecting both.
+ timeout : int
+ The number of milliseconds to poll for. Negative means no timeout.
+ """
+ if major < 3:
+ # timeout is us in 2.x, ms in 3.x
+ # expected input is ms (matches 3.x)
+ timeout = 1000 * timeout
+
+ n_sockets = len(sockets)
+
+ array_type = zmq_pollitem_t * n_sockets
+ pollitems = array_type()
+
+ for i, (s, events) in enumerate(sockets):
+ if isinstance(s, Socket):
+ pollitems[i].socket = s.handle
+ pollitems[i].events = events
+ pollitems[i].revents = 0
+ elif isinstance(s, int_t):
+ pollitems[i].socket = NULL
+ pollitems[i].fd = s
+ pollitems[i].events = events
+ pollitems[i].revents = 0
+ elif hasattr(s, 'fileno'):
+ try:
+ fileno = int(s.fileno())
+ except:
+ raise ValueError('fileno() must return an valid integer fd')
+ else:
+ pollitems[i].socket = NULL
+ pollitems[i].fd = fileno
+ pollitems[i].events = events
+ pollitems[i].revents = 0
+ else:
+ raise TypeError(
+ "Socket must be a 0MQ socket, an integer fd or have "
+ "a fileno() method: %r" % s
+ )
+
+ zmq_poll(pollitems, n_sockets, timeout)
+
+ results = []
+ for i, (s, _) in enumerate(sockets):
+ # Return the fd for sockets, for compat. with select.poll.
+ if hasattr(s, 'fileno'):
+ s = s.fileno()
+ revents = pollitems[i].revents
+ # Only return sockets with non-zero status for compat. with select.poll.
+ if revents > 0:
+ results.append((s, revents))
+
+ return results
+
+class Poller(object):
+ """Poller()
+
+ A stateful poll interface that mirrors Python's built-in poll.
+ """
+
+ def __init__(self):
+ self.sockets = {}
+
+ def register(self, socket, flags=POLLIN|POLLOUT):
+ """p.register(socket, flags=POLLIN|POLLOUT)
+
+ Register a 0MQ socket or native fd for I/O monitoring.
+
+ register(s,0) is equivalent to unregister(s).
+
+ Parameters
+ ----------
+ socket : zmq.Socket or native socket
+ A zmq.Socket or any Python object having a ``fileno()``
+ method that returns a valid file descriptor.
+ flags : int
+ The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
+ If `flags=0`, socket will be unregistered.
+ """
+ if flags:
+ self.sockets[socket] = flags
+ elif socket in self.sockets:
+ # uregister sockets registered with no events
+ self.unregister(socket)
+ else:
+ # ignore new sockets with no events
+ pass
+
+ def modify(self, socket, flags=POLLIN|POLLOUT):
+ """p.modify(socket, flags=POLLIN|POLLOUT)
+
+ Modify the flags for an already registered 0MQ socket or native fd.
+ """
+ self.register(socket, flags)
+
+ def unregister(self, socket):
+ """p.unregister(socket)
+
+ Remove a 0MQ socket or native fd for I/O monitoring.
+
+ Parameters
+ ----------
+ socket : Socket
+ The socket instance to stop polling.
+ """
+ del self.sockets[socket]
+
+ def poll(self, timeout=None):
+ """p.poll(timeout=None)
+
+ Poll the registered 0MQ or native fds for I/O.
+
+ Parameters
+ ----------
+ timeout : float, int
+ The timeout in milliseconds. If None, no `timeout` (infinite). This
+ is in milliseconds to be compatible with ``select.poll()``. The
+ underlying zmq_poll uses microseconds and we convert to that in
+ this function.
+ """
+ if timeout is None:
+ timeout = -1
+ timeout = int(timeout)
+ if timeout < 0:
+ timeout = -1
+ return _poll(list(self.sockets.items()), timeout=timeout)
Please sign in to comment.
Something went wrong with that request. Please try again.