Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lock] interoperate with Go client #599

Merged
merged 4 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
ignore = BLK100
1 change: 1 addition & 0 deletions .hound.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
fail_on_violations: true
python:
enabled: true
config_file: .flake8
192 changes: 122 additions & 70 deletions kazoo/recipe/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
and/or the lease has been lost.

"""
import re
import sys

try:
from time import monotonic as now
except ImportError:
Expand All @@ -27,13 +29,13 @@
CancelledError,
KazooException,
LockTimeout,
NoNodeError
NoNodeError,
)
from kazoo.protocol.states import KazooState
from kazoo.retry import (
ForceRetryError,
KazooRetry,
RetryFailedError
RetryFailedError,
)


Expand Down Expand Up @@ -82,22 +84,38 @@ class Lock(object):
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__"]

def __init__(self, client, path, identifier=None):
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""Create a Kazoo lock.

:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lock path to use.
:param identifier: Name to use for this lock contender. This
can be useful for querying to see who the
current lock contenders are.

:param identifier: Name to use for this lock contender. This can be
useful for querying to see who the current lock
contenders are.
:param extra_lock_patterns: Strings that will be used to
identify other znode in the path
that should be considered contenders
for this lock.
Use this for cross-implementation
compatibility.

.. versionadded:: 2.7.1
The extra_lock_patterns option.
"""
self.client = client
self.path = path
self._exclude_names = set(
StephenSorriaux marked this conversation as resolved.
Show resolved Hide resolved
self._EXCLUDE_NAMES + list(extra_lock_patterns)
)
self._contenders_re = re.compile(
r"(?:{patterns})(-?\d{{10}})$".format(
patterns="|".join(self._exclude_names)
)
)

# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
self.data = str(identifier or "").encode('utf-8')
self.data = str(identifier or "").encode("utf-8")
self.node = None

self.wake_event = client.handler.event_object()
Expand All @@ -113,8 +131,9 @@ def __init__(self, client, path, identifier=None):
self.is_acquired = False
self.assured_path = False
self.cancelled = False
self._retry = KazooRetry(max_tries=None,
sleep_func=client.handler.sleep_func)
self._retry = KazooRetry(
StephenSorriaux marked this conversation as resolved.
Show resolved Hide resolved
max_tries=None, sleep_func=client.handler.sleep_func
)
self._lock = client.handler.lock_object()

def _ensure_path(self):
Expand Down Expand Up @@ -171,6 +190,7 @@ def _acquire_lock():
return False
if not locked:
# Lock acquire doesn't take a timeout, so simulate it...
# XXX: This is not true in Py3 >= 3.2
try:
locked = retry(_acquire_lock)
except RetryFailedError:
Expand All @@ -179,9 +199,12 @@ def _acquire_lock():
try:
gotten = False
try:
gotten = retry(self._inner_acquire,
blocking=blocking, timeout=timeout,
ephemeral=ephemeral)
gotten = retry(
self._inner_acquire,
blocking=blocking,
timeout=timeout,
ephemeral=ephemeral,
)
except RetryFailedError:
pass
except KazooException:
Expand Down Expand Up @@ -222,8 +245,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
self.create_tried = True

if not node:
node = self.client.create(self.create_path, self.data,
ephemeral=ephemeral, sequence=True)
node = self.client.create(
self.create_path, self.data, ephemeral=ephemeral, sequence=True
)
# strip off path to node
node = node[len(self.path) + 1:]

Expand All @@ -236,18 +260,8 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
if self.cancelled:
raise CancelledError()

children = self._get_sorted_children()

try:
our_index = children.index(node)
except ValueError: # pragma: nocover
# somehow we aren't in the children -- probably we are
# recovering from a session failure and our ephemeral
# node was removed
raise ForceRetryError()

predecessor = self.predecessor(children, our_index)
if not predecessor:
predecessor = self._get_predecessor(node)
if predecessor is None:
return True

if not blocking:
Expand All @@ -263,40 +277,51 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True):
else:
self.wake_event.wait(timeout)
if not self.wake_event.isSet():
raise LockTimeout("Failed to acquire lock on %s after "
"%s seconds" % (self.path, timeout))
raise LockTimeout(
"Failed to acquire lock on %s after %s seconds"
% (self.path, timeout)
)
finally:
self.client.remove_listener(self._watch_session)

def predecessor(self, children, index):
for c in reversed(children[:index]):
if any(n in c for n in self._EXCLUDE_NAMES):
return c
return None

def _watch_predecessor(self, event):
self.wake_event.set()

def _get_sorted_children(self):
def _get_predecessor(self, node):
"""returns `node`'s predecessor or None

Note: This handle the case where the current lock is not a contender
(e.g. rlock), this and also edge cases where the lock's ephemeral node
is gone.
"""
children = self.client.get_children(self.path)
found_self = False
# Filter out the contenders using the computed regex
contender_matches = []
for child in children:
match = self._contenders_re.search(child)
if match is not None:
contender_matches.append(match)
if child == node:
# Remember the node's match object so we can short circuit
# below.
found_self = match

if found_self is False: # pragma: nocover
# somehow we aren't in the childrens -- probably we are
# recovering from a session failure and our ephemeral
# node was removed.
raise ForceRetryError()

