diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py index 98052c390..12ca04aa7 100644 --- a/src/ZODB/tests/BasicStorage.py +++ b/src/ZODB/tests/BasicStorage.py @@ -18,7 +18,8 @@ All storages should be able to pass these tests. """ -from ZODB import POSException +import transaction +from ZODB import DB, POSException from ZODB.Connection import TransactionMetaData from ZODB.tests.MinPO import MinPO from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle @@ -385,3 +386,97 @@ def lastInvalidations(): self.assertEqual(results.pop('lastTransaction'), tids[1]) for m, tid in results.items(): self.assertEqual(tid, tids[1]) + + + # verify storage/Connection for race in between load/open and local invalidations. + # https://github.com/zopefoundation/ZEO/issues/166 + # https://github.com/zopefoundation/ZODB/issues/290 + def check_race_loadopen_vs_local_invalidate(self): + db = DB(self._storage) + + # init initializes the database with two integer objects - obj1/obj2 + # that are set to 0. + def init(): + transaction.begin() + zconn = db.open() + + root = zconn.root() + root['obj1'] = MinPO(0) + root['obj2'] = MinPO(0) + + transaction.commit() + zconn.close() + + # verify accesses obj1/obj2 and verifies that obj1.value == obj2.value + # + # access to obj1 is organized to always trigger loading from zstor. + # access to obj2 goes through zconn cache and so verifies whether the + # cache is not stale. + failed = threading.Event() + failure = [None] + def verify(): + transaction.begin() + zconn = db.open() + + root = zconn.root() + obj1 = root['obj1'] + obj2 = root['obj2'] + + # obj1 - reload it from zstor + # obj2 - get it from zconn cache + obj1._p_invalidate() + + # both objects must have the same values + v1 = obj1.value + v2 = obj2.value + if v1 != v2: + failure[0] = "verify: obj1.value (%d) != obj2.value (%d)" % (v1, v2) + failed.set() + + transaction.abort() # we did not changed anything; also fails with commit + zconn.close() + + # modify changes obj1/obj2 by doing `objX.value += 1`. + # + # Since both objects start from 0, the invariant that + # `obj1.value == obj2.value` is always preserved. + def modify(): + transaction.begin() + zconn = db.open() + + root = zconn.root() + obj1 = root['obj1'] + obj2 = root['obj2'] + obj1.value += 1 + obj2.value += 1 + assert obj1.value == obj2.value + + transaction.commit() + zconn.close() + + # xrun runs f in a loop until either N iterations, or until failed is set. + def xrun(f, N): + try: + for i in range(N): + #print('%s.%d' % (f.__name__, i)) + f() + if failed.is_set(): + break + except: + failed.set() + raise + + + # loop verify and modify concurrently. + init() + + N = 500 + tverify = threading.Thread(name='Tverify', target=xrun, args=(verify, N)) + tmodify = threading.Thread(name='Tmodify', target=xrun, args=(modify, N)) + tverify.start() + tmodify.start() + tverify.join(60) + tmodify.join(60) + + if failed.is_set(): + self.fail(failure[0]) diff --git a/src/ZODB/tests/MVCCMappingStorage.py b/src/ZODB/tests/MVCCMappingStorage.py index e87b0be80..ea8384ee1 100644 --- a/src/ZODB/tests/MVCCMappingStorage.py +++ b/src/ZODB/tests/MVCCMappingStorage.py @@ -112,11 +112,13 @@ def poll_invalidations(self): def tpc_finish(self, transaction, func = lambda tid: None): self._data_snapshot = None - return MappingStorage.tpc_finish(self, transaction, func) + with self._main_lock: + return MappingStorage.tpc_finish(self, transaction, func) def tpc_abort(self, transaction): self._data_snapshot = None - MappingStorage.tpc_abort(self, transaction) + with self._main_lock: + MappingStorage.tpc_abort(self, transaction) def pack(self, t, referencesf, gc=True): # prevent all concurrent commits during packing