Skip to content

Commit

Permalink
remove special-cased ZMQIOLoop and friends
Browse files Browse the repository at this point in the history
leave deprecations in their place
  • Loading branch information
minrk committed Jun 1, 2017
1 parent 850f23c commit f24a264
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 183 deletions.
163 changes: 18 additions & 145 deletions zmq/eventloop/ioloop.py
Expand Up @@ -14,15 +14,9 @@

from __future__ import absolute_import, division, with_statement

import os
import time
import warnings

from zmq import (
Poller,
POLLIN, POLLOUT, POLLERR,
ZMQError, ETERM,
)

try:
import tornado
Expand All @@ -31,11 +25,15 @@
tornado_version = ()

try:
# tornado ≥ 3
from tornado.ioloop import PollIOLoop, PeriodicCallback
# tornado ≥ 4
from tornado import ioloop
if not hasattr(ioloop.IOLoop, 'configurable_default'):
raise ImportError("Tornado too old")
from tornado.ioloop import PeriodicCallback
from tornado.log import gen_log
except ImportError:
from .minitornado.ioloop import PollIOLoop, PeriodicCallback
from .minitornado import ioloop
from .minitornado.ioloop import PeriodicCallback
from .minitornado.log import gen_log


Expand Down Expand Up @@ -71,149 +69,24 @@ def _run(self):
gen_log.error("Error in delayed callback", exc_info=True)


class ZMQPoller(object):
"""A poller that can be used in the tornado IOLoop.
This simply wraps a regular zmq.Poller, scaling the timeout
by 1000, so that it is in seconds rather than milliseconds.
"""

def __init__(self):
self._poller = Poller()

@staticmethod
def _map_events(events):
"""translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
z_events = 0
if events & IOLoop.READ:
z_events |= POLLIN
if events & IOLoop.WRITE:
z_events |= POLLOUT
if events & IOLoop.ERROR:
z_events |= POLLERR
return z_events

@staticmethod
def _remap_events(z_events):
"""translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
events = 0
if z_events & POLLIN:
events |= IOLoop.READ
if z_events & POLLOUT:
events |= IOLoop.WRITE
if z_events & POLLERR:
events |= IOLoop.ERROR
return events

def register(self, fd, events):
return self._poller.register(fd, self._map_events(events))

def modify(self, fd, events):
return self._poller.modify(fd, self._map_events(events))

def unregister(self, fd):
return self._poller.unregister(fd)

def poll(self, timeout):
"""poll in seconds rather than milliseconds.
Event masks will be IOLoop.READ/WRITE/ERROR
"""
z_events = self._poller.poll(1000*timeout)
return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]

def close(self):
pass
class ZMQIOLoop(ioloop.IOLoop.configurable_default()):
"""DEPRECATED: No longer needed as of pyzmq-17"""


class ZMQIOLoop(PollIOLoop):
"""ZMQ subclass of tornado's IOLoop
Minor modifications, so that .current/.instance return self
"""

_zmq_impl = ZMQPoller

def initialize(self, impl=None, **kwargs):
impl = self._zmq_impl() if impl is None else impl
super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)

@classmethod
def instance(cls, *args, **kwargs):
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. To get the current thread's `IOLoop`, use `current()`.
"""
# install ZMQIOLoop as the active IOLoop implementation
# when using tornado 3
if tornado_version >= (3,):
PollIOLoop.configure(cls)
loop = PollIOLoop.instance(*args, **kwargs)
if not isinstance(loop, cls):
warnings.warn("IOLoop.current expected instance of %r, got %r" % (cls, loop),
RuntimeWarning, stacklevel=2,
)
return loop

@classmethod
def current(cls, *args, **kwargs):
"""Returns the current thread’s IOLoop.
"""
# install ZMQIOLoop as the active IOLoop implementation
# when using tornado 3
if tornado_version >= (3,):
PollIOLoop.configure(cls)
loop = PollIOLoop.current(*args, **kwargs)
if not isinstance(loop, cls):
warnings.warn("IOLoop.current expected instance of %r, got %r" % (cls, loop),
RuntimeWarning, stacklevel=2,
)
return loop

def start(self):
try:
super(ZMQIOLoop, self).start()
except ZMQError as e:
if e.errno == ETERM:
# quietly return on ETERM
pass
else:
raise


if (3, 0) <= tornado_version < (3, 1):
def backport_close(self, all_fds=False):
"""backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
return mini_loop.close.__get__(self)(all_fds)
ZMQIOLoop.close = backport_close
def __init__(self, *args, **kwargs):
warnings.warn("ZMQLoop is deprecated in pyzmq 17. No special eventloop integration is needed.", DeprecationWarning)
return super(ZMQIOLoop, self).__init__(*args, **kwargs)


