Skip to content

Commit

Permalink
Support reaping process on POSIX in lieu of SIGCHLD support.
Browse files Browse the repository at this point in the history
Fixes #1.
  • Loading branch information
itamarst committed Sep 16, 2013
1 parent 1812500 commit 780d331
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 6 deletions.
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ project `Github page`_.
News
----

**Next release**

* On POSIX platforms, a workaround is installed to ensure processes started by
`reactor.spawnProcess` have their exit noticed. See `Twisted ticket 6378`_
for more details about the underlying issue.

.. _Twisted ticket 6378: http://tm.tl/6738

**0.8.1**

* EventualResult.wait() now raises error if called in the reactor thread.
Expand Down
3 changes: 2 additions & 1 deletion crochet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@

from twisted.internet import reactor
from twisted.python.log import startLoggingWithObserver
from twisted.internet.process import reapAllProcesses

from ._shutdown import _watchdog, register
from ._eventloop import EventualResult, TimeoutError, EventLoop, _store

_main = EventLoop(reactor, register, startLoggingWithObserver,
_watchdog)
_watchdog, reapAllProcesses)
setup = _main.setup
no_setup = _main.no_setup
run_in_reactor = _main.run_in_reactor
Expand Down
22 changes: 18 additions & 4 deletions crochet/_eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
from functools import wraps

from twisted.python import threadable
from twisted.python.runtime import platform
from twisted.python.failure import Failure
from twisted.python.log import PythonLoggingObserver, err
from twisted.internet.threads import blockingCallFromThread
from twisted.internet.defer import maybeDeferred
from twisted.internet import reactor
from twisted.internet.task import LoopingCall

from ._util import synchronized
from ._resultstore import ResultStore
Expand Down Expand Up @@ -197,13 +199,23 @@ class EventLoop(object):
"""
def __init__(self, reactor, atexit_register,
startLoggingWithObserver=None,
watchdog_thread=None):
watchdog_thread=None,
reapAllProcesses=None):
self._reactor = reactor
self._atexit_register = atexit_register
self._startLoggingWithObserver = startLoggingWithObserver
self._started = False
self._lock = threading.Lock()
self._watchdog_thread = watchdog_thread
self._reapAllProcesses = reapAllProcesses

def _startReapingProcesses(self):
"""
Start a LoopingCall that calls reapAllProcesses.
"""
lc = LoopingCall(self._reapAllProcesses)
lc.clock = self._reactor
lc.start(0.1, False)

@synchronized
def setup(self):
Expand All @@ -219,16 +231,18 @@ def setup(self):
if self._started:
return
self._started = True
t = threading.Thread(
target=lambda: self._reactor.run(installSignalHandlers=False))
t.start()
if platform.type == "posix":
self._reactor.callFromThread(self._startReapingProcesses)
if self._startLoggingWithObserver:
observer = ThreadLogObserver(PythonLoggingObserver().emit)
self._reactor.callFromThread(
self._startLoggingWithObserver, observer, False)
# We only want to stop the logging thread once the reactor has
# shut down:
self._reactor.addSystemEventTrigger("after", "shutdown", observer.stop)
t = threading.Thread(
target=lambda: self._reactor.run(installSignalHandlers=False))
t.start()
self._atexit_register(self._reactor.callFromThread,
self._reactor.stop)
self._atexit_register(_store.log_errors)
Expand Down
12 changes: 12 additions & 0 deletions crochet/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from twisted.internet.defer import succeed, Deferred, fail, CancelledError
from twisted.python.failure import Failure
from twisted.python import threadable
from twisted.python.runtime import platform
from twisted.internet.process import reapAllProcesses

from .._eventloop import EventLoop, EventualResult, TimeoutError
from .test_setup import FakeReactor
Expand Down Expand Up @@ -393,3 +395,13 @@ def test_retrieve_result(self):
dr = EventualResult(Deferred())
uid = dr.stash()
self.assertIdentical(dr, retrieve_result(uid))

def test_reapAllProcesses(self):
"""
An EventLoop object configured with the real reapAllProcesses on POSIX
plaforms.
"""
self.assertIdentical(_main._reapAllProcesses, reapAllProcesses)
if platform.type != "posix":
test_reapAllProcesses.skip = "Only relevant on POSIX platforms"

41 changes: 40 additions & 1 deletion crochet/tests/test_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

from twisted.trial.unittest import TestCase
from twisted.python.log import PythonLoggingObserver
from twisted.python.runtime import platform
from twisted.internet.task import Clock

from .._eventloop import EventLoop, ThreadLogObserver, _store


class FakeReactor(object):
class FakeReactor(Clock):
"""
A fake reactor for testing purposes.
"""
Expand All @@ -21,6 +23,7 @@ class FakeReactor(object):
in_call_from_thread = False

def __init__(self):
Clock.__init__(self)
self.started = threading.Event()
self.stopping = False
self.events = []
Expand Down Expand Up @@ -181,6 +184,42 @@ def test_no_setup_after_setup(self):
self.assertRaises(RuntimeError, s.no_setup)


class ProcessSetupTests(TestCase):
"""
setup() enables support for IReactorProcess on POSIX plaforms.
"""
def test_posix(self):
"""
On POSIX systems, setup() installs a LoopingCall that runs
t.i.process.reapAllProcesses() 10 times a second.
"""
reactor = FakeReactor()
reaps = []
s = EventLoop(reactor, lambda f, *g: None,
reapAllProcesses=lambda: reaps.append(1))
s.setup()
reactor.advance(0.1)
self.assertEquals(reaps, [1])
reactor.advance(0.1)
self.assertEquals(reaps, [1, 1])
reactor.advance(0.1)
self.assertEquals(reaps, [1, 1, 1])
if platform.type != "posix":
test_posix.skip = "SIGCHLD is a POSIX-specific issue"

def test_non_posix(self):
"""
On POSIX systems, setup() does not install a LoopingCall.
"""
reactor = FakeReactor()
s = EventLoop(reactor, lambda f, *g: None)
s.setup()
self.assertFalse(reactor.getDelayedCalls())

if platform.type == "posix":
test_non_posix.skip = "SIGCHLD is a POSIX-specific issue"


class ThreadLogObserverTest(TestCase):
"""
Tests for ThreadLogObserver.
Expand Down

0 comments on commit 780d331

Please sign in to comment.