Skip to content

Commit

Permalink
Lots of changes while integrating wth ClientStorage
Browse files Browse the repository at this point in the history
- testZEO tests now pass

- async tests now pass again

  Probably need to write more async tests to reflect changes.
  (Or maybe the ZEO tests that drove tem are enough.)

- dropped heartbeat tests, which were insane. Will add simpler test
  when I add heartbeats to the async implementation.
  • Loading branch information
Jim Fulton committed May 25, 2016
1 parent 3f31236 commit cc33394
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 371 deletions.
37 changes: 21 additions & 16 deletions src/ZEO/ClientStorage.py
Expand Up @@ -29,6 +29,8 @@
import weakref
from binascii import hexlify

import BTrees.OOBTree

import zc.lockfile
import ZODB
import ZODB.BaseStorage
Expand Down Expand Up @@ -223,7 +225,8 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,

self._oids = [] # List of pre-fetched oids from server

cache = self._cache = open_cache(cache, var, client, cache_size)
cache = self._cache = open_cache(
cache, var, client, storage, cache_size)

# XXX need to check for POSIX-ness here
self.blob_dir = blob_dir
Expand Down Expand Up @@ -257,8 +260,8 @@ def __init__(self, addr, storage='1', cache_size=20 * MB,
addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30,
wait=wait,
)
self._server.start()
self._call = self._server.call
self._async = self._server.async
self._async_iter = self._server.async_iter
Expand Down Expand Up @@ -341,13 +344,6 @@ def notify_connected(self, conn, info):

self._info.update(info)

# for name in self._info.get('extensionMethods', {}).keys():
# if not hasattr(self, name):
# def mklambda(mname):
# return (lambda *args, **kw:
# self._server.rpc.call(mname, *args, **kw))
# setattr(self, name, mklambda(name))

for iface in (
ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration,
Expand Down Expand Up @@ -560,7 +556,7 @@ def storeBlob(self, oid, serial, data, blobfilename, version, txn):

def store():
yield ('storeBlobStart', ())
f = open(blobfilename, 'rb')
f = open(target, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
Expand Down Expand Up @@ -714,6 +710,12 @@ def tpc_begin(self, txn, tid=None, status=' '):

try:
tbuf = txn.data(self)
except AttributeError:
# Gaaaa. This is a recovery transaction. Work around this
# until we can think of something better. XXX
tb = {}
txn.data = tb.__getitem__
txn.set_data = tb.__setitem__
except KeyError:
pass
else:
Expand Down Expand Up @@ -855,9 +857,6 @@ def restore(self, oid, serial, data, version, prev_txn, transaction):
assert not version
self._check_trans(transaction, 'restore')
self._async('restorea', oid, serial, data, prev_txn, id(transaction))
# Don't update the transaction buffer, because current data are
# unaffected.
return self._check_serials()

# Below are methods invoked by the StorageServer

Expand All @@ -871,6 +870,10 @@ def info(self, dict):
"""Server callback to update the info dictionary."""
self._info.update(dict)

def invalidateCache(self):
if self._db is not None:
self._db.invalidateCache()

def invalidateTransaction(self, tid, oids):
"""Server callback: Invalidate objects modified by tid."""
if self._db is not None:
Expand Down Expand Up @@ -1154,14 +1157,16 @@ def _lock_blob(path):
else:
break

def open_cache(cache, var, client, cache_size):
def open_cache(cache, var, client, storage, cache_size):
if isinstance(cache, (None.__class__, str)):
from ZEO.cache import ClientCache
if cache is None:
if client:
cache = os.path.join(var or os.getcwd(), client)
cache = os.path.join(var or os.getcwd(),
"%s-%s.zec" % (client, storage))
else:
return ClientCache(cache, cache_size)
# ephemeral cache
return ClientCache(None, cache_size)

cache = ClientCache(cache, cache_size)

Expand Down
1 change: 1 addition & 0 deletions src/ZEO/TransactionBuffer.py
Expand Up @@ -62,6 +62,7 @@ def store(self, oid, data):
def serial(self, oid, serial):
if isinstance(serial, Exception):
self.exception = serial
self.serials[oid] = None
else:
self.serials[oid] = serial

Expand Down
93 changes: 74 additions & 19 deletions src/ZEO/asyncio/client.py
Expand Up @@ -7,9 +7,13 @@
import random
import threading
import traceback
import ZEO.Exceptions

import ZODB.event
import ZODB.POSException

import ZEO.Exceptions
import ZEO.interfaces

logger = logging.getLogger(__name__)

Fallback = object()
Expand Down Expand Up @@ -272,6 +276,16 @@ def message_received(self, data):
type(args[0]) == self.exception_type_type and
issubclass(args[0], Exception)
):
if not issubclass(
args[0], (
ZODB.POSException.POSKeyError,
ZODB.POSException.ConflictError,)
):
logger.error("%s from server: %s.%s:%s",
self.name,
args[0].__module__,
args[0].__name__,
args[1])
future.set_exception(args[1])
else:
future.set_result(args)
Expand Down Expand Up @@ -307,7 +321,7 @@ def promise(self, method, *args):
'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop',
# plus: notify_connected, notify_disconnected
)
client_delegated = client_methods[1:]
client_delegated = client_methods[2:]

