Skip to content

Commit

Permalink
add TestGroup tests; minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
d-maurer committed Apr 17, 2023
1 parent b90d90c commit a0867aa
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions src/ZODB/tests/racetest.py
Expand Up @@ -200,7 +200,7 @@ def xrun(tg, tx, f, N):
for i in range(N):
# print('%s.%d' % (f.__name__, i))
f(tg)
if tg.failed.is_set():
if tg.failed():
break

# loop verify and modify concurrently.
Expand Down Expand Up @@ -308,7 +308,7 @@ def t_():
for i in range(N):
# print('T%s.%d' % (tx, i))
t_()
if tg.failed.is_set():
if tg.failed():
break
finally:
db.close()
Expand Down Expand Up @@ -392,15 +392,15 @@ def work1(db):
db = self.dbopen()
try:
for i in range(4):
if tg.failed.is_set():
if tg.failed():
break
work1(db)
finally:
db.close()

for i in range(N):
# print('T%s.%d' % (tx, i))
if tg.failed.is_set():
if tg.failed():
break
t_()

Expand Down Expand Up @@ -490,12 +490,13 @@ class TestGroup(object):
- .go() adds test thread to the group.
- .wait() waits for all spawned threads to finish and reports all
collected failures to containing testcase.
- a test should indicate failure by call to .fail()
- a test should indicate failure by call to .fail(), it
can check for a failure with .failed()
"""

def __init__(self, testcase):
self.testcase = testcase
self.failed = threading.Event()
self.failed_event = threading.Event()
self.fail_mu = threading.Lock()
self.failv = [] # failures registerd by .fail
self.threadv = [] # spawned threads
Expand All @@ -505,7 +506,11 @@ def fail(self, msg):
"""fail adds failure to test result."""
with self.fail_mu:
self.failv.append(msg)
self.failed.set()
self.failed_event.set()

def failed(self):
"""did the thest already fail."""
return self.failed_event.is_set()

def go(self, f, *argv, **kw):
"""go spawns f(self, #thread, *argv, **kw) in new test thread."""
Expand All @@ -517,11 +522,12 @@ def go(self, f, *argv, **kw):
t.start()

def _run(self, f, tx, argv, kw):
tname = self.threadv[tx].name
try:
f(self, tx, *argv, **kw)
except Exception as e:
self.fail("Unhandled exception %r in thread %s"
% (e, self.threadv[tx].name))
% (e, tname))
raise
finally:
self.waitg.done()
Expand All @@ -537,13 +543,14 @@ def wait(self, timeout):
try:
t.join(1)
except AssertionError:
self.failed.set()
self.failed_event.set()
failed_to_finish.append(t.name)
if failed_to_finish:
self.fail("threads did not finish: %s" % failed_to_finish)
del self.threadv # avoid cyclic garbage

if self.failed.is_set():
self.testcase.fail('\n\n'.join([_ for _ in self.failv if _]))
if self.failed():
self.testcase.fail('\n\n'.join(self.failv))


class Daemon(threading.Thread):
Expand Down Expand Up @@ -616,7 +623,7 @@ def add(self, delta):
if self.n < 0:
raise AssertionError("#workers is negative")
if self.n == 0:
self.condition.notify()
self.condition.notify_all()

def done(self):
self.add(-1)
Expand Down

0 comments on commit a0867aa

Please sign in to comment.