diff --git a/tests/test_concurrent_executor.py b/tests/test_concurrent_executor.py index 667e6c3eaf..138bd84925 100644 --- a/tests/test_concurrent_executor.py +++ b/tests/test_concurrent_executor.py @@ -7,29 +7,35 @@ import pytest DATA = [1, 2, 3, 4] +RETRIES = 5 def _echo(x, please_fail=False): if please_fail or x == 'please fail': - raise IOError('Fake I/O error, cause you asked') + raise IOError('Fake I/O error, because you asked') return x -def run_tests_for_runner(runner, sleep_amount=0.5): +def run_executor_tests(executor, sleep_time=1): # get_ready: mostly pending - futures = runner.map(_echo, DATA) + futures = executor.map(_echo, DATA) assert len(futures) == len(DATA) - completed, failed, pending = runner.get_ready(futures) + completed, failed, pending = executor.get_ready(futures) assert len(failed) == 0 assert len(completed) + len(pending) == len(DATA) # get_ready: processed + failure data = ['please fail'] + DATA - futures = runner.map(_echo, data) + futures = executor.map(_echo, data) assert len(futures) == len(data) - sleep(sleep_amount) # give it "enough" time to finish - completed, failed, pending = runner.get_ready(futures) - if sleep_amount: + + for _ in range(RETRIES): + sleep(sleep_time) # give it "enough" time to finish + completed, failed, pending = executor.get_ready(futures) + if len(pending) == 0: + break + + if sleep_time: assert len(failed) == 1 else: assert len(failed) in [0, 1] @@ -37,46 +43,46 @@ def run_tests_for_runner(runner, sleep_amount=0.5): assert len(completed) + len(pending) + len(failed) == len(data) # test results - futures = runner.map(_echo, DATA) - results = runner.results(futures) + futures = executor.map(_echo, DATA) + results = executor.results(futures) assert len(results) == len(DATA) assert set(results) == set(DATA) # Test failure pass-through - future = runner.submit(_echo, "", please_fail=True) + future = executor.submit(_echo, "", please_fail=True) - for ff in runner.as_completed([future]): + for ff in executor.as_completed([future]): with pytest.raises(IOError): - runner.result(ff) + executor.result(ff) # Next completed with data - future = runner.submit(_echo, 'tt') + future = executor.submit(_echo, 'tt') futures = [future] - result, futures = runner.next_completed(futures, 'default') + result, futures = executor.next_completed(futures, 'default') assert len(futures) == 0 print(type(result), result) # Next completed with empty list - result, futures = runner.next_completed([], 'default') + result, futures = executor.next_completed([], 'default') assert result == 'default' assert len(futures) == 0 - runner.release(future) + executor.release(future) def test_concurrent_executor(): - runner = get_executor(None, 2) - assert str(runner).find('Multi') >= 0 - run_tests_for_runner(runner, 0.3) + executor = get_executor(None, 2) + assert 'Multiproc' in str(executor) + run_executor_tests(executor) - runner = get_executor(None, 2, use_cloud_pickle=False) - assert str(runner).find('Multi') >= 0 - run_tests_for_runner(runner, 0.3) + executor = get_executor(None, 2, use_cloud_pickle=False) + assert 'Multiproc' in str(executor) + run_executor_tests(executor) def test_fallback_executor(): - runner = get_executor(None, None) - assert str(runner).find('Serial') >= 0 + executor = get_executor(None, None) + assert 'Serial' in str(executor) - run_tests_for_runner(runner, 0) + run_executor_tests(executor, sleep_time=0)