Skip to content

Commit

Permalink
Simplify and match api of upstream changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tmc committed Jun 30, 2012
1 parent 3753b1e commit 36812d9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 70 deletions.
15 changes: 10 additions & 5 deletions gevent_zeromq/__init__.py
Expand Up @@ -19,11 +19,14 @@
to trigger needed events.
"""

import gevent_zeromq.core as zmq
from zmq import *
from zmq import devices
zmq.Context = zmq._Context
zmq.Socket = zmq._Socket
import gevent_zeromq.core as zmq

zmq.Socket = zmq.GreenSocket
zmq.Context = zmq.GreenContext
Socket = zmq.GreenSocket
Context = zmq.GreenContext

def monkey_patch():
"""
Expand All @@ -33,5 +36,7 @@ def monkey_patch():
compatibility as well.
"""
ozmq = __import__('zmq')
ozmq.Socket = zmq.Socket
ozmq.Context = zmq.Context
ozmq.Socket = zmq.GreenSocket
ozmq.Context = zmq.GreenContext

__all__ = zmq.__all__ + ['monkey_patch']
58 changes: 27 additions & 31 deletions gevent_zeromq/core.py
Expand Up @@ -2,35 +2,13 @@
"""
import zmq
from zmq import *

# imported with different names as to not have the star import try to to clobber (when building with cython)
from zmq.core.context import Context as _original_Context
from zmq.core.socket import Socket as _original_Socket
from zmq import devices
from zmq.core import *
__all__ = zmq.__all__

from gevent.event import AsyncResult
from gevent.hub import get_hub


