Skip to content

Commit

Permalink
Merge branch 'master' into msgpack
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Fulton committed Oct 28, 2016
2 parents 7b56ed3 + 93ec6ce commit 6780c31
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 26 deletions.
5 changes: 4 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ Changelog
5.0.2 (unreleased)
------------------

- Provide much better performance on Python 2.

- Provide better error messages when pip tries to install ZEO on an
unsupported Python version. See `issue 75 <https://github.com/zopefoundation/ZEO/issues/75>`_.
unsupported Python version. See `issue 75
<https://github.com/zopefoundation/ZEO/issues/75>`_.

5.0.1 (2016-09-06)
------------------
Expand Down
49 changes: 35 additions & 14 deletions src/ZEO/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def verify(self, server_tid):
def get_peername(self):
return self.protocol.get_peername()

def call_async_threadsafe(self, future, method, args):
def call_async_threadsafe(self, future, wait_ready, method, args):
if self.ready:
self.protocol.call_async(method, args)
future.set_result(None)
Expand All @@ -553,7 +553,7 @@ def call_async_threadsafe(self, future, method, args):
def call_async_from_same_thread(self, method, *args):
return self.protocol.call_async(method, args)

def call_async_iter_threadsafe(self, future, it):
def call_async_iter_threadsafe(self, future, wait_ready, it):
if self.ready:
self.protocol.call_async_iter(it)
future.set_result(None)
Expand All @@ -577,16 +577,19 @@ def done(future):
else:
self._when_ready(func, result_future, *args)

def call_threadsafe(self, future, method, args):
def call_threadsafe(self, future, wait_ready, method, args):
if self.ready:
self.protocol.call(future, method, args)
elif wait_ready:
self._when_ready(
self.call_threadsafe, future, wait_ready, method, args)
else:
self._when_ready(self.call_threadsafe, future, method, args)
future.set_exception(ClientDisconnected())

# Special methods because they update the cache.

@future_generator
def load_before_threadsafe(self, future, oid, tid):
def load_before_threadsafe(self, future, wait_ready, oid, tid):
data = self.cache.loadBefore(oid, tid)
if data is not None:
future.set_result(data)
Expand All @@ -600,8 +603,11 @@ def load_before_threadsafe(self, future, oid, tid):
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
elif wait_ready:
self._when_ready(
self.load_before_threadsafe, future, wait_ready, oid, tid)
else:
self._when_ready(self.load_before_threadsafe, future, oid, tid)
future.set_exception(ClientDisconnected())

@future_generator
def _prefetch(self, oid, tid):
Expand All @@ -613,7 +619,7 @@ def _prefetch(self, oid, tid):
except Exception:
logger.exception("prefetch %r %r" % (oid, tid))

def prefetch(self, future, oids, tid):
def prefetch(self, future, wait_ready, oids, tid):
if self.ready:
for oid in oids:
if self.cache.loadBefore(oid, tid) is None:
Expand All @@ -624,7 +630,7 @@ def prefetch(self, future, oids, tid):
future.set_exception(ClientDisconnected())

@future_generator
def tpc_finish_threadsafe(self, future, tid, updates, f):
def tpc_finish_threadsafe(self, future, wait_ready, tid, updates, f):
if self.ready:
try:
tid = yield self.protocol.fut('tpc_finish', tid)
Expand All @@ -648,7 +654,7 @@ def tpc_finish_threadsafe(self, future, tid, updates, f):
else:
future.set_exception(ClientDisconnected())

def close_threadsafe(self, future):
def close_threadsafe(self, future, _):
self.close()
future.set_result(None)

Expand Down Expand Up @@ -716,15 +722,30 @@ def setup_delegation(self, loop):
def call(meth, *args, **kw):
timeout = kw.pop('timeout', None)
assert not kw

