Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Bring in gevent_zeromq as zmq.green #190

Merged
merged 9 commits into from

6 participants

@minrk
Owner

@traviscline's great gevent-enabling Socket subclass is quite small, and it would be useful to have it in pyzmq itself. This makes it available as zmq.green, and makes a few small tweaks to bring it up to speed with recent changes to a couple of methods. A small tweak is made to zmq.Context that improves the ability to subclass it to make different Socket without having to override the socket-creation method itself, which has some more logic that one shouldn't need to duplicate.

Relevant tests have been subclassed, so that they will be run as well if gevent is available. This brings tests run with gevent to 142 from 101.

Now, to use zmq with gevent, simply:

import zmq.green as zmq

and off you go.

@traviscline - if zmq.green is not a good choice of name, I'm happy to change it at your recommendation. I'm not familiar enough with that ecosystem to know what's best.

@ellisonbg
Owner

I had a quick look through everything and this looks great. I think we need to get @traviscline to review in more depth though.

minrk added some commits
@minrk minrk drop in gevent_zeromq as zmq.green
gevent_zeromq authors added to authors list
7e565c7
@minrk minrk match close(linger=None) signature in Green Socket 3365f64
@minrk minrk tweak green import
use `import zmq.green as zmq` instead of `from zmq.green import zmq`
dd15a37
@minrk minrk explicit sections for authors not in git log 826d924
@minrk minrk compile cython zmq.green 1f32b46
@minrk minrk make _socket_class a property, for easier subclasses of Context
Only this property/attr needs to be changed for a Context subclass to create different class of Socket.

It *should* be a class attr, but that doesn't work well with cdef classes (yet).
83f5847
@minrk minrk add green versions of various tests 3704409
@minrk minrk remove no-longer-needed monkey_patch functions from zmq.green 37acec2
@minrk minrk add copyright/license notice to zmq.green files 4c88b0a
@minrk minrk referenced this pull request
Merged

Drop Python 2.5 hacks #191

@whitmo

w00t!

@minrk
Owner

And if need be, here is a direct diff of the changes made to gevent_zeromq when bringing it in.

@tmc
Collaborator

zmq.green is fine as a name and this changeset looks great on initial pass. There's some administriva on my end but full steam ahead! I have a few open issues that should be addressed within a few days that would end up impacting this, FWIW.

@pigmej

@minrk

Does it help with the problem that we discussed like 2 days ago on #irc ?

@minrk
Owner

@pigmej you mean the lack of gevent-friendly device? No, this doesn't add anything to the gevent subclasses, it just brings them as they were into pyzmq itself with a couple minor tweaks for compatibility.

@traviscline Do you want to wait on this until you make the changes you mention, or just apply them here later? Also, do you want to continue to maintain gevent_zeromq as the canonical version, or do further development here, in pyzmq?

@pigmej

the gevent-friendly device is 'not a big problem', because it can be written inside library in Python (as you mention). I mean the 'bugs' that I hit. I tested normal Threaded mode, and it works fine with enormous thread number.

Turning to gevent-zeromq it will crash in like 300-350 messages sent/received, with miscelanous errors.

@tmc
Collaborator

How about I get some of the gevent-zeromq issues closed over the weekend and we can move the work in then. I'm fine with continued development under pyzmq and can make the necessary changes/announcements when that occurs.

@minrk
Owner

No, this should fix no bugs in gevent_zeromq. It's the same code, dropped into the base project.

@minrk
Owner

@traviscline sounds good, just let me know when and I can update this with changes you make over there.

@minrk
Owner

@traviscline how's progress? If it makes more sense, I don't see any problem with merging this as-is, and then taking any fixes you make in gevent_zeromq itself and re-applying them here.

@tmc
Collaborator
tmc commented

@minrk Been unexpectedly swamped in the last few weeks so I haven't closed those issues. I'm fine with merging prior to that.

@minrk
Owner

