Skip to content

Commit

Permalink
Adapted to API changes in pyuv
Browse files Browse the repository at this point in the history
  • Loading branch information
saghul committed Jun 10, 2012
1 parent 1e1673a commit b85fecb
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions twisted_pyuv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@

from __future__ import absolute_import, division, with_statement
from __future__ import absolute_import, with_statement

import functools
import logging
import threading
import time

import pyuv

from collections import deque
from twisted.internet.base import _SignalReactorMixin
from twisted.internet.posixbase import PosixReactorBase
from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime
from twisted.python import failure, log
from twisted.internet import error

from zope.interface import implements

import pyuv


class UVWaker(object):
def __init__(self, reactor):
self._async = pyuv.Async(reactor._loop, self._cb)
self._async.unref()
def _cb(self, handle):
pass
def wakeUp(self):
Expand Down Expand Up @@ -65,12 +67,6 @@ def reset(self, seconds):
def active(self):
return self._active

class FDWrap(object):
def __init__(self, fd):
self.fd = fd
def fileno(self):
return self.fd

class UVReactor(PosixReactorBase):
"""Twisted reactor built on the Tornado IOLoop.
Expand All @@ -86,11 +82,13 @@ class UVReactor(PosixReactorBase):
def __init__(self):
self._loop = pyuv.Loop()
self._signal_watcher = pyuv.Signal(self._loop)
self._async_handle = pyuv.Async(self._loop, self._async_cb)
self._async_handle_lock = threading.Lock()
self._async_callbacks = deque()
self._readers = {} # map of reader objects to fd
self._writers = {} # map of writer objects to fd
self._fds = {} # map of fd to a (reader, writer) tuple
self._delayedCalls = {}
self._async_handles = set()
self._poll_handles = {}
PosixReactorBase.__init__(self)

Expand Down Expand Up @@ -119,23 +117,25 @@ def _removeDelayedCall(self, dc):
del self._delayedCalls[dc]

def _async_cb(self, handle):
handle.close()
try:
handle.cb()
except:
log.err()
del handle.cb
self._async_handles.remove(handle)
with self._async_handle_lock:
callbacks = self._async_callbacks
self._async_callbacks = deque()
while callbacks:
cb = callbacks.popleft()
try:
cb()
except Exception:
log.err()

# IReactorThreads

def callFromThread(self, f, *args, **kw):
"""See `twisted.internet.interfaces.IReactorThreads.callFromThread`"""
assert callable(f), "%s is not callable" % f
async = pyuv.Async(self._loop, self._async_cb)
async.cb = functools.partial(f, *args, **kw)
self._async_handles.add(async)
async.send()
cb = functools.partial(f, *args, **kw)
with self._async_handle_lock:
self._async_callbacks.append(cb)
self._async_handle.send()

def _handleSignals(self):
"""Bypass installing the child waker"""
Expand Down Expand Up @@ -198,7 +198,7 @@ def addReader(self, reader):
poll_handle.start(pyuv.UV_READABLE | pyuv.UV_WRITABLE, self._invoke_callback)
else:
self._fds[fd] = (reader, None)
poll_handle = pyuv.Poll(self._loop, FDWrap(fd))
poll_handle = pyuv.Poll(self._loop, fd)
poll_handle.start(pyuv.UV_READABLE, self._invoke_callback)
poll_handle.fd = fd
self._poll_handles[fd] = poll_handle
Expand All @@ -219,7 +219,7 @@ def addWriter(self, writer):
poll_handle.start(pyuv.UV_READABLE | pyuv.UV_WRITABLE, self._invoke_callback)
else:
self._fds[fd] = (None, writer)
poll_handle = pyuv.Poll(self._loop, FDWrap(fd))
poll_handle = pyuv.Poll(self._loop, fd)
poll_handle.start(pyuv.UV_WRITABLE, self._invoke_callback)
poll_handle.fd = fd
self._poll_handles[fd] = poll_handle
Expand Down Expand Up @@ -270,6 +270,7 @@ def getWriters(self):
return self._writers.keys()

def stop(self):
self._async_handle.close()
for handle in self._get_loop_handles():
if hasattr(handle, 'stop') and callable(handle.stop):
handle.stop()
Expand Down

0 comments on commit b85fecb

Please sign in to comment.