Skip to content

Commit

Permalink
Merge pull request #380 from zopefoundation/racetest
Browse files Browse the repository at this point in the history
`racetest` improvement
  • Loading branch information
d-maurer committed Apr 18, 2023
2 parents 989457c + ca41ccd commit 30c861d
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 81 deletions.
262 changes: 181 additions & 81 deletions src/ZODB/tests/racetest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
https://github.com/zopefoundation/ZODB/issues/290 and
https://github.com/zopefoundation/ZEO/issues/166.
"""
from __future__ import print_function

import threading
from random import randint
Expand Down Expand Up @@ -80,7 +81,7 @@ class T2ObjectsInc:
"""T2ObjectsInc is specification with behaviour where two objects obj1
and obj2 are incremented synchronously.
It is used in tests where bugs can be immedeately observed after the race.
It is used in tests where bugs can be immediately observed after the race.
invariant: obj1 == obj2
"""
Expand Down Expand Up @@ -159,10 +160,7 @@ def init():
# Access to half of the objects is organized to always trigger loading
# from zstor. Access to the other half goes through zconn cache and so
# verifies whether the cache is not stale.
failed = threading.Event()
failure = [None]

def verify():
def verify(tg):
transaction.begin()
zconn = db.open()
root = zconn.root()
Expand All @@ -176,8 +174,7 @@ def verify():
except AssertionError as e:
msg = "verify: %s\n" % e
msg += _state_details(root)
failure[0] = msg
failed.set()
tg.fail(msg)

# we did not changed anything; also fails with commit:
transaction.abort()
Expand All @@ -186,7 +183,7 @@ def verify():
# `modify` changes objects in the database by executing "next" step.
#
# Spec invariant should be preserved.
def modify():
def modify(tg):
transaction.begin()
zconn = db.open()

Expand All @@ -199,32 +196,21 @@ def modify():

# `xrun` runs f in a loop until either N iterations, or until failed is
# set.
def xrun(f, N):
try:
for i in range(N):
# print('%s.%d' % (f.__name__, i))
f()
if failed.is_set():
break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
def xrun(tg, tx, f, N):
for i in range(N):
# print('%s.%d' % (f.__name__, i))
f(tg)
if tg.failed():
break

# loop verify and modify concurrently.
init()

N = 500
tverify = Daemon(
name='Tverify', target=xrun, args=(verify, N))
tmodify = Daemon(
name='Tmodify', target=xrun, args=(modify, N))
tverify.start()
tmodify.start()
tverify.join(60)
tmodify.join(60)

if failed.is_set():
self.fail(failure[0])
tg = TestWorkGroup(self)
tg.go(xrun, verify, N, name='Tverify')
tg.go(xrun, modify, N, name='Tmodify')
tg.wait(120)

# client-server storages like ZEO, NEO and RelStorage allow several storage
# clients to be connected to single storage server.
Expand Down Expand Up @@ -285,10 +271,7 @@ def init():
#
# Once in a while T tries to modify the database executing spec "next"
# as test source of changes for other workers.
failed = threading.Event()
failure = [None] * nwork # [tx] is failure from T(tx)

def T(tx, N):
def T(tg, tx, N):
db = self.dbopen()

def t_():
Expand All @@ -305,8 +288,7 @@ def t_():
except AssertionError as e:
msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root)
failure[tx] = msg
failed.set()
tg.fail(msg)

# change objects once in a while
if randint(0, 4) == 0:
Expand All @@ -326,36 +308,26 @@ def t_():
for i in range(N):
# print('T%s.%d' % (tx, i))
t_()
if failed.is_set():
if tg.failed():
break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
finally:
db.close()

# run the workers concurrently.
init()

N = 100
tg = []
for x in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N))
t.start()
tg.append(t)

for t in tg:
t.join(60)

if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
tg = TestWorkGroup(self)
for _ in range(nwork):
tg.go(T, N)
tg.wait(120)

# verify storage for race in between client disconnect and external
# invalidations. https://github.com/zopefoundation/ZEO/issues/209
#
# This test is simlar to check_race_load_vs_external_invalidate, but
# This test is similar to check_race_load_vs_external_invalidate, but
# increases the number of workers and also makes every worker to repeatedly
# reconnect to the storage, so that the probability of disconection is
# reconnect to the storage, so that the probability of disconnection is
# high. It also uses T2ObjectsInc2Phase instead of T2ObjectsInc because if
# an invalidation is skipped due to the disconnect/invalidation race,
# T2ObjectsInc won't catch the bug as both objects will be either in old
Expand All @@ -381,10 +353,7 @@ def init():

# `T` is similar to the T from _check_race_load_vs_external_invalidate
# but reconnects to the database often.
failed = threading.Event()
failure = [None] * nwork # [tx] is failure from T(tx)

def T(tx, N):
def T(tg, tx, N):
def t_():
def work1(db):
transaction.begin()
Expand All @@ -400,8 +369,7 @@ def work1(db):
except AssertionError as e:
msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root)
failure[tx] = msg
failed.set()
tg.fail(msg)

zconn.close()
transaction.abort()
Expand All @@ -424,37 +392,26 @@ def work1(db):
db = self.dbopen()
try:
for i in range(4):
if failed.is_set():
if tg.failed():
break
work1(db)
finally:
db.close()

try:
for i in range(N):
# print('T%s.%d' % (tx, i))
if failed.is_set():
break
t_()
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
for i in range(N):
# print('T%s.%d' % (tx, i))
if tg.failed():
break
t_()

# run the workers concurrently.
init()

N = 100 // (2*4) # N reduced to save time
tg = []
for x in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N))
t.start()
tg.append(t)

for t in tg:
t.join(60)

if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
tg = TestWorkGroup(self)
for _ in range(nwork):
tg.go(T, N)
tg.wait(120)


# `_state_init` initializes the database according to the spec.
Expand All @@ -468,7 +425,7 @@ def _state_init(db, spec):
zconn.close()


# `_state_invalidate_half1` invalidatates first 50% of database objects, so
# `_state_invalidate_half1` invalidates first 50% of database objects, so
# that the next time they are accessed, they are reloaded from the storage.
def _state_invalidate_half1(root):
keys = list(sorted(root.keys()))
Expand Down Expand Up @@ -526,13 +483,156 @@ def load(key):
return txt


class TestWorkGroup(object):
"""TestWorkGroup represents group of threads that run together to verify
something.
- .go() adds test thread to the group.
- .wait() waits for all spawned threads to finish and reports all
collected failures to containing testcase.
- a test should indicate failure by call to .fail(), it
can check for a failure with .failed()
"""

def __init__(self, testcase):
self.testcase = testcase
self.failed_event = threading.Event()
self.fail_mu = threading.Lock()
self.failv = [] # failures registered by .fail
self.threadv = [] # spawned threads
self.waitg = WaitGroup() # to wait for spawned threads

def fail(self, msg):
"""fail adds failure to test result."""
with self.fail_mu:
self.failv.append(msg)
self.failed_event.set()

def failed(self):
"""did the test already fail."""
return self.failed_event.is_set()

def go(self, f, *argv, **kw):
"""go spawns f(self, #thread, *argv, **kw) in new test thread."""
self.waitg.add(1)
tx = len(self.threadv)
tname = kw.pop('name', 'T%d' % tx)
t = Daemon(name=tname, target=self._run, args=(f, tx, argv, kw))
self.threadv.append(t)
t.start()

def _run(self, f, tx, argv, kw):
tname = self.threadv[tx].name
try:
f(self, tx, *argv, **kw)
except Exception as e:
self.fail("Unhandled exception %r in thread %s"
% (e, tname))
raise
finally:
self.waitg.done()

def wait(self, timeout):
"""wait waits for all test threads to complete and reports all
collected failures to containing testcase."""
if not self.waitg.wait(timeout):
self.fail("test did not finish within %s seconds" % timeout)

failed_to_finish = []
for t in self.threadv:
try:
t.join(1)
except AssertionError:
self.failed_event.set()
failed_to_finish.append(t.name)
if failed_to_finish:
self.fail("threads did not finish: %s" % failed_to_finish)
del self.threadv # avoid cyclic garbage

if self.failed():
self.testcase.fail('\n\n'.join(self.failv))


class Daemon(threading.Thread):
"""auxiliary class to create daemon threads and fail if not stopped."""
"""auxiliary class to create daemon threads and fail if not stopped.
In addition, the class ensures that reports for uncaught exceptions
are output holding a lock. This prevents that concurrent reports
get intermixed and facilitates the exception analysis.
"""
def __init__(self, **kw):
super(Daemon, self).__init__(**kw)
self.daemon = True
if hasattr(self, "_invoke_excepthook"):
# Python 3.8+
ori_invoke_excepthook = self._invoke_excepthook

def invoke_excepthook(*args, **kw):
with exc_lock:
return ori_invoke_excepthook(*args, **kw)

self._invoke_excepthook = invoke_excepthook
else:
# old Python
ori_run = self.run

def run():
from threading import _format_exc
from threading import _sys
try:
ori_run()
except SystemExit:
pass
except BaseException:
if _sys and _sys.stderr is not None:
with exc_lock:
print("Exception in thread %s:\n%s" %
(self.name, _format_exc()),
file=_sys.stderr)
else:
raise
finally:
del self.run

self.run = run

def join(self, *args, **kw):
super(Daemon, self).join(*args, **kw)
if self.is_alive():
raise AssertionError("Thread %s did not stop" % self.name)


# lock to ensure that Daemon exception reports are output atomically
exc_lock = threading.Lock()


class WaitGroup(object):
"""WaitGroup provides service to wait for spawned workers to be done.
- .add() adds workers
- .done() indicates that one worker is done
- .wait() waits until all workers are done
"""
def __init__(self):
self.n = 0
self.condition = threading.Condition()

def add(self, delta):
with self.condition:
self.n += delta
if self.n < 0:
raise AssertionError("#workers is negative")
if self.n == 0:
self.condition.notify_all()

def done(self):
self.add(-1)

def wait(self, timeout): # -> ok
with self.condition:
if self.n == 0:
return True
ok = self.condition.wait(timeout)
if ok is None: # py2
ok = (self.n == 0)
return ok
Loading

0 comments on commit 30c861d

Please sign in to comment.