Skip to content

Commit

Permalink
add a shutdown_condition to setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Apr 7, 2015
1 parent fbcceb0 commit 97f18b3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 61 deletions.
7 changes: 3 additions & 4 deletions crochet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
# waitpid() is only necessary on POSIX:
reapAllProcesses = lambda: None

from ._shutdown import _watchdog, register
from ._shutdown import registry, register
from ._eventloop import (EventualResult, TimeoutError, EventLoop, _store,
ReactorStopped)
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


def _importReactor():
from twisted.internet import reactor
return reactor
_main = EventLoop(_importReactor, register, startLoggingWithObserver,
_watchdog, reapAllProcesses)
_main = EventLoop(_importReactor, registry, startLoggingWithObserver,
reapAllProcesses)
setup = _main.setup
no_setup = _main.no_setup
run_in_reactor = _main.run_in_reactor
Expand Down
26 changes: 15 additions & 11 deletions crochet/_eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from ._util import synchronized
from ._resultstore import ResultStore
from ._shutdown import Watchdog, default_shutdown_condition

_store = ResultStore()

Expand Down Expand Up @@ -305,25 +306,23 @@ class EventLoop(object):
"""
Initialization infrastructure for running a reactor in a thread.
"""
def __init__(self, reactorFactory, atexit_register,
def __init__(self, reactorFactory, atexit_registry,
startLoggingWithObserver=None,
watchdog_thread=None,
reapAllProcesses=None):
"""
reactorFactory: Zero-argument callable that returns a reactor.
atexit_register: atexit.register, or look-alike.
startLoggingWithObserver: Either None, or
twisted.python.log.startLoggingWithObserver or lookalike.
watchdog_thread: crochet._shutdown.Watchdog instance, or None.
reapAllProcesses: twisted.internet.process.reapAllProcesses or
lookalike.
"""
self._reactorFactory = reactorFactory
self._atexit_register = atexit_register
self._atexit_registry = atexit_registry
self._startLoggingWithObserver = startLoggingWithObserver
self._started = False
self._lock = threading.Lock()
self._watchdog_thread = watchdog_thread
self._watchdog_thread = None
self._reapAllProcesses = reapAllProcesses

def _startReapingProcesses(self):
Expand All @@ -347,7 +346,7 @@ def _common_setup(self):
"before", "shutdown", self._registry.stop)

@synchronized
def setup(self):
def setup(self, shutdown_condition=None):
"""
Initialize the crochet library.
Expand Down Expand Up @@ -382,11 +381,16 @@ def start():
target=lambda: self._reactor.run(installSignalHandlers=False),
name="CrochetReactor")
t.start()
self._atexit_register(self._reactor.callFromThread,
self._reactor.stop)
self._atexit_register(_store.log_errors)
if self._watchdog_thread is not None:
self._watchdog_thread.start()
self._atexit_registry.register(
self._reactor.callFromThread,
self._reactor.stop)
self._atexit_registry.register(_store.log_errors)
if shutdown_condition is None:
shutdown_condition = default_shutdown_condition()
self._watchdog_thread = Watchdog(
shutdown_condition,
self._atexit_registry.run)
self._watchdog_thread.start()

@synchronized
def no_setup(self):
Expand Down
26 changes: 17 additions & 9 deletions crochet/_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@

class Watchdog(threading.Thread):
"""
Watch a given thread, call a list of functions when that thread exits.
Poll for a shutdown condition to be True, call a shutdown function
when it becomes True.
"""

def __init__(self, canary, shutdown_function):
def __init__(self, shutdown_condition, shutdown_function):
threading.Thread.__init__(self, name="CrochetShutdownWatchdog")
self._canary = canary
self._shutdown_condition = shutdown_condition
self._shutdown_function = shutdown_function

def run(self):
while self._canary.is_alive():
while not self._shutdown_condition():
time.sleep(0.1)
self._shutdown_function()

Expand Down Expand Up @@ -53,8 +54,15 @@ def run(self):
log.err()


# This is... fragile. Not sure how else to do it though.
_registry = FunctionRegistry()
_watchdog = Watchdog([t for t in threading.enumerate()
if t.name == "MainThread"][0], _registry.run)
register = _registry.register
def default_shutdown_condition():
main_thread = [t for t in threading.enumerate()
if t.name == "MainThread"][0]

def shutdown_condition():
return not main_thread.is_alive()

return shutdown_condition


registry = FunctionRegistry()
register = registry.register
5 changes: 2 additions & 3 deletions crochet/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ def test_no_sideeffects(self):
call any methods on the objects it is created with.
"""
c = EventLoop(lambda: None, lambda f, g: 1/0, lambda *args: 1/0,
watchdog_thread=object(), reapAllProcesses=lambda: 1/0)
reapAllProcesses=lambda: 1/0)
del c

