Skip to content

Commit

Permalink
Merge pull request #38 from zopefoundation/client-side-conflict-resol…
Browse files Browse the repository at this point in the history
…ution

Client side conflict resolution
  • Loading branch information
jimfulton committed Jul 7, 2016
2 parents bff1a14 + 10d7f9f commit e5653d4
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 75 deletions.
32 changes: 27 additions & 5 deletions src/ZEO/ClientStorage.py
Expand Up @@ -34,6 +34,7 @@
import zc.lockfile
import ZODB
import ZODB.BaseStorage
import ZODB.ConflictResolution
import ZODB.interfaces
import zope.interface
import six
Expand Down Expand Up @@ -75,7 +76,7 @@ def get_timestamp(prev_ts=None):
MB = 1024**2

@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object):
class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
"""A storage class that is a network client to a remote storage.
This is a faithful implementation of the Storage API.
Expand Down Expand Up @@ -331,6 +332,7 @@ def registerDB(self, db):
The storage isn't really ready to use until after this call.
"""
super(ClientStorage, self).registerDB(db)
self._db = db

def is_connected(self, test=False):
Expand Down Expand Up @@ -722,13 +724,33 @@ def tpc_vote(self, txn):
"""
tbuf = self._check_trans(txn, 'tpc_vote')
try:
for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)

conflicts = True
vote_attempts = 0
while conflicts and vote_attempts < 9: # 9? Mainly avoid inf. loop
conflicts = False
for oid in self._call('vote', id(txn)) or ():
if isinstance(oid, dict):
# Conflict, let's try to resolve it
conflicts = True
conflict = oid
oid = conflict['oid']
committed, read = conflict['serials']
data = self.tryToResolveConflict(
oid, committed, read, conflict['data'])
self._async('storea', oid, committed, data, id(txn))
tbuf.resolve(oid, data)
else:
tbuf.serial(oid, ResolvedSerial)

vote_attempts += 1

except POSException.StorageTransactionError:
# Hm, we got disconnected and reconnected bwtween
# _check_trans and voting. Let's chack the transaction again:
self._check_trans(txn, 'tpc_vote')
raise

except POSException.ConflictError as err:
oid = getattr(err, 'oid', None)
if oid is not None:
Expand All @@ -745,8 +767,8 @@ def tpc_vote(self, txn):
if tbuf.exception:
raise tbuf.exception

if tbuf.resolved:
return list(tbuf.resolved)
if tbuf.server_resolved or tbuf.client_resolved:
return list(tbuf.server_resolved) + list(tbuf.client_resolved)
else:
return None

Expand Down
99 changes: 64 additions & 35 deletions src/ZEO/StorageServer.py
Expand Up @@ -89,6 +89,7 @@ class ZEOStorage:

def __init__(self, server, read_only=0):
self.server = server
self.client_conflict_resolution = server.client_conflict_resolution
# timeout and stats will be initialized in register()
self.read_only = read_only
# The authentication protocol may define extra methods.
Expand Down Expand Up @@ -334,6 +335,7 @@ def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
t._extension = ext

self.serials = []
self.conflicts = {}
self.invalidated = []
self.txnlog = CommitLog()
self.blob_log = []
Expand Down Expand Up @@ -413,6 +415,7 @@ def _try_to_vote(self, delay=None):

self.locked, delay = self.server.lock_storage(self, delay)
if self.locked:
result = None
try:
self.log(
"Preparing to commit transaction: %d objects, %d bytes"
Expand All @@ -433,13 +436,29 @@ def _try_to_vote(self, delay=None):
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)

serials = self.storage.tpc_vote(self.transaction)
if serials:
if not isinstance(serials[0], bytes):
serials = (oid for (oid, serial) in serials
if serial == ResolvedSerial)

self.serials.extend(serials)
if not self.conflicts:
try:
serials = self.storage.tpc_vote(self.transaction)
except ConflictError as err:
if (self.client_conflict_resolution and
err.oid and err.serials and err.data
):
self.conflicts[err.oid] = dict(
oid=err.oid, serials=err.serials, data=err.data)
else:
raise
else:
if serials:
self.serials.extend(serials)
result = self.serials

if self.conflicts:
result = list(self.conflicts.values())
self.storage.tpc_abort(self.transaction)
self.server.unlock_storage(self)
self.locked = False
self.server.stop_waiting(self)

except Exception as err:
self.storage.tpc_abort(self.transaction)
Expand All @@ -457,9 +476,9 @@ def _try_to_vote(self, delay=None):
raise
else:
if delay is not None:
delay.reply(self.serials)
delay.reply(result)
else:
return self.serials
return result

else:
return delay
Expand Down Expand Up @@ -559,28 +578,24 @@ def _checkread(self, oid, serial):
oid, serial, self.transaction)

def _store(self, oid, serial, data, blobfile=None):
if blobfile is None:
newserial = self.storage.store(
oid, serial, data, '', self.transaction)
try:
if blobfile is None:
self.storage.store(oid, serial, data, '', self.transaction)
else:
self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)
except ConflictError as err:
if self.client_conflict_resolution and err.serials:
self.conflicts[oid] = dict(
oid=oid, serials=err.serials, data=data)
else:
raise
else:
newserial = self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)

if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)

if newserial:
if oid in self.conflicts:
del self.conflicts[oid]

if isinstance(newserial, bytes):
newserial = [(oid, newserial)]

for oid, s in newserial:

if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER)
self.serials.append(oid)
if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)

def _restore(self, oid, serial, data, prev_txn):
self.storage.restore(oid, serial, data, '', prev_txn,
Expand Down Expand Up @@ -697,6 +712,7 @@ def __init__(self, addr, storages,
invalidation_age=None,
transaction_timeout=None,
ssl=None,
client_conflict_resolution=False,
):
"""StorageServer constructor.
Expand Down Expand Up @@ -767,15 +783,23 @@ def __init__(self, addr, storages,
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
if client_conflict_resolution:
# XXX this may go away later, when storages grow
# configuration for this.
storage.tryToResolveConflict = never_resolve_conflict
self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.acceptor = Acceptor(self, addr, ssl)
if isinstance(addr, tuple) and addr[0]:
self.addr = self.acceptor.addr
else:
self.addr = addr
self.loop = self.acceptor.loop
ZODB.event.notify(Serving(self, address=self.acceptor.addr))
self.client_conflict_resolution = client_conflict_resolution

if addr is not None:
self.acceptor = Acceptor(self, addr, ssl)
if isinstance(addr, tuple) and addr[0]:
self.addr = self.acceptor.addr
else:
self.addr = addr
self.loop = self.acceptor.loop
ZODB.event.notify(Serving(self, address=self.acceptor.addr))

self.stats = {}
self.timeouts = {}
for name in self.storages.keys():
Expand Down Expand Up @@ -1308,3 +1332,8 @@ class Serving(ServerEvent):

class Closed(ServerEvent):
pass

def never_resolve_conflict(oid, committedSerial, oldSerial, newpickle,
committedData=b''):
raise ConflictError(oid=oid, serials=(committedSerial, oldSerial),
data=newpickle)
21 changes: 15 additions & 6 deletions src/ZEO/TransactionBuffer.py
Expand Up @@ -46,7 +46,8 @@ def __init__(self, connection_generation):
# stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1
self.resolved = set() # {oid}
self.server_resolved = set() # {oid}
self.client_resolved = {} # {oid -> buffer_record_number}
self.exception = None

def close(self):
Expand All @@ -59,19 +60,26 @@ def store(self, oid, data):
# Estimate per-record cache size
self.size = self.size + (data and len(data) or 0) + 31

def resolve(self, oid, data):
"""Record client-resolved data
"""
self.store(oid, data)
self.client_resolved[oid] = self.count - 1

def serial(self, oid, serial):
if isinstance(serial, Exception):
self.exception = serial # This transaction will never be committed
elif serial == ResolvedSerial:
self.resolved.add(oid)
self.server_resolved.add(oid)

def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))

