From 543043fc895674ebeec947fd051b15078f5eb48c Mon Sep 17 00:00:00 2001 From: Kirill Kouzoubov Date: Tue, 30 May 2017 16:35:34 +1000 Subject: [PATCH] More executor tests somewhat higher coverage for executor classes. --- tests/test_concurrent_executor.py | 52 +++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/tests/test_concurrent_executor.py b/tests/test_concurrent_executor.py index 6f555e395d..19ea0a13e6 100644 --- a/tests/test_concurrent_executor.py +++ b/tests/test_concurrent_executor.py @@ -3,28 +3,40 @@ """ from datacube.executor import get_executor +from time import sleep import pytest DATA = [1, 2, 3, 4] def _echo(x, please_fail=False): - if please_fail: + if please_fail or x == 'please fail': raise IOError('Fake I/O error, cause you asked') return x -def test_concurrent_executor(): - runner = get_executor(None, 2) - +def run_tests_for_runner(runner, sleep_amount=0.5): + # get_ready: mostly pending futures = runner.map(_echo, DATA) assert len(futures) == len(DATA) - completed, failed, pending = runner.get_ready(futures) - assert len(failed) == 0 assert len(completed) + len(pending) == len(DATA) + # get_ready: processed + failure + data = ['please fail'] + DATA + futures = runner.map(_echo, data) + assert len(futures) == len(data) + sleep(sleep_amount) # give it "enough" time to finish + completed, failed, pending = runner.get_ready(futures) + if sleep_amount: + assert len(failed) == 1 + else: + assert len(failed) in [0, 1] + + assert len(completed) + len(pending) + len(failed) == len(data) + + # test results futures = runner.map(_echo, DATA) results = runner.results(futures) @@ -37,3 +49,31 @@ def test_concurrent_executor(): for ff in runner.as_completed([future]): with pytest.raises(IOError): runner.result(ff) + + # Next completed with data + future = runner.submit(_echo, 'tt') + futures = [future] + result, futures = runner.next_completed(futures, 'default') + assert len(futures) == 0 + print(type(result), result) + + # Next completed with empty list + result, futures = runner.next_completed([], 'default') + assert result == 'default' + assert len(futures) == 0 + + runner.release(future) + + +def test_concurrent_executor(): + runner = get_executor(None, 2) + assert str(runner).find('Multi') >= 0 + + run_tests_for_runner(runner, 0.3) + + +def test_fallback_executor(): + runner = get_executor(None, None) + assert str(runner).find('Serial') >= 0 + + run_tests_for_runner(runner, 0)