Skip to content

Commit

Permalink
Don't use time.sleep in multi-threaded test code (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacquev6 committed Feb 9, 2018
1 parent 5e07c6a commit 18b67ab
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 24 deletions.
4 changes: 1 addition & 3 deletions ActionTree/__init__.py
Expand Up @@ -929,9 +929,7 @@ def _read_to_events(self, action_id, pipe_r):
self.events.put((PRINTED, action_id, (datetime.datetime.now(), data)))

def _check_picklability(self, stuff):
# This is a way to fail fast if we see a non-picklable object
# because ProcessPoolExecutor freezes forever if we try to transfer
# a non-picklable object through its queues
# Fail fast: don't put a non-picklable object in a multiprocessing.Queue
pickle.loads(pickle.dumps(stuff))

def _handle_next_event(self):
Expand Down
43 changes: 34 additions & 9 deletions ActionTree/tests/__init__.py
Expand Up @@ -5,6 +5,7 @@
from __future__ import division, absolute_import, print_function

import ctypes
import multiprocessing
import subprocess
import sys
import tempfile
Expand All @@ -22,18 +23,39 @@
libc = ctypes.CDLL(None)


# No multiprocessing.Barrier in Python 2, so we build our own
class Barrier:
def __init__(self, expected_processes):
self.__expected_processes = expected_processes
self.__waiting_processes = multiprocessing.Value("i", 0)
self.__barrier = multiprocessing.Semaphore(0)

def wait(self):
with self.__waiting_processes.get_lock():
self.__waiting_processes.value += 1
if self.__waiting_processes.value == self.__expected_processes:
self.__barrier.release()
self.__barrier.acquire()
self.__barrier.release()


# multiprocessing.Semaphores are not picklable, so we pickle their ids and retrieve them from this dict.
# We don't deallocate them so we do leak some resources, but we accept that in unit tests.
barriers = {}


class TestAction(Action):
def __init__(
self, label,
exception, return_value, delay,
exception, return_value, barrier_id,
events_file, end_event,
print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout,
accept_failed_dependencies,
):
super(TestAction, self).__init__(label=label, accept_failed_dependencies=accept_failed_dependencies)
self.__exception = exception
self.__return_value = return_value
self.__delay = delay
self.__barrier_id = barrier_id
self.__events_file = events_file
self.__end_event = end_event
self.__print_on_stdout = print_on_stdout
Expand All @@ -49,11 +71,8 @@ def do_execute(self, dependency_statuses):
assert self.accept_failed_dependencies or dependency_statuses[d].status == SUCCESSFUL
with open(self.__events_file, "a") as f:
f.write("{}\n".format(str(self.label).lower()))
if self.__delay:
delay = self.__delay
if os.environ.get("TRAVIS") == "true":
delay *= 10
time.sleep(delay)
if self.__barrier_id:
barriers[self.__barrier_id].wait()
if self.__end_event:
with open(self.__events_file, "a") as f:
f.write("{}\n".format(str(self.label).upper()))
Expand Down Expand Up @@ -88,21 +107,27 @@ def tearDown(self):

def _action(
self, label,
exception=None, return_value=None, delay=None,
exception=None, return_value=None, barrier=None,
end_event=False,
print_on_stdout=None, print_on_stderr=None, puts_on_stdout=None, echo_on_stdout=None,
accept_failed_dependencies=False,
*args, **kwds
):
return TestAction(
label,
exception, return_value, delay,
exception, return_value, barrier,
self.__events_file, end_event,
print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout,
accept_failed_dependencies,
*args, **kwds
)

def _barrier(self, n):
barrier = Barrier(n)
barrier_id = id(barrier)
barriers[barrier_id] = barrier
return barrier_id

def assertEventsEqual(self, groups):
with open(self.__events_file) as f:
events = [line.strip() for line in f.readlines()]
Expand Down
29 changes: 17 additions & 12 deletions ActionTree/tests/multi_threaded_execution.py
Expand Up @@ -16,9 +16,10 @@ def test_many_dependencies(self):
# b c d

a = self._action("a")
b = self._action("b", delay=0.1, end_event=True)
c = self._action("c", delay=0.1, end_event=True)
d = self._action("d", delay=0.1, end_event=True)
barrier = self._barrier(3)
b = self._action("b", barrier=barrier, end_event=True)
c = self._action("c", barrier=barrier, end_event=True)
d = self._action("d", barrier=barrier, end_event=True)
a.add_dependency(b)
a.add_dependency(c)
a.add_dependency(d)
Expand All @@ -34,9 +35,10 @@ def test_many_dependencies_with_default_cpu_cores(self):
# b c d

a = self._action("a")
b = self._action("b", delay=0.1, end_event=True)
c = self._action("c", delay=0.1, end_event=True)
d = self._action("d", delay=0.1, end_event=True)
barrier = self._barrier(3)
b = self._action("b", barrier=barrier, end_event=True)
c = self._action("c", barrier=barrier, end_event=True)
d = self._action("d", barrier=barrier, end_event=True)
a.add_dependency(b)
a.add_dependency(c)
a.add_dependency(d)
Expand Down Expand Up @@ -82,8 +84,9 @@ def test_diamond_dependencies(self):
# d

a = self._action("a")
b = self._action("b", delay=0.1, end_event=True)
c = self._action("c", delay=0.1, end_event=True)
barrier = self._barrier(2)
b = self._action("b", barrier=barrier, end_event=True)
c = self._action("c", barrier=barrier, end_event=True)
d = self._action("d", end_event=True)
a.add_dependency(b)
a.add_dependency(c)
Expand Down Expand Up @@ -120,10 +123,12 @@ def test_two_deep_branches(self):
# d e

a = self._action("a")
b = self._action("b", delay=0.1, end_event=True)
c = self._action("c", delay=0.1, end_event=True)
d = self._action("d", delay=0.1, end_event=True)
e = self._action("e", delay=0.1, end_event=True)
barrier1 = self._barrier(2)
b = self._action("b", barrier=barrier1, end_event=True)
c = self._action("c", barrier=barrier1, end_event=True)
barrier2 = self._barrier(2)
d = self._action("d", barrier=barrier2, end_event=True)
e = self._action("e", barrier=barrier2, end_event=True)
a.add_dependency(b)
a.add_dependency(c)
b.add_dependency(d)
Expand Down

0 comments on commit 18b67ab

Please sign in to comment.