Skip to content

Commit

Permalink
Storage-server cleanups
Browse files Browse the repository at this point in the history
Especially splitting the method used to send invalidations and info,
since we no-longer send both in the same call.
  • Loading branch information
Jim Fulton committed Jul 28, 2016
1 parent c8f1c52 commit 02943ac
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 76 deletions.
96 changes: 21 additions & 75 deletions src/ZEO/StorageServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def notify_connected(self, conn):
self.connected = True
assert conn.protocol_version is not None
self.log_label = _addr_label(conn.addr)
self.async = conn.async
self.async_threadsafe = conn.async_threadsafe

def notify_disconnected(self):
# When this storage closes, we must ensure that it aborts
Expand Down Expand Up @@ -274,7 +276,7 @@ def _pack_impl(self, time):
self.storage.pack(time, referencesf)
self.log("pack(time=%s) complete" % repr(time))
# Broadcast new size statistics
self.server.invalidate(None, self.storage_id, info=self.get_size_info())
self.server.broadcast_info(self.storage_id, self.get_size_info())

def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
Expand Down Expand Up @@ -336,7 +338,7 @@ def tpc_finish(self, id):

self.stats.commits += 1
self.storage.tpc_finish(self.transaction, self._invalidate)
self.connection.async('info', self.get_size_info())
self.async('info', self.get_size_info())
# Note that the tid is still current because we still hold the
# commit lock. We'll relinquish it in _clear_transaction.
tid = self.storage.lastTransaction()
Expand Down Expand Up @@ -801,21 +803,6 @@ def _invalidateCache(self, storage_id):
# This method is called from foreign threads. We have to
# worry about interaction with the main thread.

# 1. We modify self.invq which is read by get_invalidations
# below. This is why get_invalidations makes a copy of
# self.invq.

# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# if a client connects after we get the list of connections,
# then it will have to read the invalidation queue, which
# has already been reset.
#
# b. A connection is closes while we are iterating. This
# doesn't matter, bacause we can call should_close on a closed
# connection.

# Rebuild invq
self._setup_invq(storage_id, self.storages[storage_id])

Expand All @@ -826,72 +813,31 @@ def _invalidateCache(self, storage_id):
for zs in self.zeo_storages_by_storage_id[storage_id][:]:
zs.call_soon_threadsafe(zs.connection.close)

def invalidate(
self, zeo_storage, storage_id, tid=None, invalidated=None, info=None):
"""Internal: broadcast info and invalidations to clients.
def invalidate(self, zeo_storage, storage_id, tid, invalidated):
"""Internal: broadcast invalidations to clients.
This is called from several ZEOStorage methods.
invalidated is a sequence of oids.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the zeo_storage argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
"""

# This method can be called from foreign threads. We have to
# worry about interaction with the main thread.

# 1. We modify self.invq which is read by get_invalidations
# below. This is why get_invalidations makes a copy of
# self.invq.

# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# we are called while the storage lock is held. A new
# connection that tries to read data won't read committed
# data without first recieving an invalidation. Also, if a
# client connects after getting the list of connections,
# then it will have to read the invalidation queue, which
# has been updated to reflect the invalidations.
#
# b. A connection is closes while we are iterating. We'll need
# to cactch and ignore Disconnected errors.


if invalidated is not None:
assert tid is not None
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
invq.pop()
invq.insert(0, (tid, invalidated))
# serialize invalidation message, so we don't have to to
# it over and over
else:
assert info is not None
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
invq.pop()
invq.insert(0, (tid, invalidated))

for zs in self.zeo_storages_by_storage_id[storage_id]:
connection = zs.connection
if invalidated is not None and zs is not zeo_storage:
connection.call_soon_threadsafe(
connection.async, 'invalidateTransaction', tid, invalidated)
elif info is not None:
connection.call_soon_threadsafe(
connection.async, 'info', info)
if zs is not zeo_storage:
zs.async_threadsafe('invalidateTransaction', tid, invalidated)

def broadcast_info(self, storage_id, info):
"""Internal: broadcast info to clients.
"""
for zs in self.zeo_storages_by_storage_id[storage_id]:
zs.async_threadsafe('info', info)

def get_invalidations(self, storage_id, tid):
"""Return a tid and list of all objects invalidation since tid.
Expand Down Expand Up @@ -1032,7 +978,7 @@ def __init__(self, timeout):

def begin(self, client):
# Called from the restart code the "main" thread, whenever the
# storage lock is being acquired. (Serialized by asyncore.)
# storage lock is being acquired.
with self._cond:
assert self._client is None
self._client = client
Expand All @@ -1041,7 +987,7 @@ def begin(self, client):

def end(self, client):
# Called from the "main" thread whenever the storage lock is
# being released. (Serialized by asyncore.)
# being released.
with self._cond:
assert self._client is not None
assert self._client is client
Expand Down Expand Up @@ -1087,7 +1033,7 @@ class SlowMethodThread(threading.Thread):
"""

# Some storage methods can take a long time to complete. If we
# run these methods via a standard asyncore read handler, they
# run these methods in response to an I/O event, they
# will block all other server activity until they complete. To
# avoid blocking, we spawn a separate thread, return an MTDelay()
# object, and have the thread reply() when it finishes.
Expand Down
3 changes: 3 additions & 0 deletions src/ZEO/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def send_error(self, message_id, exc, send_error=False):
def async(self, method, *args):
self.call_async(method, args)

def async_threadsafe(self, method, *args):
self.call_soon_threadsafe(self.call_async, method, args)

best_protocol_version = os.environ.get(
'ZEO_SERVER_PROTOCOL',
ServerProtocol.protocols[-1].decode('utf-8')).encode('utf-8')
Expand Down
1 change: 1 addition & 0 deletions src/ZEO/tests/testConversionSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class FakeConnection:
addr = 'test'

call_soon_threadsafe = lambda f, *a: f(*a)
async = async_threadsafe = None

def test_server_record_iternext():
"""
Expand Down
2 changes: 1 addition & 1 deletion src/ZEO/tests/testZEO.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def async(self, method, *args):
if method == 'serialnos':
self.serials.extend(args[0])

call_soon_threadsafe = async
call_soon_threadsafe = async_threadsafe = async

class StorageServerWrapper:

Expand Down
2 changes: 2 additions & 0 deletions src/ZEO/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def call_soon_threadsafe(self, func, *args):
def async(self, *args):
self.calls.append(args)

async_threadsafe = async

class StorageServer:
"""Create a client interface to a StorageServer.
Expand Down

0 comments on commit 02943ac

Please sign in to comment.