class Client:
"""asyncio low-level ZEO client interface
Expand Down Expand Up @@ -432,6 +446,8 @@ def finish_verify(server_tid):
self.client.invalidateCache()
self.finished_verify(server_tid)
elif cache_tid > server_tid:
logger.critical(
'Client has seen newer transactions than server!')
raise AssertionError("Server behind client, %r < %r, %s",
server_tid, cache_tid, protocol)
elif cache_tid == server_tid:
Expand All @@ -447,7 +463,15 @@ def verify_invalidations(vdata):
return tid
else:
# cache is too old
logger.info("cache too old %s", protocol)
try:
ZODB.event.notify(
ZEO.interfaces.StaleCache(self.client))
except Exception:
logger.exception("sending StaleCache event")
logger.critical(
"%s dropping stale cache",
getattr(self.client, '__name__', ''),
)
self.cache.clear()
self.client.invalidateCache()
return server_tid
Expand Down Expand Up @@ -561,14 +585,24 @@ def tpc_finish_threadsafe(self, future, tid, updates, f):
if self.ready:
@self.protocol.promise('tpc_finish', tid)
def committed(tid):
cache = self.cache
for oid, data, resolved in updates:
cache.invalidate(oid, tid)
if data and not resolved:
cache.store(oid, tid, None, data)
cache.setLastTid(tid)
f(tid)
future.set_result(tid)
try:
cache = self.cache
for oid, data, resolved in updates:
cache.invalidate(oid, tid)
if data and not resolved:
cache.store(oid, tid, None, data)
cache.setLastTid(tid)
except Exception as exc:
future.set_exception(exc)

# At this point, our cache is in an inconsistent
# state. We need to reconnect in hopes of
# recovering to a consistent state.
self.protocol.close()
self.disconnected(self.protocol)
else:
f(tid)
future.set_result(tid)

committed.catch(future.set_exception)
else:
Expand All @@ -585,6 +619,18 @@ def invalidateTransaction(self, tid, oids):
self.cache.setLastTid(tid)
self.client.invalidateTransaction(tid, oids)

def serialnos(self, serials):
# Before delegating, check for errors (likely ConflictErrors)
# and invalidate the oids they're associated with. In the
# past, this was done by the client, but now we control the
# cache and this is our last chance, as the client won't call
# back into us when there's an error.
for oid, serial in serials:
if isinstance(serial, Exception):
self.cache.invalidate(oid, None)

self.client.serialnos(serials)

@property
def protocol_version(self):
return self.protocol.protocol_version
Expand Down Expand Up @@ -699,19 +745,15 @@ class ClientThread(ClientRunner):

def __init__(self, addrs, client, cache,
storage_key='1', read_only=False, timeout=30,
disconnect_poll=1, wait=True):
disconnect_poll=1):
self.set_options(addrs, client, cache, storage_key, read_only,
timeout, disconnect_poll)
self.thread = threading.Thread(
target=self.run,
name='zeo_client_'+storage_key,
name="%s zeo client networking thread" % client.__name__,
daemon=True,
)
self.started = threading.Event()
self.thread.start()
self.started.wait()
if wait:
self.connected.result(timeout)

exception = None
def run(self):
Expand All @@ -724,11 +766,24 @@ def run(self):
except Exception as exc:
logger.exception("Client thread")
self.exception = exc
raise
else:
finally:
if not self.closed:
if self.client.ready:
self.closed = True
self.client.ready = False
self.client.client.notify_disconnected()
logger.critical("Client loop stopped unexpectedly")
loop.close()
logger.debug('Stopping client thread')

def start(self, wait=True):
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception
if wait:
self.connected.result(self.timeout)

closed = False
def close(self):
if not self.closed:
Expand Down
13 changes: 11 additions & 2 deletions src/ZEO/asyncio/tests.py
Expand Up @@ -96,7 +96,16 @@ def testBasics(self):
# Actually, the client isn't connected until it initializes it's cache:
self.assertFalse(client.connected.done() or transport.data)

# If we try to make calls while the client is connecting, they're queued
# If we try to make calls while the client is *initially*
# connecting, we get an error. This is because some dufus
# decided to create a client storage without waiting for it to
# connect.
f1 = self.call('foo', 1, 2)
self.assertTrue(isinstance(f1.exception(), ClientDisconnected))

# When the client is reconnecting, it's ready flag is set to False and
# it queues calls:
client.ready = False
f1 = self.call('foo', 1, 2)
self.assertFalse(f1.done())

Expand Down Expand Up @@ -195,7 +204,7 @@ def finished_cb(tid):
self.assertEqual(parse(transport.pop()),
(8, False, 'tpc_finish', (b'd'*8,)))
respond(8, b'e'*8)
self.assertEqual(committed.result(), None)
self.assertEqual(committed.result(), b'e'*8)
self.assertEqual(cache.load(b'1'*8), None)
self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8))
self.assertEqual(cache.load(b'4'*8), ('committed 4', b'e'*8))
Expand Down

0 comments on commit cc33394

Please sign in to comment.