diff --git a/setup.py b/setup.py index 86e3b29c3..ad4320116 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ sys.exit(0) install_requires = [ - 'ZODB >= 5.0.0a1', + 'ZODB >= 5.0.0a5', 'six', 'transaction >= 1.6.0', 'persistent >= 4.1.0', diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py index 292364157..b1b77368b 100644 --- a/src/ZEO/ClientStorage.py +++ b/src/ZEO/ClientStorage.py @@ -53,10 +53,7 @@ # max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff' -try: - from ZODB.ConflictResolution import ResolvedSerial -except ImportError: - ResolvedSerial = 'rs' +from ZODB.ConflictResolution import ResolvedSerial def tid2time(tid): return str(TimeStamp(tid)) @@ -77,6 +74,7 @@ def get_timestamp(prev_ts=None): MB = 1024**2 +@zope.interface.implementer(ZODB.interfaces.IMultiCommitStorage) class ClientStorage(object): """A storage class that is a network client to a remote storage. @@ -724,18 +722,31 @@ def tpc_vote(self, txn): """ tbuf = self._check_trans(txn, 'tpc_vote') try: - self._call('vote', id(txn)) + for oid in self._call('vote', id(txn)) or (): + tbuf.serial(oid, ResolvedSerial) except POSException.StorageTransactionError: # Hm, we got disconnected and reconnected bwtween # _check_trans and voting. Let's chack the transaction again: - tbuf = self._check_trans(txn, 'tpc_vote') + self._check_trans(txn, 'tpc_vote') + raise + except POSException.ConflictError as err: + oid = getattr(err, 'oid', None) + if oid is not None: + # This is a band-aid to help recover from a situation + # that shouldn't happen. A Client somehow misses some + # invalidations and has out of date data in its + # cache. We need some whay to invalidate the cache + # entry without invalidations. So, if we see a + # (unresolved) conflict error, we assume that the + # cache entry is bad and invalidate it. + self._cache.invalidate(oid, None) raise if tbuf.exception: raise tbuf.exception - if tbuf.serials: - return list(tbuf.serials.items()) + if tbuf.resolved: + return list(tbuf.resolved) else: return None @@ -830,6 +841,8 @@ def tpc_finish(self, txn, f=lambda tid: None): self._update_blob_cache(tbuf, tid) + return tid + def _update_blob_cache(self, tbuf, tid): """Internal helper move blobs updated by a transaction to the cache. """ diff --git a/src/ZEO/StorageServer.py b/src/ZEO/StorageServer.py index 5578376fb..4c807e0c4 100644 --- a/src/ZEO/StorageServer.py +++ b/src/ZEO/StorageServer.py @@ -84,7 +84,7 @@ class ZEOStorage: blob_tempfile = None log_label = 'unconnected' locked = False # Don't have storage lock - verifying = store_failed = 0 + verifying = 0 def __init__(self, server, read_only=0): self.server = server @@ -338,7 +338,6 @@ def tpc_begin(self, id, user, description, ext, tid=None, status=" "): self.blob_log = [] self.tid = tid self.status = status - self.store_failed = 0 self.stats.active_txns += 1 # Assign the transaction attribute last. This is so we don't @@ -426,38 +425,40 @@ def _try_to_vote(self, delay=None): self.storage.tpc_begin(self.transaction) for op, args in self.txnlog: - if not getattr(self, op)(*args): - break - + getattr(self, op)(*args) # Blob support - while self.blob_log and not self.store_failed: + while self.blob_log: oid, oldserial, data, blobfilename = self.blob_log.pop() self._store(oid, oldserial, data, blobfilename) - if not self.store_failed: - # Only call tpc_vote of no store call failed, - # otherwise the serialnos() call will deliver an - # exception that will be handled by the client in - # its tpc_vote() method. - serials = self.storage.tpc_vote(self.transaction) - if serials: - self.serials.extend(serials) + serials = self.storage.tpc_vote(self.transaction) + if serials: + if not isinstance(serials[0], bytes): + serials = (oid for (oid, serial) in serials + if serial == ResolvedSerial) - self.connection.async('serialnos', self.serials) + self.serials.extend(serials) - except Exception: + except Exception as err: self.storage.tpc_abort(self.transaction) self._clear_transaction() + + if isinstance(err, ConflictError): + self.stats.conflicts += 1 + self.log("conflict error %s" % err, BLATHER) + if not isinstance(err, TransactionError): + logger.exception("While voting") + if delay is not None: delay.error(sys.exc_info()) else: raise else: if delay is not None: - delay.reply(None) + delay.reply(self.serials) else: - return None + return self.serials else: return delay @@ -549,120 +550,45 @@ def undoa(self, trans_id, tid): self._check_tid(tid, exc=StorageTransactionError) self.txnlog.undo(trans_id) - def _op_error(self, oid, err, op): - self.store_failed = 1 - if isinstance(err, ConflictError): - self.stats.conflicts += 1 - self.log("conflict error oid=%s msg=%s" % - (oid_repr(oid), str(err)), BLATHER) - if not isinstance(err, TransactionError): - # Unexpected errors are logged and passed to the client - self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]), - logging.ERROR, exc_info=True) - err = self._marshal_error(err) - # The exception is reported back as newserial for this oid - self.serials.append((oid, err)) - def _delete(self, oid, serial): - err = None - try: - self.storage.deleteObject(oid, serial, self.transaction) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as e: - err = e - self._op_error(oid, err, 'delete') - - return err is None + self.storage.deleteObject(oid, serial, self.transaction) def _checkread(self, oid, serial): - err = None - try: - self.storage.checkCurrentSerialInTransaction( - oid, serial, self.transaction) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as e: - err = e - self._op_error(oid, err, 'checkCurrentSerialInTransaction') - - return err is None + self.storage.checkCurrentSerialInTransaction( + oid, serial, self.transaction) def _store(self, oid, serial, data, blobfile=None): - err = None - try: - if blobfile is None: - newserial = self.storage.store( - oid, serial, data, '', self.transaction) - else: - newserial = self.storage.storeBlob( - oid, serial, data, blobfile, '', self.transaction) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as error: - self._op_error(oid, error, 'store') - err = error + if blobfile is None: + newserial = self.storage.store( + oid, serial, data, '', self.transaction) else: - if serial != b"\0\0\0\0\0\0\0\0": - self.invalidated.append(oid) + newserial = self.storage.storeBlob( + oid, serial, data, blobfile, '', self.transaction) + + if serial != b"\0\0\0\0\0\0\0\0": + self.invalidated.append(oid) + + if newserial: if isinstance(newserial, bytes): newserial = [(oid, newserial)] - for oid, s in newserial or (): + for oid, s in newserial: if s == ResolvedSerial: self.stats.conflicts_resolved += 1 self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER) - - self.serials.append((oid, s)) - - return err is None + self.serials.append(oid) def _restore(self, oid, serial, data, prev_txn): - err = None - try: - self.storage.restore(oid, serial, data, '', prev_txn, - self.transaction) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as err: - self._op_error(oid, err, 'restore') - - return err is None + self.storage.restore(oid, serial, data, '', prev_txn, + self.transaction) def _undo(self, trans_id): - err = None - try: - tid, oids = self.storage.undo(trans_id, self.transaction) - except (SystemExit, KeyboardInterrupt): - raise - except Exception as e: - err = e - self._op_error(z64, err, 'undo') - else: - self.invalidated.extend(oids) - self.serials.extend((oid, ResolvedSerial) for oid in oids) - - return err is None - - def _marshal_error(self, error): - # Try to pickle the exception. If it can't be pickled, - # the RPC response would fail, so use something that can be pickled. - if PY3: - pickler = Pickler(BytesIO(), 3) - else: - # The pure-python version requires at least one argument (PyPy) - pickler = Pickler(0) - pickler.fast = 1 - try: - pickler.dump(error) - except: - msg = "Couldn't pickle storage exception: %s" % repr(error) - self.log(msg, logging.ERROR) - error = StorageServerError(msg) - return error + tid, oids = self.storage.undo(trans_id, self.transaction) + self.invalidated.extend(oids) + self.serials.extend(oids) # IStorageIteration support @@ -1381,8 +1307,3 @@ class Serving(ServerEvent): class Closed(ServerEvent): pass - -default_cert_authenticate = 'SIGNED' -def ssl_config(section): - from .sslconfig import ssl_config - return ssl_config(section, True) diff --git a/src/ZEO/TransactionBuffer.py b/src/ZEO/TransactionBuffer.py index bb3b50ec6..50471dfe5 100644 --- a/src/ZEO/TransactionBuffer.py +++ b/src/ZEO/TransactionBuffer.py @@ -46,7 +46,7 @@ def __init__(self, connection_generation): # stored are builtin types -- strings or None. self.pickler = Pickler(self.file, 1) self.pickler.fast = 1 - self.serials = {} # processed { oid -> serial } + self.resolved = set() # {oid} self.exception = None def close(self): @@ -61,10 +61,9 @@ def store(self, oid, data): def serial(self, oid, serial): if isinstance(serial, Exception): - self.exception = serial - self.serials[oid] = None - else: - self.serials[oid] = serial + self.exception = serial # This transaction will never be committed + elif serial == ResolvedSerial: + self.resolved.add(oid) def storeBlob(self, oid, blobfilename): self.blobs.append((oid, blobfilename)) @@ -72,7 +71,7 @@ def storeBlob(self, oid, blobfilename): def __iter__(self): self.file.seek(0) unpickler = Unpickler(self.file) - serials = self.serials + resolved = self.resolved # Gaaaa, this is awkward. There can be entries in serials that # aren't in the buffer, because undo. Entries can be repeated @@ -83,9 +82,9 @@ def __iter__(self): for i in range(self.count): oid, data = unpickler.load() seen.add(oid) - yield oid, data, serials.get(oid) == ResolvedSerial + yield oid, data, oid in resolved - # We may have leftover serials because undo - for oid, serial in serials.items(): + # We may have leftover oids because undo + for oid in resolved: if oid not in seen: - yield oid, None, serial == ResolvedSerial + yield oid, None, True diff --git a/src/ZEO/asyncio/server.py b/src/ZEO/asyncio/server.py index 49f499c50..765344003 100644 --- a/src/ZEO/asyncio/server.py +++ b/src/ZEO/asyncio/server.py @@ -23,7 +23,7 @@ class ServerProtocol(base.Protocol): """asyncio low-level ZEO server interface """ - protocols = b'Z4', b'Z5' + protocols = (b'Z5', ) name = 'server protocol' methods = set(('register', )) @@ -169,7 +169,7 @@ def reply(self, obj): def error(self, exc_info): self.sent = 'error' - log("Error raised in delayed method", logging.ERROR, exc_info=exc_info) + logger.error("Error raised in delayed method", exc_info=exc_info) self.protocol.send_error(self.msgid, exc_info[1]) def __repr__(self): @@ -206,7 +206,6 @@ def reply(self, obj): def error(self, exc_info): self.ready.wait() - log("Error raised in delayed method", logging.ERROR, exc_info=exc_info) self.protocol.call_soon_threadsafe(Delay.error, self, exc_info) diff --git a/src/ZEO/asyncio/tests.py b/src/ZEO/asyncio/tests.py index 8b4ddc980..4165ac389 100644 --- a/src/ZEO/asyncio/tests.py +++ b/src/ZEO/asyncio/tests.py @@ -757,7 +757,7 @@ def connect(self, finish=False): self.target = protocol.zeo_storage if finish: self.assertEqual(self.pop(parse=False), best_protocol_version) - protocol.data_received(sized(b'Z4')) + protocol.data_received(sized(b'Z5')) return protocol message_id = 0 @@ -795,9 +795,9 @@ def testServerBasics(self): self.assertEqual(self.pop(parse=False), best_protocol_version) # The client sends it's protocol: - protocol.data_received(sized(b'Z4')) + protocol.data_received(sized(b'Z5')) - self.assertEqual(protocol.protocol_version, b'Z4') + self.assertEqual(protocol.protocol_version, b'Z5') protocol.zeo_storage.notify_connected.assert_called_once_with(protocol) diff --git a/src/ZEO/cache.py b/src/ZEO/cache.py index 9a5b19721..553a8c24d 100644 --- a/src/ZEO/cache.py +++ b/src/ZEO/cache.py @@ -33,7 +33,7 @@ import ZODB.fsIndex import zc.lockfile -from ZODB.utils import p64, u64, z64 +from ZODB.utils import p64, u64, z64, RLock import six from ._compat import PYPY @@ -182,6 +182,8 @@ def __init__(self, path=None, size=200*1024**2, rearrange=.8): # currentofs. self.currentofs = ZEC_HEADER_SIZE + self._lock = RLock() + # self.f is the open file object. # When we're not reusing an existing file, self.f is left None # here -- the scan() method must be called then to open the file @@ -239,9 +241,10 @@ def fc(self): return self def clear(self): - self.f.seek(ZEC_HEADER_SIZE) - self.f.truncate() - self._initfile(ZEC_HEADER_SIZE) + with self._lock: + self.f.seek(ZEC_HEADER_SIZE) + self.f.truncate() + self._initfile(ZEC_HEADER_SIZE) ## # Scan the current contents of the cache file, calling `install` @@ -451,26 +454,28 @@ def _makeroom(self, nbytes): # new tid must be strictly greater than our current idea of the most # recent tid. def setLastTid(self, tid): - if (not tid) or (tid == z64): - return - if (tid <= self.tid) and self._len: - if tid == self.tid: - return # Be a little forgiving - raise ValueError("new last tid (%s) must be greater than " - "previous one (%s)" - % (u64(tid), u64(self.tid))) - assert isinstance(tid, bytes) and len(tid) == 8, tid - self.tid = tid - self.f.seek(len(magic)) - self.f.write(tid) - self.f.flush() + with self._lock: + if (not tid) or (tid == z64): + return + if (tid <= self.tid) and self._len: + if tid == self.tid: + return # Be a little forgiving + raise ValueError("new last tid (%s) must be greater than " + "previous one (%s)" + % (u64(tid), u64(self.tid))) + assert isinstance(tid, bytes) and len(tid) == 8, tid + self.tid = tid + self.f.seek(len(magic)) + self.f.write(tid) + self.f.flush() ## # Return the last transaction seen by the cache. # @return a transaction id # @defreturn string, or 8 nulls if no transaction is yet known def getLastTid(self): - return self.tid + with self._lock: + return self.tid ## # Return the current data record for oid. @@ -479,52 +484,54 @@ def getLastTid(self): # in the cache # @defreturn 3-tuple: (string, string, string) def load(self, oid, before_tid=None): - ofs = self.current.get(oid) - if ofs is None: - self._trace(0x20, oid) - return None - self.f.seek(ofs) - read = self.f.read - status = read(1) - assert status == b'a', (ofs, self.f.tell(), oid) - size, saved_oid, tid, end_tid, lver, ldata = unpack( - ">I8s8s8sHI", read(34)) - assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) - assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid) - assert lver == 0, "Versions aren't supported" - - if before_tid and tid >= before_tid: - return None - - data = read(ldata) - assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata) - - # WARNING: The following assert changes the file position. - # We must not depend on this below or we'll fail in optimized mode. - assert read(8) == oid, (ofs, self.f.tell(), oid) - - self._n_accesses += 1 - self._trace(0x22, oid, tid, end_tid, ldata) - - ofsofs = self.currentofs - ofs - if ofsofs < 0: - ofsofs += self.maxsize - - if (ofsofs > self.rearrange and - self.maxsize > 10*len(data) and - size > 4): - # The record is far back and might get evicted, but it's - # valuable, so move it forward. - - # Remove fromn old loc: - del self.current[oid] + with self._lock: + ofs = self.current.get(oid) + if ofs is None: + self._trace(0x20, oid) + return None self.f.seek(ofs) - self.f.write(b'f'+pack(">I", size)) + read = self.f.read + status = read(1) + assert status == b'a', (ofs, self.f.tell(), oid) + size, saved_oid, tid, end_tid, lver, ldata = unpack( + ">I8s8s8sHI", read(34)) + assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) + assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid) + assert lver == 0, "Versions aren't supported" + + if before_tid and tid >= before_tid: + return None + + data = read(ldata) + assert len(data) == ldata, ( + ofs, self.f.tell(), oid, len(data), ldata) + + # WARNING: The following assert changes the file position. + # We must not depend on this below or we'll fail in optimized mode. + assert read(8) == oid, (ofs, self.f.tell(), oid) + + self._n_accesses += 1 + self._trace(0x22, oid, tid, end_tid, ldata) - # Write to new location: - self._store(oid, tid, None, data, size) + ofsofs = self.currentofs - ofs + if ofsofs < 0: + ofsofs += self.maxsize - return data, tid + if (ofsofs > self.rearrange and + self.maxsize > 10*len(data) and + size > 4): + # The record is far back and might get evicted, but it's + # valuable, so move it forward. + + # Remove fromn old loc: + del self.current[oid] + self.f.seek(ofs) + self.f.write(b'f'+pack(">I", size)) + + # Write to new location: + self._store(oid, tid, None, data, size) + + return data, tid ## # Return a non-current revision of oid that was current before tid. @@ -533,54 +540,56 @@ def load(self, oid, before_tid=None): # @return data record, serial number, start tid, and end tid # @defreturn 4-tuple: (string, string, string, string) def loadBefore(self, oid, before_tid): - noncurrent_for_oid = self.noncurrent.get(u64(oid)) - if noncurrent_for_oid is None: - result = self.load(oid, before_tid) - if result: - return result[0], result[1], None - else: - self._trace(0x24, oid, "", before_tid) - return result - - items = noncurrent_for_oid.items(None, u64(before_tid)-1) - if not items: - result = self.load(oid, before_tid) - if result: - return result[0], result[1], None - else: - self._trace(0x24, oid, "", before_tid) - return result + with self._lock: + noncurrent_for_oid = self.noncurrent.get(u64(oid)) + if noncurrent_for_oid is None: + result = self.load(oid, before_tid) + if result: + return result[0], result[1], None + else: + self._trace(0x24, oid, "", before_tid) + return result + + items = noncurrent_for_oid.items(None, u64(before_tid)-1) + if not items: + result = self.load(oid, before_tid) + if result: + return result[0], result[1], None + else: + self._trace(0x24, oid, "", before_tid) + return result - tid, ofs = items[-1] + tid, ofs = items[-1] - self.f.seek(ofs) - read = self.f.read - status = read(1) - assert status == b'a', (ofs, self.f.tell(), oid, before_tid) - size, saved_oid, saved_tid, end_tid, lver, ldata = unpack( - ">I8s8s8sHI", read(34)) - assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) - assert saved_tid == p64(tid), (ofs, self.f.tell(), oid, saved_tid, tid) - assert end_tid != z64, (ofs, self.f.tell(), oid) - assert lver == 0, "Versions aren't supported" - data = read(ldata) - assert len(data) == ldata, (ofs, self.f.tell()) - - # WARNING: The following assert changes the file position. - # We must not depend on this below or we'll fail in optimized mode. - assert read(8) == oid, (ofs, self.f.tell(), oid) - - if end_tid < before_tid: - result = self.load(oid, before_tid) - if result: - return result[0], result[1], None - else: - self._trace(0x24, oid, "", before_tid) - return result + self.f.seek(ofs) + read = self.f.read + status = read(1) + assert status == b'a', (ofs, self.f.tell(), oid, before_tid) + size, saved_oid, saved_tid, end_tid, lver, ldata = unpack( + ">I8s8s8sHI", read(34)) + assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) + assert saved_tid == p64(tid), ( + ofs, self.f.tell(), oid, saved_tid, tid) + assert end_tid != z64, (ofs, self.f.tell(), oid) + assert lver == 0, "Versions aren't supported" + data = read(ldata) + assert len(data) == ldata, (ofs, self.f.tell()) + + # WARNING: The following assert changes the file position. + # We must not depend on this below or we'll fail in optimized mode. + assert read(8) == oid, (ofs, self.f.tell(), oid) + + if end_tid < before_tid: + result = self.load(oid, before_tid) + if result: + return result[0], result[1], None + else: + self._trace(0x24, oid, "", before_tid) + return result - self._n_accesses += 1 - self._trace(0x26, oid, "", saved_tid) - return data, saved_tid, end_tid + self._n_accesses += 1 + self._trace(0x26, oid, "", saved_tid) + return data, saved_tid, end_tid ## # Store a new data record in the cache. @@ -591,45 +600,48 @@ def loadBefore(self, oid, before_tid): # current. # @param data the actual data def store(self, oid, start_tid, end_tid, data): - seek = self.f.seek - if end_tid is None: - ofs = self.current.get(oid) - if ofs: - seek(ofs) - read = self.f.read - status = read(1) - assert status == b'a', (ofs, self.f.tell(), oid) - size, saved_oid, saved_tid, end_tid = unpack( - ">I8s8s8s", read(28)) - assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) - assert end_tid == z64, (ofs, self.f.tell(), oid) - if saved_tid == start_tid: + with self._lock: + seek = self.f.seek + if end_tid is None: + ofs = self.current.get(oid) + if ofs: + seek(ofs) + read = self.f.read + status = read(1) + assert status == b'a', (ofs, self.f.tell(), oid) + size, saved_oid, saved_tid, end_tid = unpack( + ">I8s8s8s", read(28)) + assert saved_oid == oid, ( + ofs, self.f.tell(), oid, saved_oid) + assert end_tid == z64, (ofs, self.f.tell(), oid) + if saved_tid == start_tid: + return + raise ValueError("already have current data for oid") + else: + noncurrent_for_oid = self.noncurrent.get(u64(oid)) + if noncurrent_for_oid and ( + u64(start_tid) in noncurrent_for_oid): return - raise ValueError("already have current data for oid") - else: - noncurrent_for_oid = self.noncurrent.get(u64(oid)) - if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid): - return - size = allocated_record_overhead + len(data) + size = allocated_record_overhead + len(data) - # A number of cache simulation experiments all concluded that the - # 2nd-level ZEO cache got a much higher hit rate if "very large" - # objects simply weren't cached. For now, we ignore the request - # only if the entire cache file is too small to hold the object. - if size >= min(max_block_size, self.maxsize - ZEC_HEADER_SIZE): - return + # A number of cache simulation experiments all concluded that the + # 2nd-level ZEO cache got a much higher hit rate if "very large" + # objects simply weren't cached. For now, we ignore the request + # only if the entire cache file is too small to hold the object. + if size >= min(max_block_size, self.maxsize - ZEC_HEADER_SIZE): + return - self._n_adds += 1 - self._n_added_bytes += size - self._len += 1 + self._n_adds += 1 + self._n_added_bytes += size + self._len += 1 - self._store(oid, start_tid, end_tid, data, size) + self._store(oid, start_tid, end_tid, data, size) - if end_tid: - self._trace(0x54, oid, start_tid, end_tid, dlen=len(data)) - else: - self._trace(0x52, oid, start_tid, dlen=len(data)) + if end_tid: + self._trace(0x54, oid, start_tid, end_tid, dlen=len(data)) + else: + self._trace(0x52, oid, start_tid, dlen=len(data)) def _store(self, oid, start_tid, end_tid, data, size): # Low-level store used by store and load @@ -696,35 +708,37 @@ def _store(self, oid, start_tid, end_tid, data, size): # - tid the id of the transaction that wrote a new revision of oid, # or None to forget all cached info about oid. def invalidate(self, oid, tid): - ofs = self.current.get(oid) - if ofs is None: - # 0x10 == invalidate (miss) - self._trace(0x10, oid, tid) - return + with self._lock: + ofs = self.current.get(oid) + if ofs is None: + # 0x10 == invalidate (miss) + self._trace(0x10, oid, tid) + return - self.f.seek(ofs) - read = self.f.read - status = read(1) - assert status == b'a', (ofs, self.f.tell(), oid) - size, saved_oid, saved_tid, end_tid = unpack(">I8s8s8s", read(28)) - assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) - assert end_tid == z64, (ofs, self.f.tell(), oid) - del self.current[oid] - if tid is None: self.f.seek(ofs) - self.f.write(b'f'+pack(">I", size)) - # 0x1E = invalidate (hit, discarding current or non-current) - self._trace(0x1E, oid, tid) - self._len -= 1 - else: - if tid == saved_tid: - logger.warning("Ignoring invalidation with same tid as current") - return - self.f.seek(ofs+21) - self.f.write(tid) - self._set_noncurrent(oid, saved_tid, ofs) - # 0x1C = invalidate (hit, saving non-current) - self._trace(0x1C, oid, tid) + read = self.f.read + status = read(1) + assert status == b'a', (ofs, self.f.tell(), oid) + size, saved_oid, saved_tid, end_tid = unpack(">I8s8s8s", read(28)) + assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) + assert end_tid == z64, (ofs, self.f.tell(), oid) + del self.current[oid] + if tid is None: + self.f.seek(ofs) + self.f.write(b'f'+pack(">I", size)) + # 0x1E = invalidate (hit, discarding current or non-current) + self._trace(0x1E, oid, tid) + self._len -= 1 + else: + if tid == saved_tid: + logger.warning( + "Ignoring invalidation with same tid as current") + return + self.f.seek(ofs+21) + self.f.write(tid) + self._set_noncurrent(oid, saved_tid, ofs) + # 0x1C = invalidate (hit, saving non-current) + self._trace(0x1C, oid, tid) ## # Generates (oid, serial) oairs for all objects in the diff --git a/src/ZEO/interfaces.py b/src/ZEO/interfaces.py index 15d1c9e2f..83e48114a 100644 --- a/src/ZEO/interfaces.py +++ b/src/ZEO/interfaces.py @@ -24,8 +24,7 @@ def __init__(self, storage): class IClientCache(zope.interface.Interface): """Client cache interface. - Note that caches need not be thread safe, fpr the most part, - except for getLastTid, which may be called from multiple threads. + Note that caches need to be thread safe. """ def close(): diff --git a/src/ZEO/tests/ConnectionTests.py b/src/ZEO/tests/ConnectionTests.py index 0ca5006e0..6225cb303 100644 --- a/src/ZEO/tests/ConnectionTests.py +++ b/src/ZEO/tests/ConnectionTests.py @@ -1018,90 +1018,6 @@ def checkTimeoutAfterVote(self): # or the server. self.assertRaises(KeyError, storage.load, oid, '') - def checkTimeoutProvokingConflicts(self): - self._storage = storage = self.openClientStorage() - # Assert that the zeo cache is empty. - self.assert_(not list(storage._cache.contents())) - # Create the object - oid = storage.new_oid() - obj = MinPO(7) - # We need to successfully commit an object now so we have something to - # conflict about. - t = Transaction() - storage.tpc_begin(t) - revid1a = storage.store(oid, ZERO, zodb_pickle(obj), '', t) - revid1b = storage.tpc_vote(t) - revid1 = handle_serials(oid, revid1a, revid1b) - storage.tpc_finish(t) - # Now do a store, sleeping before the finish so as to cause a timeout. - obj.value = 8 - t = Transaction() - old_connection_count = storage.connection_count_for_tests - storage.tpc_begin(t) - revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t) - revid2b = storage.tpc_vote(t) - revid2 = handle_serials(oid, revid2a, revid2b) - - # Now sleep long enough for the storage to time out. - # This used to sleep for 3 seconds, and sometimes (but very rarely) - # failed then. Now we try for a minute. It typically succeeds - # on the second time thru the loop, and, since self.timeout is 1, - # it's typically faster now (2/1.8 ~= 1.11 seconds sleeping instead - # of 3). - deadline = time.time() + 60 # wait up to a minute - while time.time() < deadline: - if (storage.is_connected() and - (storage.connection_count_for_tests == old_connection_count) - ): - time.sleep(self.timeout / 1.8) - else: - break - self.assert_( - (not storage.is_connected()) - or - (storage.connection_count_for_tests > old_connection_count) - ) - storage._wait() - self.assert_(storage.is_connected()) - # We expect finish to fail. - self.assertRaises(ClientDisconnected, storage.tpc_finish, t) - storage.tpc_abort(t) - - # Now we think we've committed the second transaction, but we really - # haven't. A third one should produce a POSKeyError on the server, - # which manifests as a ConflictError on the client. - obj.value = 9 - t = Transaction() - storage.tpc_begin(t) - storage.store(oid, revid2, zodb_pickle(obj), '', t) - self.assertRaises(ConflictError, storage.tpc_vote, t) - # Even aborting won't help. - storage.tpc_abort(t) - self.assertRaises(ZODB.POSException.StorageTransactionError, - storage.tpc_finish, t) - # Try again. - obj.value = 10 - t = Transaction() - storage.tpc_begin(t) - storage.store(oid, revid2, zodb_pickle(obj), '', t) - # Even aborting won't help. - self.assertRaises(ConflictError, storage.tpc_vote, t) - # Abort this one and try a transaction that should succeed. - storage.tpc_abort(t) - - # Now do a store. - obj.value = 11 - t = Transaction() - storage.tpc_begin(t) - revid2a = storage.store(oid, revid1, zodb_pickle(obj), '', t) - revid2b = storage.tpc_vote(t) - revid2 = handle_serials(oid, revid2a, revid2b) - storage.tpc_finish(t) - # Now load the object and verify that it has a value of 11. - data, revid = storage.load(oid, '') - self.assertEqual(zodb_unpickle(data), MinPO(11)) - self.assertEqual(revid, revid2) - class MSTThread(threading.Thread): __super_init = threading.Thread.__init__ diff --git a/src/ZEO/tests/forker.py b/src/ZEO/tests/forker.py index 6ee8ee3cd..353b216d7 100644 --- a/src/ZEO/tests/forker.py +++ b/src/ZEO/tests/forker.py @@ -95,6 +95,10 @@ def runner(config, qin, qout, timeout=None, import ZEO.asyncio.server old_protocol = ZEO.asyncio.server.best_protocol_version ZEO.asyncio.server.best_protocol_version = protocol + old_protocols = ZEO.asyncio.server.ServerProtocol.protocols + ZEO.asyncio.server.ServerProtocol.protocols = tuple(sorted( + set(old_protocols) | set([protocol]) + )) try: import ZEO.runzeo, threading @@ -142,8 +146,8 @@ def runner(config, qin, qout, timeout=None, finally: if old_protocol: - ZEO.asyncio.server.best_protocol_version = protocol - + ZEO.asyncio.server.best_protocol_version = old_protocol + ZEO.asyncio.server.ServerProtocol.protocols = old_protocols def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): qin.put('stop') diff --git a/src/ZEO/tests/protocols.test b/src/ZEO/tests/protocols.test index ab54a08c8..419a8b66c 100644 --- a/src/ZEO/tests/protocols.test +++ b/src/ZEO/tests/protocols.test @@ -5,7 +5,7 @@ A full test of all protocols isn't practical. But we'll do a limited test that at least the current and previous protocols are supported in both directions. -Let's start a Z309 server +Let's start a Z4 server >>> storage_conf = ''' ... @@ -94,82 +94,91 @@ A current client should be able to connect to a old server: >>> zope.testing.setupstack.rmtree('blobs') >>> zope.testing.setupstack.rmtree('server-blobs') -And the other way around: - - >>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5)) - -Note that we'll have to pull some hijinks: - - >>> import ZEO.asyncio.client - >>> old_protocols = ZEO.asyncio.client.Protocol.protocols - >>> ZEO.asyncio.client.Protocol.protocols = [b'Z4'] - - >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') - >>> str(db.storage.protocol_version.decode('ascii')) - 'Z4' - >>> wait_connected(db.storage) - >>> conn = db.open() - >>> conn.root().x = 0 - >>> transaction.commit() - >>> len(db.history(conn.root()._p_oid, 99)) - 2 - - >>> conn.root()['blob1'] = ZODB.blob.Blob() - >>> with conn.root()['blob1'].open('w') as f: - ... r = f.write(b'blob data 1') - >>> transaction.commit() - - >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True) - >>> wait_connected(db2.storage) - >>> conn2 = db2.open() - >>> for i in range(5): - ... conn2.root().x += 1 - ... transaction.commit() - >>> conn2.root()['blob2'] = ZODB.blob.Blob() - >>> with conn2.root()['blob2'].open('w') as f: - ... r = f.write(b'blob data 2') - >>> transaction.commit() - - - >>> @wait_until() - ... def x_to_be_5(): - ... conn.sync() - ... return conn.root().x == 5 - - >>> db.close() - - >>> for i in range(2): - ... conn2.root().x += 1 - ... transaction.commit() - - >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') - >>> wait_connected(db.storage) - >>> conn = db.open() - >>> conn.root().x - 7 - - >>> db.close() - - >>> for i in range(10): - ... conn2.root().x += 1 - ... transaction.commit() - - >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') - >>> wait_connected(db.storage) - >>> conn = db.open() - >>> conn.root().x - 17 - - >>> with conn.root()['blob1'].open() as f: - ... f.read() - b'blob data 1' - >>> with conn.root()['blob2'].open() as f: - ... f.read() - b'blob data 2' - - >>> db2.close() - >>> db.close() - -Undo the hijinks: - - >>> ZEO.asyncio.client.Protocol.protocols = old_protocols +############################################################################# +# Note that the ZEO 5.0 server only supports clients that use the Z5 protocol + +# And the other way around: + +# >>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5)) + +# Note that we'll have to pull some hijinks: + +# >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') +# >>> str(db.storage.protocol_version.decode('ascii')) +# 'Z4' +# >>> wait_connected(db.storage) +# >>> conn = db.open() +# >>> conn.root().x = 0 +# >>> transaction.commit() +# >>> len(db.history(conn.root()._p_oid, 99)) +# 2 + +# >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') +# >>> db.storage.protocol_version +# b'Z4' +# >>> wait_connected(db.storage) +# >>> conn = db.open() +# >>> conn.root().x = 0 +# >>> transaction.commit() +# >>> len(db.history(conn.root()._p_oid, 99)) +# 2 + +# >>> conn.root()['blob1'] = ZODB.blob.Blob() +# >>> with conn.root()['blob1'].open('w') as f: +# ... r = f.write(b'blob data 1') +# >>> transaction.commit() + +# >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True) +# >>> wait_connected(db2.storage) +# >>> conn2 = db2.open() +# >>> for i in range(5): +# ... conn2.root().x += 1 +# ... transaction.commit() +# >>> conn2.root()['blob2'] = ZODB.blob.Blob() +# >>> with conn2.root()['blob2'].open('w') as f: +# ... r = f.write(b'blob data 2') +# >>> transaction.commit() + + +# >>> @wait_until() +# ... def x_to_be_5(): +# ... conn.sync() +# ... return conn.root().x == 5 + +# >>> db.close() + +# >>> for i in range(2): +# ... conn2.root().x += 1 +# ... transaction.commit() + +# >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') +# >>> wait_connected(db.storage) +# >>> conn = db.open() +# >>> conn.root().x +# 7 + +# >>> db.close() + +# >>> for i in range(10): +# ... conn2.root().x += 1 +# ... transaction.commit() + +# >>> db = ZEO.DB(addr, client='client', blob_dir='blobs') +# >>> wait_connected(db.storage) +# >>> conn = db.open() +# >>> conn.root().x +# 17 + +# >>> with conn.root()['blob1'].open() as f: +# ... f.read() +# b'blob data 1' +# >>> with conn.root()['blob2'].open() as f: +# ... f.read() +# b'blob data 2' + +# >>> db2.close() +# >>> db.close() + +# Undo the hijinks: + +# >>> ZEO.asyncio.client.Protocol.protocols = old_protocols diff --git a/src/ZEO/tests/testZEO.py b/src/ZEO/tests/testZEO.py index b366a4790..0474760e9 100644 --- a/src/ZEO/tests/testZEO.py +++ b/src/ZEO/tests/testZEO.py @@ -745,24 +745,23 @@ def tpc_begin(self, transaction): self.server.tpc_begin(id(transaction), '', '', {}, None, ' ') def tpc_vote(self, transaction): - vote_result = self.server.vote(id(transaction)) - assert vote_result is None - result = self.server.connection.serials[:] + result = self.server.vote(id(transaction)) + assert result == self.server.connection.serials[:] del self.server.connection.serials[:] return result def store(self, oid, serial, data, version_ignored, transaction): self.server.storea(oid, serial, data, id(transaction)) - def send_reply(self, *args): # Masquerade as conn - pass + def send_reply(self, _, result): # Masquerade as conn + self._result = result def tpc_abort(self, transaction): self.server.tpc_abort(id(transaction)) def tpc_finish(self, transaction, func = lambda: None): self.server.tpc_finish(id(transaction)).set_sender(0, self) - + return self._result def multiple_storages_invalidation_queue_is_not_insane(): """ @@ -929,14 +928,14 @@ def tpc_finish_error(): buffer, sadly, using implementation details: >>> tbuf = t.data(client) - >>> tbuf.serials = None + >>> tbuf.resolved = None tpc_finish will fail: >>> client.tpc_finish(t) # doctest: +ELLIPSIS Traceback (most recent call last): ... - AttributeError: ... + TypeError: ... >>> client.tpc_abort(t) >>> t.abort() diff --git a/src/ZEO/tests/testZEO2.py b/src/ZEO/tests/testZEO2.py index 7b4fad8d8..e0c36626c 100644 --- a/src/ZEO/tests/testZEO2.py +++ b/src/ZEO/tests/testZEO2.py @@ -78,6 +78,8 @@ def proper_handling_of_blob_conflicts(): >>> class Sender: ... def send_reply(self, id, reply): ... print('reply', id, reply) + ... def send_error(self, id, err): + ... print('error', id, err) >>> delay.set_sender(1, Sender()) >>> logger = logging.getLogger('ZEO') @@ -87,13 +89,20 @@ def proper_handling_of_blob_conflicts(): Now, when we abort the transaction for the first client. The second client will be restarted. It will get a conflict error, that is -handled correctly: +raised to the client: >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS - reply 1 None + Error raised in delayed method + Traceback (most recent call last): + ... + ZODB.POSException.ConflictError: ... + error 1 database conflict error ... + +The transaction is aborted by the server: - >>> fs.tpc_transaction() is not None + >>> fs.tpc_transaction() is None True + >>> zs2.connected True @@ -116,7 +125,7 @@ def proper_handling_of_errors_in_restart(): >>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1.tpc_begin('0', '', '', {}) - >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0') + >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '0') Intentionally break zs1: @@ -135,7 +144,7 @@ def proper_handling_of_errors_in_restart(): >>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1.tpc_begin('1', '', '', {}) - >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1') + >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '1') >>> _ = zs1.vote('1') # doctest: +ELLIPSIS >>> zs1.tpc_finish('1').set_sender(0, zs1.connection) @@ -220,7 +229,7 @@ def some_basic_locking_tests(): ZEO.asyncio.server INFO received handshake 'Z5' >>> tid1 = start_trans(zs1) - >>> zs1.vote(tid1) # doctest: +ELLIPSIS + >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS ZEO.StorageServer DEBUG (test-addr-1) ('1') lock: transactions waiting: 0 ZEO.StorageServer BLATHER @@ -477,7 +486,7 @@ def lock_sanity_check(): ZEO.asyncio.server INFO received handshake 'Z5' >>> tid1 = start_trans(zs1) - >>> zs1.vote(tid1) # doctest: +ELLIPSIS + >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS ZEO.StorageServer DEBUG (test-addr-1) ('1') lock: transactions waiting: 0 ZEO.StorageServer BLATHER @@ -493,7 +502,7 @@ def lock_sanity_check(): ZEO.asyncio.server INFO received handshake 'Z5' >>> tid2 = start_trans(zs2) - >>> zs2.vote(tid2) # doctest: +ELLIPSIS + >>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS ZEO.StorageServer DEBUG (test-addr-2) ('1') lock: transactions waiting: 0 ZEO.StorageServer BLATHER