Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/asyncio' into py2
Browse files Browse the repository at this point in the history
Conflicts:
	setup.py
	src/ZEO/tests/protocols.test
  • Loading branch information
Jim Fulton committed Jul 7, 2016
2 parents 8705602 + 1eb086d commit 6fbe47e
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 493 deletions.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -26,7 +26,7 @@
sys.exit(0)

install_requires = [
'ZODB >= 5.0.0a1',
'ZODB >= 5.0.0a5',
'six',
'transaction >= 1.6.0',
'persistent >= 4.1.0',
Expand Down
29 changes: 21 additions & 8 deletions src/ZEO/ClientStorage.py
Expand Up @@ -53,10 +53,7 @@
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'

try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
from ZODB.ConflictResolution import ResolvedSerial

def tid2time(tid):
return str(TimeStamp(tid))
Expand All @@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None):

MB = 1024**2

@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage)
class ClientStorage(object):
"""A storage class that is a network client to a remote storage.
Expand Down Expand Up @@ -724,18 +722,31 @@ def tpc_vote(self, txn):
"""
tbuf = self._check_trans(txn, 'tpc_vote')
try:
self._call('vote', id(txn))
for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial)
except POSException.StorageTransactionError:
# Hm, we got disconnected and reconnected bwtween
# _check_trans and voting. Let's chack the transaction again:
tbuf = self._check_trans(txn, 'tpc_vote')
self._check_trans(txn, 'tpc_vote')
raise
except POSException.ConflictError as err:
oid = getattr(err, 'oid', None)
if oid is not None:
# This is a band-aid to help recover from a situation
# that shouldn't happen. A Client somehow misses some
# invalidations and has out of date data in its
# cache. We need some whay to invalidate the cache
# entry without invalidations. So, if we see a
# (unresolved) conflict error, we assume that the
# cache entry is bad and invalidate it.
self._cache.invalidate(oid, None)
raise

if tbuf.exception:
raise tbuf.exception

if tbuf.serials:
return list(tbuf.serials.items())
if tbuf.resolved:
return list(tbuf.resolved)
else:
return None

Expand Down Expand Up @@ -830,6 +841,8 @@ def tpc_finish(self, txn, f=lambda tid: None):

self._update_blob_cache(tbuf, tid)

return tid

def _update_blob_cache(self, tbuf, tid):
"""Internal helper move blobs updated by a transaction to the cache.
"""
Expand Down
157 changes: 39 additions & 118 deletions src/ZEO/StorageServer.py
Expand Up @@ -84,7 +84,7 @@ class ZEOStorage:
blob_tempfile = None
log_label = 'unconnected'
locked = False # Don't have storage lock
verifying = store_failed = 0
verifying = 0

def __init__(self, server, read_only=0):
self.server = server
Expand Down Expand Up @@ -338,7 +338,6 @@ def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
self.blob_log = []
self.tid = tid
self.status = status
self.store_failed = 0
self.stats.active_txns += 1

# Assign the transaction attribute last. This is so we don't
Expand Down Expand Up @@ -426,38 +425,40 @@ def _try_to_vote(self, delay=None):
self.storage.tpc_begin(self.transaction)

for op, args in self.txnlog:
if not getattr(self, op)(*args):
break

getattr(self, op)(*args)

# Blob support
while self.blob_log and not self.store_failed:
while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename)

if not self.store_failed:
# Only call tpc_vote of no store call failed,
# otherwise the serialnos() call will deliver an
# exception that will be handled by the client in
# its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction)
if serials:
self.serials.extend(serials)
serials = self.storage.tpc_vote(self.transaction)
if serials:
if not isinstance(serials[0], bytes):
serials = (oid for (oid, serial) in serials
if serial == ResolvedSerial)

self.connection.async('serialnos', self.serials)
self.serials.extend(serials)

except Exception:
except Exception as err:
self.storage.tpc_abort(self.transaction)
self._clear_transaction()

if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")

if delay is not None:
delay.error(sys.exc_info())
else:
raise
else:
if delay is not None:
delay.reply(None)
delay.reply(self.serials)
else:
return None
return self.serials

else:
return delay
Expand Down Expand Up @@ -549,120 +550,45 @@ def undoa(self, trans_id, tid):
self._check_tid(tid, exc=StorageTransactionError)
self.txnlog.undo(trans_id)

def _op_error(self, oid, err, op):
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error oid=%s msg=%s" %
(oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))

def _delete(self, oid, serial):
err = None
try:
self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'delete')

return err is None
self.storage.deleteObject(oid, serial, self.transaction)

def _checkread(self, oid, serial):
err = None
try:
self.storage.checkCurrentSerialInTransaction(
oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'checkCurrentSerialInTransaction')

return err is None
self.storage.checkCurrentSerialInTransaction(
oid, serial, self.transaction)

