Skip to content

Commit

Permalink
Merge pull request #599 from pmazzini/lock
Browse files Browse the repository at this point in the history
feat(core): interoperate with Go client
  • Loading branch information
ceache committed Apr 29, 2020
2 parents b20c929 + 75f62a0 commit b880ae6
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 109 deletions.
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
ignore = BLK100
1 change: 1 addition & 0 deletions .hound.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fail_on_violations: true
python:
enabled: true
config_file: .flake8
192 changes: 122 additions & 70 deletions kazoo/recipe/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
and/or the lease has been lost.
"""
import re
import sys

try:
from time import monotonic as now
except ImportError:
Expand All @@ -27,13 +29,13 @@
CancelledError,
KazooException,
LockTimeout,
NoNodeError
NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
RetryFailedError
RetryFailedError,
)


Expand Down Expand Up @@ -82,22 +84,38 @@ class Lock(object):
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__"]

def __init__(self, client, path, identifier=None):
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
:param identifier: Name to use for this lock contender. This
can be useful for querying to see who the
current lock contenders are.
:param identifier: Name to use for this lock contender. This can be
useful for querying to see who the current lock
contenders are.
:param extra_lock_patterns: Strings that will be used to
identify other znode in the path
that should be considered contenders
for this lock.
Use this for cross-implementation
compatibility.
.. versionadded:: 2.7.1
The extra_lock_patterns option.
"""
self.client = client
self.path = path
self._exclude_names = set(
self._EXCLUDE_NAMES + list(extra_lock_patterns)
)
self._contenders_re = re.compile(
r"(?:{patterns})(-?\d{{10}})$".format(
patterns="|".join(self._exclude_names)
)
)

# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
self.data = str(identifier or "").encode('utf-8')
self.data = str(identifier or "").encode("utf-8")
self.node = None

self.wake_event = client.handler.event_object()
Expand All @@ -113,8 +131,9 @@ def __init__(self, client, path, identifier=None):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
self._retry = KazooRetry(max_tries=None,
sleep_func=client.handler.sleep_func)
self._retry = KazooRetry(
max_tries=None, sleep_func=client.handler.sleep_func
)
self._lock = client.handler.lock_object()

def _ensure_path(self):
Expand Down Expand Up @@ -171,6 +190,7 @@ def _acquire_lock():
return False
if not locked:
# Lock acquire doesn't take a timeout, so simulate it...
# XXX: This is not true in Py3 >= 3.2
try:
locked = retry(_acquire_lock)
except RetryFailedError:
Expand All @@ -179,9 +199,12 @@ def _acquire_lock():
try:
gotten = False
try:
gotten = retry(self._inner_acquire,
blocking=blocking, timeout=timeout,
ephemeral=ephemeral)
gotten = retry(
self._inner_acquire,
blocking=blocking,
timeout=timeout,
ephemeral=ephemeral,
)
except RetryFailedError:
pass
except KazooException:
Expand Down Expand Up @@ -222,8 +245,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
self.create_tried = True

if not node:
node = self.client.create(self.create_path, self.data,
ephemeral=ephemeral, sequence=True)
node = self.client.create(
self.create_path, self.data, ephemeral=ephemeral, sequence=True
)
# strip off path to node
node = node[len(self.path) + 1:]

Expand All @@ -236,18 +260,8 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
if self.cancelled:
raise CancelledError()

children = self._get_sorted_children()

try:
our_index = children.index(node)
except ValueError: # pragma: nocover
# somehow we aren't in the children -- probably we are
# recovering from a session failure and our ephemeral
# node was removed
raise ForceRetryError()

predecessor = self.predecessor(children, our_index)
if not predecessor:
predecessor = self._get_predecessor(node)
if predecessor is None:
return True

if not blocking:
Expand All @@ -263,40 +277,51 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
raise LockTimeout("Failed to acquire lock on %s after "
"%s seconds" % (self.path, timeout))
raise LockTimeout(
"Failed to acquire lock on %s after %s seconds"
% (self.path, timeout)
)
finally:
self.client.remove_listener(self._watch_session)

def predecessor(self, children, index):
for c in reversed(children[:index]):
if any(n in c for n in self._EXCLUDE_NAMES):
return c
return None

def _watch_predecessor(self, event):
self.wake_event.set()

def _get_sorted_children(self):
def _get_predecessor(self, node):
"""returns `node`'s predecessor or None
Note: This handle the case where the current lock is not a contender
(e.g. rlock), this and also edge cases where the lock's ephemeral node
is gone.
"""
children = self.client.get_children(self.path)
found_self = False
# Filter out the contenders using the computed regex
contender_matches = []
for child in children:
match = self._contenders_re.search(child)
if match is not None:
contender_matches.append(match)
if child == node:
# Remember the node's match object so we can short circuit
# below.
found_self = match

if found_self is False: # pragma: nocover
# somehow we aren't in the childrens -- probably we are
# recovering from a session failure and our ephemeral
# node was removed.
raise ForceRetryError()

