Skip to content

Commit

Permalink
object-level lock manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Fulton committed Aug 8, 2016
1 parent d22ec76 commit cf87144
Show file tree
Hide file tree
Showing 2 changed files with 425 additions and 69 deletions.
213 changes: 144 additions & 69 deletions src/ZEO/StorageServer.py
Expand Up @@ -1135,31 +1135,73 @@ def __init__(self, storage_id, stats, timeout):
self.storage_id = storage_id
self.stats = stats
self.timeout = timeout
self.locked = None
self.waiting = {} # {ZEOStorage -> (func, delay)}
self._lock = RLock()

def lock(self, zs, func):
"""Call the given function with the commit lock.
self.oid_locks = {} # {oid -> ZEOStorage}
self.oid_waiting = {} # {oid -> [ZEOStorage]}
self.locked = [] # [ZEOStorage]
self.working = {} # {ZEOStorage -> (want, got, func, delay)}

If we can get the lock right away, return the result of
calling the function.
def lock(self, zs, oids, func):
"""Call the given function with the commit locks for the given oids.
If we can't get the lock right away, return a delay
If we can get the locks right away, return the result of
calling the given function.
If we can't get the locks right away, return a delay. The
delay will be completed later with the result of calling the
function when the locks can be acquired.
Arguments:
zs
An object that wants the locks. It is expected to have:
locked
An attribute indicating whether it desires/has locks
connected
An attribute indicating whether it's alive. If an object
has locks, and isn't connected, the locks will be
released.
call_soon_thread_safe(func, *args)
Method to queue a function call, typically for execution
in the thread that executes zs.
log(message, level)
Log a given message at the given level.
oids
Object ids to be locked.
Any iterable will do. It will be iterated once to make a copy.
func
A function to be called when the locks are obtained.
The function must set ``locked`` on the zeo-storage to
indicate that the zeo-storage should be locked. Otherwise,
the lock isn't held pas the call.
The function must set ``locked`` on the zeo-storage to
indicate that the zeo-storage should be locked. Otherwise,
the lock isn't held pas the call.
"""
want = sorted(oids)
got = []
with self._lock:
if self._can_lock(zs):
if zs in self.working:
raise StorageTransactionError(
"Already voting ({})".format(
'waiting' if self.working[zs][0] else 'locked'
))

if self._can_lock(zs, want, got):
self.working[zs] = want, got
self._locked(zs)
else:
if any(w for w in self.waiting if w is zs):
raise StorageTransactionError("Already voting (waiting)")

delay = Delay()
self.waiting[zs] = (func, delay)
self.working[zs] = want, got, func, delay
self._oid_wait(want[0], zs)

self._log_waiting(
zs, "(%r) queue lock: transactions waiting: %s")

Expand All @@ -1176,83 +1218,116 @@ def lock(self, zs, func):
return result

def _lock_waiting(self, zs):
waiting = None
with self._lock:
if self.locked is zs:
assert zs.locked
waiting = self.working.get(zs)
if not waiting:
return

if self._can_lock(zs):
waiting = self.waiting.pop(zs, None)
if waiting:
self._locked(zs)

if waiting:
func, delay = waiting
try:
result = func()
except Exception:
delay.error(sys.exc_info())
self.release(zs)
want, got, func, delay = waiting
if not want:
return # already locked

self._oid_unwait(want[0], zs)
if self._can_lock(zs, want, got):
self._locked(zs)
else:
delay.reply(result)
if not zs.locked:
self.release(zs)
self._oid_wait(want[0], zs)
return

try:
result = func()
except Exception:
delay.error(sys.exc_info())
self.release(zs)
else:
delay.reply(result)
if not zs.locked:
self.release(zs)

def release(self, zs):
with self._lock:
locked = self.locked
if locked is zs:
self._unlocked(zs)
w = self.working.pop(zs)
if not w:
return
want, got = w[:2]

for zs in list(self.waiting):
zs.call_soon_threadsafe(self._lock_waiting, zs)
oid_locks = self.oid_locks
oid_waiting = self.oid_waiting
retry = set()
for oid in got:
del oid_locks[oid]
retry.update(oid_waiting.get(oid, ()))

if want:
self._oid_unwait(want[0], zs)
self._log_waiting(
zs, "(%r) dequeue lock: transactions waiting: %s")
else:
if self.waiting.pop(zs, None):
self._log_waiting(
zs, "(%r) dequeue lock: transactions waiting: %s")
self._unlocked(zs)

for wzs in retry:
wzs.call_soon_threadsafe(self._lock_waiting, wzs)

def _log_waiting(self, zs, message):
l = len(self.waiting)
l = len(self.working) - len(self.locked)
zs.log(message % (self.storage_id, l),
logging.CRITICAL if l > 9 else (
logging.WARNING if l > 3 else logging.DEBUG)
)

def _can_lock(self, zs):
locked = self.locked

if locked is zs:
raise StorageTransactionError("Already voting (locked)")

if locked is not None:
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)

self._unlocked(locked)
locked = None
def _can_lock(self, zs, want, got):
oid_locks = self.oid_locks
while want:
locked = oid_locks.get(want[0])
if locked is not None:
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)

self._unlocked(locked)
else:
return False

# Note that locked.locked may not be true here, because
# .lock may be set in the lock callback, but may not have
# been set yet. This aspect of the API may need more
# thought. :/
oid = want.pop(0)
got.append(oid)
oid_locks[oid] = zs

return locked is None
return True

def _locked(self, zs):
self.locked = zs
self.stats.lock_time = time.time()
self.locked.append(zs)
self._log_waiting(zs, "(%r) lock: transactions waiting: %s")
self.timeout.begin(zs)
return True
if len(self.locked) == 1:
self.stats.lock_time = time.time()
self.timeout.begin(zs)

def _unlocked(self, zs):
assert self.locked is zs
self.timeout.end(zs)
self.locked = self.stats.lock_time = None
locked = self.locked
if locked[0] is zs:
del locked[0]
self.timeout.end(zs)
if locked:
self.stats.lock_time = time.time()
self.timeout.begin(locked[0])
else:
self.stats.lock_time = None
else:
locked.remove(zs)

zs.locked = False
self._log_waiting(zs, "(%r) unlock: transactions waiting: %s")

def _oid_wait(self, oid, zs):
waiting = self.oid_waiting.get(oid)
if waiting is None:
self.oid_waiting[oid] = [zs]
else:
waiting.append(zs)

def _oid_unwait(self, oid, zs):
waiting = self.oid_waiting[oid]
waiting.remove(zs)
if not waiting:
del self.oid_waiting[oid]

0 comments on commit cf87144

Please sign in to comment.