def _store(self, oid, serial, data, blobfile=None):
err = None
try:
if blobfile is None:
newserial = self.storage.store(
oid, serial, data, '', self.transaction)
else:
newserial = self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as error:
self._op_error(oid, error, 'store')
err = error
if blobfile is None:
newserial = self.storage.store(
oid, serial, data, '', self.transaction)
else:
if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)
newserial = self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction)

if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid)

if newserial:

if isinstance(newserial, bytes):
newserial = [(oid, newserial)]

for oid, s in newserial or ():
for oid, s in newserial:

if s == ResolvedSerial:
self.stats.conflicts_resolved += 1
self.log("conflict resolved oid=%s"
% oid_repr(oid), BLATHER)

self.serials.append((oid, s))

return err is None
self.serials.append(oid)

def _restore(self, oid, serial, data, prev_txn):
err = None
try:
self.storage.restore(oid, serial, data, '', prev_txn,
self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as err:
self._op_error(oid, err, 'restore')

return err is None
self.storage.restore(oid, serial, data, '', prev_txn,
self.transaction)

def _undo(self, trans_id):
err = None
try:
tid, oids = self.storage.undo(trans_id, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(z64, err, 'undo')
else:
self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids)

return err is None

def _marshal_error(self, error):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
if PY3:
pickler = Pickler(BytesIO(), 3)
else:
# The pure-python version requires at least one argument (PyPy)
pickler = Pickler(0)
pickler.fast = 1
try:
pickler.dump(error)
except:
msg = "Couldn't pickle storage exception: %s" % repr(error)
self.log(msg, logging.ERROR)
error = StorageServerError(msg)
return error
tid, oids = self.storage.undo(trans_id, self.transaction)
self.invalidated.extend(oids)
self.serials.extend(oids)

# IStorageIteration support

Expand Down Expand Up @@ -1381,8 +1307,3 @@ class Serving(ServerEvent):

class Closed(ServerEvent):
pass

default_cert_authenticate = 'SIGNED'
def ssl_config(section):
from .sslconfig import ssl_config
return ssl_config(section, True)
19 changes: 9 additions & 10 deletions src/ZEO/TransactionBuffer.py
Expand Up @@ -46,7 +46,7 @@ def __init__(self, connection_generation):
# stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1
self.serials = {} # processed { oid -> serial }
self.resolved = set() # {oid}
self.exception = None

def close(self):
Expand All @@ -61,18 +61,17 @@ def store(self, oid, data):

def serial(self, oid, serial):
if isinstance(serial, Exception):
self.exception = serial
self.serials[oid] = None
else:
self.serials[oid] = serial
self.exception = serial # This transaction will never be committed
elif serial == ResolvedSerial:
self.resolved.add(oid)

def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))

def __iter__(self):
self.file.seek(0)
unpickler = Unpickler(self.file)
serials = self.serials
resolved = self.resolved

# Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated
Expand All @@ -83,9 +82,9 @@ def __iter__(self):
for i in range(self.count):
oid, data = unpickler.load()
seen.add(oid)
yield oid, data, serials.get(oid) == ResolvedSerial
yield oid, data, oid in resolved

# We may have leftover serials because undo
for oid, serial in serials.items():
# We may have leftover oids because undo
for oid in resolved:
if oid not in seen:
yield oid, None, serial == ResolvedSerial
yield oid, None, True
5 changes: 2 additions & 3 deletions src/ZEO/asyncio/server.py
Expand Up @@ -23,7 +23,7 @@ class ServerProtocol(base.Protocol):
"""asyncio low-level ZEO server interface
"""

protocols = b'Z4', b'Z5'
protocols = (b'Z5', )

name = 'server protocol'
methods = set(('register', ))
Expand Down Expand Up @@ -169,7 +169,7 @@ def reply(self, obj):

def error(self, exc_info):
self.sent = 'error'
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info)
logger.error("Error raised in delayed method", exc_info=exc_info)
self.protocol.send_error(self.msgid, exc_info[1])

def __repr__(self):
Expand Down Expand Up @@ -206,7 +206,6 @@ def reply(self, obj):

def error(self, exc_info):
self.ready.wait()
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info)
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)


Expand Down
6 changes: 3 additions & 3 deletions src/ZEO/asyncio/tests.py
Expand Up @@ -757,7 +757,7 @@ def connect(self, finish=False):
self.target = protocol.zeo_storage
if finish:
self.assertEqual(self.pop(parse=False), best_protocol_version)
protocol.data_received(sized(b'Z4'))
protocol.data_received(sized(b'Z5'))
return protocol

message_id = 0
Expand Down Expand Up @@ -795,9 +795,9 @@ def testServerBasics(self):
self.assertEqual(self.pop(parse=False), best_protocol_version)

# The client sends it's protocol:
protocol.data_received(sized(b'Z4'))
protocol.data_received(sized(b'Z5'))

self.assertEqual(protocol.protocol_version, b'Z4')
self.assertEqual(protocol.protocol_version, b'Z5')

protocol.zeo_storage.notify_connected.assert_called_once_with(protocol)

Expand Down

0 comments on commit 6fbe47e

Please sign in to comment.