Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shutdown_condition to setup #81

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions crochet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
# waitpid() is only necessary on POSIX:
reapAllProcesses = lambda: None

from ._shutdown import _watchdog, register
from ._shutdown import registry, register
from ._eventloop import (EventualResult, TimeoutError, EventLoop, _store,
ReactorStopped)
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions


def _importReactor():
from twisted.internet import reactor
return reactor
_main = EventLoop(_importReactor, register, startLoggingWithObserver,
_watchdog, reapAllProcesses)
_main = EventLoop(_importReactor, registry, startLoggingWithObserver,
reapAllProcesses)
setup = _main.setup
no_setup = _main.no_setup
run_in_reactor = _main.run_in_reactor
Expand Down
26 changes: 15 additions & 11 deletions crochet/_eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from ._util import synchronized
from ._resultstore import ResultStore
from ._shutdown import Watchdog, default_shutdown_condition

_store = ResultStore()

Expand Down Expand Up @@ -305,25 +306,23 @@ class EventLoop(object):
"""
Initialization infrastructure for running a reactor in a thread.
"""
def __init__(self, reactorFactory, atexit_register,
def __init__(self, reactorFactory, atexit_registry,
startLoggingWithObserver=None,
watchdog_thread=None,
reapAllProcesses=None):
"""
reactorFactory: Zero-argument callable that returns a reactor.
atexit_register: atexit.register, or look-alike.
startLoggingWithObserver: Either None, or
twisted.python.log.startLoggingWithObserver or lookalike.
watchdog_thread: crochet._shutdown.Watchdog instance, or None.
reapAllProcesses: twisted.internet.process.reapAllProcesses or
lookalike.
"""
self._reactorFactory = reactorFactory
self._atexit_register = atexit_register
self._atexit_registry = atexit_registry
self._startLoggingWithObserver = startLoggingWithObserver
self._started = False
self._lock = threading.Lock()
self._watchdog_thread = watchdog_thread
self._watchdog_thread = None
self._reapAllProcesses = reapAllProcesses

def _startReapingProcesses(self):
Expand All @@ -347,7 +346,7 @@ def _common_setup(self):
"before", "shutdown", self._registry.stop)

@synchronized
def setup(self):
def setup(self, shutdown_condition=None):
"""
Initialize the crochet library.

Expand Down Expand Up @@ -382,11 +381,16 @@ def start():
target=lambda: self._reactor.run(installSignalHandlers=False),
name="CrochetReactor")
t.start()
self._atexit_register(self._reactor.callFromThread,
self._reactor.stop)
self._atexit_register(_store.log_errors)
if self._watchdog_thread is not None:
self._watchdog_thread.start()
self._atexit_registry.register(
self._reactor.callFromThread,
self._reactor.stop)
self._atexit_registry.register(_store.log_errors)
if shutdown_condition is None:
shutdown_condition = default_shutdown_condition()
self._watchdog_thread = Watchdog(
shutdown_condition,
self._atexit_registry.run)
self._watchdog_thread.start()

@synchronized
def no_setup(self):
Expand Down
26 changes: 17 additions & 9 deletions crochet/_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@

class Watchdog(threading.Thread):
"""
Watch a given thread, call a list of functions when that thread exits.
Poll for a shutdown condition to be True, call a shutdown function
when it becomes True.
"""

def __init__(self, canary, shutdown_function):
def __init__(self, shutdown_condition, shutdown_function):
threading.Thread.__init__(self, name="CrochetShutdownWatchdog")
self._canary = canary
self._shutdown_condition = shutdown_condition
self._shutdown_function = shutdown_function

def run(self):
while self._canary.is_alive():
while not self._shutdown_condition():
time.sleep(0.1)
self._shutdown_function()

Expand Down Expand Up @@ -53,8 +54,15 @@ def run(self):
log.err()


# This is... fragile. Not sure how else to do it though.
_registry = FunctionRegistry()
_watchdog = Watchdog([t for t in threading.enumerate()
if t.name == "MainThread"][0], _registry.run)
register = _registry.register
def default_shutdown_condition():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retrospect it's not clear to me why this is module-level state at all. Seems like all of the state could live inside EventLoop, and then there's no need to pass a FunctionRegistry instance to EventLoop.__init__, it would just create one.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be forgetting why this was necessary though, so if you think it's too big of a change for you (or this branch) it's not necessary to do this.

main_thread = [t for t in threading.enumerate()
if t.name == "MainThread"][0]

def shutdown_condition():
return not main_thread.is_alive()

return shutdown_condition


registry = FunctionRegistry()
register = registry.register
5 changes: 2 additions & 3 deletions crochet/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ def test_no_sideeffects(self):
call any methods on the objects it is created with.
"""
c = EventLoop(lambda: None, lambda f, g: 1/0, lambda *args: 1/0,
watchdog_thread=object(), reapAllProcesses=lambda: 1/0)
reapAllProcesses=lambda: 1/0)
del c

