Skip to content

Commit

Permalink
Make Multiprocessing Executor test a bit more robust to timing failures
Browse files Browse the repository at this point in the history
  • Loading branch information
omad committed Dec 12, 2017
1 parent 4eb85f1 commit 8412450
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions tests/test_concurrent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,82 @@
import pytest

DATA = [1, 2, 3, 4]
RETRIES = 5


def _echo(x, please_fail=False):
if please_fail or x == 'please fail':
raise IOError('Fake I/O error, cause you asked')
raise IOError('Fake I/O error, because you asked')
return x


def run_tests_for_runner(runner, sleep_amount=0.5):
def run_executor_tests(executor, sleep_time=1):
# get_ready: mostly pending
futures = runner.map(_echo, DATA)
futures = executor.map(_echo, DATA)
assert len(futures) == len(DATA)
completed, failed, pending = runner.get_ready(futures)
completed, failed, pending = executor.get_ready(futures)
assert len(failed) == 0
assert len(completed) + len(pending) == len(DATA)

# get_ready: processed + failure
data = ['please fail'] + DATA
futures = runner.map(_echo, data)
futures = executor.map(_echo, data)
assert len(futures) == len(data)
sleep(sleep_amount) # give it "enough" time to finish
completed, failed, pending = runner.get_ready(futures)
if sleep_amount:

for _ in range(RETRIES):
sleep(sleep_time) # give it "enough" time to finish
completed, failed, pending = executor.get_ready(futures)
if len(pending) == 0:
break

if sleep_time:
assert len(failed) == 1
else:
assert len(failed) in [0, 1]

assert len(completed) + len(pending) + len(failed) == len(data)

# test results
futures = runner.map(_echo, DATA)
results = runner.results(futures)
futures = executor.map(_echo, DATA)
results = executor.results(futures)

assert len(results) == len(DATA)
assert set(results) == set(DATA)

# Test failure pass-through
future = runner.submit(_echo, "", please_fail=True)
future = executor.submit(_echo, "", please_fail=True)

for ff in runner.as_completed([future]):
for ff in executor.as_completed([future]):
with pytest.raises(IOError):
runner.result(ff)
executor.result(ff)

# Next completed with data
future = runner.submit(_echo, 'tt')
future = executor.submit(_echo, 'tt')
futures = [future]
result, futures = runner.next_completed(futures, 'default')
result, futures = executor.next_completed(futures, 'default')
assert len(futures) == 0
print(type(result), result)

# Next completed with empty list
result, futures = runner.next_completed([], 'default')
result, futures = executor.next_completed([], 'default')
assert result == 'default'
assert len(futures) == 0

runner.release(future)
executor.release(future)


def test_concurrent_executor():
runner = get_executor(None, 2)
assert str(runner).find('Multi') >= 0
run_tests_for_runner(runner, 0.3)
executor = get_executor(None, 2)
assert 'Multiproc' in str(executor)
run_executor_tests(executor)

runner = get_executor(None, 2, use_cloud_pickle=False)
assert str(runner).find('Multi') >= 0
run_tests_for_runner(runner, 0.3)
executor = get_executor(None, 2, use_cloud_pickle=False)
assert 'Multiproc' in str(executor)
run_executor_tests(executor)


def test_fallback_executor():
runner = get_executor(None, None)
assert str(runner).find('Serial') >= 0
executor = get_executor(None, None)
assert 'Serial' in str(executor)

run_tests_for_runner(runner, 0)
run_executor_tests(executor, sleep_time=0)

0 comments on commit 8412450

Please sign in to comment.