Skip to content

Commit

Permalink
Add test.support.busy_retry() (#93770)
Browse files Browse the repository at this point in the history
Add busy_retry() and sleeping_retry() functions to test.support.
  • Loading branch information
vstinner committed Jun 15, 2022
1 parent 4e9fa71 commit 7e9eaad
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 99 deletions.
45 changes: 45 additions & 0 deletions Doc/library/test.rst
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,51 @@ The :mod:`test.support` module defines the following constants:

The :mod:`test.support` module defines the following functions:

.. function:: busy_retry(timeout, err_msg=None, /, *, error=True)

Run the loop body until ``break`` stops the loop.

After *timeout* seconds, raise an :exc:`AssertionError` if *error* is true,
or just stop the loop if *error* is false.

Example::

for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage::

for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')

.. function:: sleeping_retry(timeout, err_msg=None, /, *, init_delay=0.010, max_delay=1.0, error=True)

Wait strategy that applies exponential backoff.

Run the loop body until ``break`` stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).

See :func:`busy_retry` documentation for the parameters usage.

Example raising an exception after SHORT_TIMEOUT seconds::

for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break

Example of error=False usage::

for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')

.. function:: is_resource_enabled(resource)

Return ``True`` if *resource* is enabled and available. The list of
Expand Down
60 changes: 25 additions & 35 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4313,18 +4313,13 @@ def test_shared_memory_cleaned_after_process_termination(self):
p.terminate()
p.wait()

deadline = time.monotonic() + support.LONG_TIMEOUT
t = 0.1
while time.monotonic() < deadline:
time.sleep(t)
t = min(t*2, 5)
err_msg = ("A SharedMemory segment was leaked after "
"a process was abruptly terminated")
for _ in support.sleeping_retry(support.LONG_TIMEOUT, err_msg):
try:
smm = shared_memory.SharedMemory(name, create=False)
except FileNotFoundError:
break
else:
raise AssertionError("A SharedMemory segment was leaked after"
" a process was abruptly terminated.")

if os.name == 'posix':
# Without this line it was raising warnings like:
Expand Down Expand Up @@ -5334,20 +5329,18 @@ def create_and_register_resource(rtype):
p.terminate()
p.wait()

deadline = time.monotonic() + support.LONG_TIMEOUT
while time.monotonic() < deadline:
time.sleep(.5)
err_msg = (f"A {rtype} resource was leaked after a process was "
f"abruptly terminated")
for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
err_msg):
try:
_resource_unlink(name2, rtype)
except OSError as e:
# docs say it should be ENOENT, but OSX seems to give
# EINVAL
self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
break
else:
raise AssertionError(
f"A {rtype} resource was leaked after a process was "
f"abruptly terminated.")

err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = ('resource_tracker: There appear to be 2 leaked {} '
Expand Down Expand Up @@ -5575,18 +5568,17 @@ def wait_proc_exit(self):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395).
join_process(self.proc)

start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")

def run_worker(self, worker, obj):
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
Expand Down Expand Up @@ -5884,17 +5876,15 @@ def tearDownClass(cls):
# but this can take a bit on slow machines, so wait a few seconds
# if there are other children too (see #17395)
start_time = time.monotonic()
t = 0.01
while len(multiprocessing.active_children()) > 1:
time.sleep(t)
t *= 2
dt = time.monotonic() - start_time
if dt >= 5.0:
test.support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt} seconds")
for _ in support.sleeping_retry(5.0, error=False):
if len(multiprocessing.active_children()) <= 1:
break
else:
dt = time.monotonic() - start_time
support.environment_altered = True
support.print_warning(f"multiprocessing.Manager still has "
f"{multiprocessing.active_children()} "
f"active children after {dt:.1f} seconds")

gc.collect() # do garbage collection
if cls.manager._number_of_objects() != 0:
Expand Down
6 changes: 2 additions & 4 deletions Lib/test/fork_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ def test_wait(self):
self.threads.append(thread)

# busy-loop to wait for threads
deadline = time.monotonic() + support.SHORT_TIMEOUT
while len(self.alive) < NUM_THREADS:
time.sleep(0.1)
if deadline < time.monotonic():
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(self.alive) >= NUM_THREADS:
break

a = sorted(self.alive.keys())
Expand Down
76 changes: 76 additions & 0 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2250,3 +2250,79 @@ def atfork_func():
pass
atfork_func.reference = ref_cycle
os.register_at_fork(before=atfork_func)


