Skip to content

Commit

Permalink
Merge de8d9f5 into bb54986
Browse files Browse the repository at this point in the history
  • Loading branch information
waveform80 committed Sep 24, 2019
2 parents bb54986 + de8d9f5 commit 77a41da
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/reference/main_loop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,8 @@ AsyncioEventLoop
----------------

.. autoclass:: AsyncioEventLoop

ZMQEventLoop
------------

.. autoclass:: ZMQEventLoop
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ deps =
setuptools
tornado<5.0.0
coverage
py27: zmq
py34: zmq
py35: zmq
py36: zmq
py37: zmq
pypy: zmqpy
py27: twisted==16.6.0
py35: twisted
py36: twisted
Expand Down
2 changes: 1 addition & 1 deletion urwid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
CURSOR_PAGE_UP, CURSOR_PAGE_DOWN, CURSOR_MAX_LEFT, CURSOR_MAX_RIGHT,
ACTIVATE)
from urwid.main_loop import (ExitMainLoop, MainLoop, SelectEventLoop,
GLibEventLoop, TornadoEventLoop, AsyncioEventLoop)
GLibEventLoop, TornadoEventLoop, AsyncioEventLoop, ZMQEventLoop)
try:
from urwid.main_loop import TwistedEventLoop
except ImportError:
Expand Down
191 changes: 191 additions & 0 deletions urwid/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import heapq
import select
import os
import io
import signal
from functools import wraps
from itertools import count
Expand Down Expand Up @@ -1492,6 +1493,196 @@ def run(self):
reraise(*exc_info)


class ZMQEventLoop(EventLoop):
"""
This class is an urwid event loop for `ZeroMQ`_ applications. It is very
similar to :class:`SelectEventLoop`, supporting the usual :meth:`alarm`
events and file watching (:meth:`watch_file`) capabilities, but also
incorporates the ability to watch zmq queues for events
(:meth:`watch_queue`).
.. _ZeroMQ: https://zeromq.org/
"""
_alarm_break = count()

def __init__(self):
import zmq
self._did_something = True
self._alarms = []
self._poller = zmq.Poller()
self._queue_callbacks = {}
self._idle_handle = 0
self._idle_callbacks = {}

def alarm(self, seconds, callback):
"""
Call *callback* a given time from now. No parameters are passed to
callback. Returns a handle that may be passed to :meth:`remove_alarm`.
:param float seconds:
floating point time to wait before calling callback.
:param callback:
function to call from event loop.
"""
handle = (time.time() + seconds, next(self._alarm_break), callback)
heapq.heappush(self._alarms, handle)
return handle

def remove_alarm(self, handle):
"""
Remove an alarm. Returns ``True`` if the alarm exists, ``False``
otherwise.
"""
try:
self._alarms.remove(handle)
heapq.heapify(self._alarms)
return True
except ValueError:
return False

def watch_queue(self, queue, callback, flags=1): # flags=zmq.POLLIN
"""
Call *callback* when zmq *queue* has something to read (when *flags* is
set to ``POLLIN``, the default) or is available to write (when *flags*
is set to ``POLLOUT``). No parameters are passed to the callback.
:param queue:
The zmq queue to poll.
:param callback:
The function to call when the poll is successful.
:param int flags:
The condition to monitor on the queue (defaults to ``POLLIN``).
"""
if queue in self._queue_callbacks:
raise ValueError('already watching %r' % queue)
self._poller.register(queue, flags)
self._queue_callbacks[queue] = callback
return queue

def watch_file(self, fd, callback, flags=1): # flags=zmq.POLLIN
"""
Call *callback* when *fd* has some data to read. No parameters are
passed to the callback. The *flags* are as for :meth:`watch_queue`.
:param fd:
The file-like object, or fileno to monitor.
:param callback:
The function to call when the file has data available.
:param int flags:
The condition to monitor on the file (defaults to ``POLLIN``).
"""
if isinstance(fd, int):
# XXX Ideally, in Python 3 this would be a call to os.fdopen but we
# use io.open to support Python 2 easily
fd = io.open(fd, {1: 'rb', 2: 'wb', 3: 'r+b'}[flags], closefd=False)
self._poller.register(fd, flags)
self._queue_callbacks[fd.fileno()] = callback
return fd

def remove_watch_queue(self, handle):
"""
Remove a queue from background polling. Returns ``True`` if the queue
was being monitored, ``False`` otherwise.
"""
try:
try:
self._poller.unregister(handle)
finally:
self._queue_callbacks.pop(handle, None)
return True
except KeyError:
return False

def remove_watch_file(self, handle):
"""
Remove a file from background polling. Returns ``True`` if the file was
being monitored, ``False`` otherwise.
"""
try:
try:
self._poller.unregister(handle)
finally:
self._queue_callbacks.pop(handle.fileno(), None)
return True
except KeyError:
return False

def enter_idle(self, callback):
"""
Add a *callback* to be executed when the event loop detects it is idle.
Returns a handle that may be passed to :meth:`remove_enter_idle`.
"""
self._idle_handle += 1
self._idle_callbacks[self._idle_handle] = callback
return self._idle_handle

def remove_enter_idle(self, handle):
"""
Remove an idle callback. Returns ``True`` if *handle* was removed,
``False`` otherwise.
"""
try:
del self._idle_callbacks[handle]
return True
except KeyError:
return False

def _entering_idle(self):
for callback in list(self._idle_callbacks.values()):
callback()

def run(self):
"""
Start the event loop. Exit the loop when any callback raises an
exception. If :exc:`ExitMainLoop` is raised, exit cleanly.
"""
import zmq
try:
while True:
try:
self._loop()
except zmq.error.ZMQError as exc:
if exc.errno != errno.EINTR:
raise
except ExitMainLoop:
pass

def _loop(self):
"""
A single iteration of the event loop.
"""
if self._alarms or self._did_something:
if self._alarms:
state = 'alarm'
timeout = max(0, self._alarms[0][0] - time.time())
if self._did_something and (not self._alarms or
(self._alarms and timeout > 0)):
state = 'idle'
timeout = 0
ready = dict(self._poller.poll(timeout))
else:
state = 'wait'
ready = dict(self._poller.poll())

if not ready:
if state == 'idle':
self._entering_idle()
self._did_something = False
elif state == 'alarm':
due, tie_break, callback = heapq.heappop(self._alarms)
callback()
self._did_something = True

for queue, _ in ready.items():
self._queue_callbacks[queue]()
self._did_something = True


def _refl(name, rval=None, exit=False):
"""
This function is used to test the main loop classes.
Expand Down
10 changes: 10 additions & 0 deletions urwid/tests/test_event_loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,13 @@ def error_coro():

asyncio.ensure_future(error_coro())
self.assertRaises(ZeroDivisionError, evl.run)


try:
import zmq
except ImportError:
pass
else:
class ZMQEventLoopTest(unittest.TestCase, EventLoopTestMixin):
def setUp(self):
self.evl = urwid.ZMQEventLoop()

0 comments on commit 77a41da

Please sign in to comment.