def test_eventloop_api(self):
Expand All @@ -1005,10 +1005,9 @@ def test_eventloop_api(self):
self.assertEqual(_main.run_in_reactor, run_in_reactor)
self.assertEqual(_main.wait_for_reactor, wait_for_reactor)
self.assertEqual(_main.wait_for, wait_for)
self.assertIdentical(_main._atexit_register, _shutdown.register)
self.assertIdentical(_main._atexit_registry, _shutdown.registry)
self.assertIdentical(_main._startLoggingWithObserver,
startLoggingWithObserver)
self.assertIdentical(_main._watchdog_thread, _shutdown._watchdog)

def test_eventloop_api_reactor(self):
"""
Expand Down
57 changes: 33 additions & 24 deletions crochet/tests/test_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ def start(self):
self.started = True


class FakeFunctionRegistry(object):

def __init__(self):
self.atexit = []
self.has_run = False

def register(self, func, *args):
self.atexit.append((func, args))

def run(self):
self.has_run = True


class SetupTests(TestCase):
"""
Tests for setup().
Expand All @@ -69,7 +82,7 @@ def test_first_runs_reactor(self):
With it first call, setup() runs the reactor in a thread.
"""
reactor = FakeReactor()
EventLoop(lambda: reactor, lambda f, *g: None).setup()
EventLoop(lambda: reactor, FakeFunctionRegistry()).setup()
reactor.started.wait(5)
self.assertNotEqual(reactor.thread_id, None)
self.assertNotEqual(reactor.thread_id, threading.current_thread().ident)
Expand All @@ -80,7 +93,7 @@ def test_second_does_nothing(self):
The second call to setup() does nothing.
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
s.setup()
reactor.started.wait(5)
Expand All @@ -91,18 +104,18 @@ def test_stop_on_exit(self):
setup() registers an exit handler that stops the reactor, and an exit
handler that logs stashed EventualResults.
"""
atexit = []
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *args: atexit.append((f, args)))
registry = FakeFunctionRegistry()
s = EventLoop(lambda: reactor, registry)
s.setup()
self.assertEqual(len(atexit), 2)
self.assertEqual(len(registry.atexit), 2)
self.assertFalse(reactor.stopping)
f, args = atexit[0]
f, args = registry.atexit[0]
self.assertEqual(f, reactor.callFromThread)
self.assertEqual(args, (reactor.stop,))
f(*args)
self.assertTrue(reactor.stopping)
f, args = atexit[1]
f, args = registry.atexit[1]
self.assertEqual(f, _store.log_errors)
self.assertEqual(args, ())
f(*args) # make sure it doesn't throw an exception
Expand Down Expand Up @@ -132,7 +145,7 @@ def fakeStartLoggingWithObserver(observer, setStdout=1):
logging.append(observer)

reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *g: None,
loop = EventLoop(lambda: reactor, FakeFunctionRegistry(),
fakeStartLoggingWithObserver)
loop.setup()
self.assertTrue(logging)
Expand All @@ -144,7 +157,7 @@ def test_stop_logging_on_exit(self):
"""
observers = []
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *arg: None,
s = EventLoop(lambda: reactor, FakeFunctionRegistry(),
lambda observer, setStdout=1: observers.append(observer))
s.setup()
self.addCleanup(observers[0].stop)
Expand All @@ -160,7 +173,7 @@ def fakeStartLoggingWithObserver(observer, setStdout=1):
self.addCleanup(observer.stop)
original = warnings.showwarning
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *g: None,
loop = EventLoop(lambda: reactor, FakeFunctionRegistry(),
fakeStartLoggingWithObserver)
loop.setup()
self.assertIdentical(warnings.showwarning, original)
Expand All @@ -169,39 +182,35 @@ def test_start_watchdog_thread(self):
"""
setup() starts the shutdown watchdog thread.
"""
thread = FakeThread()
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda *args: None,
watchdog_thread=thread)
loop = EventLoop(lambda: reactor, FakeFunctionRegistry())
loop.setup()
self.assertTrue(thread.started)
self.assertTrue(loop._watchdog_thread.is_alive)

def test_no_setup(self):
"""
If called first, no_setup() makes subsequent calls to setup() do
nothing.
"""
observers = []
atexit = []
thread = FakeThread()
registry = FakeFunctionRegistry()
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *arg: atexit.append(f),
lambda observer, *a, **kw: observers.append(observer),
watchdog_thread=thread)
loop = EventLoop(lambda: reactor,
registry,
lambda observer, *a, **kw: observers.append(observer))

loop.no_setup()
loop.setup()
self.assertFalse(observers)
self.assertFalse(atexit)
self.assertFalse(registry.atexit)
self.assertFalse(reactor.runs)
self.assertFalse(thread.started)

def test_no_setup_after_setup(self):
"""
If called after setup(), no_setup() throws an exception.
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
self.assertRaises(RuntimeError, s.no_setup)

Expand All @@ -211,7 +220,7 @@ def test_setup_registry_shutdown(self):
setup().
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
self.assertEqual(reactor.events,
[("before", "shutdown", s._registry.stop)])
Expand Down Expand Up @@ -240,7 +249,7 @@ def test_posix(self):
"""
reactor = FakeReactor()
reaps = []
s = EventLoop(lambda: reactor, lambda f, *g: None,
s = EventLoop(lambda: reactor, FakeFunctionRegistry(),
reapAllProcesses=lambda: reaps.append(1))
s.setup()
reactor.advance(0.1)
Expand Down
21 changes: 11 additions & 10 deletions crochet/tests/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

from twisted.trial.unittest import TestCase

from crochet._shutdown import (Watchdog, FunctionRegistry, _watchdog, register,
_registry)
from crochet._shutdown import Watchdog, FunctionRegistry, registry
from ..tests import crochet_directory


Expand All @@ -27,8 +26,8 @@ def test_shutdown(self):
program = """\
import threading, sys
from crochet._shutdown import register, _watchdog
_watchdog.start()
from crochet._shutdown import registry, Watchdog, default_shutdown_condition
Watchdog(default_shutdown_condition(), registry.run).start()
end = False
Expand All @@ -46,7 +45,7 @@ def stop(x, y):
end = True
threading.Thread(target=thread).start()
register(stop, 1, y=2)
registry.register(stop, 1, y=2)
sys.exit()
"""
Expand All @@ -69,7 +68,12 @@ class FakeThread:
def is_alive(self):
return alive

w = Watchdog(FakeThread(), lambda: done.append(True))
thread = FakeThread()

def condition():
return not thread.is_alive()

w = Watchdog(condition, lambda: done.append(True))
w.start()
time.sleep(0.2)
self.assertTrue(w.is_alive())
Expand All @@ -84,10 +88,7 @@ def test_api(self):
The module exposes a shutdown thread that will call a global
registry's run(), and a register function tied to the global registry.
"""
self.assertIsInstance(_registry, FunctionRegistry)
self.assertEqual(register, _registry.register)
self.assertIsInstance(_watchdog, Watchdog)
self.assertEqual(_watchdog._shutdown_function, _registry.run)
self.assertIsInstance(registry, FunctionRegistry)


class FunctionRegistryTests(TestCase):
Expand Down

0 comments on commit 97f18b3

Please sign in to comment.