diff --git a/src/ZEO/StorageServer.py b/src/ZEO/StorageServer.py index 79790428f..b07a228cb 100644 --- a/src/ZEO/StorageServer.py +++ b/src/ZEO/StorageServer.py @@ -104,6 +104,8 @@ def notify_connected(self, conn): self.connected = True assert conn.protocol_version is not None self.log_label = _addr_label(conn.addr) + self.async = conn.async + self.async_threadsafe = conn.async_threadsafe def notify_disconnected(self): # When this storage closes, we must ensure that it aborts @@ -274,7 +276,7 @@ def _pack_impl(self, time): self.storage.pack(time, referencesf) self.log("pack(time=%s) complete" % repr(time)) # Broadcast new size statistics - self.server.invalidate(None, self.storage_id, info=self.get_size_info()) + self.server.broadcast_info(self.storage_id, self.get_size_info()) def new_oids(self, n=100): """Return a sequence of n new oids, where n defaults to 100""" @@ -336,7 +338,7 @@ def tpc_finish(self, id): self.stats.commits += 1 self.storage.tpc_finish(self.transaction, self._invalidate) - self.connection.async('info', self.get_size_info()) + self.async('info', self.get_size_info()) # Note that the tid is still current because we still hold the # commit lock. We'll relinquish it in _clear_transaction. tid = self.storage.lastTransaction() @@ -801,21 +803,6 @@ def _invalidateCache(self, storage_id): # This method is called from foreign threads. We have to # worry about interaction with the main thread. - # 1. We modify self.invq which is read by get_invalidations - # below. This is why get_invalidations makes a copy of - # self.invq. - - # 2. We access connections. There are two dangers: - # - # a. We miss a new connection. This is not a problem because - # if a client connects after we get the list of connections, - # then it will have to read the invalidation queue, which - # has already been reset. - # - # b. A connection is closes while we are iterating. This - # doesn't matter, bacause we can call should_close on a closed - # connection. - # Rebuild invq self._setup_invq(storage_id, self.storages[storage_id]) @@ -826,72 +813,31 @@ def _invalidateCache(self, storage_id): for zs in self.zeo_storages_by_storage_id[storage_id][:]: zs.call_soon_threadsafe(zs.connection.close) - def invalidate( - self, zeo_storage, storage_id, tid=None, invalidated=None, info=None): - """Internal: broadcast info and invalidations to clients. + def invalidate(self, zeo_storage, storage_id, tid, invalidated): + """Internal: broadcast invalidations to clients. This is called from several ZEOStorage methods. invalidated is a sequence of oids. - - This can do three different things: - - - If the invalidated argument is non-empty, it broadcasts - invalidateTransaction() messages to all clients of the given - storage except the current client (the zeo_storage argument). - - - If the invalidated argument is empty and the info argument - is a non-empty dictionary, it broadcasts info() messages to - all clients of the given storage, including the current - client. - - - If both the invalidated argument and the info argument are - non-empty, it broadcasts invalidateTransaction() messages to all - clients except the current, and sends an info() message to - the current client. - """ # This method can be called from foreign threads. We have to # worry about interaction with the main thread. - # 1. We modify self.invq which is read by get_invalidations - # below. This is why get_invalidations makes a copy of - # self.invq. - - # 2. We access connections. There are two dangers: - # - # a. We miss a new connection. This is not a problem because - # we are called while the storage lock is held. A new - # connection that tries to read data won't read committed - # data without first recieving an invalidation. Also, if a - # client connects after getting the list of connections, - # then it will have to read the invalidation queue, which - # has been updated to reflect the invalidations. - # - # b. A connection is closes while we are iterating. We'll need - # to cactch and ignore Disconnected errors. - - - if invalidated is not None: - assert tid is not None - invq = self.invq[storage_id] - if len(invq) >= self.invq_bound: - invq.pop() - invq.insert(0, (tid, invalidated)) - # serialize invalidation message, so we don't have to to - # it over and over - else: - assert info is not None + invq = self.invq[storage_id] + if len(invq) >= self.invq_bound: + invq.pop() + invq.insert(0, (tid, invalidated)) for zs in self.zeo_storages_by_storage_id[storage_id]: - connection = zs.connection - if invalidated is not None and zs is not zeo_storage: - connection.call_soon_threadsafe( - connection.async, 'invalidateTransaction', tid, invalidated) - elif info is not None: - connection.call_soon_threadsafe( - connection.async, 'info', info) + if zs is not zeo_storage: + zs.async_threadsafe('invalidateTransaction', tid, invalidated) + + def broadcast_info(self, storage_id, info): + """Internal: broadcast info to clients. + """ + for zs in self.zeo_storages_by_storage_id[storage_id]: + zs.async_threadsafe('info', info) def get_invalidations(self, storage_id, tid): """Return a tid and list of all objects invalidation since tid. @@ -1032,7 +978,7 @@ def __init__(self, timeout): def begin(self, client): # Called from the restart code the "main" thread, whenever the - # storage lock is being acquired. (Serialized by asyncore.) + # storage lock is being acquired. with self._cond: assert self._client is None self._client = client @@ -1041,7 +987,7 @@ def begin(self, client): def end(self, client): # Called from the "main" thread whenever the storage lock is - # being released. (Serialized by asyncore.) + # being released. with self._cond: assert self._client is not None assert self._client is client @@ -1087,7 +1033,7 @@ class SlowMethodThread(threading.Thread): """ # Some storage methods can take a long time to complete. If we - # run these methods via a standard asyncore read handler, they + # run these methods in response to an I/O event, they # will block all other server activity until they complete. To # avoid blocking, we spawn a separate thread, return an MTDelay() # object, and have the thread reply() when it finishes. diff --git a/src/ZEO/asyncio/server.py b/src/ZEO/asyncio/server.py index 95d311bbd..66b20b195 100644 --- a/src/ZEO/asyncio/server.py +++ b/src/ZEO/asyncio/server.py @@ -139,6 +139,9 @@ def send_error(self, message_id, exc, send_error=False): def async(self, method, *args): self.call_async(method, args) + def async_threadsafe(self, method, *args): + self.call_soon_threadsafe(self.call_async, method, args) + best_protocol_version = os.environ.get( 'ZEO_SERVER_PROTOCOL', ServerProtocol.protocols[-1].decode('utf-8')).encode('utf-8') diff --git a/src/ZEO/tests/testConversionSupport.py b/src/ZEO/tests/testConversionSupport.py index fa3b9a538..52aff9ec4 100644 --- a/src/ZEO/tests/testConversionSupport.py +++ b/src/ZEO/tests/testConversionSupport.py @@ -60,6 +60,7 @@ class FakeConnection: addr = 'test' call_soon_threadsafe = lambda f, *a: f(*a) + async = async_threadsafe = None def test_server_record_iternext(): """ diff --git a/src/ZEO/tests/testZEO.py b/src/ZEO/tests/testZEO.py index e5bc535c9..3e3ef3755 100644 --- a/src/ZEO/tests/testZEO.py +++ b/src/ZEO/tests/testZEO.py @@ -714,7 +714,7 @@ def async(self, method, *args): if method == 'serialnos': self.serials.extend(args[0]) - call_soon_threadsafe = async + call_soon_threadsafe = async_threadsafe = async class StorageServerWrapper: diff --git a/src/ZEO/tests/utils.py b/src/ZEO/tests/utils.py index 9c27260d6..2f06ecca6 100644 --- a/src/ZEO/tests/utils.py +++ b/src/ZEO/tests/utils.py @@ -28,6 +28,8 @@ def call_soon_threadsafe(self, func, *args): def async(self, *args): self.calls.append(args) + async_threadsafe = async + class StorageServer: """Create a client interface to a StorageServer.