Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[for discussion] asyncio.client: move cache access out of ClientIO into Protocol #219

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 34 additions & 20 deletions src/ZEO/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,32 @@ def load_before(self, oid, tid):
future = self.futures.get(message_id)
if future is None:
future = Future(loop=self.loop)
self.call_sync('loadBefore', message_id, message_id, future)

@future.add_done_callback
def _(future):
try:
# Check whether the cache contains the information.
# I am not sure whether the cache lookup is really
# necessary at this place (it has already been
# done in ``ClientStorage.loadBefore`` and therefore
# will likely fail here).
# The lookup at this place, however, guarantees
# (together with the folding of identical requests above)
# that we will not store data already in the cache.
# Maybe, this is important
cache = self.client.cache
data = cache.loadBefore(oid, tid)
if data:
future.set_result(data)
else:
# data not in the cache
# ensure the cache gets updated when the answer arrives
@future.add_done_callback
def store(future):
if future.cancelled() or future.exception() is not None:
return
data = future.result()
except Exception:
return
if data:
data, start, end = data
self.client.cache.store(oid, start, end, data)
if data:
data, start, end = data
self.client.cache.store(oid, start, end, data)

self.call_sync('loadBefore', message_id, message_id, future)
return future

# Methods called by the server.
Expand Down Expand Up @@ -753,10 +767,15 @@ def close_co(self):

@coroutine
def load_before_co(self, oid, tid, timeout):
data = self.cache.loadBefore(oid, tid)
if data is not None:
return_(data)
if not self.operational:
# the following cache lookup is not necessary in
# real life: ``ClientStorage.loadBefore`` has
# already done it (and failed); we will fail, too --
# with high probability
# But a test relies on this cache lookup.
data = self.cache.loadBefore(oid, tid)
if data:
return_(data)
yield self.await_operational_co(timeout)
# Race condition potential
# -- see comment in ``call_sync_co``
Expand All @@ -774,13 +793,8 @@ def _prefetch_co(self, oid, tid):
def prefetch_co(self, oids, tid):
if not self.operational:
raise ClientDisconnected()
oids_tofetch = []
for oid in oids:
if self.cache.loadBefore(oid, tid) is None:
oids_tofetch.append(oid)
if oids_tofetch:
yield asyncio.gather(*(Task(self._prefetch_co(oid, tid), loop=self.loop)
for oid in oids_tofetch))
yield asyncio.gather(*(Task(self._prefetch_co(oid, tid), loop=self.loop)
for oid in oids))

@coroutine
def tpc_finish_co(self, tid, updates, f):
Expand Down