From 119d41580461dbed25ca6d3a88b06c7cb765febc Mon Sep 17 00:00:00 2001 From: Andrey Zakharevich Date: Sun, 15 Sep 2019 14:26:33 +0300 Subject: [PATCH 1/5] Add a test for deadlock after sequence worker timeout --- tests/keras/utils/data_utils_test.py | 43 ++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/keras/utils/data_utils_test.py b/tests/keras/utils/data_utils_test.py index ff87efe76575..40a4d5f6cbd2 100644 --- a/tests/keras/utils/data_utils_test.py +++ b/tests/keras/utils/data_utils_test.py @@ -5,6 +5,7 @@ import sys import tarfile import threading +import signal import shutil import zipfile from itertools import cycle @@ -211,6 +212,25 @@ def on_epoch_end(self): pass +class SlowSequence(Sequence): + def __init__(self, shape, value=1.0): + self.shape = shape + self.inner = value + self.wait = True + + def __getitem__(self, item): + if self.wait: + self.wait = False + time.sleep(40) + return np.ones(self.shape, dtype=np.uint32) * item * self.inner + + def __len__(self): + return 10 + + def on_epoch_end(self): + pass + + @threadsafe_generator def create_generator_from_sequence_threads(ds): for i in cycle(range(len(ds))): @@ -335,6 +355,29 @@ def test_ordered_enqueuer_fail_threads(): next(gen_output) +def test_ordered_enqueuer_timeout_threads(): + enqueuer = OrderedEnqueuer(SlowSequence([3, 10, 10, 3]), + use_multiprocessing=False) + + def handler(signum, frame): + raise TimeoutError('Sequence deadlocked') + + old = signal.signal(signal.SIGALRM, handler) + signal.setitimer(signal.ITIMER_REAL, 40) + + enqueuer.start(5, 10) + gen_output = enqueuer.get() + for epoch_num in range(2): + acc = [] + for i in range(10): + acc.append(next(gen_output)[0, 0, 0, 0]) + assert acc == list(range(10)), ('Order was not keep in GeneratorEnqueuer ' + 'with processes') + enqueuer.stop() + signal.setitimer(signal.ITIMER_REAL, 0) + signal.signal(signal.SIGALRM, old) + + @use_spawn def test_on_epoch_end_processes(): enqueuer = OrderedEnqueuer(DummySequence([3, 10, 10, 3]), From 3bdb1f80bfd2ce84f801fb83aee360deb5472efa Mon Sep 17 00:00:00 2001 From: Andrey Zakharevich Date: Sun, 15 Sep 2019 14:28:34 +0300 Subject: [PATCH 2/5] Call task_done even if the task timeouted --- keras/utils/data_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/keras/utils/data_utils.py b/keras/utils/data_utils.py index 179e8861f24a..238daaaa7502 100644 --- a/keras/utils/data_utils.py +++ b/keras/utils/data_utils.py @@ -608,7 +608,6 @@ def get(self): try: future = self.queue.get(block=True) inputs = future.get(timeout=30) - self.queue.task_done() except mp.TimeoutError: idx = future.idx warnings.warn( @@ -616,6 +615,9 @@ def get(self): ' It could be because a worker has died.'.format(idx), UserWarning) inputs = self.sequence[idx] + finally: + self.queue.task_done() + if inputs is not None: yield inputs except Exception: From eefb18cf2cbffb13bc9c270441bac1acb3294aae Mon Sep 17 00:00:00 2001 From: Andrey Zakharevich Date: Sun, 15 Sep 2019 14:49:12 +0300 Subject: [PATCH 3/5] catch dead worker warning --- tests/keras/utils/data_utils_test.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/keras/utils/data_utils_test.py b/tests/keras/utils/data_utils_test.py index 40a4d5f6cbd2..2704ce36bdf8 100644 --- a/tests/keras/utils/data_utils_test.py +++ b/tests/keras/utils/data_utils_test.py @@ -364,16 +364,18 @@ def handler(signum, frame): old = signal.signal(signal.SIGALRM, handler) signal.setitimer(signal.ITIMER_REAL, 40) - - enqueuer.start(5, 10) - gen_output = enqueuer.get() - for epoch_num in range(2): - acc = [] - for i in range(10): - acc.append(next(gen_output)[0, 0, 0, 0]) - assert acc == list(range(10)), ('Order was not keep in GeneratorEnqueuer ' - 'with processes') - enqueuer.stop() + with pytest.warns(UserWarning) as record: + enqueuer.start(5, 10) + gen_output = enqueuer.get() + for epoch_num in range(2): + acc = [] + for i in range(10): + acc.append(next(gen_output)[0, 0, 0, 0]) + assert acc == list(range(10)), 'Order was not keep in OrderedEnqueuer with threads' + enqueuer.stop() + assert len(record) == 1 + assert str(record[0].message) == 'The input 0 could not be retrieved. ' \ + 'It could be because a worker has died.' signal.setitimer(signal.ITIMER_REAL, 0) signal.signal(signal.SIGALRM, old) From 5936b386f749d27d33eaba341dce2848eda9a826 Mon Sep 17 00:00:00 2001 From: Andrey Zakharevich Date: Sun, 15 Sep 2019 15:14:23 +0300 Subject: [PATCH 4/5] fix line length --- tests/keras/utils/data_utils_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/keras/utils/data_utils_test.py b/tests/keras/utils/data_utils_test.py index 2704ce36bdf8..8ef9461859e8 100644 --- a/tests/keras/utils/data_utils_test.py +++ b/tests/keras/utils/data_utils_test.py @@ -371,7 +371,8 @@ def handler(signum, frame): acc = [] for i in range(10): acc.append(next(gen_output)[0, 0, 0, 0]) - assert acc == list(range(10)), 'Order was not keep in OrderedEnqueuer with threads' + assert acc == list(range(10)), 'Order was not keep in ' \ + 'OrderedEnqueuer with threads' enqueuer.stop() assert len(record) == 1 assert str(record[0].message) == 'The input 0 could not be retrieved. ' \ From fa27220fbd77f565c5c08b9848e9ae53faa106e8 Mon Sep 17 00:00:00 2001 From: Andrey Zakharevich Date: Sun, 15 Sep 2019 15:47:38 +0300 Subject: [PATCH 5/5] Increase deadlock detection timeout to prevent flakiness --- tests/keras/utils/data_utils_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/keras/utils/data_utils_test.py b/tests/keras/utils/data_utils_test.py index 8ef9461859e8..5dea7fe644b9 100644 --- a/tests/keras/utils/data_utils_test.py +++ b/tests/keras/utils/data_utils_test.py @@ -363,7 +363,7 @@ def handler(signum, frame): raise TimeoutError('Sequence deadlocked') old = signal.signal(signal.SIGALRM, handler) - signal.setitimer(signal.ITIMER_REAL, 40) + signal.setitimer(signal.ITIMER_REAL, 60) with pytest.warns(UserWarning) as record: enqueuer.start(5, 10) gen_output = enqueuer.get()