def __iter__(self):
self.file.seek(0)
unpickler = Unpickler(self.file)
resolved = self.resolved
server_resolved = self.server_resolved
client_resolved = self.client_resolved

# Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated
Expand All @@ -81,10 +89,11 @@ def __iter__(self):
seen = set()
for i in range(self.count):
oid, data = unpickler.load()
seen.add(oid)
yield oid, data, oid in resolved
if client_resolved.get(oid, i) == i:
seen.add(oid)
yield oid, data, oid in server_resolved

# We may have leftover oids because undo
for oid in resolved:
for oid in server_resolved:
if oid not in seen:
yield oid, None, True
4 changes: 4 additions & 0 deletions src/ZEO/runzeo.py
Expand Up @@ -98,6 +98,9 @@ def add_zeo_options(self):
self.add("address", "zeo.address.address",
required="no server address specified; use -a or -C")
self.add("read_only", "zeo.read_only", default=0)
self.add("client_conflict_resolution",
"zeo.client_conflict_resolution",
default=0)
self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
default=100)
self.add("invalidation_age", "zeo.invalidation_age")
Expand Down Expand Up @@ -339,6 +342,7 @@ def create_server(storages, options):
options.address,
storages,
read_only = options.read_only,
client_conflict_resolution=options.client_conflict_resolution,
invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout,
Expand Down
8 changes: 8 additions & 0 deletions src/ZEO/server.xml
Expand Up @@ -107,6 +107,14 @@
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>

