Permalink
Browse files

rework gevent backend so it works with DispatcherConnection

Still far from ideal, but it's a start...
  • Loading branch information...
1 parent bd5e548 commit bdc66433c44269a13860f9f7a9f9ab85d78285c0 @rfk committed Mar 3, 2011
Showing with 68 additions and 52 deletions.
  1. +0 −5 m2wsgi/__init__.py
  2. +68 −47 m2wsgi/io/gevent.py
View
@@ -101,7 +101,6 @@
Devices
-------
-
This module also provides a number of pre-built "devices" - stand-alone
executables designed to perform a specific common task. Currently availble
devices are:
@@ -153,10 +152,6 @@
* Needs tests something fierce! I just have to find the patience to
write the necessary setup and teardown cruft.
- * gevent IO module doesn't work with DispatcherConnection, it will
- serve a few requests and then just freeze up. No idea what's going
- wrong.
-
* It would be great to grab connection details straight from the
mongrel2 config database. Perhaps a Connection.from_config method
with keywords to select the connection by handler id, host, route etc.
View
@@ -32,9 +32,11 @@
import gevent.core
import gevent.hub
-import zmq.core.poll as zmq_poll
+import gevent_zeromq
from gevent_zeromq import zmq
+import zmq.core.poll as zmq_poll
+
def monkey_patch():
"""Hook to monkey-patch the interpreter for this IO module.
@@ -43,6 +45,43 @@ def monkey_patch():
it's not called by default unless you're running from the command line.
"""
gevent.monkey.patch_all()
+ gevent_zeromq.monkey_patch()
+
+
+
+# The BaseConnection recv logic is based on polling, but I can't get
+# gevent polling only multiple sockets to work correctly.
+# Instead, we simulate polling on each socket individually by reading an item
+# and keeping it in a local buffer.
+#
+# Ideally I would juse use the _wait_read() method on gevent-zmq sockets,
+# but this seems to cause hangs for me. Still investigating.
+
+class _Context(zmq._Context):
+ def socket(self,socket_type):
+ if self.closed:
+ raise zmq.ZMQError(zmq.ENOTSUP)
+ return _Socket(self,socket_type)
+
+class _Socket(zmq._Socket):
+ _polled_recv = None
+ # This blockingly-reads a message from the socket, but stores
+ # it in a buffer rather than returning it.
+ def _recv_poll(self,flags=0,copy=True,track=False):
+ if self._polled_recv is None:
+ self._polled_recv = super(_Socket,self).recv(flags,copy,track)
+ # This uses the buffered result if available, or polls otherwise.
+ def recv(self,flags=0,copy=True,track=False):
+ v = self._polled_recv
+ while v is None:
+ self._recv_poll(flags,copy=copy,track=track)
+ v = self._polled_recv
+ self._polled_recv = None
+ return v
+
+zmq.Context = _Context
+zmq.Socket = _Socket
+
class Client(base.Client):
@@ -61,73 +100,53 @@ class ConnectionBase(base.ConnectionBase):
"""
ZMQ_CTX = zmq.Context()
- # a blocking zmq.core.poll doesn't play nice with gevent. We have to
- # poll based on ZMQ_FD. Each polled socket gets a persistent gevent
- # read_event and an Event object to signal on when ready; calls
- # to interrupt() just signal the events early.
+ # A blocking zmq.core.poll doesn't play nice with gevent.
+ # Instead we read from each socket in a separate greenthread, and keep
+ # the results in a local buffer so they don't get lost. An interrupt
+ # then just kills all the currently-running threads.
def __init__(self):
super(ConnectionBase,self).__init__()
- self.poll_events = {}
+ self.poll_threads = []
def _poll(self,sockets,timeout=None):
- # Polling based on ZMQ_FD is edge-triggered, so before we do that
- # we need to make sure there's no data currently available.
+ # If there's anything available non-blockingly, just use it.
(ready,_,error) = zmq_poll.select(sockets,[],sockets,timeout=0)
if ready:
return ready
if error:
return []
if timeout == 0:
return []
- # Now we can do the ZMQ_FD polling.
- # First make sure each socket has an event and signal set up.
- # Then spawn a greenthread to wait on each signal.
+ # Spawn a greenthread to poll-recv from each socket.
+ ready = []
threads = []
res = gevent.event.Event()
for sock in sockets:
- fd = sock.getsockopt(zmq.FD)
- try:
- (evt,sig) = self.poll_events[fd]
- sig.clear()
- except KeyError:
- sig = gevent.event.Event()
- def on_ready(evt,what,sig=sig):
- sig.set()
- try:
- read_event = gevent.hub.get_hub().reactor.read_event
- evt = read_event(fd,persist=True)
- evt.add(None,on_ready)
- except AttributeError:
- evt = gevent.core.read_event(fd,on_ready,persist=True)
- self.poll_events[fd] = (evt,sig)
- def wait_for_signal(sig=sig,res=res,sock=sock):
- sig.wait()
- res.set()
- threads.append(gevent.spawn(wait_for_signal))
- # Wait for the 'res' event to be triggered by some socket.
+ threads.append(gevent.spawn(self._do_poll,sock,ready,res,timeout))
+ self.poll_threads.append((res,threads))
+ # Wait for one of them to return, or for an interrupt.
try:
- if timeout is None:
- res.wait()
- else:
- with gevent.Timeout(timeout,False):
- res.wait()
+ res.wait()
finally:
gevent.killall(threads)
gevent.joinall(threads)
- # The peristent poll events just signal us when *something* is
- # ready, they don't tell us what it was. Nevermind, just ask zmq.
- (ready,_,_) = zmq_poll.select(sockets,[],[],timeout=0)
return ready
- def _interrupt(self):
- for (evt,sig) in self.poll_events.values():
- sig.set()
+ def _do_poll(self,sock,ready,res,timeout):
+ if timeout is None:
+ sock._recv_poll()
+ else:
+ with gevent.Timeout(timeout,False):
+ sock._recv_poll()
+ ready.append(sock)
+ if not res.is_set():
+ res.set()
- def close(self):
- for (evt,sig) in self.poll_events.values():
- evt.cancel()
- sig.set()
- super(ConnectionBase,self).close()
+ def _interrupt(self):
+ for (res,threads) in self.poll_threads:
+ gevent.killall(threads)
+ if not res.is_set():
+ res.set()
@@ -170,6 +189,8 @@ class Handler(base.Handler):
def __init__(self,*args,**kwds):
super(Handler,self).__init__(*args,**kwds)
+ # We need to count the number of inflight requests, so the
+ # main thread can wait for them to complete when shutting down.
self._num_inflight_requests = 0
self._all_requests_complete = gevent.event.Event()

0 comments on commit bdc6643

Please sign in to comment.