# Some explanation of the code below.
# Timeouts on Python 2 are expensive, so we try to avoid
# them if we're connected. The 3rd argument below is a
# wait flag. If false, and we're disconnected, we fail
# immediately. If that happens, then we try again with the
# wait flag set to True and wait with the default timeout.
result = Future()
call_soon_threadsafe(meth, result, *args)
return self.wait_for_result(result, timeout)
call_soon_threadsafe(meth, result, timeout is not None, *args)
try:
return self.wait_for_result(result, timeout)
except ClientDisconnected:
if timeout is None:
result = Future()
call_soon_threadsafe(meth, result, True, *args)
return self.wait_for_result(result, self.timeout)
else:
raise

self.__call = call

def wait_for_result(self, future, timeout):
try:
return future.result(self.timeout if timeout is None else timeout)
return future.result(timeout)
except concurrent.futures.TimeoutError:
if not self.client.ready:
raise ClientDisconnected("timed out waiting for connection")
Expand All @@ -738,7 +759,7 @@ def call_future(self, method, *args):
# for tests
result = concurrent.futures.Future()
self.loop.call_soon_threadsafe(
self.call_threadsafe, result, method, args)
self.call_threadsafe, result, True, method, args)
return result

def async(self, method, *args):
Expand Down Expand Up @@ -779,7 +800,7 @@ def call_closed(*a, **k):

self.__call = call_closed

def apply_threadsafe(self, future, func, *args):
def apply_threadsafe(self, future, wait_ready, func, *args):
try:
future.set_result(func(*args))
except Exception as exc:
Expand Down
8 changes: 4 additions & 4 deletions src/ZEO/asyncio/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ def respond(self, message_id, result):
sized(self.encode(message_id, 'R', result)))

def wait_for_result(self, future, timeout):
if future.done() and future.exception() is not None:
raise future.exception()
return future

def testClientBasics(self):
Expand Down Expand Up @@ -145,8 +147,7 @@ def testClientBasics(self):
# connecting, we get an error. This is because some dufus
# decided to create a client storage without waiting for it to
# connect.
f1 = self.call('foo', 1, 2)
self.assertTrue(isinstance(f1.exception(), ClientDisconnected))
self.assertRaises(ClientDisconnected, self.call, 'foo', 1, 2)

# When the client is reconnecting, it's ready flag is set to False and
# it queues calls:
Expand All @@ -155,8 +156,7 @@ def testClientBasics(self):
self.assertFalse(f1.done())

# If we try to make an async call, we get an immediate error:
f2 = self.async('bar', 3, 4)
self.assert_(isinstance(f2.exception(), ClientDisconnected))
self.assertRaises(ClientDisconnected, self.async, 'bar', 3, 4)

# The wrapper object (ClientStorage) hasn't been notified:
self.assertFalse(wrapper.notify_connected.called)
Expand Down
7 changes: 2 additions & 5 deletions src/ZEO/tests/drop_cache_rather_than_verify.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,6 @@ If we access the root object, it'll be loaded from the server:
>>> conn.root()[1].x
6

>>> len(db.storage._cache)
2

Similarly, if we simply disconnect the client, and write data from
another client:

Expand All @@ -138,8 +135,8 @@ another client:
- Drops or clears it's client cache. (The end result is that the cache
is working but empty.)

>>> len(db.storage._cache)
1
>>> len(db.storage._cache) <= 1
True

(When a database is created, it checks to make sure the root object is
in the database, which is why we get 1, rather than 0 objects in the cache.)
Expand Down
4 changes: 2 additions & 2 deletions src/ZEO/tests/testZEO.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,8 @@ def test_prefetch(self):
>>> conn.close()
>>> conn = ZEO.connection(addr)
>>> storage = conn.db().storage
>>> len(storage._cache)
1
>>> len(storage._cache) <= 1
True
>>> storage.prefetch(oids, conn._storage._start)
The prefetch returns before the cache is filled:
Expand Down

0 comments on commit 6780c31

Please sign in to comment.