diff --git a/src/ZODB/FileStorage/tests.py b/src/ZODB/FileStorage/tests.py index 64d08287f..fb564d416 100644 --- a/src/ZODB/FileStorage/tests.py +++ b/src/ZODB/FileStorage/tests.py @@ -113,15 +113,15 @@ def pack_with_repeated_blob_records(): fixed by the time you read this, but there might still be transactions in the wild that have duplicate records. - >>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs') - >>> db = ZODB.DB(fs) + >>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs')) >>> conn = db.open() >>> conn.root()[1] = ZODB.blob.Blob() >>> transaction.commit() >>> tm = transaction.TransactionManager() >>> oid = conn.root()[1]._p_oid - >>> from ZODB.utils import load_current - >>> blob_record, oldserial = load_current(fs, oid) + >>> fs = db._mvcc_storage.new_instance() + >>> _ = fs.poll_invalidations() + >>> blob_record, oldserial = fs.load(oid) Now, create a transaction with multiple saves: diff --git a/src/ZODB/mvccadapter.py b/src/ZODB/mvccadapter.py index 121e579bd..c625c1414 100644 --- a/src/ZODB/mvccadapter.py +++ b/src/ZODB/mvccadapter.py @@ -49,6 +49,7 @@ def new_instance(self): instance = MVCCAdapterInstance(self) with self._lock: self._instances.add(instance) + instance._lastTransaction() return instance def before_instance(self, before=None): @@ -77,13 +78,13 @@ def invalidateCache(self): def invalidate(self, transaction_id, oids): with self._lock: for instance in self._instances: - instance._invalidate(oids) + instance._invalidate(transaction_id, oids) - def _invalidate_finish(self, oids, committing_instance): + def _invalidate_finish(self, tid, oids, committing_instance): with self._lock: for instance in self._instances: if instance is not committing_instance: - instance._invalidate(oids) + instance._invalidate(tid, oids) references = serialize.referencesf transform_record_data = untransform_record_data = lambda self, data: data @@ -98,14 +99,26 @@ class MVCCAdapterInstance(Base): 'checkCurrentSerialInTransaction', 'tpc_abort', ) + _start = None # Transaction start time + _ltid = None # Last storage transaction id + def __init__(self, base): self._base = base Base.__init__(self, base._storage) self._lock = Lock() self._invalidations = set() - self._start = None # Transaction start time self._sync = getattr(self._storage, 'sync', lambda : None) + def _lastTransaction(self): + ltid = self._storage.lastTransaction() + # At this precise moment, a transaction may be + # committed and we have already received the new tid. + with self._lock: + # So make sure we won't override with a smaller value. + if self._ltid is None: + # Calling lastTransaction() here could result in a deadlock. + self._ltid = ltid + def release(self): self._base._release(self) @@ -115,8 +128,9 @@ def _invalidateCache(self): with self._lock: self._invalidations = None - def _invalidate(self, oids): + def _invalidate(self, tid, oids): with self._lock: + self._ltid = tid try: self._invalidations.update(oids) except AttributeError: @@ -128,8 +142,8 @@ def sync(self, force=True): self._sync() def poll_invalidations(self): - self._start = p64(u64(self._storage.lastTransaction()) + 1) with self._lock: + self._start = p64(u64(self._ltid) + 1) if self._invalidations is None: self._invalidations = set() return None @@ -175,7 +189,8 @@ def tpc_finish(self, transaction, func = lambda tid: None): self._modified = None def invalidate_finish(tid): - self._base._invalidate_finish(modified, self) + self._base._invalidate_finish(tid, modified, self) + self._ltid = tid func(tid) return self._storage.tpc_finish(transaction, invalidate_finish) @@ -260,7 +275,7 @@ def tpc_vote(self, transaction): def tpc_finish(self, transaction, func = lambda tid: None): def invalidate_finish(tid): - self._base._invalidate_finish(self._undone, None) + self._base._invalidate_finish(tid, self._undone, None) func(tid) self._storage.tpc_finish(transaction, invalidate_finish) diff --git a/src/ZODB/tests/testConnection.py b/src/ZODB/tests/testConnection.py index 88276b9c9..2fe003eb5 100644 --- a/src/ZODB/tests/testConnection.py +++ b/src/ZODB/tests/testConnection.py @@ -14,6 +14,7 @@ """Unit tests for the Connection class.""" from __future__ import print_function +from contextlib import contextmanager import doctest import re import six @@ -535,13 +536,13 @@ def doctest_invalidate(self): >>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1}) - Transaction start times are based on storage's last - transaction. (Previousely, they were based on the first - invalidation seen in a transaction.) + Transaction start times are based on storage's last transaction, + which is known from invalidations. (Previousely, they were + based on the first invalidation seen in a transaction.) >>> mvcc_instance.poll_invalidations() == [p1._p_oid] True - >>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1) + >>> mvcc_instance._start == p64(2) True >>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1}) @@ -570,6 +571,36 @@ def doctest_invalidate(self): >>> db.close() """ + def test_mvccadapterNewTransactionVsInvalidations(self): + """ + Check that polled invalidations are consistent with the TID at which + the transaction operates. Otherwise, it's like we miss invalidations. + """ + db = databaseFromString("\n\n") + try: + t1 = transaction.TransactionManager() + c1 = db.open(t1) + r1 = c1.root() + r1['a'] = 1 + t1.commit() + t2 = transaction.TransactionManager() + c2 = db.open(t2) + c2.root()['b'] = 1 + s1 = c1._storage + l1 = s1._lock + @contextmanager + def beforeLock1(): + s1._lock = l1 + t2.commit() + with l1: + yield + s1._lock = beforeLock1() + t1.begin() + self.assertIs(s1._lock, l1) + self.assertIn('b', r1) + finally: + db.close() + def doctest_invalidateCache(): """The invalidateCache method invalidates a connection's cache. @@ -1395,4 +1426,5 @@ def test_suite(): s.addTest(doctest.DocTestSuite(checker=checker)) s.addTest(unittest.makeSuite(TestConnection)) s.addTest(unittest.makeSuite(EstimatedSizeTests)) + s.addTest(unittest.makeSuite(InvalidationTests)) return s diff --git a/src/ZODB/tests/testmvcc.py b/src/ZODB/tests/testmvcc.py index ff52af5c0..4316e0e5d 100644 --- a/src/ZODB/tests/testmvcc.py +++ b/src/ZODB/tests/testmvcc.py @@ -85,13 +85,14 @@ >>> cn = db.open() ->>> cn._storage._start == p64(u64(st.lastTransaction()) + 1) +>>> ltid = u64(st.lastTransaction()) +>>> cn._storage._start == p64(ltid + 1) True ->>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2])) ->>> cn._storage._start == p64(u64(st.lastTransaction()) + 1) +>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2])) +>>> cn._storage._start == p64(ltid + 1) True ->>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2])) ->>> cn._storage._start == p64(u64(st.lastTransaction()) + 1) +>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2])) +>>> cn._storage._start == p64(ltid + 1) True A connection's high-water mark is set to the transaction id taken from @@ -105,7 +106,7 @@ a transaction and process invalidations. >>> cn.sync() ->>> cn._storage._start == p64(u64(st.lastTransaction()) + 1) +>>> cn._storage._start == p64(ltid + 201) True Basic functionality