Skip to content

Commit

Permalink
Merge pull request #82 from zopefoundation/handle_serial
Browse files Browse the repository at this point in the history
Switch all storages to the new commit protocol

Thanks!
  • Loading branch information
jimfulton committed Jul 6, 2016
2 parents ae956ad + 4d0512f commit 7856af0
Show file tree
Hide file tree
Showing 13 changed files with 62 additions and 136 deletions.
7 changes: 3 additions & 4 deletions CHANGES.rst
Expand Up @@ -10,13 +10,12 @@ See 4.4.2.
5.0.0a3 (2016-07-01)
====================

Added IMultiCommitStorage to directly represent the changes in the 4.4.0
release and to make complient storages introspectable.
See 4.4.1.

5.0.0a2 (2016-07-01)
====================

See the 4.4.x releases.
See 4.4.0.

5.0.0a1 (2016-06-20)
====================
Expand All @@ -38,7 +37,7 @@ Concurrency Control (MVCC) implementation:
==================

Better support of the new commit protocol. This fixes issues with blobs and
undo. See https://github.com/zopefoundation/ZODB/pull/77
undo. See pull requests #77, #80, #83

4.4.1 (2016-07-01)
==================
Expand Down
7 changes: 6 additions & 1 deletion src/ZODB/BaseStorage.py
Expand Up @@ -106,6 +106,9 @@ def __init__(self, name, base=None):
self._oid = z64
else:
self._oid = oid
# In case that conflicts are resolved during store,
# this collects oids to be returned by tpc_vote.
self._resolved = []

def sortKey(self):
"""Return a string that can be used to sort storage instances.
Expand Down Expand Up @@ -205,6 +208,7 @@ def tpc_begin(self, transaction, tid=None, status=' '):
self._ts = TimeStamp(tid)
self._tid = tid

del self._resolved[:]
self._tstatus = status
self._begin(self._tid, user, desc, ext)

Expand All @@ -226,7 +230,7 @@ def tpc_vote(self, transaction):
def _vote(self):
"""Subclasses should redefine this to supply transaction vote actions.
"""
pass
return self._resolved

def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
Expand All @@ -249,6 +253,7 @@ def tpc_finish(self, transaction, f=None):
self._ude = None
self._transaction = None
self._commit_lock.release()
return self._tid

def _finish(self, tid, u, d, e):
"""Subclasses should redefine this to supply transaction finish actions
Expand Down
2 changes: 1 addition & 1 deletion src/ZODB/ConflictResolution.py
Expand Up @@ -28,7 +28,7 @@

logger = logging.getLogger('ZODB.ConflictResolution')

ResolvedSerial = b'rs' # deprecated: store/tpc_finish should just use True
ResolvedSerial = b'rs' # deprecated: see IMultiCommitStorage.tpc_vote

class BadClassName(Exception):
pass
Expand Down
7 changes: 2 additions & 5 deletions src/ZODB/Connection.py
Expand Up @@ -589,18 +589,15 @@ def _store_objects(self, writer, transaction):

self._handle_serial(oid, s)

def _handle_serial(self, oid, serial=True, change=True):
def _handle_serial(self, oid, serial=ResolvedSerial, change=True):

# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
self._readCurrent.pop(oid, None)

if not serial:
return
if serial is True:
serial = ResolvedSerial
elif not isinstance(serial, bytes):
raise serial
assert isinstance(serial, bytes), serial
obj = self._cache.get(oid, None)
if obj is None:
return
Expand Down
25 changes: 17 additions & 8 deletions src/ZODB/DemoStorage.py
Expand Up @@ -32,7 +32,7 @@
import ZODB.utils
import zope.interface

from .ConflictResolution import ConflictResolvingStorage, ResolvedSerial
from .ConflictResolution import ConflictResolvingStorage
from .utils import load_current, maxtid

