Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jmuchemb committed Dec 13, 2019
1 parent 8e97bd7 commit ac03883
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/ZODB/FileStorage/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ def pack_with_repeated_blob_records():
fixed by the time you read this, but there might still be
transactions in the wild that have duplicate records.
>>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs')
>>> db = ZODB.DB(fs)
>>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs'))
>>> conn = db.open()
>>> conn.root()[1] = ZODB.blob.Blob()
>>> transaction.commit()
>>> tm = transaction.TransactionManager()
>>> oid = conn.root()[1]._p_oid
>>> from ZODB.utils import load_current
>>> blob_record, oldserial = load_current(fs, oid)
>>> fs = db._mvcc_storage.new_instance()
>>> _ = fs.poll_invalidations()
>>> blob_record, oldserial = fs.load(oid)
Now, create a transaction with multiple saves:
Expand Down
31 changes: 23 additions & 8 deletions src/ZODB/mvccadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def new_instance(self):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
instance._lastTransaction()
return instance

def before_instance(self, before=None):
Expand Down Expand Up @@ -77,13 +78,13 @@ def invalidateCache(self):
def invalidate(self, transaction_id, oids):
with self._lock:
for instance in self._instances:
instance._invalidate(oids)
instance._invalidate(transaction_id, oids)

def _invalidate_finish(self, oids, committing_instance):
def _invalidate_finish(self, tid, oids, committing_instance):
with self._lock:
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
instance._invalidate(tid, oids)

references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
Expand All @@ -98,14 +99,26 @@ class MVCCAdapterInstance(Base):
'checkCurrentSerialInTransaction', 'tpc_abort',
)

_start = None # Transaction start time
_ltid = None # Last storage transaction id

def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)

def _lastTransaction(self):
ltid = self._storage.lastTransaction()
# At this precise moment, a transaction may be
# committed and we have already received the new tid.
with self._lock:
# So make sure we won't override with a smaller value.
if self._ltid is None:
# Calling lastTransaction() here could result in a deadlock.
self._ltid = ltid

def release(self):
self._base._release(self)

Expand All @@ -115,8 +128,9 @@ def _invalidateCache(self):
with self._lock:
self._invalidations = None

def _invalidate(self, oids):
def _invalidate(self, tid, oids):
with self._lock:
self._ltid = tid
try:
self._invalidations.update(oids)
except AttributeError:
Expand All @@ -128,8 +142,8 @@ def sync(self, force=True):
self._sync()

def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
self._start = p64(u64(self._ltid) + 1)
if self._invalidations is None:
self._invalidations = set()
return None
Expand Down Expand Up @@ -175,7 +189,8 @@ def tpc_finish(self, transaction, func = lambda tid: None):
self._modified = None

def invalidate_finish(tid):
self._base._invalidate_finish(modified, self)
self._base._invalidate_finish(tid, modified, self)
self._ltid = tid
func(tid)

return self._storage.tpc_finish(transaction, invalidate_finish)
Expand Down Expand Up @@ -260,7 +275,7 @@ def tpc_vote(self, transaction):
def tpc_finish(self, transaction, func = lambda tid: None):

def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
self._base._invalidate_finish(tid, self._undone, None)
func(tid)

self._storage.tpc_finish(transaction, invalidate_finish)
8 changes: 4 additions & 4 deletions src/ZODB/tests/testConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,13 @@ def doctest_invalidate(self):
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
Transaction start times are based on storage's last transaction,
which is known from invalidations. (Previousely, they were
based on the first invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
>>> mvcc_instance._start == p64(2)
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
Expand Down
13 changes: 7 additions & 6 deletions src/ZODB/tests/testmvcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@
>>> cn = db.open()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> ltid = u64(st.lastTransaction())
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
>>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1)
True
A connection's high-water mark is set to the transaction id taken from
Expand All @@ -105,7 +106,7 @@
a transaction and process invalidations.
>>> cn.sync()
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(ltid + 201)
True
Basic functionality
Expand Down

0 comments on commit ac03883

Please sign in to comment.