Skip to content

Commit

Permalink
mvccadapter: check if the last TID changed without invalidation
Browse files Browse the repository at this point in the history
Since commit b5895a5 ("mvccadapter:
fix race with invalidations when starting a new transaction"),
a ZEO test fails as follows:

    File "src/ZEO/tests/drop_cache_rather_than_verify.txt", line 114, in drop_cache_rather_than_verify.txt
    Failed example:
        conn.root()[1].x
    Expected:
        6
    Got:
        1

Earlier in the test, the ZEO server is restarted and then another
client commits. When disconnected, the first client does not receive
invalidations anymore and the connection gets stuck in the past until
there's a new commit after it reconnected. It was possible to make the
test pass with the following patch:

--- a/src/ZEO/ClientStorage.py
+++ b/src/ZEO/ClientStorage.py
@@ -357,6 +357,7 @@ def notify_connected(self, conn, info):

         # invalidate our db cache
         if self._db is not None:
+            self._db.invalidate(self.lastTransaction(), ())
             self._db.invalidateCache()

         logger.info("%s %s to storage: %s",

Other implementations like NEO are probably affected the same way.

Rather than changing interfaces in a backward-incompatible way,
this commit revert to the original behaviour, and all the changes
that were done in existing tests are reverted.

However, the interfaces are clarified about the fact that storage
implementations must update at a precise moment the value that is
returned by lastTransaction(): just after invalidate() or
tpc_finish callback.
  • Loading branch information
jmuchemb committed Jun 9, 2020
1 parent 5ce50c3 commit 4a6b028
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 28 deletions.
8 changes: 4 additions & 4 deletions src/ZODB/FileStorage/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ def pack_with_repeated_blob_records():
fixed by the time you read this, but there might still be
transactions in the wild that have duplicate records.
>>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs'))
>>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs')
>>> db = ZODB.DB(fs)
>>> conn = db.open()
>>> conn.root()[1] = ZODB.blob.Blob()
>>> transaction.commit()
>>> tm = transaction.TransactionManager()
>>> oid = conn.root()[1]._p_oid
>>> fs = db._mvcc_storage.new_instance()
>>> _ = fs.poll_invalidations()
>>> blob_record, oldserial = fs.load(oid)
>>> from ZODB.utils import load_current
>>> blob_record, oldserial = load_current(fs, oid)
Now, create a transaction with multiple saves:
Expand Down
8 changes: 8 additions & 0 deletions src/ZODB/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,14 @@ def isReadOnly():
def lastTransaction():
"""Return the id of the last committed transaction.
For proper MVCC operation, the return value is the id of the last
transaction for which invalidation notifications are completed.
In particular for client-server implementations, lastTransaction
should return a cached value (rather than querying the server).
A preliminary call to sync() can be done to get the actual last
TID at the wanted time.
If no transactions have been committed, return a string of 8
null (0) characters.
"""
Expand Down
22 changes: 9 additions & 13 deletions src/ZODB/mvccadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def new_instance(self):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
instance._lastTransaction()
return instance

def before_instance(self, before=None):
Expand Down Expand Up @@ -100,7 +99,7 @@ class MVCCAdapterInstance(Base):
)

_start = None # Transaction start time
_ltid = None # Last storage transaction id
_ltid = b'' # Last storage transaction id

def __init__(self, base):
self._base = base
Expand All @@ -109,16 +108,6 @@ def __init__(self, base):
self._invalidations = set()
self._sync = getattr(self._storage, 'sync', lambda : None)

def _lastTransaction(self):
ltid = self._storage.lastTransaction()
# At this precise moment, a transaction may be
# committed and we have already received the new tid.
with self._lock:
# So make sure we won't override with a smaller value.
if self._ltid is None:
# Calling lastTransaction() here could result in a deadlock.
self._ltid = ltid

def release(self):
self._base._release(self)

Expand All @@ -142,8 +131,15 @@ def sync(self, force=True):
self._sync()

def poll_invalidations(self):
# Storage implementations don't always call invalidate() when
# the last TID changes, e.g. after network reconnection,
# so we still have to poll.
ltid = self._storage.lastTransaction()
# But at this precise moment, a transaction may be committed and
# we have already received the new tid, along with invalidations.
with self._lock:
self._start = p64(u64(self._ltid) + 1)
# So we must pick the greatest value.
self._start = p64(u64(max(ltid, self._ltid)) + 1)
if self._invalidations is None:
self._invalidations = set()
return None
Expand Down
8 changes: 4 additions & 4 deletions src/ZODB/tests/testConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,13 +536,13 @@ def doctest_invalidate(self):
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last transaction,
which is known from invalidations. (Previousely, they were
based on the first invalidation seen in a transaction.)
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> mvcc_instance._start == p64(2)
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
Expand Down
13 changes: 6 additions & 7 deletions src/ZODB/tests/testmvcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,13 @@
>>> cn = db.open()
>>> ltid = u64(st.lastTransaction())
>>> cn._storage._start == p64(ltid + 1)
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
A connection's high-water mark is set to the transaction id taken from
Expand All @@ -106,7 +105,7 @@
a transaction and process invalidations.
>>> cn.sync()
>>> cn._storage._start == p64(ltid + 201)
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
Basic functionality
Expand Down

0 comments on commit 4a6b028

Please sign in to comment.