Okay, I'll go ahead and merge then. Just let me know (or submit a PR) when you make changes that we should pull from upstream.

@minrk minrk merged commit 7ab2050 into from
@minrk
Owner

It's coming up on time to release pyzmq-2.2.0. @traviscline, what is your thinking on the bugs you wanted to fix?

Options:

  1. Wait for your fixes before release
  2. Release with zmq.green as-is
  3. Release without zmq.green, and bring it back into master, giving you some more time to work things out.
@tmc
Collaborator

Made some headway on issues but my free time has been close to nonexistent, I'll optimistically have things sorted this week so up to you wrt the options here. Getting it incorporated is a goal but if it'll just generate some issues first thing that's not ideal. I think re-merging post 2.2.0 and getting it out in a point release probably makes the most sense.

@minrk
Owner

I'm also fine marking zmq.green as a 'tech preview' in 2.2.0, so people are aware that it's still a work in progress. How serious are the issues you are working on?

How likely do you think it is that you will have something next week?

@CrackerJackMack CrackerJackMack referenced this pull request in dotcloud/zerorpc-python
Open

Use zmq.green instead of gevent_zeromq #50

@maxekman

I just noticed an issue in the ZeroRPC repo. It would be good if ZeroRPC could use zmq.green instead of their custom solution. But at the moment the ZeroRPC author says there are a couple of differences that still makes the custom solution work better; dotcloud/zerorpc-python#50.

It would be great if anyone with more insight in zmq/ZeroRPC than me could take a look at the differences and maybe have zmq.green be usable in ZeroRPC, as that custom solution shouldn't be necessary.

@minrk
Owner

That's a pretty old Issue, I don't think the gevent bug workaround listed there is a differentiator anymore. I would certainly love for zeroprc to apply any improvements / patches to zmq.green, but they aren't obliged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 22, 2012
  1. @minrk

    drop in gevent_zeromq as zmq.green

    minrk authored
    gevent_zeromq authors added to authors list
  2. @minrk
  3. @minrk

    tweak green import

    minrk authored
    use `import zmq.green as zmq` instead of `from zmq.green import zmq`
  4. @minrk
  5. @minrk

    compile cython zmq.green

    minrk authored
  6. @minrk

    make _socket_class a property, for easier subclasses of Context

    minrk authored
    Only this property/attr needs to be changed for a Context subclass to create different class of Socket.
    
    It *should* be a class attr, but that doesn't work well with cdef classes (yet).
  7. @minrk
  8. @minrk
  9. @minrk
