From 5ba506e7f173370377feb6761b532f86e7681f4d Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Wed, 3 Aug 2016 21:41:06 +0000 Subject: [PATCH] Fixed a bug handling ZEO4 invalidations during cache verification ZEO4 servers can send invalidations out of order with ``getInvalidations`` results, presumably because ``getInvalidations`` didn't get the commit lock. ZEO4 clients worked around this (maybe not directly) by queuing invalidations during cache verification. ZEO5 servers don't send invalidations out of order with ``getInvalidations`` calls and the ZEO5 client didn't need an invalidation queue, except they do need one to work correctly with ZEO4 servers. :/ --- src/ZEO/asyncio/client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index 5f3a4a1fb..90503b14a 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -313,6 +313,12 @@ def __init__(self, loop, self.protocols = () self.disconnected(None) + # Work around odd behavior of ZEO4 server. It may send + # invalidations for transactions later than the result of + # getInvalidations. While we support ZEO 4 servers, we'll + # need to keep an invalidation queue. :( + self.verify_invalidation_queue = [] + def new_addrs(self, addrs): self.addrs = addrs if self.trying_to_connect(): @@ -409,6 +415,8 @@ def register_failed(self, protocol, exc): @future_generator def verify(self, server_tid): + self.verify_invalidation_queue = [] # See comment in init :( + protocol = self.protocol if server_tid is None: server_tid = yield protocol.fut('lastTransaction') @@ -465,6 +473,12 @@ def verify(self, server_tid): self.cache.setLastTid(server_tid) self.ready = True + # Gaaaa, ZEO 4 work around. See comment in __init__. :( + for tid, oids in self.verify_invalidation_queue: + if tid > server_tid: + self.invalidateTransaction(tid, oids) + self.verify_invalidation_queue = [] + try: info = yield protocol.fut('get_info') except Exception as exc: @@ -597,6 +611,8 @@ def invalidateTransaction(self, tid, oids): self.cache.invalidate(oid, tid) self.client.invalidateTransaction(tid, oids) self.cache.setLastTid(tid) + else: + self.verify_invalidation_queue.append((tid, oids)) def serialnos(self, serials): # Method called by ZEO4 storage servers.