From 37c667891267415a88eb16fe9490b04f2233fa4d Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 14 Jul 2016 15:37:00 -0400 Subject: [PATCH 1/7] When calling loadBefore, collapse outstanding calls. IOW, if there's an outstanding call is made for a given oid and tid, and another call is made, the second call will use the result of the outstanding call, rather than making another call to the server. --- src/ZEO/asyncio/client.py | 30 ++++++++++++++++++++-------- src/ZEO/asyncio/tests.py | 41 ++++++++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index ca68ee013..05ba494eb 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -240,6 +240,17 @@ def call(self, future, method, args): def promise(self, method, *args): return self.call(Promise(), method, args) + def load_before(self, oid, tid): + # Special-case loadBefore, so we collapse outstanding requests + message_id = (oid, tid) + future = self.futures.get(message_id) + if future is None: + future = asyncio.Future(loop=self.loop) + self.futures[message_id] = future + self._write( + self.encode(message_id, False, 'loadBefore', (oid, tid))) + return future.add_done_callback + # Methods called by the server. # WARNING WARNING we can't call methods that call back to us # syncronously, as that would lead to DEADLOCK! @@ -519,14 +530,17 @@ def load_before_threadsafe(self, future, oid, tid): if data is not None: future.set_result(data) elif self.ready: - @self.protocol.promise('loadBefore', oid, tid) - def load_before(data): - future.set_result(data) - if data: - data, start, end = data - self.cache.store(oid, start, end, data) - - load_before.catch(future.set_exception) + + @self.protocol.load_before(oid, tid) + def load_before(load_future): + try: + data = load_future.result() + future.set_result(data) + if data: + data, start, end = data + self.cache.store(oid, start, end, data) + except Exception as exc: + future.set_exception(exc) else: self._when_ready(self.load_before_threadsafe, future, oid, tid) diff --git a/src/ZEO/asyncio/tests.py b/src/ZEO/asyncio/tests.py index 4165ac389..36b66bc65 100644 --- a/src/ZEO/asyncio/tests.py +++ b/src/ZEO/asyncio/tests.py @@ -71,6 +71,8 @@ def pop(self, count=None, parse=True): class ClientTests(Base, setupstack.TestCase, ClientRunner): + maxDiff = None + def start(self, addrs=(('127.0.0.1', 8200), ), loop_addrs=None, read_only=False, @@ -191,9 +193,11 @@ def testClientBasics(self): # Loading objects gets special handling to leverage the cache. loaded = self.load_before(b'1'*8, m64) - # The data wasn't in the cache, so we make a server call: - self.assertEqual(self.pop(), (5, False, 'loadBefore', (b'1'*8, m64))) - self.respond(5, (b'data', b'a'*8, None)) + # The data wasn't in the cache, so we made a server call: + self.assertEqual(self.pop(), + ((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64))) + # Note load_before uses the oid as the message id. + self.respond((b'1'*8, m64), (b'data', b'a'*8, None)) self.assertEqual(loaded.result(), (b'data', b'a'*8, None)) # If we make another request, it will be satisfied from the cache: @@ -206,9 +210,16 @@ def testClientBasics(self): # Now, if we try to load current again, we'll make a server request. loaded = self.load_before(b'1'*8, m64) - self.assertEqual(self.pop(), (6, False, 'loadBefore', (b'1'*8, m64))) - self.respond(6, (b'data2', b'b'*8, None)) + + # Note that if we make another request for the same object, + # the requests will be collapsed: + loaded2 = self.load_before(b'1'*8, m64) + + self.assertEqual(self.pop(), + ((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64))) + self.respond((b'1'*8, m64), (b'data2', b'b'*8, None)) self.assertEqual(loaded.result(), (b'data2', b'b'*8, None)) + self.assertEqual(loaded2.result(), (b'data2', b'b'*8, None)) # Loading non-current data may also be satisfied from cache loaded = self.load_before(b'1'*8, b'b'*8) @@ -219,9 +230,10 @@ def testClientBasics(self): self.assertFalse(transport.data) loaded = self.load_before(b'1'*8, b'_'*8) - self.assertEqual(self.pop(), - (7, False, 'loadBefore', (b'1'*8, b'_'*8))) - self.respond(7, (b'data0', b'^'*8, b'_'*8)) + self.assertEqual( + self.pop(), + ((b'1'*8, b'_'*8), False, 'loadBefore', (b'1'*8, b'_'*8))) + self.respond((b'1'*8, b'_'*8), (b'data0', b'^'*8, b'_'*8)) self.assertEqual(loaded.result(), (b'data0', b'^'*8, b'_'*8)) # When committing transactions, we need to update the cache @@ -244,8 +256,8 @@ def finished_cb(tid): cache.load(b'4'*8)) self.assertEqual(cache.load(b'1'*8), (b'data2', b'b'*8)) self.assertEqual(self.pop(), - (8, False, 'tpc_finish', (b'd'*8,))) - self.respond(8, b'e'*8) + (5, False, 'tpc_finish', (b'd'*8,))) + self.respond(5, b'e'*8) self.assertEqual(committed.result(), b'e'*8) self.assertEqual(cache.load(b'1'*8), None) self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8)) @@ -257,8 +269,9 @@ def finished_cb(tid): loaded = self.load_before(b'1'*8, m64) f1 = self.call('foo', 1, 2) self.assertFalse(loaded.done() or f1.done()) - self.assertEqual(self.pop(), [(9, False, 'loadBefore', (b'1'*8, m64)), - (10, False, 'foo', (1, 2))], + self.assertEqual(self.pop(), + [((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)), + (6, False, 'foo', (1, 2))], ) exc = TypeError(43) @@ -720,7 +733,9 @@ def load(self, oid): def store(self, oid, start_tid, end_tid, data): assert start_tid is not None revisions = self.data[oid] - revisions.append((start_tid, end_tid, data)) + data = (start_tid, end_tid, data) + if not revisions or data != revisions[-1]: + revisions.append(data) revisions.sort() def loadBefore(self, oid, tid): From 1a38d378c81d0db07f377db2350702332436fb08 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Thu, 14 Jul 2016 18:11:49 -0400 Subject: [PATCH 2/7] Added a prefetch method. --- src/ZEO/ClientStorage.py | 7 +++++++ src/ZEO/asyncio/client.py | 24 +++++++++++++++++++++ src/ZEO/tests/testZEO2.py | 44 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py index 7b02bd15c..ff45706d2 100644 --- a/src/ZEO/ClientStorage.py +++ b/src/ZEO/ClientStorage.py @@ -504,8 +504,15 @@ def load(self, oid, version=''): return result[:2] def loadBefore(self, oid, tid): + result = self._cache.loadBefore(oid, tid) + if result: + return result + return self._server.load_before(oid, tid) + def prefetch(self, oids, tid): + self._server.prefetch(oids, tid) + def new_oid(self): """Storage API: return a new object identifier. """ diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index 05ba494eb..40655210c 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -544,6 +544,27 @@ def load_before(load_future): else: self._when_ready(self.load_before_threadsafe, future, oid, tid) + def _prefetch(self, oid, tid): + @self.protocol.load_before(oid, tid) + def load_before(load_future): + try: + data = load_future.result() + if data: + data, start, end = data + self.cache.store(oid, start, end, data) + except Exception: + logger.exception("prefetch %r %r" % (oid, tid)) + + def prefetch(self, future, oids, tid): + if self.ready: + for oid in oids: + if self.cache.loadBefore(oid, tid) is None: + self._prefetch(oid, tid) + + future.set_result(None) + else: + future.set_exception(ClientDisconnected()) + def tpc_finish_threadsafe(self, future, tid, updates, f): if self.ready: @self.protocol.promise('tpc_finish', tid) @@ -662,6 +683,9 @@ def async(self, method, *args): def async_iter(self, it): return self.__call(self.client.call_async_iter_threadsafe, it) + def prefetch(self, oids, tid): + return self.__call(self.client.prefetch, oids, tid) + def load_before(self, oid, tid): return self.__call(self.client.load_before_threadsafe, oid, tid) diff --git a/src/ZEO/tests/testZEO2.py b/src/ZEO/tests/testZEO2.py index 160990691..937d05387 100644 --- a/src/ZEO/tests/testZEO2.py +++ b/src/ZEO/tests/testZEO2.py @@ -513,6 +513,50 @@ def lock_sanity_check(): >>> logging.getLogger('ZEO').removeHandler(handler) """ +def test_prefetch(self): + """The client storage prefetch method pre-fetches from the server + + >>> count = 99 + + >>> import ZEO + >>> addr, stop = ZEO.server() + >>> conn = ZEO.connection(addr) + >>> root = conn.root() + >>> cls = root.__class__ + >>> for i in range(count): + ... root[i] = cls() + >>> conn.transaction_manager.commit() + >>> oids = [root[i]._p_oid for i in range(count)] + >>> conn.close() + >>> conn = ZEO.connection(addr) + >>> storage = conn.db().storage + >>> len(storage._cache) + 1 + >>> storage.prefetch(oids, conn._storage._start) + + The prefetch returns before the cache is filled: + + >>> len(storage._cache) < count + True + + But it is filled eventually: + + >>> from zope.testing.wait import wait + >>> wait(lambda : len(storage._cache) > count) + + >>> loads = storage.server_status()['loads'] + + Now if we reload the data, it will be satisfied from the cache: + + >>> for oid in oids: + ... _ = conn._storage.load(oid) + + >>> storage.server_status()['loads'] == loads + True + + >>> conn.close() + >>> stop() + """ def test_suite(): return unittest.TestSuite(( From b6df02cbcd80d5af1cd0407bc29b7941a76b7f44 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Sat, 16 Jul 2016 16:49:08 -0400 Subject: [PATCH 3/7] make sure I lose a race --- src/ZEO/tests/testZEO2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ZEO/tests/testZEO2.py b/src/ZEO/tests/testZEO2.py index 937d05387..c58d0411a 100644 --- a/src/ZEO/tests/testZEO2.py +++ b/src/ZEO/tests/testZEO2.py @@ -516,7 +516,7 @@ def lock_sanity_check(): def test_prefetch(self): """The client storage prefetch method pre-fetches from the server - >>> count = 99 + >>> count = 999 >>> import ZEO >>> addr, stop = ZEO.server() From db9d3da56572f128325a296859d9316445d24c46 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Sat, 16 Jul 2016 16:49:44 -0400 Subject: [PATCH 4/7] removed uneeded import --- src/ZEO/asyncio/tests.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ZEO/asyncio/tests.py b/src/ZEO/asyncio/tests.py index 36b66bc65..d6897f961 100644 --- a/src/ZEO/asyncio/tests.py +++ b/src/ZEO/asyncio/tests.py @@ -12,7 +12,6 @@ import collections import logging -import pdb import struct import unittest From f96776b396c9ee2af33319318a8d7beee4ca2286 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Sat, 16 Jul 2016 16:51:09 -0400 Subject: [PATCH 5/7] Changed load_before and preftech to use generators rather than callbacks --- src/ZEO/asyncio/client.py | 72 +++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index 40655210c..86a731fdc 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -8,6 +8,7 @@ from ZEO.Exceptions import ClientDisconnected from ZODB.ConflictResolution import ResolvedSerial import concurrent.futures +import functools import logging import random import threading @@ -27,6 +28,37 @@ local_random = random.Random() # use separate generator to facilitate tests +def future_generator(func): + """Decorates a generator that generates futures + """ + + @functools.wraps(func) + def call_generator(*args, **kw): + gen = func(*args, **kw) + try: + f = next(gen) + except StopIteration: + gen.close() + else: + def store(gen, future): + @future.add_done_callback + def _(future): + try: + try: + result = future.result() + except Exception as exc: + f = gen.throw(exc) + else: + f = gen.send(result) + except StopIteration: + gen.close() + else: + store(gen, f) + + store(gen, f) + + return call_generator + class Protocol(base.Protocol): """asyncio low-level ZEO client interface """ @@ -249,7 +281,7 @@ def load_before(self, oid, tid): self.futures[message_id] = future self._write( self.encode(message_id, False, 'loadBefore', (oid, tid))) - return future.add_done_callback + return future # Methods called by the server. # WARNING WARNING we can't call methods that call back to us @@ -525,35 +557,33 @@ def call_threadsafe(self, future, method, args): # Special methods because they update the cache. + @future_generator def load_before_threadsafe(self, future, oid, tid): data = self.cache.loadBefore(oid, tid) if data is not None: future.set_result(data) elif self.ready: - - @self.protocol.load_before(oid, tid) - def load_before(load_future): - try: - data = load_future.result() - future.set_result(data) - if data: - data, start, end = data - self.cache.store(oid, start, end, data) - except Exception as exc: - future.set_exception(exc) - else: - self._when_ready(self.load_before_threadsafe, future, oid, tid) - - def _prefetch(self, oid, tid): - @self.protocol.load_before(oid, tid) - def load_before(load_future): try: - data = load_future.result() + data = yield self.protocol.load_before(oid, tid) + except Exception as exc: + future.set_exception(exc) + else: + future.set_result(data) if data: data, start, end = data self.cache.store(oid, start, end, data) - except Exception: - logger.exception("prefetch %r %r" % (oid, tid)) + else: + self._when_ready(self.load_before_threadsafe, future, oid, tid) + + @future_generator + def _prefetch(self, oid, tid): + try: + data = yield self.protocol.load_before(oid, tid) + if data: + data, start, end = data + self.cache.store(oid, start, end, data) + except Exception: + logger.exception("prefetch %r %r" % (oid, tid)) def prefetch(self, future, oids, tid): if self.ready: From 7cdcc01348bf6066b1dd6b13ee36e3451d4b299f Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Sun, 17 Jul 2016 10:57:07 -0400 Subject: [PATCH 6/7] Updated connect/verify to use generators --- src/ZEO/asyncio/client.py | 194 ++++++++++++++++++-------------------- src/ZEO/asyncio/tests.py | 77 ++++++--------- 2 files changed, 123 insertions(+), 148 deletions(-) diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index 86a731fdc..034abe07a 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -159,6 +159,7 @@ def connection_lost(self, exc): self.closed = True self.client.disconnected(self) + @future_generator def finish_connect(self, protocol_version): # We use a promise model rather than coroutines here because @@ -182,56 +183,29 @@ def finish_connect(self, protocol_version): self.client.register_failed( self, ZEO.Exceptions.ProtocolError(protocol_version)) return - self._write(self.protocol_version) - - register = self.promise( - 'register', self.storage_key, - self.read_only if self.read_only is not Fallback else False, - ) - if self.read_only is not Fallback: - # Get lastTransaction in flight right away to make - # successful connection quicker, but only if we're not - # doing read-only fallback. If we might need to retry, we - # can't send lastTransaction because if the registration - # fails, it will be seen as an invalid message and the - # connection will close. :( It would be a lot better of - # registere returned the last transaction (and info while - # it's at it). - lastTransaction = self.promise('lastTransaction') - else: - lastTransaction = None # to make python happy - @register - def registered(_): - if self.read_only is Fallback: - self.read_only = False - r_lastTransaction = self.promise('lastTransaction') - else: - r_lastTransaction = lastTransaction - self.client.registered(self, r_lastTransaction) - - @register.catch - def register_failed(exc): - if (isinstance(exc, ZODB.POSException.ReadOnlyError) and - self.read_only is Fallback): - # We tried a write connection, degrade to a read-only one - self.read_only = True - logger.info("%s write connection failed. Trying read-only", - self) - register = self.promise('register', self.storage_key, True) - # get lastTransaction in flight. - lastTransaction = self.promise('lastTransaction') - - @register - def registered(_): - self.client.registered(self, lastTransaction) - - @register.catch - def register_failed(exc): - self.client.register_failed(self, exc) + self._write(self.protocol_version) + try: + try: + server_tid = yield self.fut( + 'register', self.storage_key, + self.read_only if self.read_only is not Fallback else False, + ) + except ZODB.POSException.ReadOnlyError: + if self.read_only is Fallback: + self.read_only = True + server_tid = yield self.fut( + 'register', self.storage_key, True) + else: + raise else: - self.client.register_failed(self, exc) + if self.read_only is Fallback: + self.read_only = False + except Exception as exc: + self.client.register_failed(self, exc) + else: + self.client.registered(self, server_tid) exception_type_type = type(Exception) def message_received(self, data): @@ -272,6 +246,9 @@ def call(self, future, method, args): def promise(self, method, *args): return self.call(Promise(), method, args) + def fut(self, method, *args): + return self.call(Fut(), method, args) + def load_before(self, oid, tid): # Special-case loadBefore, so we collapse outstanding requests message_id = (oid, tid) @@ -405,18 +382,18 @@ def try_connecting(self): for addr in self.addrs ] - def registered(self, protocol, last_transaction_promise): + def registered(self, protocol, server_tid): if self.protocol is None: self.protocol = protocol if not (self.read_only is Fallback and protocol.read_only): # We're happy with this protocol. Tell the others to # stop trying. self._clear_protocols(protocol) - self.verify(last_transaction_promise) + self.verify(server_tid) elif (self.read_only is Fallback and not protocol.read_only and self.protocol.read_only): self.upgrade(protocol) - self.verify(last_transaction_promise) + self.verify(server_tid) else: protocol.close() # too late, we went home with another @@ -434,11 +411,14 @@ def register_failed(self, protocol, exc): self.try_connecting) verify_result = None # for tests - def verify(self, last_transaction_promise): + + @future_generator + def verify(self, server_tid): protocol = self.protocol + if server_tid is None: + server_tid = yield protocol.fut('lastTransaction') - @last_transaction_promise - def finish_verify(server_tid): + try: cache = self.cache if cache: cache_tid = cache.getLastTid() @@ -447,7 +427,6 @@ def finish_verify(server_tid): logger.error("Non-empty cache w/o tid -- clearing") cache.clear() self.client.invalidateCache() - self.finished_verify(server_tid) elif cache_tid > server_tid: self.verify_result = "Cache newer than server" logger.critical( @@ -456,61 +435,54 @@ def finish_verify(server_tid): server_tid, cache_tid, protocol) elif cache_tid == server_tid: self.verify_result = "Cache up to date" - self.finished_verify(server_tid) else: - @protocol.promise('getInvalidations', cache_tid) - def verify_invalidations(vdata): - if vdata: - self.verify_result = "quick verification" - tid, oids = vdata - for oid in oids: - cache.invalidate(oid, None) - self.client.invalidateTransaction(tid, oids) - return tid - else: - # cache is too old - self.verify_result = "cache too old, clearing" - try: - ZODB.event.notify( - ZEO.interfaces.StaleCache(self.client)) - except Exception: - logger.exception("sending StaleCache event") - logger.critical( - "%s dropping stale cache", - getattr(self.client, '__name__', ''), - ) - self.cache.clear() - self.client.invalidateCache() - return server_tid - - verify_invalidations( - self.finished_verify, - self.connected.set_exception, - ) + vdata = yield protocol.fut('getInvalidations', cache_tid) + if vdata: + self.verify_result = "quick verification" + server_tid, oids = vdata + for oid in oids: + cache.invalidate(oid, None) + self.client.invalidateTransaction(server_tid, oids) + else: + # cache is too old + self.verify_result = "cache too old, clearing" + try: + ZODB.event.notify( + ZEO.interfaces.StaleCache(self.client)) + except Exception: + logger.exception("sending StaleCache event") + logger.critical( + "%s dropping stale cache", + getattr(self.client, '__name__', ''), + ) + self.cache.clear() + self.client.invalidateCache() else: self.verify_result = "empty cache" - self.finished_verify(server_tid) - @finish_verify.catch - def verify_failed(exc): + except Exception as exc: del self.protocol self.register_failed(protocol, exc) + else: + # The cache is validated and the last tid we got from the server. + # Set ready so we apply any invalidations that follow. + # We've been ignoring them up to this point. + self.cache.setLastTid(server_tid) + self.ready = True - def finished_verify(self, server_tid): - # The cache is validated and the last tid we got from the server. - # Set ready so we apply any invalidations that follow. - # We've been ignoring them up to this point. - self.cache.setLastTid(server_tid) - self.ready = True + try: + info = yield protocol.fut('get_info') + except Exception as exc: + # This is weird. We were connected and verified our cache, but + # Now we errored getting info. - @self.protocol.promise('get_info') - def got_info(info): - self.client.notify_connected(self, info) - self.connected.set_result(None) + # XXX Need a test fpr this. The lone before is what we + # had, but it's wrong. + self.register_failed(self, exc) - @got_info.catch - def failed_info(exc): - self.register_failed(self, exc) + else: + self.client.notify_connected(self, info) + self.connected.set_result(None) def get_peername(self): return self.protocol.get_peername() @@ -822,6 +794,28 @@ def close(self): if self.exception: raise self.exception +class Fut(object): + """Lightweight future that calls it's callback immediately rather than soon + """ + + def add_done_callback(self, cb): + self.cb = cb + + exc = None + def set_exception(self, exc): + self.exc = exc + self.cb(self) + + def set_result(self, result): + self._result = result + self.cb(self) + + def result(self): + if self.exc: + raise self.exc + else: + return self._result + class Promise(object): """Lightweight future with a partial promise API. diff --git a/src/ZEO/asyncio/tests.py b/src/ZEO/asyncio/tests.py index d6897f961..c18150e69 100644 --- a/src/ZEO/asyncio/tests.py +++ b/src/ZEO/asyncio/tests.py @@ -72,6 +72,10 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): maxDiff = None + def tearDown(self): + self.client.close() + super(ClientTests, self) + def start(self, addrs=(('127.0.0.1', 8200), ), loop_addrs=None, read_only=False, @@ -96,12 +100,9 @@ def start(self, if finish_start: protocol.data_received(sized(b'Z3101')) self.assertEqual(self.pop(2, False), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) self.respond(2, 'a'*8) + self.pop(4) self.assertEqual(self.pop(), (3, False, 'get_info', ())) self.respond(3, dict(length=42)) @@ -135,12 +136,9 @@ def testClientBasics(self): # The client sends back a handshake, and registers the # storage, and requests the last transaction. self.assertEqual(self.pop(2, False), b'Z5') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) + self.assertEqual(self.pop(), (1, False, 'register', ('TEST', False))) - # Actually, the client isn't connected until it initializes it's cache: + # The client isn't connected until it initializes it's cache: self.assertFalse(client.connected.done() or transport.data) # If we try to make calls while the client is *initially* @@ -163,9 +161,13 @@ def testClientBasics(self): # The wrapper object (ClientStorage) hasn't been notified: self.assertFalse(wrapper.notify_connected.called) - # Let's respond to those first 2 calls: - + # Let's respond to the register call: self.respond(1, None) + + # The client requests the last transaction: + self.assertEqual(self.pop(), (2, False, 'lastTransaction', ())) + + # We respond self.respond(2, 'a'*8) # After verification, the client requests info: @@ -298,15 +300,14 @@ def finished_cb(tid): # protocol: protocol.data_received(sized(b'Z310')) self.assertEqual(self.unsized(transport.pop(2)), b'Z310') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) + self.assertEqual(self.pop(), (1, False, 'register', ('TEST', False))) self.assertFalse(wrapper.notify_connected.called) - self.respond(1, None) - self.respond(2, b'e'*8) - self.assertEqual(self.pop(), (3, False, 'get_info', ())) - self.respond(3, dict(length=42)) + + # If the register response is a tid, then the client won't + # request lastTransaction + self.respond(1, b'e'*8) + self.assertEqual(self.pop(), (2, False, 'get_info', ())) + self.respond(2, dict(length=42)) # Because the server tid matches the cache tid, we're done connecting wrapper.notify_connected.assert_called_with(client, {'length': 42}) @@ -335,12 +336,9 @@ def test_cache_behind(self): self.assertFalse(client.connected.done() or transport.data) protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) self.respond(2, b'e'*8) + self.pop(4) # We have to verify the cache, so we're not done connecting: self.assertFalse(client.connected.done()) @@ -373,12 +371,9 @@ def test_cache_way_behind(self): self.assertFalse(client.connected.done() or transport.data) protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) self.respond(2, b'e'*8) + self.pop(4) # We have to verify the cache, so we're not done connecting: self.assertFalse(client.connected.done()) @@ -445,12 +440,9 @@ def test_bad_server_tid(self): cache.setLastTid('b'*8) protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) self.respond(2, 'a'*8) + self.pop() self.assertFalse(client.connected.done() or transport.data) delay, func, args, _ = loop.later.pop(1) # first in later is heartbeat self.assert_(8 < delay < 10) @@ -462,12 +454,9 @@ def test_bad_server_tid(self): transport = loop.transport protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) self.respond(2, 'b'*8) + self.pop(4) self.assertEqual(self.pop(), (3, False, 'get_info', ())) self.respond(3, dict(length=42)) self.assert_(client.connected.done() and not transport.data) @@ -493,12 +482,10 @@ def test_readonly_fallback(self): self.assertTrue(self.is_read_only()) # The client tries for a read-only connection: - self.assertEqual(self.pop(), - [(2, False, 'register', ('TEST', True)), - (3, False, 'lastTransaction', ()), - ]) + self.assertEqual(self.pop(), (2, False, 'register', ('TEST', True))) # We respond with successfully: self.respond(2, None) + self.pop(2) self.respond(3, 'b'*8) self.assertTrue(self.is_read_only()) @@ -525,12 +512,12 @@ def test_readonly_fallback(self): # We respond and the writable connection succeeds: self.respond(1, None) - self.assertFalse(self.is_read_only()) # at this point, a lastTransaction request is emitted: self.assertEqual(self.parse(loop.transport.pop()), (2, False, 'lastTransaction', ())) + self.assertFalse(self.is_read_only()) # Now, the original protocol is closed, and the client is # no-longer ready: @@ -554,11 +541,8 @@ def test_invalidations_while_verifying(self): wrapper, cache, loop, client, protocol, transport = self.start() protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) + self.pop(4) self.send('invalidateTransaction', b'b'*8, [b'1'*8], called=False) self.respond(2, b'a'*8) self.send('invalidateTransaction', b'c'*8, [b'1'*8], no_output=False) @@ -575,11 +559,8 @@ def test_invalidations_while_verifying(self): protocol.data_received(sized(b'Z3101')) self.assertEqual(self.unsized(transport.pop(2)), b'Z3101') - self.assertEqual(self.pop(), - [(1, False, 'register', ('TEST', False)), - (2, False, 'lastTransaction', ()), - ]) self.respond(1, None) + self.pop(4) self.send('invalidateTransaction', b'd'*8, [b'1'*8], called=False) self.respond(2, b'c'*8) self.send('invalidateTransaction', b'e'*8, [b'1'*8], no_output=False) From 09d8df62e4e781b293eeedd8d9ecc61a5faa5e51 Mon Sep 17 00:00:00 2001 From: Jim Fulton Date: Sun, 17 Jul 2016 11:08:09 -0400 Subject: [PATCH 7/7] Converted the last promise to a generator Bye bye Promise. Also fixed a comment. --- src/ZEO/asyncio/client.py | 145 ++++++-------------------------------- 1 file changed, 20 insertions(+), 125 deletions(-) diff --git a/src/ZEO/asyncio/client.py b/src/ZEO/asyncio/client.py index 034abe07a..0bc8b1049 100644 --- a/src/ZEO/asyncio/client.py +++ b/src/ZEO/asyncio/client.py @@ -161,14 +161,7 @@ def connection_lost(self, exc): @future_generator def finish_connect(self, protocol_version): - - # We use a promise model rather than coroutines here because - # for the most part, this class is reactive and coroutines - # aren't a good model of it's activities. During - # initialization, however, we use promises to provide an - # imperative flow. - - # The promise(/future) implementation we use differs from + # The future implementation we use differs from # asyncio.Future in that callbacks are called immediately, # rather than using the loops call_soon. We want to avoid a # race between invalidations and cache initialization. In @@ -243,9 +236,6 @@ def call(self, future, method, args): self._write(self.encode(self.message_id, False, method, args)) return future - def promise(self, method, *args): - return self.call(Promise(), method, args) - def fut(self, method, *args): return self.call(Fut(), method, args) @@ -567,30 +557,28 @@ def prefetch(self, future, oids, tid): else: future.set_exception(ClientDisconnected()) + @future_generator def tpc_finish_threadsafe(self, future, tid, updates, f): if self.ready: - @self.protocol.promise('tpc_finish', tid) - def committed(tid): - try: - cache = self.cache - for oid, data, resolved in updates: - cache.invalidate(oid, tid) - if data and not resolved: - cache.store(oid, tid, None, data) - cache.setLastTid(tid) - except Exception as exc: - future.set_exception(exc) - - # At this point, our cache is in an inconsistent - # state. We need to reconnect in hopes of - # recovering to a consistent state. - self.protocol.close() - self.disconnected(self.protocol) - else: - f(tid) - future.set_result(tid) + try: + tid = yield self.protocol.fut('tpc_finish', tid) + cache = self.cache + for oid, data, resolved in updates: + cache.invalidate(oid, tid) + if data and not resolved: + cache.store(oid, tid, None, data) + cache.setLastTid(tid) + except Exception as exc: + future.set_exception(exc) - committed.catch(future.set_exception) + # At this point, our cache is in an inconsistent + # state. We need to reconnect in hopes of + # recovering to a consistent state. + self.protocol.close() + self.disconnected(self.protocol) + else: + f(tid) + future.set_result(tid) else: future.set_exception(ClientDisconnected()) @@ -815,96 +803,3 @@ def result(self): raise self.exc else: return self._result - -class Promise(object): - """Lightweight future with a partial promise API. - - These are lighweight because they call callbacks synchronously - rather than through an event loop, and because they ony support - single callbacks. - """ - - # Note that we can know that they are completed after callbacks - # are set up because they're used to make network requests. - # Requests are made by writing to a transport. Because we're used - # in a single-threaded protocol, we can't get a response and be - # completed if the callbacks are set in the same code that - # created the promise, which they are. - - next = success_callback = error_callback = cancelled = None - - def __call__(self, success_callback = None, error_callback = None): - """Set the promises success and error handlers and beget a new promise - - The promise returned provides for promise chaining, providing - a sane imperative flow. Let's call this the "next" promise. - Any results or exceptions generated by the promise or it's - callbacks are passed on to the next promise. - - When the promise completes successfully, if a success callback - isn't set, then the next promise is completed with the - successfull result. If a success callback is provided, it's - called. If the call succeeds, and the result is a promise, - them the result is called with the next promise's set_result - and set_exception methods, chaining the result and next - promise. If the result isn't a promise, then the next promise - is completed with it by calling set_result. If the success - callback fails, then it's exception is passed to - next.set_exception. - - If the promise completes with an error and the error callback - isn't set, then the exception is passed to the next promises - set_exception. If an error handler is provided, it's called - and if it doesn't error, then the original exception is passed - to the next promise's set_exception. If there error handler - errors, then that exception is passed to the next promise's - set_exception. - """ - self.next = self.__class__() - self.success_callback = success_callback - self.error_callback = error_callback - return self.next - - def cancel(self): - self.set_exception(concurrent.futures.CancelledError) - - def catch(self, error_callback): - self.error_callback = error_callback - - def set_exception(self, exc): - self._notify(None, exc) - - def set_result(self, result): - self._notify(result, None) - - def _notify(self, result, exc): - next = self.next - if exc is not None: - if self.error_callback is not None: - try: - result = self.error_callback(exc) - except Exception: - logger.exception("Exception handling error %s", exc) - if next is not None: - next.set_exception(exc) - else: - if next is not None: - next.set_result(result) - elif next is not None: - next.set_exception(exc) - else: - if self.success_callback is not None: - try: - result = self.success_callback(result) - except Exception as exc: - logger.exception("Exception in success callback") - if next is not None: - next.set_exception(exc) - else: - if next is not None: - if isinstance(result, Promise): - result(next.set_result, next.set_exception) - else: - next.set_result(result) - elif next is not None: - next.set_result(result)