This page is out of date. Refresh to see the latest.
View
19 README.rst
@@ -236,9 +236,6 @@ is the primary developer of pyzmq at this time.
The following people have contributed to the project:
-* Eugene Chernyshov (chernyshov DOT eugene AT gmail DOT com)
-* Douglas Creager (dcreager AT dcreager DOT net)
-* Craig Austin (craig DOT austin AT gmail DOT com)
* Andrew Gwozdziewycz (git AT apgwoz DOT com)
* Baptiste Lepilleur (baptiste DOT lepilleur AT gmail DOT com)
@@ -271,3 +268,19 @@ as reported by::
with some adjustments.
+Not in git log
+--------------
+
+* Eugene Chernyshov (chernyshov DOT eugene AT gmail DOT com)
+* Douglas Creager (dcreager AT dcreager DOT net)
+* Craig Austin (craig DOT austin AT gmail DOT com)
+
+
+gevent_zeromq, now zmq.green
+----------------------------
+
+* Travis Cline (travis DOT cline AT gmail DOT com)
+* Ryan Kelly (ryan AT rfk DOT id DOT au)
+* Zachary Voase (z AT zacharyvoase DOT com)
+
+
View
47 examples/gevent/reqrep.py
@@ -0,0 +1,47 @@
+"""
+Complex example which is a combination of the rr* examples from the zguide.
+"""
+from gevent import spawn
+import zmq.green as zmq
+
+# server
+context = zmq.Context()
+socket = context.socket(zmq.REP)
+socket.connect("tcp://localhost:5560")
+
+def serve(socket):
+ while True:
+ message = socket.recv()
+ print "Received request: ", message
+ socket.send("World")
+server = spawn(serve, socket)
+
+
+# client
+context = zmq.Context()
+socket = context.socket(zmq.REQ)
+socket.connect("tcp://localhost:5559")
+
+# Do 10 requests, waiting each time for a response
+def client():
+ for request in range(1,10):
+ socket.send("Hello")
+ message = socket.recv()
+ print "Received reply ", request, "[", message, "]"
+
+
+# broker
+frontend = context.socket(zmq.XREP)
+backend = context.socket(zmq.XREQ);
+frontend.bind("tcp://*:5559")
+backend.bind("tcp://*:5560")
+
+def proxy(socket_from, socket_to):
+ while True:
+ m = socket_from.recv_multipart()
+ socket_to.send_multipart(m)
+
+a = spawn(proxy, frontend, backend)
+b = spawn(proxy, backend, frontend)
+
+spawn(client).join()
View
37 examples/gevent/simple.py
@@ -0,0 +1,37 @@
+from gevent import spawn, spawn_later
+import zmq.green as zmq
+
+# server
+print zmq.Context
+ctx = zmq.Context()
+sock = ctx.socket(zmq.PUSH)
+sock.bind('ipc:///tmp/zmqtest')
+
+spawn(sock.send_pyobj, ('this', 'is', 'a', 'python', 'tuple'))
+spawn_later(1, sock.send_pyobj, {'hi': 1234})
+spawn_later(2, sock.send_pyobj, ({'this': ['is a more complicated object', ':)']}, 42, 42, 42))
+spawn_later(3, sock.send_pyobj, 'foobar')
+spawn_later(4, sock.send_pyobj, 'quit')
+
+
+# client
+ctx = zmq.Context() # create a new context to kick the wheels
+sock = ctx.socket(zmq.PULL)
+sock.connect('ipc:///tmp/zmqtest')
+
+def get_objs(sock):
+ while True:
+ o = sock.recv_pyobj()
+ print 'received python object:', o
+ if o == 'quit':
+ print 'exiting.'
+ break
+
+def print_every(s, t=None):
+ print s
+ if t:
+ spawn_later(t, print_every, s, t)
+
+print_every('printing every half second', 0.5)
+spawn(get_objs, sock).join()
+
View
3  setup.py
@@ -538,6 +538,9 @@ def dotc(subdir, name):
utils = {
'initthreads':[libzmq],
'rebuffer':[buffers],
+ },
+ green = {
+ 'core' : [libzmq, context, socket],
}
)
View
10 zmq/core/context.pyx
@@ -186,6 +186,12 @@ cdef class Context:
self._sockets[0] = self._sockets[self.n_sockets]
self.term()
+ @property
+ def _socket_class(self):
+ # import here to prevent circular import
+ from zmq.core.socket import Socket
+ return Socket
+
def socket(self, int socket_type):
"""ctx.socket(socket_type)
@@ -197,11 +203,9 @@ cdef class Context:
The socket type, which can be any of the 0MQ socket types:
REQ, REP, PUB, SUB, PAIR, DEALER, ROUTER, PULL, PUSH, XSUB, XPUB.
"""
- # import here to prevent circular import
- from zmq.core.socket import Socket
if self.closed:
raise ZMQError(ENOTSUP)
- s = Socket(self, socket_type)
+ s = self._socket_class(self, socket_type)
for opt, value in self.sockopts.iteritems():
try:
s.setsockopt(opt, value)
View
36 zmq/green/__init__.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#-----------------------------------------------------------------------------
+# Copyright (c) 2011-2012 Travis Cline
+#
+# This file is part of pyzmq
+# It is adapted from upstream project zeromq_gevent under the New BSD License
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+"""zmq.green - gevent compatibility with zeromq.
+
+Usage
+-----
+
+Instead of importing zmq directly, do so in the following manner:
+
+..
+
+ import zmq.green as zmq
+
+
+Any calls that would have blocked the current thread will now only block the
+current green thread.
+
+This compatibility is accomplished by ensuring the nonblocking flag is set
+before any blocking operation and the ØMQ file descriptor is polled internally
+to trigger needed events.
+"""
+
+from zmq import *
+from zmq.green.core import _Context, _Socket
+Context = _Context
+Socket = _Socket
+
View
128 zmq/green/core.py
@@ -0,0 +1,128 @@
+#-----------------------------------------------------------------------------
+# Copyright (c) 2011-2012 Travis Cline
+#
+# This file is part of pyzmq
+# It is adapted from upstream project zeromq_gevent under the New BSD License
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
+"""
+
+import zmq
+from zmq import *
+
+# imported with different names as to not have the star import try to to clobber (when building with cython)
+from zmq.core.context import Context as _original_Context
+from zmq.core.socket import Socket as _original_Socket
+
+from gevent.event import AsyncResult
+from gevent.hub import get_hub
+
+
+class _Context(_original_Context):
+ """Replacement for :class:`zmq.core.context.Context`
+
+ Ensures that the greened Socket below is used in calls to `socket`.
+ """
+ _socket_class = _Socket
+
+
+class _Socket(_original_Socket):
+ """Green version of :class:`zmq.core.socket.Socket`
+
+ The following methods are overridden:
+
+ * send
+ * recv
+
+ To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
+ is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
+
+ The `__state_changed` method is triggered when the zmq.FD for the socket is
+ marked as readable and triggers the necessary read and write events (which
+ are waited for in the recv and send methods).
+
+ Some double underscore prefixes are used to minimize pollution of
+ :class:`zmq.core.socket.Socket`'s namespace.
+ """
+
+ def __init__(self, context, socket_type):
+ self.__setup_events()
+
+ def close(self, linger=None):
+ # close the _state_event event, keeps the number of active file descriptors down
+ if not self._closed and getattr(self, '_state_event', None):
+ try:
+ self._state_event.stop()
+ except AttributeError, e:
+ # gevent<1.0 compat
+ self._state_event.cancel()
+ super(_Socket, self).close(linger)
+
+ def __setup_events(self):
+ self.__readable = AsyncResult()
+ self.__writable = AsyncResult()
+ try:
+ self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
+ self._state_event.start(self.__state_changed)
+ except AttributeError:
+ # for gevent<1.0 compatibility
+ from gevent.core import read_event
+ self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True)
+
+ def __state_changed(self, event=None, _evtype=None):
+ try:
+ if self.closed:
+ # if the socket has entered a close state resume any waiting greenlets
+ self.__writable.set()
+ self.__readable.set()
+ return
+ events = self.getsockopt(zmq.EVENTS)
+ except ZMQError, exc:
+ self.__writable.set_exception(exc)
+ self.__readable.set_exception(exc)
+ else:
+ if events & zmq.POLLOUT:
+ self.__writable.set()
+ if events & zmq.POLLIN:
+ self.__readable.set()
+
+ def _wait_write(self):
+ self.__writable = AsyncResult()
+ self.__writable.get()
+
+ def _wait_read(self):
+ self.__readable = AsyncResult()
+ self.__readable.get()
+
+ def send(self, data, flags=0, copy=True, track=False):
+ # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
+ if flags & zmq.NOBLOCK:
+ return super(_Socket, self).send(data, flags, copy, track)
+ # ensure the zmq.NOBLOCK flag is part of flags
+ flags |= zmq.NOBLOCK
+ while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
+ try:
+ # attempt the actual call
+ return super(_Socket, self).send(data, flags, copy, track)
+ except zmq.ZMQError, e:
+ # if the raised ZMQError is not EAGAIN, reraise
+ if e.errno != zmq.EAGAIN:
+ raise
+ # defer to the event loop until we're notified the socket is writable
+ self._wait_write()
+
+ def recv(self, flags=0, copy=True, track=False):
+ if flags & zmq.NOBLOCK:
+ return super(_Socket, self).recv(flags, copy, track)
+ flags |= zmq.NOBLOCK
+ while True:
+ try:
+ return super(_Socket, self).recv(flags, copy, track)
+ except zmq.ZMQError, e:
+ if e.errno != zmq.EAGAIN:
+ raise
+ self._wait_read()
View
136 zmq/green/core.pyx
@@ -0,0 +1,136 @@
+#-----------------------------------------------------------------------------
+# Copyright (c) 2011-2012 Travis Cline
+#
+# This file is part of pyzmq
+# It is adapted from upstream project zeromq_gevent under the New BSD License
+#
+# Distributed under the terms of the New BSD License. The full license is in
+# the file COPYING.BSD, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+"""This module wraps the :class:`Socket` and :class:`Context` found in :mod:`pyzmq <zmq>` to be non blocking
+"""
+
+import zmq
+from zmq import *
+
+# imported with different names as to not have the star import try to to clobber (when building with cython)
+from zmq.core.context cimport Context as _original_Context
+from zmq.core.socket cimport Socket as _original_Socket
+
+from gevent.event import AsyncResult
+from gevent.hub import get_hub
+
+
+cdef class _Socket(_original_Socket)
+
+cdef class _Context(_original_Context):
+ """Replacement for :class:`zmq.core.context.Context`
+
+ Ensures that the greened Socket below is used in calls to `socket`.
+ """
+ @property
+ def _socket_class(self):
+ """overridden to ensure green Sockets are created by this Context"""
+ return _Socket
+
+cdef class _Socket(_original_Socket):
+ """Green version of :class:`zmq.core.socket.Socket`
+
+ The following methods are overridden:
+
+ * send
+ * recv
+
+ To ensure that the ``zmq.NOBLOCK`` flag is set and that sending or recieving
+ is deferred to the hub if a ``zmq.EAGAIN`` (retry) error is raised.
+
+ The `__state_changed` method is triggered when the zmq.FD for the socket is
+ marked as readable and triggers the necessary read and write events (which
+ are waited for in the recv and send methods).
+
+ Some double underscore prefixes are used to minimize pollution of
+ :class:`zmq.core.socket.Socket`'s namespace.
+ """
+ cdef object __readable
+ cdef object __writable
+ cdef public object _state_event
+
+ def __init__(self, _Context context, int socket_type):
+ self.__setup_events()
+
+ def close(self, linger=None):
+ # close the _state_event event, keeps the number of active file descriptors down
+ if not self._closed and getattr(self, '_state_event', None):
+ try:
+ self._state_event.stop()
+ except AttributeError, e:
+ # gevent<1.0 compat
+ self._state_event.cancel()
+ super(_Socket, self).close(linger)
+
+ cdef __setup_events(self) with gil:
+ self.__readable = AsyncResult()
+ self.__writable = AsyncResult()
+ try:
+ self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
+ self._state_event.start(self.__state_changed)
+ except AttributeError, e:
+ # for gevent<1.0 compatibility
+ from gevent.core import read_event
+ self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True)
+
+ def __state_changed(self, event=None, _evtype=None):
+ cdef int events
+ try:
+ if self.closed:
+ # if the socket has entered a close state resume any waiting greenlets
+ self.__writable.set()
+ self.__readable.set()
+ return
+ events = self.getsockopt(EVENTS)
+ except ZMQError, exc:
+ self.__writable.set_exception(exc)
+ self.__readable.set_exception(exc)
+ else:
+ if events & POLLOUT:
+ self.__writable.set()
+ if events & POLLIN:
+ self.__readable.set()
+
+ cdef _wait_write(self) with gil:
+ self.__writable = AsyncResult()
+ self.__writable.get()
+
+ cdef _wait_read(self) with gil:
+ self.__readable = AsyncResult()
+ self.__readable.get()
+
+ cpdef object send(self, object data, int flags=0, copy=True, track=False):
+ # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
+ if flags & NOBLOCK:
+ return _original_Socket.send(self, data, flags, copy, track)
+ # ensure the zmq.NOBLOCK flag is part of flags
+ flags = flags | NOBLOCK
+ while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
+ try:
+ # attempt the actual call
+ return _original_Socket.send(self, data, flags, copy, track)
+ except ZMQError, e:
+ # if the raised ZMQError is not EAGAIN, reraise
+ if e.errno != EAGAIN:
+ raise
+ # defer to the event loop until we're notified the socket is writable
+ self._wait_write()
+
+ cpdef object recv(self, int flags=0, copy=True, track=False):
+ if flags & NOBLOCK:
+ return _original_Socket.recv(self, flags, copy, track)
+ flags = flags | NOBLOCK
+ while True:
+ try:
+ return _original_Socket.recv(self, flags, copy, track)
+ except ZMQError, e:
+ if e.errno != EAGAIN:
+ raise
+ self._wait_read()
View
62 zmq/tests/__init__.py
@@ -21,6 +21,13 @@
from zmq.utils import jsonapi
try:
+ import gevent
+ from zmq import green as gzmq
+ have_gevent = True
+except ImportError:
+ have_gevent = False
+
+try:
from unittest import SkipTest
except ImportError:
try:
@@ -37,9 +44,19 @@ class SkipTest(Exception):
zmq.NOBLOCK = zmq.DONTWAIT
class BaseZMQTestCase(TestCase):
-
+ green = False
+
+ @property
+ def Context(self):
+ if self.green:
+ return gzmq.Context
+ else:
+ return zmq.Context
+
def setUp(self):
- self.context = zmq.Context.instance()
+ if self.green and not have_gevent:
+ raise SkipTest("requires gevent")
+ self.context = self.Context.instance()
self.sockets = []
def tearDown(self):
@@ -62,10 +79,10 @@ def tearDown(self):
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
"""Create a bound socket pair using a random port."""
- s1 = zmq.Socket(self.context, type1)
+ s1 = self.context.socket(type1)
s1.setsockopt(zmq.LINGER, 0)
port = s1.bind_to_random_port(interface)
- s2 = zmq.Socket(self.context, type2)
+ s2 = self.context.socket(type2)
s2.setsockopt(zmq.LINGER, 0)
s2.connect('%s:%s' % (interface, port))
self.sockets.extend([s1,s2])
@@ -131,3 +148,40 @@ def recv_multipart(self, socket, **kwargs):
class PollZMQTestCase(BaseZMQTestCase):
pass
+class GreenTest:
+ """Mixin for making green versions of test classes"""
+ green = True
+
+ def assertRaisesErrno(self, errno, func, *args, **kwargs):
+ if errno == zmq.EAGAIN:
+ raise SkipTest("Skipping because we're green.")
+ try:
+ func(*args, **kwargs)
+ except zmq.ZMQError:
+ e = sys.exc_info()[1]
+ self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \
+got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno)))
+ else:
+ self.fail("Function did not raise any error")
+
+ def tearDown(self):
+ contexts = set([self.context])
+ while self.sockets:
+ sock = self.sockets.pop()
+ contexts.add(sock.context) # in case additional contexts are created
+ sock.close()
+ try:
+ gevent.joinall([gevent.spawn(ctx.term) for ctx in contexts], timeout=2, raise_error=True)
+ except gevent.Timeout:
+ raise RuntimeError("context could not terminate, open sockets likely remain in test")
+
+ def skip_green(self):
+ raise SkipTest("Skipping because we are green")
+
+def skip_green(f):
+ def skipping_test(self, *args, **kwargs):
+ if self.green:
+ raise SkipTest("Skipping because we are green")
+ else:
+ return f(self, *args, **kwargs)
+ return skipping_test
View
48 zmq/tests/test_context.py
@@ -17,7 +17,7 @@
import zmq
from zmq.utils.strtypes import asbytes, b
-from zmq.tests import BaseZMQTestCase
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest, skip_green
#-----------------------------------------------------------------------------
@@ -28,23 +28,23 @@
class TestContext(BaseZMQTestCase):
def test_init(self):
- c1 = zmq.Context()
- self.assert_(isinstance(c1, zmq.Context))
+ c1 = self.Context()
+ self.assert_(isinstance(c1, self.Context))
del c1
- c2 = zmq.Context()
- self.assert_(isinstance(c2, zmq.Context))
+ c2 = self.Context()
+ self.assert_(isinstance(c2, self.Context))
del c2
- c3 = zmq.Context()
- self.assert_(isinstance(c3, zmq.Context))
+ c3 = self.Context()
+ self.assert_(isinstance(c3, self.Context))
del c3
def test_term(self):
- c = zmq.Context()
+ c = self.Context()
c.term()
self.assert_(c.closed)
def test_fail_init(self):
- self.assertRaisesErrno(zmq.EINVAL, zmq.Context, 0)
+ self.assertRaisesErrno(zmq.EINVAL, self.Context, 0)
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
@@ -55,19 +55,19 @@ def test_term_hang(self):
self.context.term()
def test_instance(self):
- ctx = zmq.Context.instance()
- c2 = zmq.Context.instance(io_threads=2)
+ ctx = self.Context.instance()
+ c2 = self.Context.instance(io_threads=2)
self.assertTrue(c2 is ctx)
c2.term()
- c3 = zmq.Context.instance()
- c4 = zmq.Context.instance()
+ c3 = self.Context.instance()
+ c4 = self.Context.instance()
self.assertFalse(c3 is c2)
self.assertFalse(c3.closed)
self.assertTrue(c3 is c4)
def test_many_sockets(self):
"""opening and closing many sockets shouldn't cause problems"""
- ctx = zmq.Context()
+ ctx = self.Context()
for i in range(16):
sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
[ s.close() for s in sockets ]
@@ -77,7 +77,7 @@ def test_many_sockets(self):
def test_sockopts(self):
"""setting socket options with ctx attributes"""
- ctx = zmq.Context()
+ ctx = self.Context()
ctx.linger = 5
self.assertEquals(ctx.linger, 5)
s = ctx.socket(zmq.REQ)
@@ -94,7 +94,7 @@ def test_sockopts(self):
def test_destroy(self):
"""Context.destroy should close sockets"""
- ctx = zmq.Context()
+ ctx = self.Context()
sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
# close half of the sockets
@@ -119,7 +119,7 @@ def test_destroy_linger(self):
def test_term_noclose(self):
"""Context.term won't close sockets"""
- ctx = zmq.Context()
+ ctx = self.Context()
s = ctx.socket(zmq.REQ)
self.assertFalse(s.closed)
t = Thread(target=ctx.term)
@@ -136,7 +136,7 @@ def test_gc(self):
"""test close&term by garbage collection alone"""
# test credit @dln (GH #137):
def gc():
- ctx = zmq.Context()
+ ctx = self.Context()
s = ctx.socket(zmq.PUSH)
t = Thread(target=gc)
t.start()
@@ -157,7 +157,7 @@ def crash(self, sock):
self.child = CyclicReference(self)
def crash_zmq():
- ctx = zmq.Context()
+ ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
@@ -167,9 +167,10 @@ def crash_zmq():
def test_term_thread(self):
"""ctx.term should not crash active threads (#139)"""
- ctx = zmq.Context()
+ ctx = self.Context()
evt = Event()
evt.clear()
+
def block():
s = ctx.socket(zmq.REP)
s.bind_to_random_port('tcp://127.0.0.1')
@@ -192,3 +193,10 @@ def block():
t.join(timeout=1)
self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
+
+if have_gevent:
+ class TestContextGreen(GreenTest, TestContext):
+ """gevent subclass of context tests"""
+ # skip tests that use real threads:
+ test_gc = GreenTest.skip_green
+ test_term_thread = GreenTest.skip_green
View
5 zmq/tests/test_multipart.py
@@ -14,7 +14,7 @@
import zmq
from zmq.utils.strtypes import asbytes
-from zmq.tests import BaseZMQTestCase, SkipTest
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
#-----------------------------------------------------------------------------
# Tests
@@ -42,3 +42,6 @@ def test_basic_multipart(self):
recvd = b.recv_multipart()
self.assertEquals(msg, recvd)
+if have_gevent:
+ class TestMultipartGreen(GreenTest, TestMultipart):
+ pass
View
6 zmq/tests/test_pair.py
@@ -14,7 +14,7 @@
import zmq
from zmq.utils.strtypes import asbytes
-from zmq.tests import BaseZMQTestCase
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
#-----------------------------------------------------------------------------
# Tests
@@ -59,3 +59,7 @@ def test_pyobj(self):
o = dict(a=10,b=range(10))
o2 = self.ping_pong_pyobj(s1, s2, o)
+if have_gevent:
+ class TestReqRepGreen(GreenTest, TestPair):
+ pass
+
View
5 zmq/tests/test_pubsub.py
@@ -16,7 +16,7 @@
import zmq
from zmq.utils.strtypes import asbytes
-from zmq.tests import BaseZMQTestCase
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
#-----------------------------------------------------------------------------
# Tests
@@ -48,3 +48,6 @@ def test_topic(self):
msg2 = s2.recv()
self.assertEquals(msg1, msg2)
+if have_gevent:
+ class TestPubSubGreen(GreenTest, TestPubSub):
+ pass
View
5 zmq/tests/test_reqrep.py
@@ -15,7 +15,7 @@
import zmq
from zmq.utils.strtypes import asbytes
-from zmq.tests import BaseZMQTestCase
+from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
#-----------------------------------------------------------------------------
# Tests
@@ -70,3 +70,6 @@ def test_large_msg(self):
msg2 = self.ping_pong(s1, s2, msg1)
self.assertEquals(msg1, msg2)
+if have_gevent:
+ class TestReqRepGreen(GreenTest, TestReqRep):
+ pass
View
14 zmq/tests/test_socket.py
@@ -16,7 +16,7 @@
import time
import zmq
-from zmq.tests import BaseZMQTestCase, SkipTest
+from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest
from zmq.utils.strtypes import bytes, unicode, asbytes
try:
from queue import Queue
@@ -30,7 +30,7 @@
class TestSocket(BaseZMQTestCase):
def test_create(self):
- ctx = zmq.Context()
+ ctx = self.Context()
s = ctx.socket(zmq.PUB)
# Superluminal protocol not yet implemented
self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a')
@@ -221,7 +221,7 @@ def test_tracker(self):
def test_close(self):
- ctx = zmq.Context()
+ ctx = self.Context()
s = ctx.socket(zmq.PUB)
s.close()
self.assertRaises(zmq.ZMQError, s.bind, asbytes(''))
@@ -281,7 +281,7 @@ def test_recv_multipart(self):
def test_close_after_destroy(self):
"""s.close() after ctx.destroy() should be fine"""
- ctx = zmq.Context()
+ ctx = self.Context()
s = ctx.socket(zmq.REP)
ctx.destroy()
# reaper is not instantaneous
@@ -304,6 +304,10 @@ def test_poll(self):
evt = b.poll(50)
self.assertEquals(evt, 0)
self.assertEquals(msg2, msg)
-
+
+
+if have_gevent:
+ class TestSocketGreen(GreenTest, TestSocket):
+ test_bad_attr = GreenTest.skip_green
Something went wrong with that request. Please try again.