Skip to content

Commit

Permalink
Issue #20556: Used specific assert methods in threading tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
serhiy-storchaka committed Mar 14, 2016
2 parents 04bc5b9 + 8c0f0c5 commit c877658
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
16 changes: 8 additions & 8 deletions Lib/test/test_dummy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ def setUp(self):

def test_initlock(self):
#Make sure locks start locked
self.assertTrue(not self.lock.locked(),
self.assertFalse(self.lock.locked(),
"Lock object is not initialized unlocked.")

def test_release(self):
# Test self.lock.release()
self.lock.acquire()
self.lock.release()
self.assertTrue(not self.lock.locked(),
self.assertFalse(self.lock.locked(),
"Lock object did not release properly.")

def test_improper_release(self):
Expand All @@ -46,7 +46,7 @@ def test_cond_acquire_success(self):
def test_cond_acquire_fail(self):
#Test acquiring locked lock returns False
self.lock.acquire(0)
self.assertTrue(not self.lock.acquire(0),
self.assertFalse(self.lock.acquire(0),
"Conditional acquiring of a locked lock incorrectly "
"succeeded.")

Expand All @@ -58,9 +58,9 @@ def test_uncond_acquire_success(self):

def test_uncond_acquire_return_val(self):
#Make sure that an unconditional locking returns True.
self.assertTrue(self.lock.acquire(1) is True,
self.assertIs(self.lock.acquire(1), True,
"Unconditional locking did not return True.")
self.assertTrue(self.lock.acquire() is True)
self.assertIs(self.lock.acquire(), True)

def test_uncond_acquire_blocking(self):
#Make sure that unconditional acquiring of a locked lock blocks.
Expand All @@ -80,7 +80,7 @@ def delay_unlock(to_unlock, delay):
end_time = int(time.time())
if support.verbose:
print("done")
self.assertTrue((end_time - start_time) >= DELAY,
self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.")

class MiscTests(unittest.TestCase):
Expand All @@ -94,7 +94,7 @@ def test_ident(self):
#Test sanity of _thread.get_ident()
self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer")
self.assertTrue(_thread.get_ident() != 0,
self.assertNotEqual(_thread.get_ident(), 0,
"_thread.get_ident() returned 0")

def test_LockType(self):
Expand Down Expand Up @@ -164,7 +164,7 @@ def queue_mark(queue, delay):
time.sleep(DELAY)
if support.verbose:
print('done')
self.assertTrue(testing_queue.qsize() == thread_count,
self.assertEqual(testing_queue.qsize(), thread_count,
"Not all %s threads executed properly after %s sec." %
(thread_count, DELAY))

Expand Down
27 changes: 13 additions & 14 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ def run(self):
self.nrunning.inc()
if verbose:
print(self.nrunning.get(), 'tasks are running')
self.testcase.assertTrue(self.nrunning.get() <= 3)
self.testcase.assertLessEqual(self.nrunning.get(), 3)

time.sleep(delay)
if verbose:
print('task', self.name, 'done')

with self.mutex:
self.nrunning.dec()
self.testcase.assertTrue(self.nrunning.get() >= 0)
self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
if verbose:
print('%s is finished. %d tasks are running' %
(self.name, self.nrunning.get()))
Expand Down Expand Up @@ -101,34 +101,33 @@ def test_various_ops(self):
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
self.assertEqual(t.ident, None)
self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
self.assertIsNone(t.ident)
self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
t.start()

if verbose:
print('waiting for all tasks to complete')
for t in threads:
t.join()
self.assertTrue(not t.is_alive())
self.assertFalse(t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
repr(t)))
self.assertIsNotNone(t.ident)
self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
if verbose:
print('all tasks done')
self.assertEqual(numrunning.get(), 0)

def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]

Expand Down Expand Up @@ -245,7 +244,7 @@ def run(self):
self.assertTrue(ret)
if verbose:
print(" verifying worker hasn't exited")
self.assertTrue(not t.finished)
self.assertFalse(t.finished)
if verbose:
print(" attempting to raise asynch exception in worker")
result = set_async_exc(ctypes.c_long(t.id), exception)
Expand Down Expand Up @@ -416,9 +415,9 @@ def test_old_threading_api(self):

def test_repr_daemon(self):
t = threading.Thread()
self.assertFalse('daemon' in repr(t))
self.assertNotIn('daemon', repr(t))
t.daemon = True
self.assertTrue('daemon' in repr(t))
self.assertIn('daemon', repr(t))

def test_deamon_param(self):
t = threading.Thread()
Expand Down Expand Up @@ -570,7 +569,7 @@ def f():
tstate_lock.release()
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertTrue(t._tstate_lock is None)
self.assertIsNone(t._tstate_lock)

def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_threading_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class X:
wr = weakref.ref(x)
del x
gc.collect()
self.assertIs(wr(), None)
self.assertIsNone(wr())


class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
Expand Down

0 comments on commit c877658

Please sign in to comment.