class _Context(_original_Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket below is used in calls to `socket`.
"""

def socket(self, socket_type):
"""Overridden method to ensure that the green version of socket is used
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
that a :class:`Socket` with all of its send and recv methods set to be
non-blocking is returned
"""
if self.closed:
raise ZMQError(ENOTSUP)
return _Socket(self, socket_type)

class _Socket(_original_Socket):
class GreenSocket(Socket):
"""Green version of :class:`zmq.core.socket.Socket`
The following methods are overridden:
Expand All @@ -52,15 +30,26 @@ class _Socket(_original_Socket):
def __init__(self, context, socket_type):
self.__setup_events()

def close(self):
def __del__(self):
self.close()

def close(self, linger=None):
super(GreenSocket, self).close(linger)
self.__cleanup_events()

def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if not self._closed and getattr(self, '_state_event', None):
if getattr(self, '_state_event', None):
try:
self._state_event.stop()
except AttributeError, e:
# gevent<1.0 compat
self._state_event.cancel()
super(_Socket, self).close()

# if the socket has entered a close state resume any waiting greenlets
if hasattr(self, '__writable'):
self.__writable.set()
self.__readable.set()

def __setup_events(self):
self.__readable = AsyncResult()
Expand Down Expand Up @@ -101,13 +90,13 @@ def _wait_read(self):
def send(self, data, flags=0, copy=True, track=False):
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
return super(_Socket, self).send(data, flags, copy, track)
return super(GreenSocket, self).send(data, flags, copy, track)
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
return super(_Socket, self).send(data, flags, copy, track)
return super(GreenSocket, self).send(data, flags, copy, track)
except zmq.ZMQError, e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != zmq.EAGAIN:
Expand All @@ -117,12 +106,19 @@ def send(self, data, flags=0, copy=True, track=False):

def recv(self, flags=0, copy=True, track=False):
if flags & zmq.NOBLOCK:
return super(_Socket, self).recv(flags, copy, track)
return super(GreenSocket, self).recv(flags, copy, track)
flags |= zmq.NOBLOCK
while True:
try:
return super(_Socket, self).recv(flags, copy, track)
return super(GreenSocket, self).recv(flags, copy, track)
except zmq.ZMQError, e:
if e.errno != zmq.EAGAIN:
raise
self._wait_read()

class GreenContext(Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket above is used in calls to `socket`.
"""
_socket_class = GreenSocket
67 changes: 33 additions & 34 deletions gevent_zeromq/core.pyx
Expand Up @@ -2,35 +2,16 @@
"""
import zmq
from zmq import *

# imported with different names as to not have the star import try to to clobber (when building with cython)
from zmq.core.context cimport Context as _original_Context
from zmq.core.socket cimport Socket as _original_Socket
from zmq import devices
__all__ = zmq.__all__

from gevent.event import AsyncResult
from gevent.hub import get_hub

from zmq.core.context cimport Context as _Context
from zmq.core.socket cimport Socket as _Socket

cdef class _Socket(_original_Socket)

cdef class _Context(_original_Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket below is used in calls to `socket`.
"""

def socket(self, int socket_type):
"""Overridden method to ensure that the green version of socket is used
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
that a :class:`Socket` with all of its send and recv methods set to be
non-blocking is returned
"""
if self.closed:
raise ZMQError(ENOTSUP)
return _Socket(self, socket_type)

cdef class _Socket(_original_Socket):
cdef class GreenSocket(_Socket):
"""Green version of :class:`zmq.core.socket.Socket`
The following methods are overridden:
Expand All @@ -52,26 +33,37 @@ cdef class _Socket(_original_Socket):
cdef object __writable
cdef public object _state_event

def __init__(self, _Context context, int socket_type):
def __init__(self, context, int socket_type):
self.__setup_events()

def close(self):
def __del__(self):
self.close()

def close(self, linger=None):
super(GreenSocket, self).close(linger)
self.__cleanup_events()

def __cleanup_events(self):
# close the _state_event event, keeps the number of active file descriptors down
if not self._closed and getattr(self, '_state_event', None):
if getattr(self, '_state_event', None):
try:
self._state_event.stop()
except AttributeError, e:
# gevent<1.0 compat
self._state_event.cancel()
super(_Socket, self).close()

cdef __setup_events(self) with gil:
# if the socket has entered a close state resume any waiting greenlets
if hasattr(self, '__writable'):
self.__writable.set()
self.__readable.set()

def __setup_events(self):
self.__readable = AsyncResult()
self.__writable = AsyncResult()
try:
self._state_event = get_hub().loop.io(self.getsockopt(FD), 1) # read state watcher
self._state_event.start(self.__state_changed)
except AttributeError, e:
except AttributeError:
# for gevent<1.0 compatibility
from gevent.core import read_event
self._state_event = read_event(self.getsockopt(FD), self.__state_changed, persist=True)
Expand Down Expand Up @@ -105,13 +97,13 @@ cdef class _Socket(_original_Socket):
cpdef object send(self, object data, int flags=0, copy=True, track=False):
# if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
if flags & NOBLOCK:
return _original_Socket.send(self, data, flags, copy, track)
return _Socket.send(self, data, flags, copy, track)
# ensure the zmq.NOBLOCK flag is part of flags
flags = flags | NOBLOCK
while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
try:
# attempt the actual call
return _original_Socket.send(self, data, flags, copy, track)
return _Socket.send(self, data, flags, copy, track)
except ZMQError, e:
# if the raised ZMQError is not EAGAIN, reraise
if e.errno != EAGAIN:
Expand All @@ -121,12 +113,19 @@ cdef class _Socket(_original_Socket):

cpdef object recv(self, int flags=0, copy=True, track=False):
if flags & NOBLOCK:
return _original_Socket.recv(self, flags, copy, track)
return _Socket.recv(self, flags, copy, track)
flags = flags | NOBLOCK
while True:
try:
return _original_Socket.recv(self, flags, copy, track)
return _Socket.recv(self, flags, copy, track)
except ZMQError, e:
if e.errno != EAGAIN:
raise
self._wait_read()

class GreenContext(_Context):
"""Replacement for :class:`zmq.core.context.Context`
Ensures that the greened Socket below is used in calls to `socket`.
"""
_socket_class = GreenSocket

0 comments on commit 36812d9

Please sign in to comment.