# public API name
IOLoop = ZMQIOLoop


def install():
"""set the tornado IOLoop instance with the pyzmq IOLoop.
After calling this function, tornado's IOLoop.instance() and pyzmq's
IOLoop.instance() will return the same object.
"""DEPRECATED
An assertion error will be raised if tornado's IOLoop has been initialized
prior to calling this function.
pyzmq 17 no longer needs any special integration for tornado.
"""
from tornado import ioloop
# check if tornado's IOLoop is already initialized to something other
# than the pyzmq IOLoop instance:
assert (not ioloop.IOLoop.initialized()) or \
ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"

if tornado_version >= (3,):
# tornado 3 has an official API for registering new defaults, yay!
ioloop.IOLoop.configure(ZMQIOLoop)
else:
# we have to set the global instance explicitly
ioloop.IOLoop._instance = IOLoop.instance()

import warnings
warnings.warn("zmq.eventloop.install is deprecated in pyzmq-17.0. "
"No special integration is needed.", DeprecationWarning)
return
14 changes: 0 additions & 14 deletions zmq/green/eventloop/ioloop.py
@@ -1,15 +1 @@
from zmq.eventloop.ioloop import *
from zmq.green import Poller

RealIOLoop = IOLoop
RealZMQPoller = ZMQPoller

class ZMQPoller(RealZMQPoller):
"""gevent-compatible version of ioloop.ZMQPoller"""
def __init__(self):
self._poller = Poller()

class IOLoop(RealIOLoop):
"""gevent-and-zmq-aware tornado IOLoop implementation"""
_zmq_impl = ZMQPoller

26 changes: 2 additions & 24 deletions zmq/tests/test_ioloop.py
Expand Up @@ -66,39 +66,17 @@ def test_simple(self):
else:
self.fail("IOLoop failed to exit")

def test_poller_events(self):
"""Tornado poller implementation maps events correctly"""
req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
poller = ioloop.ZMQPoller()
poller.register(req, ioloop.IOLoop.READ)
poller.register(rep, ioloop.IOLoop.READ)
events = dict(poller.poll(0))
self.assertEqual(events.get(rep), None)
self.assertEqual(events.get(req), None)

poller.register(req, ioloop.IOLoop.WRITE)
poller.register(rep, ioloop.IOLoop.WRITE)
events = dict(poller.poll(1))
self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
self.assertEqual(events.get(rep), None)

poller.register(rep, ioloop.IOLoop.READ)
req.send(b'hi')
events = dict(poller.poll(1))
self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
self.assertEqual(events.get(req), None)

def test_instance(self):
"""Green IOLoop.instance returns the right object"""
loop = ioloop.IOLoop.instance()
assert isinstance(loop, ioloop.IOLoop)
assert isinstance(loop, BaseIOLoop)
base_loop = BaseIOLoop.instance()
assert base_loop is loop

def test_current(self):
"""Green IOLoop.current returns the right object"""
loop = ioloop.IOLoop.current()
assert isinstance(loop, ioloop.IOLoop)
assert isinstance(loop, BaseIOLoop)
base_loop = BaseIOLoop.current()
assert base_loop is loop

Expand Down

0 comments on commit f24a264

Please sign in to comment.