<key name="client-conflict-resolution" datatype="boolean"
required="no" default="false">
<description>
Flag indicating whether the server should return conflict
errors to the client, for resolution there.
</description>
</key>

</sectiontype>

</component>
2 changes: 2 additions & 0 deletions src/ZEO/tests/CommitLockTests.py
Expand Up @@ -30,6 +30,8 @@ class DummyDB:
def invalidate(self, *args, **kwargs):
pass

transform_record_data = untransform_record_data = lambda self, data: data

class WorkerThread(TestThread):

# run the entire test in a thread so that the blocking call for
Expand Down
3 changes: 3 additions & 0 deletions src/ZEO/tests/ConnectionTests.py
Expand Up @@ -59,6 +59,9 @@ def invalidate(self, *args, **kwargs):
def invalidateCache(self):
pass

transform_record_data = untransform_record_data = lambda self, data: data


class CommonSetupTearDown(StorageTestBase):
"""Common boilerplate"""

Expand Down
7 changes: 4 additions & 3 deletions src/ZEO/tests/forker.py
Expand Up @@ -33,7 +33,7 @@
class ZEOConfig:
"""Class to generate ZEO configuration file. """

def __init__(self, addr):
def __init__(self, addr, **options):
if isinstance(addr, str):
self.logpath = addr+'.log'
else:
Expand All @@ -42,6 +42,7 @@ def __init__(self, addr):
self.address = addr
self.read_only = None
self.loglevel = 'INFO'
self.__dict__.update(options)

def dump(self, f):
print("<zeo>", file=f)
Expand All @@ -52,7 +53,7 @@ def dump(self, f):
for name in (
'invalidation_queue_size', 'invalidation_age',
'transaction_timeout', 'pid_filename',
'ssl_certificate', 'ssl_key',
'ssl_certificate', 'ssl_key', 'client_conflict_resolution',
):
v = getattr(self, name, None)
if v:
Expand Down Expand Up @@ -159,7 +160,7 @@ def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
# The runner thread didn't stop. If it was a process,
# give it some time to exit
if hasattr(thread, 'pid') and thread.pid:
os.waitpid(thread.pid)
os.waitpid(thread.pid, 0)
else:
# Gaaaa, force gc in hopes of maybe getting the unclosed
# sockets to get GCed
Expand Down
2 changes: 2 additions & 0 deletions src/ZEO/tests/testConversionSupport.py
Expand Up @@ -52,6 +52,8 @@ class FakeServer:
def register_connection(*args):
return None, None

client_conflict_resolution = False

class FakeConnection:
protocol_version = b'Z4'
addr = 'test'
Expand Down

0 comments on commit e5653d4

Please sign in to comment.