Skip to content

Commit

Permalink
When calling loadBefore, collapse outstanding calls.
Browse files Browse the repository at this point in the history
IOW, if there's an outstanding call is made for a given oid and tid,
and another call is made, the second call will use the result of the
outstanding call, rather than making another call to the server.
  • Loading branch information
Jim Fulton committed Jul 14, 2016
1 parent d209579 commit 37c6678
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
30 changes: 22 additions & 8 deletions src/ZEO/asyncio/client.py
Expand Up @@ -240,6 +240,17 @@ def call(self, future, method, args):
def promise(self, method, *args):
return self.call(Promise(), method, args)

def load_before(self, oid, tid):
# Special-case loadBefore, so we collapse outstanding requests
message_id = (oid, tid)
future = self.futures.get(message_id)
if future is None:
future = asyncio.Future(loop=self.loop)
self.futures[message_id] = future
self._write(
self.encode(message_id, False, 'loadBefore', (oid, tid)))
return future.add_done_callback

# Methods called by the server.
# WARNING WARNING we can't call methods that call back to us
# syncronously, as that would lead to DEADLOCK!
Expand Down Expand Up @@ -519,14 +530,17 @@ def load_before_threadsafe(self, future, oid, tid):
if data is not None:
future.set_result(data)
elif self.ready:
@self.protocol.promise('loadBefore', oid, tid)
def load_before(data):
future.set_result(data)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)

load_before.catch(future.set_exception)

@self.protocol.load_before(oid, tid)
def load_before(load_future):
try:
data = load_future.result()
future.set_result(data)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
except Exception as exc:
future.set_exception(exc)
else:
self._when_ready(self.load_before_threadsafe, future, oid, tid)

Expand Down
41 changes: 28 additions & 13 deletions src/ZEO/asyncio/tests.py
Expand Up @@ -71,6 +71,8 @@ def pop(self, count=None, parse=True):

class ClientTests(Base, setupstack.TestCase, ClientRunner):

maxDiff = None

def start(self,
addrs=(('127.0.0.1', 8200), ), loop_addrs=None,
read_only=False,
Expand Down Expand Up @@ -191,9 +193,11 @@ def testClientBasics(self):
# Loading objects gets special handling to leverage the cache.
loaded = self.load_before(b'1'*8, m64)

# The data wasn't in the cache, so we make a server call:
self.assertEqual(self.pop(), (5, False, 'loadBefore', (b'1'*8, m64)))
self.respond(5, (b'data', b'a'*8, None))
# The data wasn't in the cache, so we made a server call:
self.assertEqual(self.pop(),
((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)))
# Note load_before uses the oid as the message id.
self.respond((b'1'*8, m64), (b'data', b'a'*8, None))
self.assertEqual(loaded.result(), (b'data', b'a'*8, None))

# If we make another request, it will be satisfied from the cache:
Expand All @@ -206,9 +210,16 @@ def testClientBasics(self):

# Now, if we try to load current again, we'll make a server request.
loaded = self.load_before(b'1'*8, m64)
self.assertEqual(self.pop(), (6, False, 'loadBefore', (b'1'*8, m64)))
self.respond(6, (b'data2', b'b'*8, None))

# Note that if we make another request for the same object,
# the requests will be collapsed:
loaded2 = self.load_before(b'1'*8, m64)

self.assertEqual(self.pop(),
((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)))
self.respond((b'1'*8, m64), (b'data2', b'b'*8, None))
self.assertEqual(loaded.result(), (b'data2', b'b'*8, None))
self.assertEqual(loaded2.result(), (b'data2', b'b'*8, None))

# Loading non-current data may also be satisfied from cache
loaded = self.load_before(b'1'*8, b'b'*8)
Expand All @@ -219,9 +230,10 @@ def testClientBasics(self):
self.assertFalse(transport.data)
loaded = self.load_before(b'1'*8, b'_'*8)

self.assertEqual(self.pop(),
(7, False, 'loadBefore', (b'1'*8, b'_'*8)))
self.respond(7, (b'data0', b'^'*8, b'_'*8))
self.assertEqual(
self.pop(),
((b'1'*8, b'_'*8), False, 'loadBefore', (b'1'*8, b'_'*8)))
self.respond((b'1'*8, b'_'*8), (b'data0', b'^'*8, b'_'*8))
self.assertEqual(loaded.result(), (b'data0', b'^'*8, b'_'*8))

# When committing transactions, we need to update the cache
Expand All @@ -244,8 +256,8 @@ def finished_cb(tid):
cache.load(b'4'*8))
self.assertEqual(cache.load(b'1'*8), (b'data2', b'b'*8))
self.assertEqual(self.pop(),
(8, False, 'tpc_finish', (b'd'*8,)))
self.respond(8, b'e'*8)
(5, False, 'tpc_finish', (b'd'*8,)))
self.respond(5, b'e'*8)
self.assertEqual(committed.result(), b'e'*8)
self.assertEqual(cache.load(b'1'*8), None)
self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8))
Expand All @@ -257,8 +269,9 @@ def finished_cb(tid):
loaded = self.load_before(b'1'*8, m64)
f1 = self.call('foo', 1, 2)
self.assertFalse(loaded.done() or f1.done())
self.assertEqual(self.pop(), [(9, False, 'loadBefore', (b'1'*8, m64)),
(10, False, 'foo', (1, 2))],
self.assertEqual(self.pop(),
[((b'1'*8, m64), False, 'loadBefore', (b'1'*8, m64)),
(6, False, 'foo', (1, 2))],
)
exc = TypeError(43)

Expand Down Expand Up @@ -720,7 +733,9 @@ def load(self, oid):
def store(self, oid, start_tid, end_tid, data):
assert start_tid is not None
revisions = self.data[oid]
revisions.append((start_tid, end_tid, data))
data = (start_tid, end_tid, data)
if not revisions or data != revisions[-1]:
revisions.append(data)
revisions.sort()

def loadBefore(self, oid, tid):
Expand Down

0 comments on commit 37c6678

Please sign in to comment.