From 780d33146fe5351bce15ac4e738d423dbe274869 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Mon, 16 Sep 2013 16:20:22 -0400 Subject: [PATCH] Support reaping process on POSIX in lieu of SIGCHLD support. Fixes #1. --- README.rst | 8 ++++++++ crochet/__init__.py | 3 ++- crochet/_eventloop.py | 22 ++++++++++++++++---- crochet/tests/test_api.py | 12 +++++++++++ crochet/tests/test_setup.py | 41 ++++++++++++++++++++++++++++++++++++- 5 files changed, 80 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index f5981da..8c0735a 100644 --- a/README.rst +++ b/README.rst @@ -153,6 +153,14 @@ project `Github page`_. News ---- +**Next release** + +* On POSIX platforms, a workaround is installed to ensure processes started by + `reactor.spawnProcess` have their exit noticed. See `Twisted ticket 6378`_ + for more details about the underlying issue. + +.. _Twisted ticket 6378: http://tm.tl/6738 + **0.8.1** * EventualResult.wait() now raises error if called in the reactor thread. diff --git a/crochet/__init__.py b/crochet/__init__.py index 9a1fcca..da95701 100644 --- a/crochet/__init__.py +++ b/crochet/__init__.py @@ -6,12 +6,13 @@ from twisted.internet import reactor from twisted.python.log import startLoggingWithObserver +from twisted.internet.process import reapAllProcesses from ._shutdown import _watchdog, register from ._eventloop import EventualResult, TimeoutError, EventLoop, _store _main = EventLoop(reactor, register, startLoggingWithObserver, - _watchdog) + _watchdog, reapAllProcesses) setup = _main.setup no_setup = _main.no_setup run_in_reactor = _main.run_in_reactor diff --git a/crochet/_eventloop.py b/crochet/_eventloop.py index 378eaa2..3084d3c 100644 --- a/crochet/_eventloop.py +++ b/crochet/_eventloop.py @@ -13,11 +13,13 @@ from functools import wraps from twisted.python import threadable +from twisted.python.runtime import platform from twisted.python.failure import Failure from twisted.python.log import PythonLoggingObserver, err from twisted.internet.threads import blockingCallFromThread from twisted.internet.defer import maybeDeferred from twisted.internet import reactor +from twisted.internet.task import LoopingCall from ._util import synchronized from ._resultstore import ResultStore @@ -197,13 +199,23 @@ class EventLoop(object): """ def __init__(self, reactor, atexit_register, startLoggingWithObserver=None, - watchdog_thread=None): + watchdog_thread=None, + reapAllProcesses=None): self._reactor = reactor self._atexit_register = atexit_register self._startLoggingWithObserver = startLoggingWithObserver self._started = False self._lock = threading.Lock() self._watchdog_thread = watchdog_thread + self._reapAllProcesses = reapAllProcesses + + def _startReapingProcesses(self): + """ + Start a LoopingCall that calls reapAllProcesses. + """ + lc = LoopingCall(self._reapAllProcesses) + lc.clock = self._reactor + lc.start(0.1, False) @synchronized def setup(self): @@ -219,9 +231,8 @@ def setup(self): if self._started: return self._started = True - t = threading.Thread( - target=lambda: self._reactor.run(installSignalHandlers=False)) - t.start() + if platform.type == "posix": + self._reactor.callFromThread(self._startReapingProcesses) if self._startLoggingWithObserver: observer = ThreadLogObserver(PythonLoggingObserver().emit) self._reactor.callFromThread( @@ -229,6 +240,9 @@ def setup(self): # We only want to stop the logging thread once the reactor has # shut down: self._reactor.addSystemEventTrigger("after", "shutdown", observer.stop) + t = threading.Thread( + target=lambda: self._reactor.run(installSignalHandlers=False)) + t.start() self._atexit_register(self._reactor.callFromThread, self._reactor.stop) self._atexit_register(_store.log_errors) diff --git a/crochet/tests/test_api.py b/crochet/tests/test_api.py index a43921d..e8139f2 100644 --- a/crochet/tests/test_api.py +++ b/crochet/tests/test_api.py @@ -14,6 +14,8 @@ from twisted.internet.defer import succeed, Deferred, fail, CancelledError from twisted.python.failure import Failure from twisted.python import threadable +from twisted.python.runtime import platform +from twisted.internet.process import reapAllProcesses from .._eventloop import EventLoop, EventualResult, TimeoutError from .test_setup import FakeReactor @@ -393,3 +395,13 @@ def test_retrieve_result(self): dr = EventualResult(Deferred()) uid = dr.stash() self.assertIdentical(dr, retrieve_result(uid)) + + def test_reapAllProcesses(self): + """ + An EventLoop object configured with the real reapAllProcesses on POSIX + plaforms. + """ + self.assertIdentical(_main._reapAllProcesses, reapAllProcesses) + if platform.type != "posix": + test_reapAllProcesses.skip = "Only relevant on POSIX platforms" + diff --git a/crochet/tests/test_setup.py b/crochet/tests/test_setup.py index 30b8bfd..8c358df 100644 --- a/crochet/tests/test_setup.py +++ b/crochet/tests/test_setup.py @@ -8,11 +8,13 @@ from twisted.trial.unittest import TestCase from twisted.python.log import PythonLoggingObserver +from twisted.python.runtime import platform +from twisted.internet.task import Clock from .._eventloop import EventLoop, ThreadLogObserver, _store -class FakeReactor(object): +class FakeReactor(Clock): """ A fake reactor for testing purposes. """ @@ -21,6 +23,7 @@ class FakeReactor(object): in_call_from_thread = False def __init__(self): + Clock.__init__(self) self.started = threading.Event() self.stopping = False self.events = [] @@ -181,6 +184,42 @@ def test_no_setup_after_setup(self): self.assertRaises(RuntimeError, s.no_setup) +class ProcessSetupTests(TestCase): + """ + setup() enables support for IReactorProcess on POSIX plaforms. + """ + def test_posix(self): + """ + On POSIX systems, setup() installs a LoopingCall that runs + t.i.process.reapAllProcesses() 10 times a second. + """ + reactor = FakeReactor() + reaps = [] + s = EventLoop(reactor, lambda f, *g: None, + reapAllProcesses=lambda: reaps.append(1)) + s.setup() + reactor.advance(0.1) + self.assertEquals(reaps, [1]) + reactor.advance(0.1) + self.assertEquals(reaps, [1, 1]) + reactor.advance(0.1) + self.assertEquals(reaps, [1, 1, 1]) + if platform.type != "posix": + test_posix.skip = "SIGCHLD is a POSIX-specific issue" + + def test_non_posix(self): + """ + On POSIX systems, setup() does not install a LoopingCall. + """ + reactor = FakeReactor() + s = EventLoop(reactor, lambda f, *g: None) + s.setup() + self.assertFalse(reactor.getDelayedCalls()) + + if platform.type == "posix": + test_non_posix.skip = "SIGCHLD is a POSIX-specific issue" + + class ThreadLogObserverTest(TestCase): """ Tests for ThreadLogObserver.