predecessor = None
# Sort the contenders using the sequence number extracted by the regex,
# then extract the original string.
for match in sorted(contender_matches, key=lambda m: m.groups()):
if match is found_self:
break
predecessor = match.string

# Node names are prefixed by a type: strip the prefix first, which may
# be one of multiple values in case of a read-write lock, and return
# only the sequence number (as a string since it is padded and will
# sort correctly anyway).
#
# In some cases, the lock path may contain nodes with other prefixes
# (eg. in case of a lease), just sort them last ('~' sorts after all
# ASCII digits).
def _seq(c):
for name in ["__lock__", "__rlock__"]:
idx = c.find(name)
if idx != -1:
return c[idx + len(name):]
# Sort unknown node names eg. "lease_holder" last.
return '~'
children.sort(key=_seq)
return children
return predecessor

def _find_node(self):
children = self.client.get_children(self.path)
Expand Down Expand Up @@ -347,16 +372,37 @@ def contenders(self):
if not self.assured_path:
self._ensure_path()

children = self._get_sorted_children()

contenders = []
children = self.client.get_children(self.path)
# We want all contenders, including self (this is especially important
# for r/w locks). This is similar to the logic of `_get_predecessor`
# except we include our own pattern.
all_contenders_re = re.compile(
r"(?:{patterns})(-?\d{{10}})$".format(
patterns="|".join(self._exclude_names | {self._NODE_NAME})
)
)
# Filter out the contenders using the computed regex
contender_matches = []
for child in children:
match = all_contenders_re.search(child)
if match is not None:
contender_matches.append(match)
# Sort the contenders using the sequence number extracted by the regex,
# then extract the original string.
contender_nodes = [
match.string
for match in sorted(contender_matches, key=lambda m: m.groups())
]
# Retrieve all the contender nodes data (preserving order).
contenders = []
for node in contender_nodes:
try:
data, stat = self.client.get(self.path + "/" + child)
data, stat = self.client.get(self.path + "/" + node)
if data is not None:
contenders.append(data.decode('utf-8'))
contenders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass

return contenders

def __enter__(self):
Expand Down Expand Up @@ -391,6 +437,7 @@ class WriteLock(Lock):
shared lock.
"""

_NODE_NAME = "__lock__"
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]

Expand Down Expand Up @@ -420,6 +467,7 @@ class ReadLock(Lock):
shared lock.
"""

_NODE_NAME = "__rlock__"
_EXCLUDE_NAMES = ["__lock__"]

Expand Down Expand Up @@ -458,6 +506,7 @@ class Semaphore(object):
The max_leases check.
"""

def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
Expand All @@ -483,12 +532,12 @@ def __init__(self, client, path, identifier=None, max_leases=1):

# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
self.data = str(identifier or "").encode('utf-8')
self.data = str(identifier or "").encode("utf-8")
self.max_leases = max_leases
self.wake_event = client.handler.event_object()

self.create_path = self.path + "/" + uuid.uuid4().hex
self.lock_path = path + '-' + '__lock__'
self.lock_path = path + "-" + "__lock__"
self.is_acquired = False
self.assured_path = False
self.cancelled = False
Expand All @@ -501,19 +550,19 @@ def _ensure_path(self):
# node did already exist
data, _ = self.client.get(self.path)
try:
leases = int(data.decode('utf-8'))
leases = int(data.decode("utf-8"))
except (ValueError, TypeError):
# ignore non-numeric data, maybe the node data is used
# for other purposes
pass
else:
if leases != self.max_leases:
raise ValueError(
"Inconsistent max leases: %s, expected: %s" %
(leases, self.max_leases)
"Inconsistent max leases: %s, expected: %s"
% (leases, self.max_leases)
)
else:
self.client.set(self.path, str(self.max_leases).encode('utf-8'))
self.client.set(self.path, str(self.max_leases).encode("utf-8"))

def cancel(self):
"""Cancel a pending semaphore acquire."""
Expand Down Expand Up @@ -548,7 +597,8 @@ def acquire(self, blocking=True, timeout=None):

try:
self.is_acquired = self.client.retry(
self._inner_acquire, blocking=blocking, timeout=timeout)
self._inner_acquire, blocking=blocking, timeout=timeout
)
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
Expand Down Expand Up @@ -590,8 +640,9 @@ def _inner_acquire(self, blocking, timeout=None):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
"Failed to acquire semaphore on %s "
"after %s seconds" % (self.path, timeout))
"Failed to acquire semaphore on %s"
" after %s seconds" % (self.path, timeout)
)
else:
return False
finally:
Expand All @@ -612,8 +663,9 @@ def _get_lease(self, data=None):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
children = self.client.get_children(self.path,
self._watch_lease_change)
children = self.client.get_children(
self.path, self._watch_lease_change
)

# If there are leases available, acquire one
if len(children) < self.max_leases:
Expand Down Expand Up @@ -674,7 +726,7 @@ def lease_holders(self):
for child in children:
try:
data, stat = self.client.get(self.path + "/" + child)
lease_holders.append(data.decode('utf-8'))
lease_holders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
return lease_holders
Expand Down
Loading

0 comments on commit b880ae6

Please sign in to comment.