@zope.interface.implementer(
Expand Down Expand Up @@ -73,6 +73,7 @@ def __init__(self, name=None, base=None, changes=None,

self._issued_oids = set()
self._stored_oids = set()
self._resolved = []

self._commit_lock = ZODB.utils.Lock()
self._transaction = None
Expand Down Expand Up @@ -116,7 +117,7 @@ def _copy_methods_from_changes(self, changes):
for meth in (
'_lock',
'getSize', 'isReadOnly',
'sortKey', 'tpc_transaction', 'tpc_vote',
'sortKey', 'tpc_transaction',
):
setattr(self, meth, getattr(changes, meth))

Expand Down Expand Up @@ -309,9 +310,9 @@ def store(self, oid, serial, data, version, transaction):
if old != serial:
rdata = self.tryToResolveConflict(oid, old, serial, data)
self.changes.store(oid, old, rdata, '', transaction)
return ResolvedSerial

return self.changes.store(oid, serial, data, '', transaction)
self._resolved.append(oid)
else:
self.changes.store(oid, serial, data, '', transaction)

def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
Expand All @@ -324,11 +325,11 @@ def storeBlob(self, oid, oldserial, data, blobfilename, version,
self._stored_oids.add(oid)

try:
return self.changes.storeBlob(
self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction)
except AttributeError:
if self._blobify():
return self.changes.storeBlob(
self.changes.storeBlob(
oid, oldserial, data, blobfilename, '', transaction)
raise

Expand Down Expand Up @@ -365,6 +366,13 @@ def tpc_begin(self, transaction, *a, **k):
self.changes.tpc_begin(transaction, *a, **k)
self._transaction = transaction
self._stored_oids = set()
del self._resolved[:]

def tpc_vote(self, *a, **k):
if self.changes.tpc_vote(*a, **k):
raise ZODB.POSException.StorageTransactionError(
"Unexpected resolved conflicts")
return self._resolved

def tpc_finish(self, transaction, func = lambda tid: None):
with self._lock:
Expand All @@ -374,8 +382,9 @@ def tpc_finish(self, transaction, func = lambda tid: None):
self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set()
self._transaction = None
self.changes.tpc_finish(transaction, func)
tid = self.changes.tpc_finish(transaction, func)
self._commit_lock.release()
return tid

_temporary_blobdirs = {}
def cleanup_temporary_blobdir(
Expand Down
15 changes: 6 additions & 9 deletions src/ZODB/FileStorage/FileStorage.py
Expand Up @@ -40,7 +40,6 @@
from ZODB.BaseStorage import DataRecord as _DataRecord
from ZODB.BaseStorage import TransactionRecord as _TransactionRecord
from ZODB.ConflictResolution import ConflictResolvingStorage
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.FileStorage.format import CorruptedDataError
from ZODB.FileStorage.format import CorruptedError
from ZODB.FileStorage.format import DATA_HDR
Expand Down Expand Up @@ -521,6 +520,7 @@ def store(self, oid, oldserial, data, version, transaction):
if oldserial != committed_tid:
data = self.tryToResolveConflict(oid, committed_tid,
oldserial, data)
self._resolved.append(oid)

pos = self._pos
here = pos + self._tfile.tell() + self._thl
Expand All @@ -535,11 +535,6 @@ def store(self, oid, oldserial, data, version, transaction):
raise FileStorageQuotaError(
"The storage quota has been exceeded.")

if old and oldserial != committed_tid:
return ResolvedSerial
else:
return self._tid

def deleteObject(self, oid, oldserial, transaction):
if self._is_read_only:
raise ReadOnlyError()
Expand Down Expand Up @@ -731,6 +726,7 @@ def tpc_vote(self, transaction):
self._files.flush()
raise
self._nextpos = self._pos + (tl + 8)
return self._resolved

def tpc_finish(self, transaction, f=None):
with self._files.write_lock():
Expand All @@ -739,15 +735,16 @@ def tpc_finish(self, transaction, f=None):
raise StorageTransactionError(
"tpc_finish called with wrong transaction")
try:
tid = self._tid
if f is not None:
f(self._tid)
u, d, e = self._ude
self._finish(self._tid, u, d, e)
f(tid)
self._finish(tid, *self._ude)
self._clear_temp()
finally:
self._ude = None
self._transaction = None
self._commit_lock.release()
return tid

def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
Expand Down
14 changes: 7 additions & 7 deletions src/ZODB/FileStorage/tests.py
Expand Up @@ -128,10 +128,10 @@ def pack_with_repeated_blob_records():
>>> fs.tpc_begin(trans)
>>> with open('ablob', 'w') as file:
... _ = file.write('some data')
>>> _ = fs.store(oid, oldserial, blob_record, '', trans)
>>> _ = fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans)
>>> fs.tpc_vote(trans)
>>> fs.tpc_finish(trans)
>>> fs.store(oid, oldserial, blob_record, '', trans)
>>> fs.storeBlob(oid, oldserial, blob_record, 'ablob', '', trans)
>>> _ = fs.tpc_vote(trans)
>>> _ = fs.tpc_finish(trans)
>>> time.sleep(.01)
>>> db.pack()
Expand All @@ -156,9 +156,9 @@ def _save_index():
>>> oid = 0
>>> for i in range(5000):
... oid += (1<<16)
... _ = fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t)
>>> fs.tpc_vote(t)
>>> fs.tpc_finish(t)
... fs.store(ZODB.utils.p64(oid), ZODB.utils.z64, b'x', '', t)
>>> _ = fs.tpc_vote(t)
>>> _ = fs.tpc_finish(t)
>>> import sys
>>> old_limit = sys.getrecursionlimit()
Expand Down
3 changes: 1 addition & 2 deletions src/ZODB/MappingStorage.py
Expand Up @@ -247,8 +247,6 @@ def store(self, oid, serial, data, version, transaction):

self._tdata[oid] = data

return self._tid

checkCurrentSerialInTransaction = (
ZODB.BaseStorage.checkCurrentSerialInTransaction)

Expand Down Expand Up @@ -307,6 +305,7 @@ def tpc_finish(self, transaction, func = lambda tid: None):
self._transaction = None
del self._tdata
self._commit_lock.release()
return tid

# ZEO.interfaces.IServeable
@ZODB.utils.locked(opened)
Expand Down
7 changes: 4 additions & 3 deletions src/ZODB/tests/IExternalGC.test
Expand Up @@ -45,9 +45,10 @@ transaction ourselves.
>>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid0, s0, txn)
>>> storage.deleteObject(oid1, s1, txn)
>>> storage.tpc_vote(txn)
>>> storage.tpc_finish(txn)
>>> tid = storage.lastTransaction()
>>> _ = storage.tpc_vote(txn)
>>> tid = storage.tpc_finish(txn)
>>> tid == storage.lastTransaction()
True

Now if we try to load data for the objects, we get a POSKeyError:

Expand Down
4 changes: 1 addition & 3 deletions src/ZODB/tests/StorageTestBase.py
Expand Up @@ -118,9 +118,7 @@ def handle_all_serials(oid, *args):
for arg in args:
if isinstance(arg, bytes):
d[oid] = arg
elif arg is None:
pass
else:
elif arg:
for oid, serial in arg:
if not isinstance(serial, bytes):
raise serial # error from ZEO server
Expand Down
9 changes: 4 additions & 5 deletions src/ZODB/tests/blob_transaction.txt
Expand Up @@ -381,16 +381,15 @@ stored are discarded.
>>> blob_storage.tpc_begin(t)
>>> with open('blobfile', 'wb') as file:
... _ = file.write(b'This data should go away')
>>> s1 = blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
>>> blob_storage.storeBlob(blob._p_oid, oldserial, olddata, 'blobfile',
... '', t)
>>> new_oid = blob_storage.new_oid()
>>> with open('blobfile2', 'wb') as file:
... _ = file.write(b'This data should go away too')
>>> s2 = blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
>>> blob_storage.storeBlob(new_oid, '\0'*8, olddata, 'blobfile2',
... '', t)

>>> serials = blob_storage.tpc_vote(t)

>>> bool(blob_storage.tpc_vote(t))
False
>>> blob_storage.tpc_abort(t)

Now, the serial for the existing blob should be the same:
Expand Down
45 changes: 0 additions & 45 deletions src/ZODB/tests/testDemoStorage.py
Expand Up @@ -42,46 +42,6 @@

from zope.testing import renormalizing

# With the following monkey-patch, we can test the different ways
# to update _p_changed/_p_serial status of committed oids.

from ZODB.ConflictResolution import ResolvedSerial

class DemoStorage(ZODB.DemoStorage.DemoStorage):

delayed_store = False

def tpc_begin(self, *args):
super(DemoStorage, self).tpc_begin(*args)
self.__stored = []

def store(self, oid, *args):
s = super(DemoStorage, self).store(oid, *args)
if s != ResolvedSerial:
assert type(s) is bytes, s
return
if not self.delayed_store:
return True
self.__stored.append(oid)

tpc_vote = property(lambda self: self._tpc_vote, lambda *_: None)

def _tpc_vote(self, transaction):
s = self.changes.tpc_vote(transaction)
assert s is None, s
return self.__stored if self.delayed_store else s

def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]

ZODB.DemoStorage.DemoStorage = DemoStorage

class DemoStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
Expand Down Expand Up @@ -146,11 +106,6 @@ def base_and_changes():
self._checkHistory(base_and_changes())
self._storage = self._storage.pop()

def checkResolveLate(self):
self._storage.delayed_store = True
self.checkResolve()


class DemoStorageHexTests(DemoStorageTests):

def setUp(self):
Expand Down

0 comments on commit 7856af0

Please sign in to comment.