def test_eventloop_api(self):
Expand All @@ -1005,10 +1005,9 @@ def test_eventloop_api(self):
self.assertEqual(_main.run_in_reactor, run_in_reactor)
self.assertEqual(_main.wait_for_reactor, wait_for_reactor)
self.assertEqual(_main.wait_for, wait_for)
self.assertIdentical(_main._atexit_register, _shutdown.register)
self.assertIdentical(_main._atexit_registry, _shutdown.registry)
self.assertIdentical(_main._startLoggingWithObserver,
startLoggingWithObserver)
self.assertIdentical(_main._watchdog_thread, _shutdown._watchdog)

def test_eventloop_api_reactor(self):
"""
Expand Down
57 changes: 33 additions & 24 deletions crochet/tests/test_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ def start(self):
self.started = True


class FakeFunctionRegistry(object):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a fake, you can use the real class.


def __init__(self):
self.atexit = []
self.has_run = False

def register(self, func, *args):
self.atexit.append((func, args))

def run(self):
self.has_run = True


class SetupTests(TestCase):
"""
Tests for setup().
Expand All @@ -69,7 +82,7 @@ def test_first_runs_reactor(self):
With it first call, setup() runs the reactor in a thread.
"""
reactor = FakeReactor()
EventLoop(lambda: reactor, lambda f, *g: None).setup()
EventLoop(lambda: reactor, FakeFunctionRegistry()).setup()
reactor.started.wait(5)
self.assertNotEqual(reactor.thread_id, None)
self.assertNotEqual(reactor.thread_id, threading.current_thread().ident)
Expand All @@ -80,7 +93,7 @@ def test_second_does_nothing(self):
The second call to setup() does nothing.
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
s.setup()
reactor.started.wait(5)
Expand All @@ -91,18 +104,18 @@ def test_stop_on_exit(self):
setup() registers an exit handler that stops the reactor, and an exit
handler that logs stashed EventualResults.
"""
atexit = []
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *args: atexit.append((f, args)))
registry = FakeFunctionRegistry()
s = EventLoop(lambda: reactor, registry)
s.setup()
self.assertEqual(len(atexit), 2)
self.assertEqual(len(registry.atexit), 2)
self.assertFalse(reactor.stopping)
f, args = atexit[0]
f, args = registry.atexit[0]
self.assertEqual(f, reactor.callFromThread)
self.assertEqual(args, (reactor.stop,))
f(*args)
self.assertTrue(reactor.stopping)
f, args = atexit[1]
f, args = registry.atexit[1]
self.assertEqual(f, _store.log_errors)
self.assertEqual(args, ())
f(*args) # make sure it doesn't throw an exception
Expand Down Expand Up @@ -132,7 +145,7 @@ def fakeStartLoggingWithObserver(observer, setStdout=1):
logging.append(observer)

reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *g: None,
loop = EventLoop(lambda: reactor, FakeFunctionRegistry(),
fakeStartLoggingWithObserver)
loop.setup()
self.assertTrue(logging)
Expand All @@ -144,7 +157,7 @@ def test_stop_logging_on_exit(self):
"""
observers = []
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *arg: None,
s = EventLoop(lambda: reactor, FakeFunctionRegistry(),
lambda observer, setStdout=1: observers.append(observer))
s.setup()
self.addCleanup(observers[0].stop)
Expand All @@ -160,7 +173,7 @@ def fakeStartLoggingWithObserver(observer, setStdout=1):
self.addCleanup(observer.stop)
original = warnings.showwarning
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *g: None,
loop = EventLoop(lambda: reactor, FakeFunctionRegistry(),
fakeStartLoggingWithObserver)
loop.setup()
self.assertIdentical(warnings.showwarning, original)
Expand All @@ -169,39 +182,35 @@ def test_start_watchdog_thread(self):
"""
setup() starts the shutdown watchdog thread.
"""
thread = FakeThread()
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda *args: None,
watchdog_thread=thread)
loop = EventLoop(lambda: reactor, FakeFunctionRegistry())
loop.setup()
self.assertTrue(thread.started)
self.assertTrue(loop._watchdog_thread.is_alive)

def test_no_setup(self):
"""
If called first, no_setup() makes subsequent calls to setup() do
nothing.
"""
observers = []
atexit = []
thread = FakeThread()
registry = FakeFunctionRegistry()
reactor = FakeReactor()
loop = EventLoop(lambda: reactor, lambda f, *arg: atexit.append(f),
lambda observer, *a, **kw: observers.append(observer),
watchdog_thread=thread)
loop = EventLoop(lambda: reactor,
registry,
lambda observer, *a, **kw: observers.append(observer))

loop.no_setup()
loop.setup()
self.assertFalse(observers)
self.assertFalse(atexit)
self.assertFalse(registry.atexit)
self.assertFalse(reactor.runs)
self.assertFalse(thread.started)

def test_no_setup_after_setup(self):
"""
If called after setup(), no_setup() throws an exception.
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
self.assertRaises(RuntimeError, s.no_setup)

Expand All @@ -211,7 +220,7 @@ def test_setup_registry_shutdown(self):
setup().
"""
reactor = FakeReactor()
s = EventLoop(lambda: reactor, lambda f, *g: None)
s = EventLoop(lambda: reactor, FakeFunctionRegistry())
s.setup()
self.assertEqual(reactor.events,
[("before", "shutdown", s._registry.stop)])
Expand Down Expand Up @@ -240,7 +249,7 @@ def test_posix(self):
"""
reactor = FakeReactor()
reaps = []
s = EventLoop(lambda: reactor, lambda f, *g: None,
s = EventLoop(lambda: reactor, FakeFunctionRegistry(),
reapAllProcesses=lambda: reaps.append(1))
s.setup()
reactor.advance(0.1)
Expand Down
21 changes: 11 additions & 10 deletions crochet/tests/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

from twisted.trial.unittest import TestCase

from crochet._shutdown import (Watchdog, FunctionRegistry, _watchdog, register,
_registry)
from crochet._shutdown import Watchdog, FunctionRegistry, registry
from ..tests import crochet_directory


Expand All @@ -27,8 +26,8 @@ def test_shutdown(self):
program = """\
import threading, sys

from crochet._shutdown import register, _watchdog
_watchdog.start()
from crochet._shutdown import registry, Watchdog, default_shutdown_condition
Watchdog(default_shutdown_condition(), registry.run).start()

end = False

Expand All @@ -46,7 +45,7 @@ def stop(x, y):
end = True

threading.Thread(target=thread).start()
register(stop, 1, y=2)
registry.register(stop, 1, y=2)

sys.exit()
"""
Expand All @@ -69,7 +68,12 @@ class FakeThread:
def is_alive(self):
return alive

w = Watchdog(FakeThread(), lambda: done.append(True))
thread = FakeThread()

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this test could be simplified to not even have a thread, and just have a flag that gets sets to True... so correspondingly the docstring should change.

def condition():
return not thread.is_alive()

w = Watchdog(condition, lambda: done.append(True))
w.start()
time.sleep(0.2)
self.assertTrue(w.is_alive())
Expand All @@ -84,10 +88,7 @@ def test_api(self):
The module exposes a shutdown thread that will call a global
registry's run(), and a register function tied to the global registry.
"""
self.assertIsInstance(_registry, FunctionRegistry)
self.assertEqual(register, _registry.register)
self.assertIsInstance(_watchdog, Watchdog)
self.assertEqual(_watchdog._shutdown_function, _registry.run)
self.assertIsInstance(registry, FunctionRegistry)


class FunctionRegistryTests(TestCase):
Expand Down