Skip to content

Commit

Permalink
provide IMultiCommitStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Fulton committed Jul 2, 2016
1 parent 2529c25 commit 8b4689d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 42 deletions.
15 changes: 8 additions & 7 deletions src/ZEO/ClientStorage.py
Expand Up @@ -53,10 +53,7 @@
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'

try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
from ZODB.ConflictResolution import ResolvedSerial

def tid2time(tid):
return str(TimeStamp(tid))
Expand All @@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None):

MB = 1024**2

@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object):
"""A storage class that is a network client to a remote storage.
Expand Down Expand Up @@ -726,7 +724,8 @@ def tpc_vote(self, txn):
"""
tbuf = self._check_trans(txn, 'tpc_vote')
try:
self._call('vote', id(txn))
for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)
except POSException.ConflictError as err:
oid = getattr(err, 'oid', None)
if oid is not None:
Expand All @@ -743,8 +742,8 @@ def tpc_vote(self, txn):
if tbuf.exception:
raise tbuf.exception

if tbuf.serials:
return list(tbuf.serials.items())
if tbuf.resolved:
return list(tbuf.resolved)
else:
return None

Expand Down Expand Up @@ -839,6 +838,8 @@ def tpc_finish(self, txn, f=lambda tid: None):

self._update_blob_cache(tbuf, tid)

return tid

def _update_blob_cache(self, tbuf, tid):
"""Internal helper move blobs updated by a transaction to the cache.
"""
Expand Down
30 changes: 16 additions & 14 deletions src/ZEO/StorageServer.py
Expand Up @@ -427,17 +427,18 @@ def _try_to_vote(self, delay=None):
for op, args in self.txnlog:
getattr(self, op)(*args)


# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)

serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
if not isinstance(serials[0], bytes):
serials = (oid for (oid, serial) in serials
if serial == ResolvedSerial)

self.connection.async('serialnos', self.serials)
self.serials.extend(serials)

except Exception as err:
self.storage.tpc_abort(self.transaction)
Expand All @@ -455,9 +456,9 @@ def _try_to_vote(self, delay=None):
raise
else:
if delay is not None:
delay.reply(None)
delay.reply(self.serials)
else:
return None
return self.serials

else:
return delay
Expand Down Expand Up @@ -567,17 +568,18 @@ def _store(self, oid, serial, data, blobfile=None):
if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)

if isinstance(newserial, bytes):
newserial = [(oid, newserial)]
if newserial:

for oid, s in newserial or ():
if isinstance(newserial, bytes):
newserial = [(oid, newserial)]

if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER)
for oid, s in newserial:

self.serials.append((oid, s))
if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER)
self.serials.append(oid)

def _restore(self, oid, serial, data, prev_txn):
self.storage.restore(oid, serial, data, '', prev_txn,
Expand All @@ -586,7 +588,7 @@ def _restore(self, oid, serial, data, prev_txn):
def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction)
self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids)
self.serials.extend(oids)

# IStorageIteration support

Expand Down
19 changes: 9 additions & 10 deletions src/ZEO/TransactionBuffer.py
Expand Up @@ -46,7 +46,7 @@ def __init__(self, connection_generation):
# stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1
self.serials = {} # processed { oid -> serial }
self.resolved = set() # {oid}
self.exception = None

def close(self):
Expand All @@ -61,18 +61,17 @@ def store(self, oid, data):

def serial(self, oid, serial):
if isinstance(serial, Exception):
self.exception = serial
self.serials[oid] = None
else:
self.serials[oid] = serial
self.exception = serial # This transaction will never be committed
elif serial == ResolvedSerial:
self.resolved.add(oid)

def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))

def __iter__(self):
self.file.seek(0)
unpickler = Unpickler(self.file)
serials = self.serials
resolved = self.resolved

# Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated
Expand All @@ -83,9 +82,9 @@ def __iter__(self):
for i in range(self.count):
oid, data = unpickler.load()
seen.add(oid)
yield oid, data, serials.get(oid) == ResolvedSerial
yield oid, data, oid in resolved

# We may have leftover serials because undo
for oid, serial in serials.items():
# We may have leftover oids because undo
for oid in resolved:
if oid not in seen:
yield oid, None, serial == ResolvedSerial
yield oid, None, True
15 changes: 7 additions & 8 deletions src/ZEO/tests/testZEO.py
Expand Up @@ -731,24 +731,23 @@ def tpc_begin(self, transaction):
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')

def tpc_vote(self, transaction):
vote_result = self.server.vote(id(transaction))
assert vote_result is None
result = self.server.connection.serials[:]
result = self.server.vote(id(transaction))
assert result == self.server.connection.serials[:]
del self.server.connection.serials[:]
return result

def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, id(transaction))

def send_reply(self, *args): # Masquerade as conn
pass
def send_reply(self, _, result): # Masquerade as conn
self._result = result

def tpc_abort(self, transaction):
self.server.tpc_abort(id(transaction))

def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction)).set_sender(0, self)

return self._result

def multiple_storages_invalidation_queue_is_not_insane():
"""
Expand Down Expand Up @@ -915,14 +914,14 @@ def tpc_finish_error():
buffer, sadly, using implementation details:
>>> tbuf = t.data(client)
>>> tbuf.serials = None
>>> tbuf.resolved = None
tpc_finish will fail:
>>> client.tpc_finish(t) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError: ...
TypeError: ...
>>> client.tpc_abort(t)
>>> t.abort()
Expand Down
6 changes: 3 additions & 3 deletions src/ZEO/tests/testZEO2.py
Expand Up @@ -229,7 +229,7 @@ def some_basic_locking_tests():
ZEO.asyncio.server INFO
received handshake b'Z5'
>>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
Expand Down Expand Up @@ -486,7 +486,7 @@ def lock_sanity_check():
ZEO.asyncio.server INFO
received handshake b'Z5'
>>> tid1 = start_trans(zs1)
>>> zs1.vote(tid1) # doctest: +ELLIPSIS
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
Expand All @@ -502,7 +502,7 @@ def lock_sanity_check():
ZEO.asyncio.server INFO
received handshake b'Z5'
>>> tid2 = start_trans(zs2)
>>> zs2.vote(tid2) # doctest: +ELLIPSIS
>>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
Expand Down

0 comments on commit 8b4689d

Please sign in to comment.