def busy_retry(timeout, err_msg=None, /, *, error=True):
"""
Run the loop body until "break" stops the loop.
After *timeout* seconds, raise an AssertionError if *error* is true,
or just stop if *error is false.
Example:
for _ in support.busy_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage:
for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
"""
if timeout <= 0:
raise ValueError("timeout must be greater than zero")

start_time = time.monotonic()
deadline = start_time + timeout

while True:
yield

if time.monotonic() >= deadline:
break

if error:
dt = time.monotonic() - start_time
msg = f"timeout ({dt:.1f} seconds)"
if err_msg:
msg = f"{msg}: {err_msg}"
raise AssertionError(msg)


def sleeping_retry(timeout, err_msg=None, /,
*, init_delay=0.010, max_delay=1.0, error=True):
"""
Wait strategy that applies exponential backoff.
Run the loop body until "break" stops the loop. Sleep at each loop
iteration, but not at the first iteration. The sleep delay is doubled at
each iteration (up to *max_delay* seconds).
See busy_retry() documentation for the parameters usage.
Example raising an exception after SHORT_TIMEOUT seconds:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if check():
break
Example of error=False usage:
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if check():
break
else:
raise RuntimeError('my custom error')
"""

delay = init_delay
for _ in busy_retry(timeout, err_msg, error=error):
yield

time.sleep(delay)
delay = min(delay * 2, max_delay)
11 changes: 5 additions & 6 deletions Lib/test/test__xxsubinterpreters.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ def _wait_for_interp_to_run(interp, timeout=None):
# run subinterpreter eariler than the main thread in multiprocess.
if timeout is None:
timeout = support.SHORT_TIMEOUT
start_time = time.monotonic()
deadline = start_time + timeout
while not interpreters.is_running(interp):
if time.monotonic() > deadline:
raise RuntimeError('interp is not running')
time.sleep(0.010)
for _ in support.sleeping_retry(timeout, error=False):
if interpreters.is_running(interp):
break
else:
raise RuntimeError('interp is not running')


@contextlib.contextmanager
Expand Down
10 changes: 5 additions & 5 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ def test_initializer(self):
else:
with self.assertRaises(BrokenExecutor):
future.result()

# At some point, the executor should break
t1 = time.monotonic()
while not self.executor._broken:
if time.monotonic() - t1 > 5:
self.fail("executor not broken after 5 s.")
time.sleep(0.01)
for _ in support.sleeping_retry(5, "executor not broken"):
if self.executor._broken:
break

# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
Expand Down
25 changes: 11 additions & 14 deletions Lib/test/test_multiprocessing_main_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support
# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
Expand All @@ -59,13 +60,11 @@ def f(x):
results = []
with Pool(5) as pool:
pool.map_async(f, [1, 2, 3], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break
results.sort()
print(start_method, "->", results)
Expand All @@ -86,19 +85,17 @@ def f(x):
import sys
import time
from multiprocessing import Pool, set_start_method
from test import support
start_method = sys.argv[1]
set_start_method(start_method)
results = []
with Pool(5) as pool:
pool.map_async(int, [1, 4, 9], callback=results.extend)
start_time = time.monotonic()
while not results:
time.sleep(0.05)
# up to 1 min to report the results
dt = time.monotonic() - start_time
if dt > 60.0:
raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
# up to 1 min to report the results
for _ in support.sleeping_retry(60, "Timed out waiting for results"):
if results:
break
results.sort()
print(start_method, "->", results)
Expand Down
25 changes: 13 additions & 12 deletions Lib/test/test_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,13 +812,14 @@ def test_itimer_virtual(self):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_vtalrm handler stopped this itimer
else: # Issue 8424
# sig_vtalrm handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")

Expand All @@ -832,13 +833,14 @@ def test_itimer_prof(self):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)

start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(60.0, error=False):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
# sig_prof handler stopped this itimer
break
else:
# bpo-8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")

Expand Down Expand Up @@ -1307,8 +1309,6 @@ def handler(signum, frame):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL

expected_sigs = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT

while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
Expand All @@ -1317,8 +1317,9 @@ def handler(signum, frame):

expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.monotonic() < deadline:
time.sleep(1e-5)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
if len(sigs) >= expected_sigs:
break

# All ITIMER_REAL signals should have been delivered to the
# Python handler
Expand Down

0 comments on commit 7e9eaad

Please sign in to comment.