Skip to content

Commit

Permalink
Allow serial to be returned as late as tpc_finish
Browse files Browse the repository at this point in the history
This makes possible for storage to allocate serial inside tpc_finish,
removing the requirement to serialise 2PC's second phase phase (tpc_vote
to tpc_finish/tpc_abort).

Co-Authored-By: Julien Muchembled <jm@nexedi.com>
  • Loading branch information
vpelletier and jmuchemb committed Jun 17, 2016
1 parent 4905bb8 commit f66cb31
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 22 deletions.
13 changes: 12 additions & 1 deletion src/ZODB/Connection.py
Expand Up @@ -808,7 +808,18 @@ def callback(tid):
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._storage.tpc_finish(transaction, callback)
serial = self._storage.tpc_finish(transaction, callback)
# TODO: In the future, it will be required for storages to return the
# committed tid.
if serial is not None:
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
self._tpc_cleanup()

def sortKey(self):
Expand Down
8 changes: 6 additions & 2 deletions src/ZODB/interfaces.py
Expand Up @@ -776,6 +776,10 @@ def tpc_finish(transaction, func = lambda tid: None):
called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction.
The return value can be either None or a serial giving new
serial for objects whose ids were passed to previous store calls
in the same transaction, and for which no serial was returned
from either store or tpc_vote for objects passed to store.
"""

def tpc_vote(transaction):
Expand All @@ -794,14 +798,14 @@ def tpc_vote(transaction):
The return value can be either None or a sequence of object-id
and serial pairs giving new serials for objects who's ids were
passed to previous store calls in the same transaction.
After the tpc_vote call, new serials must have been returned,
either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be
the special value ZODB.ConflictResolution.ResolvedSerial to
indicate that a conflict occured and that the object should be
invalidated.
After the tpc_vote call, all solved conflicts must have been notified,
either from tpc_vote or store for objects passed to store.
"""


Expand Down
4 changes: 3 additions & 1 deletion src/ZODB/tests/BasicStorage.py
Expand Up @@ -69,8 +69,10 @@ def checkSerialIsNoneForInitialRevision(self):
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn)
r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn)
serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2)
if newrevid is None and serial is not None:
newrevid = serial
data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data)
eq(value, MinPO(11))
Expand Down
4 changes: 3 additions & 1 deletion src/ZODB/tests/MTStorage.py
Expand Up @@ -152,10 +152,12 @@ def dostore(self, i):
r2 = self.storage.tpc_vote(t)
self.pause()

self.storage.tpc_finish(t)
serial = self.storage.tpc_finish(t)
self.pause()

revid = handle_serials(oid, r1, r2)
if serial is not None and revid is None:
revid = serial
self.oids[oid] = revid

class ExtStorageClientThread(StorageClientThread):
Expand Down
4 changes: 3 additions & 1 deletion src/ZODB/tests/RevisionStorage.py
Expand Up @@ -150,10 +150,12 @@ def helper(tid, revid, x):
# Finish the transaction
r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
serial = self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
if serial is not None and newrevid is None:
newrevid = serial
return newrevid
revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2)
Expand Down
10 changes: 6 additions & 4 deletions src/ZODB/tests/StorageTestBase.py
Expand Up @@ -132,7 +132,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials().
"""
return handle_all_serials(oid, *args)[oid]
return handle_all_serials(oid, *args).get(oid)

def import_helper(name):
__import__(name)
Expand Down Expand Up @@ -189,7 +189,9 @@ def _dostore(self, oid=None, revid=None, data=None,
# Finish the transaction
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
serial = self._storage.tpc_finish(t)
if serial is not None and revid is None:
revid = serial
except:
self._storage.tpc_abort(t)
raise
Expand All @@ -209,8 +211,8 @@ def _undo(self, tid, expected_oids=None, note=None):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
if expected_oids is not None:
serial = self._storage.tpc_finish(t)
if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
Expand Down
32 changes: 20 additions & 12 deletions src/ZODB/tests/TransactionalUndoStorage.py
Expand Up @@ -73,6 +73,12 @@ def _transaction_vote(self, trans):
def _transaction_newserial(self, oid):
return self.__serials[oid]

def _transaction_finish(self, t, oid_list):
tid = self._storage.tpc_finish(t)
if tid is not None:
for oid in oid_list:
self.__serials[oid] = tid

def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
Expand All @@ -82,7 +88,7 @@ def _multi_obj_transaction(self, objs):
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
self._storage.tpc_finish(t)
self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
Expand Down Expand Up @@ -219,9 +225,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Update those same two objects
t = Transaction()
Expand All @@ -231,9 +237,9 @@ def checkTwoObjectUndo(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2)
# Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '')
Expand Down Expand Up @@ -289,10 +295,11 @@ def checkTwoObjectUndoAtOnce(self):
tid1 = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1)
self._storage.tpc_finish(t)
serial = self._storage.tpc_finish(t)
# We may get the finalization stuff called an extra time,
# depending on the implementation.
self.assertEqual(set(oids), set((oid1, oid2)))
if serial is None:
self.assertEqual(set(oids), {oid1, oid2})
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '')
Expand Down Expand Up @@ -326,7 +333,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
Expand All @@ -346,7 +353,7 @@ def checkTwoObjectUndoAgain(self):
self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
self._storage.tpc_finish(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
Expand All @@ -358,10 +365,11 @@ def checkTwoObjectUndoAgain(self):
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
eq(len(oids), 1)
self.assertTrue(oid1 in oids)
self.assertTrue(not oid2 in oids)
serial = self._storage.tpc_finish(t)
if serial is None:
eq(len(oids), 1)
self.assertTrue(oid1 in oids)
self.assertTrue(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
Expand Down Expand Up @@ -398,7 +406,7 @@ def checkNotUndoable(self):
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
self._storage.tpc_finish(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
Expand Down
28 changes: 28 additions & 0 deletions src/ZODB/tests/testDemoStorage.py
Expand Up @@ -39,6 +39,34 @@
import ZODB.utils
from zope.testing import renormalizing

# The following monkey-patch makes DemoStorage use the future API
# to update _p_changed/_p_serial status of committed oids.

from ZODB.ConflictResolution import ResolvedSerial

class DemoStorage(ZODB.DemoStorage.DemoStorage):

def store(self, *args):
s = super(DemoStorage, self).store(*args)
if s == ResolvedSerial:
return s

def tpc_vote(self, transaction):
s = super(DemoStorage, self).tpc_vote(transaction)
if s:
return [(oid, serial) for oid, serial in s
if serial == ResolvedSerial]

def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]

ZODB.DemoStorage.DemoStorage = DemoStorage

class DemoStorageTests(
StorageTestBase.StorageTestBase,
Expand Down

0 comments on commit f66cb31

Please sign in to comment.