Skip to content

Commit

Permalink
Merge pull request #2 from flowblok/master
Browse files Browse the repository at this point in the history
Support for the Poller class
  • Loading branch information
svpcom committed Oct 7, 2011
2 parents 808c9ba + ac49f28 commit 820c938
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.rst
Expand Up @@ -36,3 +36,4 @@ The following people have contributed to the project:
* Scott Sadler (github AT mashi DOT org)
* spez (steve AT hipmunk DOT com)
* Thomas Kluyver (takowl AT gmail DOT com)
* Peter Ward (peteraward AT gmail DOT com)
153 changes: 151 additions & 2 deletions zmq/_zmq.py
Expand Up @@ -79,7 +79,7 @@ def __str__(self):
return self.strerror

def _check_nonzero(result, func, arguments):
if result != 0:
if result == -1:
raise ZMQError(get_errno())
return result

Expand Down Expand Up @@ -176,6 +176,17 @@ class zmq_msg_t(Structure):
libzmq.zmq_recv.restype = c_int
libzmq.zmq_recv.argtypes = [c_void_p, POINTER(zmq_msg_t), c_int]

class zmq_pollitem_t(Structure):
_fields_ = [
('socket', c_void_p),
('fd', c_int),
('events', c_short),
('revents', c_short)
]

libzmq.zmq_poll.restype = c_int
libzmq.zmq_poll.argtypes = [POINTER(zmq_pollitem_t), c_int, c_long]

def _default_errcheck():
for symbol in dir(libzmq):
if symbol.startswith('zmq_'):
Expand All @@ -197,7 +208,7 @@ def _shortcuts():
_shortcuts()

# Higher-level interface. Partially copied from pyzmq.

class Context(object):
def __init__(self, io_threads=1):
"""The io_threads argument specifies the size of the ØMQ thread pool to
Expand Down Expand Up @@ -494,5 +505,143 @@ def recv(self, flags=0, copy=True, track=False):
finally:
zmq_msg_close(byref(msg))

def _poll(sockets, timeout=-1):
"""_poll(sockets, timeout=-1)
Poll a set of 0MQ sockets, native file descs. or sockets.
Parameters
----------
sockets : list of tuples of (socket, flags)
Each element of this list is a two-tuple containing a socket
and a flags. The socket may be a 0MQ socket or any object with
a ``fileno()`` method. The flags can be zmq.POLLIN (for detecting
for incoming messages), zmq.POLLOUT (for detecting that send is OK)
or zmq.POLLIN|zmq.POLLOUT for detecting both.
timeout : int
The number of milliseconds to poll for. Negative means no timeout.
"""
if major < 3:
# timeout is us in 2.x, ms in 3.x
# expected input is ms (matches 3.x)
timeout = 1000 * timeout

n_sockets = len(sockets)

array_type = zmq_pollitem_t * n_sockets
pollitems = array_type()

for i, (s, events) in enumerate(sockets):
if isinstance(s, Socket):
pollitems[i].socket = s.handle
pollitems[i].events = events
pollitems[i].revents = 0
elif isinstance(s, int_t):
pollitems[i].socket = NULL
pollitems[i].fd = s
pollitems[i].events = events
pollitems[i].revents = 0
elif hasattr(s, 'fileno'):
try:
fileno = int(s.fileno())
except:
raise ValueError('fileno() must return an valid integer fd')
else:
pollitems[i].socket = NULL
pollitems[i].fd = fileno
pollitems[i].events = events
pollitems[i].revents = 0
else:
raise TypeError(
"Socket must be a 0MQ socket, an integer fd or have "
"a fileno() method: %r" % s
)

zmq_poll(pollitems, n_sockets, timeout)

results = []
for i, (s, _) in enumerate(sockets):
# Return the fd for sockets, for compat. with select.poll.
if hasattr(s, 'fileno'):
s = s.fileno()
revents = pollitems[i].revents
# Only return sockets with non-zero status for compat. with select.poll.
if revents > 0:
results.append((s, revents))

return results

class Poller(object):
"""Poller()
A stateful poll interface that mirrors Python's built-in poll.
"""

def __init__(self):
self.sockets = {}

def register(self, socket, flags=POLLIN|POLLOUT):
"""p.register(socket, flags=POLLIN|POLLOUT)
Register a 0MQ socket or native fd for I/O monitoring.
register(s,0) is equivalent to unregister(s).
Parameters
----------
socket : zmq.Socket or native socket
A zmq.Socket or any Python object having a ``fileno()``
method that returns a valid file descriptor.
flags : int
The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT.
If `flags=0`, socket will be unregistered.
"""
if flags:
self.sockets[socket] = flags
elif socket in self.sockets:
# uregister sockets registered with no events
self.unregister(socket)
else:
# ignore new sockets with no events
pass

def modify(self, socket, flags=POLLIN|POLLOUT):
"""p.modify(socket, flags=POLLIN|POLLOUT)
Modify the flags for an already registered 0MQ socket or native fd.
"""
self.register(socket, flags)

def unregister(self, socket):
"""p.unregister(socket)
Remove a 0MQ socket or native fd for I/O monitoring.
Parameters
----------
socket : Socket
The socket instance to stop polling.
"""
del self.sockets[socket]

def poll(self, timeout=None):
"""p.poll(timeout=None)
Poll the registered 0MQ or native fds for I/O.
Parameters
----------
timeout : float, int
The timeout in milliseconds. If None, no `timeout` (infinite). This
is in milliseconds to be compatible with ``select.poll()``. The
underlying zmq_poll uses microseconds and we convert to that in
this function.
"""
if timeout is None:
timeout = -1

timeout = int(timeout)
if timeout < 0:
timeout = -1
return _poll(list(self.sockets.items()), timeout=timeout)

0 comments on commit 820c938

Please sign in to comment.