Skip to content

Commit

Permalink
mvccadapter: fix race with invalidations when starting a new transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jmuchemb committed Dec 19, 2019
1 parent 8e97bd7 commit eb64a2b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/ZODB/FileStorage/tests.py
Expand Up @@ -113,15 +113,15 @@ def pack_with_repeated_blob_records():
fixed by the time you read this, but there might still be
transactions in the wild that have duplicate records.
>>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs')
>>> db = ZODB.DB(fs)
>>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs'))
>>> conn = db.open()
>>> conn.root()[1] = ZODB.blob.Blob()
>>> transaction.commit()
>>> tm = transaction.TransactionManager()
>>> oid = conn.root()[1]._p_oid
>>> from ZODB.utils import load_current
>>> blob_record, oldserial = load_current(fs, oid)
>>> fs = db._mvcc_storage.new_instance()
>>> _ = fs.poll_invalidations()
>>> blob_record, oldserial = fs.load(oid)
Now, create a transaction with multiple saves:
Expand Down
31 changes: 23 additions & 8 deletions src/ZODB/mvccadapter.py
Expand Up @@ -49,6 +49,7 @@ def new_instance(self):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
instance._lastTransaction()
return instance

def before_instance(self, before=None):
Expand Down Expand Up @@ -77,13 +78,13 @@ def invalidateCache(self):
def invalidate(self, transaction_id, oids):
with self._lock:
for instance in self._instances:
instance._invalidate(oids)
instance._invalidate(transaction_id, oids)

def _invalidate_finish(self, oids, committing_instance):
def _invalidate_finish(self, tid, oids, committing_instance):
with self._lock:
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
instance._invalidate(tid, oids)

references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
Expand All @@ -98,14 +99,26 @@ class MVCCAdapterInstance(Base):
'checkCurrentSerialInTransaction', 'tpc_abort',
)

_start = None # Transaction start time
_ltid = None # Last storage transaction id

def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)

def _lastTransaction(self):
ltid = self._storage.lastTransaction()
# At this precise moment, a transaction may be
# committed and we have already received the new tid.
with self._lock:
# So make sure we won't override with a smaller value.
if self._ltid is None:
# Calling lastTransaction() here could result in a deadlock.
self._ltid = ltid

def release(self):
self._base._release(self)

Expand All @@ -115,8 +128,9 @@ def _invalidateCache(self):
with self._lock:
self._invalidations = None

def _invalidate(self, oids):
def _invalidate(self, tid, oids):
with self._lock:
self._ltid = tid
try:
self._invalidations.update(oids)
except AttributeError:
Expand All @@ -128,8 +142,8 @@ def sync(self, force=True):
self._sync()

def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
self._start = p64(u64(self._ltid) + 1)
if self._invalidations is None:
self._invalidations = set()
return None
Expand Down Expand Up @@ -175,7 +189,8 @@ def tpc_finish(self, transaction, func = lambda tid: None):
self._modified = None

def invalidate_finish(tid):
self._base._invalidate_finish(modified, self)
self._base._invalidate_finish(tid, modified, self)
self._ltid = tid
func(tid)

return self._storage.tpc_finish(transaction, invalidate_finish)
Expand Down Expand Up @@ -260,7 +275,7 @@ def tpc_vote(self, transaction):
def tpc_finish(self, transaction, func = lambda tid: None):

def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
self._base._invalidate_finish(tid, self._undone, None)
func(tid)

self._storage.tpc_finish(transaction, invalidate_finish)
40 changes: 36 additions & 4 deletions src/ZODB/tests/testConnection.py
Expand Up @@ -14,6 +14,7 @@
"""Unit tests for the Connection class."""
from __future__ import print_function

from contextlib import contextmanager
import doctest
import re
import six
Expand Down Expand Up @@ -535,13 +536,13 @@ def doctest_invalidate(self):
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
Transaction start times are based on storage's last transaction,
which is known from invalidations. (Previousely, they were
based on the first invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
>>> mvcc_instance._start == p64(2)
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
Expand Down Expand Up @@ -570,6 +571,36 @@ def doctest_invalidate(self):
>>> db.close()
"""

def test_mvccadapterNewTransactionVsInvalidations(self):
"""
Check that polled invalidations are consistent with the TID at which
the transaction operates. Otherwise, it's like we miss invalidations.
"""
db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
try:
t1 = transaction.TransactionManager()
c1 = db.open(t1)
r1 = c1.root()
r1['a'] = 1
t1.commit()
t2 = transaction.TransactionManager()
c2 = db.open(t2)
c2.root()['b'] = 1
s1 = c1._storage
l1 = s1._lock
@contextmanager
def beforeLock1():
s1._lock = l1
t2.commit()
with l1:
yield
s1._lock = beforeLock1()
t1.begin()
self.assertIs(s1._lock, l1)
self.assertIn('b', r1)
finally:
db.close()

def doctest_invalidateCache():
"""The invalidateCache method invalidates a connection's cache.
Expand Down Expand Up @@ -1395,4 +1426,5 @@ def test_suite():
s.addTest(doctest.DocTestSuite(checker=checker))
s.addTest(unittest.makeSuite(TestConnection))
s.addTest(unittest.makeSuite(EstimatedSizeTests))
s.addTest(unittest.makeSuite(InvalidationTests))
return s
13 changes: 7 additions & 6 deletions src/ZODB/tests/testmvcc.py
Expand Up @@ -85,13 +85,14 @@
>>> cn = db.open()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> ltid = u64(st.lastTransaction())
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
A connection's high-water mark is set to the transaction id taken from
Expand All @@ -105,7 +106,7 @@
a transaction and process invalidations.
>>> cn.sync()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(ltid + 201)
True
Basic functionality
Expand Down

0 comments on commit eb64a2b

Please sign in to comment.