diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py index e53e24b18f27609..5e9b1e826fe91e1 100644 --- a/Lib/test/lock_tests.py +++ b/Lib/test/lock_tests.py @@ -19,22 +19,24 @@ "(no _at_fork_reinit method)") -def _wait(): - # A crude wait/yield function not relying on synchronization primitives. - time.sleep(0.01) +def wait_threads_blocked(nthread): + # Arbitrary sleep to wait until N threads are blocked, + # like waiting for a lock. + time.sleep(0.010 * nthread) + class Bunch(object): """ A bunch of threads. """ - def __init__(self, f, n, wait_before_exit=False): + def __init__(self, func, nthread, wait_before_exit=False): """ - Construct a bunch of `n` threads running the same function `f`. + Construct a bunch of `nthread` threads running the same function `func`. If `wait_before_exit` is True, the threads won't terminate until do_finish() is called. """ - self.f = f - self.n = n + self.func = func + self.nthread = nthread self.started = [] self.finished = [] self._can_exit = not wait_before_exit @@ -45,26 +47,30 @@ def task(): tid = threading.get_ident() self.started.append(tid) try: - f() + func() finally: self.finished.append(tid) - while not self._can_exit: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self._can_exit: + break try: - for i in range(n): + for i in range(nthread): start_new_thread(task, ()) except: self._can_exit = True raise def wait_for_started(self): - while len(self.started) < self.n: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.started) >= self.nthread: + break def wait_for_finished(self): - while len(self.finished) < self.n: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(self.finished) >= self.nthread: + break + # Wait for threads exit self.wait_thread.__exit__(None, None, None) @@ -94,6 +100,12 @@ class BaseLockTests(BaseTestCase): Tests for both recursive and non-recursive locks. """ + def wait_phase(self, phase, expected): + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(phase) >= expected: + break + self.assertEqual(len(phase), expected) + def test_constructor(self): lock = self.locktype() del lock @@ -138,15 +150,18 @@ def f(): def test_acquire_contended(self): lock = self.locktype() lock.acquire() - N = 5 def f(): lock.acquire() lock.release() + # Threads block on lock.acquire() + N = 5 b = Bunch(f, N) b.wait_for_started() - _wait() + wait_threads_blocked(N) self.assertEqual(len(b.finished), 0) + + # Threads unblocked lock.release() b.wait_for_finished() self.assertEqual(len(b.finished), N) @@ -174,17 +189,10 @@ def test_thread_leak(self): def f(): lock.acquire() lock.release() - n = len(threading.enumerate()) + # We run many threads in the hope that existing threads ids won't # be recycled. Bunch(f, 15).wait_for_finished() - if len(threading.enumerate()) != n: - # There is a small window during which a Thread instance's - # target function has finished running, but the Thread is still - # alive and registered. Avoid spurious failures by waiting a - # bit more (seen on a buildbot). - time.sleep(0.4) - self.assertEqual(n, len(threading.enumerate())) def test_timeout(self): lock = self.locktype() @@ -242,15 +250,13 @@ def f(): phase.append(None) with threading_helper.wait_threads_exit(): + # Thread blocked on lock.acquire() start_new_thread(f, ()) - while len(phase) == 0: - _wait() - _wait() - self.assertEqual(len(phase), 1) + self.wait_phase(phase, 1) + + # Thread unblocked lock.release() - while len(phase) == 1: - _wait() - self.assertEqual(len(phase), 2) + self.wait_phase(phase, 2) def test_different_thread(self): # Lock can be released from a different thread. @@ -349,21 +355,20 @@ def test_recursion_count(self): def f(): lock.acquire() phase.append(None) - while len(phase) == 1: - _wait() + + self.wait_phase(phase, 2) lock.release() phase.append(None) with threading_helper.wait_threads_exit(): + # Thread blocked on lock.acquire() start_new_thread(f, ()) - while len(phase) == 0: - _wait() - self.assertEqual(len(phase), 1) + self.wait_phase(phase, 1) self.assertEqual(0, lock._recursion_count()) + + # Thread unblocked phase.append(None) - while len(phase) == 2: - _wait() - self.assertEqual(len(phase), 3) + self.wait_phase(phase, 3) self.assertEqual(0, lock._recursion_count()) def test_different_thread(self): @@ -421,10 +426,14 @@ def _check_notify(self, evt): def f(): results1.append(evt.wait()) results2.append(evt.wait()) + + # Threads blocked on first evt.wait() b = Bunch(f, N) b.wait_for_started() - _wait() + wait_threads_blocked(N) self.assertEqual(len(results1), 0) + + # Threads unblocked evt.set() b.wait_for_finished() self.assertEqual(results1, [True] * N) @@ -464,19 +473,22 @@ def f(): self.assertTrue(r) def test_set_and_clear(self): - # Issue #13502: check that wait() returns true even when the event is + # gh-57711: check that wait() returns true even when the event is # cleared before the waiting thread is woken up. - evt = self.eventtype() + event = self.eventtype() results = [] - timeout = 0.250 - N = 5 def f(): - results.append(evt.wait(timeout * 4)) + results.append(event.wait(support.LONG_TIMEOUT)) + + # Threads blocked on event.wait() + N = 5 b = Bunch(f, N) b.wait_for_started() - time.sleep(timeout) - evt.set() - evt.clear() + wait_threads_blocked(N) + + # Threads unblocked + event.set() + event.clear() b.wait_for_finished() self.assertEqual(results, [True] * N) @@ -533,8 +545,8 @@ def _check_notify(self, cond): # Note that this test is sensitive to timing. If the worker threads # don't execute in a timely fashion, the main thread may think they # are further along then they are. The main thread therefore issues - # _wait() statements to try to make sure that it doesn't race ahead - # of the workers. + # wait_threads_blocked() statements to try to make sure that it doesn't + # race ahead of the workers. # Secondly, this test assumes that condition variables are not subject # to spurious wakeups. The absence of spurious wakeups is an implementation # detail of Condition Variables in current CPython, but in general, not @@ -550,56 +562,77 @@ def f(): cond.acquire() ready.append(phase_num) result = cond.wait() + cond.release() results1.append((result, phase_num)) + cond.acquire() ready.append(phase_num) + result = cond.wait() cond.release() results2.append((result, phase_num)) b = Bunch(f, N) + b.wait_for_started() # first wait, to ensure all workers settle into cond.wait() before # we continue. See issues #8799 and #30727. - while len(ready) < 5: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + ready.clear() self.assertEqual(results1, []) + # Notify 3 threads at first + notified = 3 cond.acquire() - cond.notify(3) - _wait() + cond.notify(notified) + wait_threads_blocked(notified) + phase_num = 1 cond.release() - while len(results1) < 3: - _wait() - self.assertEqual(results1, [(True, 1)] * 3) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) >= notified: + break + + self.assertEqual(results1, [(True, 1)] * notified) self.assertEqual(results2, []) + # make sure all awaken workers settle into cond.wait() - while len(ready) < 3: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= 3: + break + # Notify 5 threads: they might be in their first or second wait cond.acquire() cond.notify(5) - _wait() + wait_threads_blocked(N) + phase_num = 2 cond.release() - while len(results1) + len(results2) < 8: - _wait() - self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) - self.assertEqual(results2, [(True, 2)] * 3) - # make sure all workers settle into cond.wait() - while len(ready) < 5: - _wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (N + notified): + break + self.assertEqual(results1, [(True, 1)] * notified + [(True, 2)] * 2) + self.assertEqual(results2, [(True, 2)] * notified) + + # Make sure all workers settle into cond.wait() + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(ready) >= N: + break + # Notify all threads: they are all in their second wait cond.acquire() cond.notify_all() - _wait() + wait_threads_blocked(N) + phase_num = 3 cond.release() - while len(results2) < 5: - _wait() - self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results2) >= N: + break + self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) b.wait_for_finished() @@ -611,19 +644,22 @@ def test_notify(self): def test_timeout(self): cond = self.condtype() + timeout = 0.5 results = [] - N = 5 def f(): cond.acquire() t1 = time.monotonic() - result = cond.wait(0.5) + result = cond.wait(timeout) t2 = time.monotonic() cond.release() results.append((t2 - t1, result)) + + N = 5 Bunch(f, N).wait_for_finished() self.assertEqual(len(results), N) + for dt, result in results: - self.assertTimeout(dt, 0.5) + self.assertTimeout(dt, timeout) # Note that conceptually (that"s the condition variable protocol) # a wait() may succeed even if no one notifies us and before any # timeout occurs. Spurious wakeups can occur. @@ -636,13 +672,13 @@ def test_waitfor(self): state = 0 def f(): with cond: - result = cond.wait_for(lambda : state==4) + result = cond.wait_for(lambda: state == 4) self.assertTrue(result) self.assertEqual(state, 4) b = Bunch(f, 1) b.wait_for_started() for i in range(4): - time.sleep(0.01) + time.sleep(0.010) with cond: state += 1 cond.notify() @@ -660,14 +696,16 @@ def f(): self.assertFalse(result) self.assertTimeout(dt, 0.1) success.append(None) + b = Bunch(f, 1) b.wait_for_started() # Only increment 3 times, so state == 4 is never reached. for i in range(3): - time.sleep(0.01) + time.sleep(0.010) with cond: state += 1 cond.notify() + b.wait_for_finished() self.assertEqual(len(success), 1) @@ -697,70 +735,115 @@ def test_acquire_destroy(self): del sem def test_acquire_contended(self): - sem = self.semtype(7) + sem_value = 7 + sem = self.semtype(sem_value) sem.acquire() - N = 10 + sem_results = [] results1 = [] results2 = [] phase_num = 0 - def f(): + + def func(): sem_results.append(sem.acquire()) results1.append(phase_num) + sem_results.append(sem.acquire()) results2.append(phase_num) - b = Bunch(f, 10) + + # Phase 0 + N = 10 + b = Bunch(func, N) b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() - self.assertEqual(results1 + results2, [0] * 6) + count1 = sem_value - 1 + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= count1: + break + + self.assertEqual(results1 + results2, [0] * count1) + + # Phase 1 phase_num = 1 - for i in range(7): + for i in range(sem_value): sem.release() - while len(results1) + len(results2) < 13: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + count2 = sem_value + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (count1 + count2): + break + + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2) + + # Phase 2 phase_num = 2 - for i in range(6): + count3 = (sem_value - 1) + for i in range(count3): sem.release() - while len(results1) + len(results2) < 19: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (count1 + count2 + count3): + break + + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2 + [2] * count3) # The semaphore is still locked self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish + count4 = 1 sem.release() b.wait_for_finished() - self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) + self.assertEqual(sem_results, + [True] * (count1 + count2 + count3 + count4)) def test_multirelease(self): - sem = self.semtype(7) + sem_value = 7 + sem = self.semtype(sem_value) sem.acquire() + results1 = [] results2 = [] phase_num = 0 - def f(): + def func(): sem.acquire() results1.append(phase_num) + sem.acquire() results2.append(phase_num) - b = Bunch(f, 10) + + # Phase 0 + b = Bunch(func, 10) b.wait_for_started() - while len(results1) + len(results2) < 6: - _wait() - self.assertEqual(results1 + results2, [0] * 6) + count1 = sem_value - 1 + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= count1: + break + + self.assertEqual(results1 + results2, [0] * count1) + + # Phase 1 phase_num = 1 - sem.release(7) - while len(results1) + len(results2) < 13: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + count2 = sem_value + sem.release(count2) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (count1 + count2): + break + + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2) + + # Phase 2 phase_num = 2 - sem.release(6) - while len(results1) + len(results2) < 19: - _wait() - self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) + count3 = sem_value - 1 + sem.release(count3) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if len(results1) + len(results2) >= (count1 + count2 + count3): + break + + self.assertEqual(sorted(results1 + results2), + [0] * count1 + [1] * count2 + [2] * count3) # The semaphore is still locked self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish sem.release() b.wait_for_finished() @@ -806,10 +889,14 @@ def test_default_value(self): def f(): sem.acquire() sem.release() + + # Thread blocked on sem.acquire() b = Bunch(f, 1) b.wait_for_started() - _wait() + wait_threads_blocked(1) self.assertFalse(b.finished) + + # Thread unblocked sem.release() b.wait_for_finished() @@ -882,6 +969,7 @@ class BarrierTests(BaseTestCase): def setUp(self): self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) + def tearDown(self): self.barrier.abort() @@ -979,8 +1067,9 @@ def f(): i = self.barrier.wait() if i == self.N//2: # Wait until the other threads are all in the barrier. - while self.barrier.n_waiting < self.N-1: - time.sleep(0.001) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if self.barrier.n_waiting >= (self.N - 1): + break self.barrier.reset() else: try: @@ -1068,16 +1157,29 @@ def test_single_thread(self): b.wait() def test_repr(self): - b = self.barriertype(3) - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") + barrier = self.barriertype(3) + timeout = support.LONG_TIMEOUT + self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>") def f(): - b.wait(3) - bunch = Bunch(f, 2) + barrier.wait(timeout) + + # Threads blocked on barrier.wait() + N = 2 + bunch = Bunch(f, N) bunch.wait_for_started() - time.sleep(0.2) - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=2/3>") - b.wait(3) + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if barrier.n_waiting >= N: + break + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=2/3>") + + # Threads unblocked + barrier.wait(timeout) bunch.wait_for_finished() - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: waiters=0/3>") - b.abort() - self.assertRegex(repr(b), r"<\w+\.Barrier at .*: broken>") + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: waiters=0/3>") + + # Abort the barrier + barrier.abort() + self.assertRegex(repr(barrier), + r"<\w+\.Barrier at .*: broken>") diff --git a/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst b/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst new file mode 100644 index 000000000000000..a130cf690a57cb5 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2023-09-29-00-19-21.gh-issue-109974.Sh_g-r.rst @@ -0,0 +1,3 @@ +Fix race conditions in test_threading lock tests. Wait until a condition is met +rather than using :func:`time.sleep` with a hardcoded number of seconds. Patch +by Victor Stinner.