predecessor = None
# Sort the contenders using the sequence number extracted by the regex,
# then extract the original string.
for match in sorted(contender_matches, key=lambda m: m.groups()):
if match is found_self:
break
predecessor = match.string

# Node names are prefixed by a type: strip the prefix first, which may
# be one of multiple values in case of a read-write lock, and return
# only the sequence number (as a string since it is padded and will
# sort correctly anyway).
#
# In some cases, the lock path may contain nodes with other prefixes
# (eg. in case of a lease), just sort them last ('~' sorts after all
# ASCII digits).
def _seq(c):
for name in ["__lock__", "__rlock__"]:
idx = c.find(name)
if idx != -1:
return c[idx + len(name):]
# Sort unknown node names eg. "lease_holder" last.
return '~'
children.sort(key=_seq)
return children
return predecessor

def _find_node(self):
children = self.client.get_children(self.path)
Expand Down Expand Up @@ -347,16 +372,37 @@ def contenders(self):
if not self.assured_path:
self._ensure_path()

children = self._get_sorted_children()

contenders = []
children = self.client.get_children(self.path)
# We want all contenders, including self (this is especially important
# for r/w locks). This is similar to the logic of `_get_predecessor`
# except we include our own pattern.
all_contenders_re = re.compile(
r"(?:{patterns})(-?\d{{10}})$".format(
patterns="|".join(self._exclude_names | {self._NODE_NAME})
)
)
# Filter out the contenders using the computed regex
contender_matches = []
for child in children:
match = all_contenders_re.search(child)
if match is not None:
contender_matches.append(match)
# Sort the contenders using the sequence number extracted by the regex,
# then extract the original string.
contender_nodes = [
match.string
for match in sorted(contender_matches, key=lambda m: m.groups())
]
# Retrieve all the contender nodes data (preserving order).
contenders = []
for node in contender_nodes:
try:
data, stat = self.client.get(self.path + "/" + child)
data, stat = self.client.get(self.path + "/" + node)
if data is not None:
contenders.append(data.decode('utf-8'))
contenders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass

return contenders

def __enter__(self):
Expand Down Expand Up @@ -391,6 +437,7 @@ class WriteLock(Lock):
shared lock.

"""

_NODE_NAME = "__lock__"
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]

Expand Down Expand Up @@ -420,6 +467,7 @@ class ReadLock(Lock):
shared lock.

"""

_NODE_NAME = "__rlock__"
_EXCLUDE_NAMES = ["__lock__"]

Expand Down Expand Up @@ -458,6 +506,7 @@ class Semaphore(object):
The max_leases check.

"""

def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock

Expand All @@ -483,12 +532,12 @@ def __init__(self, client, path, identifier=None, max_leases=1):

# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
self.data = str(identifier or "").encode('utf-8')
self.data = str(identifier or "").encode("utf-8")
self.max_leases = max_leases
self.wake_event = client.handler.event_object()

self.create_path = self.path + "/" + uuid.uuid4().hex
self.lock_path = path + '-' + '__lock__'
self.lock_path = path + "-" + "__lock__"
self.is_acquired = False
self.assured_path = False
self.cancelled = False
Expand All @@ -501,19 +550,19 @@ def _ensure_path(self):
# node did already exist
data, _ = self.client.get(self.path)
try:
leases = int(data.decode('utf-8'))
leases = int(data.decode("utf-8"))
except (ValueError, TypeError):
# ignore non-numeric data, maybe the node data is used
# for other purposes
pass
else:
if leases != self.max_leases:
raise ValueError(
"Inconsistent max leases: %s, expected: %s" %
(leases, self.max_leases)
"Inconsistent max leases: %s, expected: %s"
% (leases, self.max_leases)
)
else:
self.client.set(self.path, str(self.max_leases).encode('utf-8'))
self.client.set(self.path, str(self.max_leases).encode("utf-8"))

def cancel(self):
"""Cancel a pending semaphore acquire."""
Expand Down Expand Up @@ -548,7 +597,8 @@ def acquire(self, blocking=True, timeout=None):

try:
self.is_acquired = self.client.retry(
self._inner_acquire, blocking=blocking, timeout=timeout)
self._inner_acquire, blocking=blocking, timeout=timeout
)
except KazooException:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
Expand Down Expand Up @@ -590,8 +640,9 @@ def _inner_acquire(self, blocking, timeout=None):
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
"Failed to acquire semaphore on %s "
"after %s seconds" % (self.path, timeout))
"Failed to acquire semaphore on %s"
" after %s seconds" % (self.path, timeout)
)
else:
return False
finally:
Expand All @@ -612,8 +663,9 @@ def _get_lease(self, data=None):
# Get a list of the current potential lock holders. If they change,
# notify our wake_event object. This is used to unblock a blocking
# self._inner_acquire call.
children = self.client.get_children(self.path,
self._watch_lease_change)
children = self.client.get_children(
self.path, self._watch_lease_change
)

# If there are leases available, acquire one
if len(children) < self.max_leases:
Expand Down Expand Up @@ -674,7 +726,7 @@ def lease_holders(self):
for child in children:
try:
data, stat = self.client.get(self.path + "/" + child)
lease_holders.append(data.decode('utf-8'))
lease_holders.append(data.decode("utf-8"))
except NoNodeError: # pragma: nocover
pass
return lease_holders
Expand Down
Loading