diff --git a/ActionTree/__init__.py b/ActionTree/__init__.py index be877d9..4fab8bd 100644 --- a/ActionTree/__init__.py +++ b/ActionTree/__init__.py @@ -929,9 +929,7 @@ def _read_to_events(self, action_id, pipe_r): self.events.put((PRINTED, action_id, (datetime.datetime.now(), data))) def _check_picklability(self, stuff): - # This is a way to fail fast if we see a non-picklable object - # because ProcessPoolExecutor freezes forever if we try to transfer - # a non-picklable object through its queues + # Fail fast: don't put a non-picklable object in a multiprocessing.Queue pickle.loads(pickle.dumps(stuff)) def _handle_next_event(self): diff --git a/ActionTree/tests/__init__.py b/ActionTree/tests/__init__.py index 2faeeb6..be33a57 100644 --- a/ActionTree/tests/__init__.py +++ b/ActionTree/tests/__init__.py @@ -5,6 +5,7 @@ from __future__ import division, absolute_import, print_function import ctypes +import multiprocessing import subprocess import sys import tempfile @@ -22,10 +23,31 @@ libc = ctypes.CDLL(None) +# No multiprocessing.Barrier in Python 2, so we build our own +class Barrier: + def __init__(self, expected_processes): + self.__expected_processes = expected_processes + self.__waiting_processes = multiprocessing.Value("i", 0) + self.__barrier = multiprocessing.Semaphore(0) + + def wait(self): + with self.__waiting_processes.get_lock(): + self.__waiting_processes.value += 1 + if self.__waiting_processes.value == self.__expected_processes: + self.__barrier.release() + self.__barrier.acquire() + self.__barrier.release() + + +# multiprocessing.Semaphores are not picklable, so we pickle their ids and retrieve them from this dict. +# We don't deallocate them so we do leak some resources, but we accept that in unit tests. +barriers = {} + + class TestAction(Action): def __init__( self, label, - exception, return_value, delay, + exception, return_value, barrier_id, events_file, end_event, print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout, accept_failed_dependencies, @@ -33,7 +55,7 @@ def __init__( super(TestAction, self).__init__(label=label, accept_failed_dependencies=accept_failed_dependencies) self.__exception = exception self.__return_value = return_value - self.__delay = delay + self.__barrier_id = barrier_id self.__events_file = events_file self.__end_event = end_event self.__print_on_stdout = print_on_stdout @@ -49,11 +71,8 @@ def do_execute(self, dependency_statuses): assert self.accept_failed_dependencies or dependency_statuses[d].status == SUCCESSFUL with open(self.__events_file, "a") as f: f.write("{}\n".format(str(self.label).lower())) - if self.__delay: - delay = self.__delay - if os.environ.get("TRAVIS") == "true": - delay *= 10 - time.sleep(delay) + if self.__barrier_id: + barriers[self.__barrier_id].wait() if self.__end_event: with open(self.__events_file, "a") as f: f.write("{}\n".format(str(self.label).upper())) @@ -88,7 +107,7 @@ def tearDown(self): def _action( self, label, - exception=None, return_value=None, delay=None, + exception=None, return_value=None, barrier=None, end_event=False, print_on_stdout=None, print_on_stderr=None, puts_on_stdout=None, echo_on_stdout=None, accept_failed_dependencies=False, @@ -96,13 +115,19 @@ def _action( ): return TestAction( label, - exception, return_value, delay, + exception, return_value, barrier, self.__events_file, end_event, print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout, accept_failed_dependencies, *args, **kwds ) + def _barrier(self, n): + barrier = Barrier(n) + barrier_id = id(barrier) + barriers[barrier_id] = barrier + return barrier_id + def assertEventsEqual(self, groups): with open(self.__events_file) as f: events = [line.strip() for line in f.readlines()] diff --git a/ActionTree/tests/multi_threaded_execution.py b/ActionTree/tests/multi_threaded_execution.py index 4964e14..7a1c381 100644 --- a/ActionTree/tests/multi_threaded_execution.py +++ b/ActionTree/tests/multi_threaded_execution.py @@ -16,9 +16,10 @@ def test_many_dependencies(self): # b c d a = self._action("a") - b = self._action("b", delay=0.1, end_event=True) - c = self._action("c", delay=0.1, end_event=True) - d = self._action("d", delay=0.1, end_event=True) + barrier = self._barrier(3) + b = self._action("b", barrier=barrier, end_event=True) + c = self._action("c", barrier=barrier, end_event=True) + d = self._action("d", barrier=barrier, end_event=True) a.add_dependency(b) a.add_dependency(c) a.add_dependency(d) @@ -34,9 +35,10 @@ def test_many_dependencies_with_default_cpu_cores(self): # b c d a = self._action("a") - b = self._action("b", delay=0.1, end_event=True) - c = self._action("c", delay=0.1, end_event=True) - d = self._action("d", delay=0.1, end_event=True) + barrier = self._barrier(3) + b = self._action("b", barrier=barrier, end_event=True) + c = self._action("c", barrier=barrier, end_event=True) + d = self._action("d", barrier=barrier, end_event=True) a.add_dependency(b) a.add_dependency(c) a.add_dependency(d) @@ -82,8 +84,9 @@ def test_diamond_dependencies(self): # d a = self._action("a") - b = self._action("b", delay=0.1, end_event=True) - c = self._action("c", delay=0.1, end_event=True) + barrier = self._barrier(2) + b = self._action("b", barrier=barrier, end_event=True) + c = self._action("c", barrier=barrier, end_event=True) d = self._action("d", end_event=True) a.add_dependency(b) a.add_dependency(c) @@ -120,10 +123,12 @@ def test_two_deep_branches(self): # d e a = self._action("a") - b = self._action("b", delay=0.1, end_event=True) - c = self._action("c", delay=0.1, end_event=True) - d = self._action("d", delay=0.1, end_event=True) - e = self._action("e", delay=0.1, end_event=True) + barrier1 = self._barrier(2) + b = self._action("b", barrier=barrier1, end_event=True) + c = self._action("c", barrier=barrier1, end_event=True) + barrier2 = self._barrier(2) + d = self._action("d", barrier=barrier2, end_event=True) + e = self._action("e", barrier=barrier2, end_event=True) a.add_dependency(b) a.add_dependency(c) b.add_dependency(d)