From d8f4003613270afc3bb56fd702bde85c799b8f54 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 28 Sep 2023 23:18:54 +0200 Subject: [PATCH] gh-109974: Enhance threading lock_tests * Replace sleeping loops with support.sleeping_retry() which raises an exception after a timeout. * test_set_and_clear(): increase event wait() timeout. Remove sleep after Bunch.wait_for_started(). * BarrierTests.test_repr(): remove sleep after wait_for_started(). Wait until the 2 threads are waiting for the barrier. Use long timeout for Barrier.wait() timeout. * test_thread_leak() no longer needs to count len(threading.enumerate()): Bunch uses threading_helper.wait_threads_exit() internally which does it in wait_for_finished(). * Add BaseLockTests.wait_phase() which implements a timeout. test_reacquire() and test_recursion_count() use wait_phase(). --- Lib/test/lock_tests.py | 240 ++++++++++++++++++++++++++--------------- 1 file changed, 155 insertions(+), 85 deletions(-) diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index e53e24b18f2760..325b17a9b2737f 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -21,19 +21,26 @@ def _wait(): # A crude wait/yield function not relying on synchronization primitives. - time.sleep(0.01) + time.sleep(0.010) + + +def wait_threads_blocked(nthread): + # Arbitrary sleep to wait until N threads are blocked, + # like waiting for a lock. + time.sleep(0.010 * nthread) + class Bunch(object): """ A bunch of threads. """ - def __init__(self, f, n, wait_before_exit=False): + def __init__(self, func, n, wait_before_exit=False): """ - Construct a bunch of `n` threads running the same function `f`. + Construct a bunch of `n` threads running the same function `func`. If `wait_before_exit` is True, the threads won't terminate until do_finish() is called. """ - self.f = f + self.func = func self.n = n self.started = [] self.finished = [] @@ -45,11 +52,12 @@ def task(): tid = threading.get_ident() self.started.append(tid) try: - f() + func() finally: self.finished.append(tid) - while not self._can_exit: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self._can_exit: + break try: for i in range(n): @@ -59,12 +67,15 @@ def task(): raise def wait_for_started(self): - while len(self.started) < self.n: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.started) >= self.n: + break def wait_for_finished(self): - while len(self.finished) < self.n: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.finished) >= self.n: + break + # Wait for threads exit self.wait_thread.__exit__(None, None, None) @@ -94,6 +105,12 @@ class BaseLockTests(BaseTestCase): Tests for both recursive and non-recursive locks. """ + def wait_phase(self, phase, expected): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(phase) >= expected: + break + self.assertEqual(len(phase), expected) + def test_constructor(self): lock = self.locktype() del lock @@ -143,10 +160,13 @@ def f(): lock.acquire() lock.release() + # Threads block on lock.acquire() b = Bunch(f, N) b.wait_for_started() - _wait() + wait_threads_blocked(N) self.assertEqual(len(b.finished), 0) + + # Threads unblocked lock.release() b.wait_for_finished() self.assertEqual(len(b.finished), N) @@ -174,17 +194,10 @@ def test_thread_leak(self): def f(): lock.acquire() lock.release() - n = len(threading.enumerate()) + # We run many threads in the hope that existing threads ids won't # be recycled. Bunch(f, 15).wait_for_finished() - if len(threading.enumerate()) != n: - # There is a small window during which a Thread instance's - # target function has finished running, but the Thread is still - # alive and registered. Avoid spurious failures by waiting a - # bit more (seen on a buildbot). - time.sleep(0.4) - self.assertEqual(n, len(threading.enumerate())) def test_timeout(self): lock = self.locktype() @@ -243,14 +256,10 @@ def f(): with threading_helper.wait_threads_exit(): start_new_thread(f, ()) - while len(phase) == 0: - _wait() - _wait() - self.assertEqual(len(phase), 1) + self.wait_phase(phase, 1) + lock.release() - while len(phase) == 1: - _wait() - self.assertEqual(len(phase), 2) + self.wait_phase(phase, 2) def test_different_thread(self): # Lock can be released from a different thread. @@ -349,21 +358,18 @@ def test_recursion_count(self): def f(): lock.acquire() phase.append(None) - while len(phase) == 1: - _wait() + self.wait_phase(phase, 2) lock.release() phase.append(None) with threading_helper.wait_threads_exit(): start_new_thread(f, ()) - while len(phase) == 0: - _wait() - self.assertEqual(len(phase), 1) + self.wait_phase(phase, 1) + self.assertEqual(0, lock._recursion_count()) phase.append(None) - while len(phase) == 2: - _wait() - self.assertEqual(len(phase), 3) + self.wait_phase(phase, 3) + self.assertEqual(0, lock._recursion_count()) def test_different_thread(self): @@ -421,10 +427,14 @@ def _check_notify(self, evt): def f(): results1.append(evt.wait()) results2.append(evt.wait()) + + # Threads blocked on first evt.wait() b = Bunch(f, N) b.wait_for_started() - _wait() + wait_threads_blocked(N) self.assertEqual(len(results1), 0) + + # Threads unblocked evt.set() b.wait_for_finished() self.assertEqual(results1, [True] * N) @@ -464,17 +474,15 @@ def f(): self.assertTrue(r) def test_set_and_clear(self): - # Issue #13502: check that wait() returns true even when the event is + # gh-57711: check that wait() returns true even when the event is # cleared before the waiting thread is woken up. evt = self.eventtype() results = [] - timeout = 0.250 N = 5 def f(): - results.append(evt.wait(timeout * 4)) + results.append(evt.wait(support.LONG_TIMEOUT)) b = Bunch(f, N) b.wait_for_started() - time.sleep(timeout) evt.set() evt.clear() b.wait_for_finished() @@ -550,56 +558,75 @@ def f(): cond.acquire() ready.append(phase_num) result = cond.wait() + cond.release() results1.append((result, phase_num)) + cond.acquire() ready.append(phase_num) + result = cond.wait() cond.release() results2.append((result, phase_num)) b = Bunch(f, N) + b.wait_for_started() # first wait, to ensure all workers settle into cond.wait() before # we continue. See issues #8799 and #30727. - while len(ready) < 5: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + ready.clear() self.assertEqual(results1, []) # Notify 3 threads at first cond.acquire() cond.notify(3) _wait() + phase_num = 1 cond.release() - while len(results1) < 3: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) >= 3: + break + self.assertEqual(results1, [(True, 1)] * 3) self.assertEqual(results2, []) + # make sure all awaken workers settle into cond.wait() - while len(ready) < 3: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= 3: + break + # Notify 5 threads: they might be in their first or second wait cond.acquire() cond.notify(5) _wait() + phase_num = 2 cond.release() - while len(results1) + len(results2) < 8: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 8: + break self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) self.assertEqual(results2, [(True, 2)] * 3) - # make sure all workers settle into cond.wait() - while len(ready) < 5: - _wait() + + # Make sure all workers settle into cond.wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + # Notify all threads: they are all in their second wait cond.acquire() cond.notify_all() _wait() + phase_num = 3 cond.release() - while len(results2) < 5: - _wait() - self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results2) >= N: + break + self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) b.wait_for_finished() @@ -611,19 +638,22 @@ def test_notify(self): def test_timeout(self): cond = self.condtype() + timeout = 0.5 results = [] - N = 5 def f(): cond.acquire() t1 = time.monotonic() - result = cond.wait(0.5) + result = cond.wait(timeout) t2 = time.monotonic() cond.release() results.append((t2 - t1, result)) + + N = 5 Bunch(f, N).wait_for_finished() self.assertEqual(len(results), N) + for dt, result in results: - self.assertTimeout(dt, 0.5) + self.assertTimeout(dt, timeout) # Note that conceptually (that"s the condition variable protocol) # a wait() may succeed even if no one notifies us and before any # timeout occurs. Spurious wakeups can occur. @@ -636,7 +666,7 @@ def test_waitfor(self): state = 0 def f(): with cond: - result = cond.wait_for(lambda : state==4) + result = cond.wait_for(lambda: state == 4) self.assertTrue(result) self.assertEqual(state, 4) b = Bunch(f, 1) @@ -660,14 +690,16 @@ def f(): self.assertFalse(result) self.assertTimeout(dt, 0.1) success.append(None) + b = Bunch(f, 1) b.wait_for_started() # Only increment 3 times, so state == 4 is never reached. for i in range(3): - time.sleep(0.01) + _wait() with cond: state += 1 cond.notify() + b.wait_for_finished() self.assertEqual(len(success), 1) @@ -709,22 +741,32 @@ def f(): results1.append(phase_num) sem_results.append(sem.acquire()) results2.append(phase_num) - b = Bunch(f, 10) + + # Phase 0 + b = Bunch(f, N) b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 6: + break + + # Phase 1 self.assertEqual(results1 + results2, [0] * 6) phase_num = 1 for i in range(7): sem.release() - while len(results1) + len(results2) < 13: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 13: + break + + # Phase 2 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) phase_num = 2 for i in range(6): sem.release() - while len(results1) + len(results2) < 19: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 19: + break + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) # The semaphore is still locked self.assertFalse(sem.acquire(False)) @@ -746,21 +788,30 @@ def f(): results2.append(phase_num) b = Bunch(f, 10) b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 6: + break + self.assertEqual(results1 + results2, [0] * 6) phase_num = 1 sem.release(7) - while len(results1) + len(results2) < 13: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 13: + break + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + + # Phase 2 phase_num = 2 sem.release(6) - while len(results1) + len(results2) < 19: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= 19: + break + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) # The semaphore is still locked self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish sem.release() b.wait_for_finished() @@ -806,10 +857,14 @@ def test_default_value(self): def f(): sem.acquire() sem.release() + + # Thread blocked on sem.acquire() b = Bunch(f, 1) b.wait_for_started() - _wait() + wait_threads_blocked(1) self.assertFalse(b.finished) + + # Thread unblocked sem.release() b.wait_for_finished() @@ -882,6 +937,7 @@ class BarrierTests(BaseTestCase): def setUp(self): self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) + def tearDown(self): self.barrier.abort() @@ -979,8 +1035,9 @@ def f(): i = self.barrier.wait() if i == self.N//2: # Wait until the other threads are all in the barrier. - while self.barrier.n_waiting < self.N-1: - time.sleep(0.001) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self.barrier.n_waiting >= self.N-1: + break self.barrier.reset() else: try: @@ -1068,16 +1125,29 @@ def test_single_thread(self): b.wait() def test_repr(self): - b = self.barriertype(3) - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") + barrier = self.barriertype(3) + timeout = support.LONG_TIMEOUT + self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>") def f(): - b.wait(3) - bunch = Bunch(f, 2) + barrier.wait(timeout) + + # Threads blocked on barrier.wait() + N = 2 + bunch = Bunch(f, N) bunch.wait_for_started() - time.sleep(0.2) - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>") - b.wait(3) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if barrier.n_waiting >= N: + break + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=2/3>") + + # Threads unblocked + barrier.wait(timeout) bunch.wait_for_finished() - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") - b.abort() - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>") + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=0/3>") + + # Abort the barrier + barrier.abort() + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: broken>")