Permalink
Browse files

Merge pull request #267 from minrk/fullgreen

Add gevent-compatible device/eventloop

now every blocking API available in zmq.green should only block the greenlet.

closes #261
  • Loading branch information...
2 parents 9f170d2 + a6f6dd0 commit af7a8dd57540df5996ee86ac912962658a81d607 @minrk minrk committed Oct 14, 2012
@@ -131,8 +131,8 @@ starvation.
.. _zmq_green:
-gevent
-======
+PyZMQ and gevent
+================
PyZMQ ≥ 2.2.0.1 ships with a `gevent <http://www.gevent.org/>`_ compatible API as :mod:`zmq.green`.
To use it, simply:
@@ -143,8 +143,17 @@ To use it, simply:
Then write your code as normal.
-Currently, Socket.send/recv methods and zmq.Poller are gevent-aware.
-The tornado-based ZMQStream/IOLoop *are not* compatible with gevent.
+Socket.send/recv and zmq.Poller are gevent-aware.
+
+In PyZMQ ≥ 2.2.0.2, green.device and green.eventloop should be gevent-friendly as well.
+
+.. note::
+
+ The green device does *not* release the GIL, unlike the true device in zmq.core.
+
+zmq.green.eventloop includes minimally patched IOLoop/ZMQStream in order to use the gevent-enabled Poller,
+so you should be able to use the ZMQStream interface in gevent apps as well,
+though using two eventloops simultaneously (tornado + gevent) is not recommended.
.. warning::
@@ -36,3 +36,5 @@
Socket = _Socket
Poller = _Poller
+from zmq.green.device import device
+
View
@@ -123,8 +123,8 @@ def _wait_write(self):
if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
print("BUG: gevent missed a libzmq send event on %i!" % self.FD, file=sys.stderr)
finally:
- self.__writable.set()
timeout.cancel()
+ self.__writable.set()
def _wait_read(self):
assert self.__readable.ready(), "Only one greenlet can be waiting on this event"
@@ -147,8 +147,8 @@ def _wait_read(self):
if toc-tic > 0.9 and self.getsockopt(zmq.EVENTS) & zmq.POLLIN:
print("BUG: gevent missed a libzmq recv event on %i!" % self.FD, file=sys.stderr)
finally:
- self.__readable.set()
timeout.cancel()
+ self.__readable.set()
def send(self, data, flags=0, copy=True, track=False):
"""send, which will only block current greenlet
View
@@ -0,0 +1,38 @@
+#-----------------------------------------------------------------------------
+# Copyright (c) 2012 Min Ragan-Kelley
+#
+# This file is part of pyzmq
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+import zmq
+from zmq.green import Poller
+
+def device(device_type, isocket, osocket):
+ """Start a zeromq device (gevent-compatible).
+
+ Unlike the true zmq.device, this does not release the GIL.
+
+ Parameters
+ ----------
+ device_type : (QUEUE, FORWARDER, STREAMER)
+ The type of device to start (ignored).
+ isocket : Socket
+ The Socket instance for the incoming traffic.
+ osocket : Socket
+ The Socket instance for the outbound traffic.
+ """
+ p = Poller()
+ if osocket == -1:
+ osocket = isocket
+ p.register(isocket, zmq.POLLIN)
+ p.register(osocket, zmq.POLLIN)
+
+ while True:
+ events = dict(p.poll())
+ if isocket in events:
+ osocket.send_multipart(isocket.recv_multipart())
+ if osocket in events:
+ isocket.send_multipart(osocket.recv_multipart())
@@ -0,0 +1,3 @@
+from zmq.green.eventloop.ioloop import IOLoop
+
+__all__ = ['IOLoop']
@@ -0,0 +1,37 @@
+from zmq.green import Poller
+from zmq.eventloop.ioloop import *
+
+RealIOLoop = IOLoop
+RealZMQPoller = ZMQPoller
+
+class IOLoop(RealIOLoop):
+
+ def __init__(self, impl=None):
+ if impl is None:
+ impl = _poll()
+ super(IOLoop, self).__init__(impl=impl)
+
+ # these methods are copied verbatim from the real IOLoop
+ @staticmethod
+ def instance():
+ if not hasattr(IOLoop, "_instance"):
+ with IOLoop._instance_lock:
+ if not hasattr(IOLoop, "_instance"):
+ # New instance after double check
+ IOLoop._instance = IOLoop()
+ return IOLoop._instance
+
+ @staticmethod
+ def initialized():
+ return hasattr(IOLoop, "_instance")
+
+ def install(self):
+ assert not IOLoop.initialized()
+ IOLoop._instance = self
+
+class ZMQPoller(RealZMQPoller):
+ """gevent-compatible version of ioloop.ZMQPoller"""
+ def __init__(self):
+ self._poller = Poller()
+
+_poll = ZMQPoller
@@ -0,0 +1,11 @@
+from zmq.eventloop.zmqstream import *
+
+from zmq.green.eventloop.ioloop import IOLoop
+
+RealZMQStream = ZMQStream
+
+class ZMQStream(RealZMQStream):
+
+ def __init__(self, socket, io_loop=None):
+ io_loop = io_loop or IOLoop.instance()
+ super(ZMQStream, self).__init__(socket, io_loop=io_loop)
@@ -15,7 +15,7 @@
import zmq
from zmq import devices
-from zmq.tests import BaseZMQTestCase, SkipTest
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
from zmq.utils.strtypes import (bytes,unicode,basestring)
#-----------------------------------------------------------------------------
@@ -106,3 +106,24 @@ def test_single_socket_forwarder_bind(self):
del dev
req.close()
+if have_gevent:
+ import gevent
+ import zmq.green
+
+ class TestDeviceGreen(GreenTest, BaseZMQTestCase):
+
+ def test_green_device(self):
+ rep = self.context.socket(zmq.REP)
+ req = self.context.socket(zmq.REQ)
+ self.sockets.extend([req, rep])
+ port = rep.bind_to_random_port('tcp://127.0.0.1')
+ g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
+ req.connect('tcp://127.0.0.1:%i' % port)
+ req.send(b'hi')
+ timeout = gevent.Timeout(1)
+ timeout.start()
+ receiver = gevent.spawn(req.recv)
+ self.assertEqual(receiver.get(1), b'hi')
+ timeout.cancel()
+ g.kill()
+

0 comments on commit af7a8dd

Please sign in to comment.