diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..20d6db4d --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +ignore = BLK100 diff --git a/.hound.yml b/.hound.yml index 5de48c83..5f00754a 100644 --- a/.hound.yml +++ b/.hound.yml @@ -1,3 +1,4 @@ fail_on_violations: true python: enabled: true + config_file: .flake8 diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py index 9bd5dc05..7722a978 100644 --- a/kazoo/recipe/lock.py +++ b/kazoo/recipe/lock.py @@ -14,7 +14,9 @@ and/or the lease has been lost. """ +import re import sys + try: from time import monotonic as now except ImportError: @@ -27,13 +29,13 @@ CancelledError, KazooException, LockTimeout, - NoNodeError + NoNodeError, ) from kazoo.protocol.states import KazooState from kazoo.retry import ( ForceRetryError, KazooRetry, - RetryFailedError + RetryFailedError, ) @@ -82,22 +84,38 @@ class Lock(object): # sequence number. Involved in read/write locks. _EXCLUDE_NAMES = ["__lock__"] - def __init__(self, client, path, identifier=None): + def __init__(self, client, path, identifier=None, extra_lock_patterns=()): """Create a Kazoo lock. :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The lock path to use. - :param identifier: Name to use for this lock contender. This - can be useful for querying to see who the - current lock contenders are. - + :param identifier: Name to use for this lock contender. This can be + useful for querying to see who the current lock + contenders are. + :param extra_lock_patterns: Strings that will be used to + identify other znode in the path + that should be considered contenders + for this lock. + Use this for cross-implementation + compatibility. + + .. versionadded:: 2.7.1 + The extra_lock_patterns option. """ self.client = client self.path = path + self._exclude_names = set( + self._EXCLUDE_NAMES + list(extra_lock_patterns) + ) + self._contenders_re = re.compile( + r"(?:{patterns})(-?\d{{10}})$".format( + patterns="|".join(self._exclude_names) + ) + ) # some data is written to the node. this can be queried via # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode('utf-8') + self.data = str(identifier or "").encode("utf-8") self.node = None self.wake_event = client.handler.event_object() @@ -113,8 +131,9 @@ def __init__(self, client, path, identifier=None): self.is_acquired = False self.assured_path = False self.cancelled = False - self._retry = KazooRetry(max_tries=None, - sleep_func=client.handler.sleep_func) + self._retry = KazooRetry( + max_tries=None, sleep_func=client.handler.sleep_func + ) self._lock = client.handler.lock_object() def _ensure_path(self): @@ -171,6 +190,7 @@ def _acquire_lock(): return False if not locked: # Lock acquire doesn't take a timeout, so simulate it... + # XXX: This is not true in Py3 >= 3.2 try: locked = retry(_acquire_lock) except RetryFailedError: @@ -179,9 +199,12 @@ def _acquire_lock(): try: gotten = False try: - gotten = retry(self._inner_acquire, - blocking=blocking, timeout=timeout, - ephemeral=ephemeral) + gotten = retry( + self._inner_acquire, + blocking=blocking, + timeout=timeout, + ephemeral=ephemeral, + ) except RetryFailedError: pass except KazooException: @@ -222,8 +245,9 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True): self.create_tried = True if not node: - node = self.client.create(self.create_path, self.data, - ephemeral=ephemeral, sequence=True) + node = self.client.create( + self.create_path, self.data, ephemeral=ephemeral, sequence=True + ) # strip off path to node node = node[len(self.path) + 1:] @@ -236,18 +260,8 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True): if self.cancelled: raise CancelledError() - children = self._get_sorted_children() - - try: - our_index = children.index(node) - except ValueError: # pragma: nocover - # somehow we aren't in the children -- probably we are - # recovering from a session failure and our ephemeral - # node was removed - raise ForceRetryError() - - predecessor = self.predecessor(children, our_index) - if not predecessor: + predecessor = self._get_predecessor(node) + if predecessor is None: return True if not blocking: @@ -263,40 +277,51 @@ def _inner_acquire(self, blocking, timeout, ephemeral=True): else: self.wake_event.wait(timeout) if not self.wake_event.isSet(): - raise LockTimeout("Failed to acquire lock on %s after " - "%s seconds" % (self.path, timeout)) + raise LockTimeout( + "Failed to acquire lock on %s after %s seconds" + % (self.path, timeout) + ) finally: self.client.remove_listener(self._watch_session) - def predecessor(self, children, index): - for c in reversed(children[:index]): - if any(n in c for n in self._EXCLUDE_NAMES): - return c - return None - def _watch_predecessor(self, event): self.wake_event.set() - def _get_sorted_children(self): + def _get_predecessor(self, node): + """returns `node`'s predecessor or None + + Note: This handle the case where the current lock is not a contender + (e.g. rlock), this and also edge cases where the lock's ephemeral node + is gone. + """ children = self.client.get_children(self.path) + found_self = False + # Filter out the contenders using the computed regex + contender_matches = [] + for child in children: + match = self._contenders_re.search(child) + if match is not None: + contender_matches.append(match) + if child == node: + # Remember the node's match object so we can short circuit + # below. + found_self = match + + if found_self is False: # pragma: nocover + # somehow we aren't in the childrens -- probably we are + # recovering from a session failure and our ephemeral + # node was removed. + raise ForceRetryError() + + predecessor = None + # Sort the contenders using the sequence number extracted by the regex, + # then extract the original string. + for match in sorted(contender_matches, key=lambda m: m.groups()): + if match is found_self: + break + predecessor = match.string - # Node names are prefixed by a type: strip the prefix first, which may - # be one of multiple values in case of a read-write lock, and return - # only the sequence number (as a string since it is padded and will - # sort correctly anyway). - # - # In some cases, the lock path may contain nodes with other prefixes - # (eg. in case of a lease), just sort them last ('~' sorts after all - # ASCII digits). - def _seq(c): - for name in ["__lock__", "__rlock__"]: - idx = c.find(name) - if idx != -1: - return c[idx + len(name):] - # Sort unknown node names eg. "lease_holder" last. - return '~' - children.sort(key=_seq) - return children + return predecessor def _find_node(self): children = self.client.get_children(self.path) @@ -347,16 +372,37 @@ def contenders(self): if not self.assured_path: self._ensure_path() - children = self._get_sorted_children() - - contenders = [] + children = self.client.get_children(self.path) + # We want all contenders, including self (this is especially important + # for r/w locks). This is similar to the logic of `_get_predecessor` + # except we include our own pattern. + all_contenders_re = re.compile( + r"(?:{patterns})(-?\d{{10}})$".format( + patterns="|".join(self._exclude_names | {self._NODE_NAME}) + ) + ) + # Filter out the contenders using the computed regex + contender_matches = [] for child in children: + match = all_contenders_re.search(child) + if match is not None: + contender_matches.append(match) + # Sort the contenders using the sequence number extracted by the regex, + # then extract the original string. + contender_nodes = [ + match.string + for match in sorted(contender_matches, key=lambda m: m.groups()) + ] + # Retrieve all the contender nodes data (preserving order). + contenders = [] + for node in contender_nodes: try: - data, stat = self.client.get(self.path + "/" + child) + data, stat = self.client.get(self.path + "/" + node) if data is not None: - contenders.append(data.decode('utf-8')) + contenders.append(data.decode("utf-8")) except NoNodeError: # pragma: nocover pass + return contenders def __enter__(self): @@ -391,6 +437,7 @@ class WriteLock(Lock): shared lock. """ + _NODE_NAME = "__lock__" _EXCLUDE_NAMES = ["__lock__", "__rlock__"] @@ -420,6 +467,7 @@ class ReadLock(Lock): shared lock. """ + _NODE_NAME = "__rlock__" _EXCLUDE_NAMES = ["__lock__"] @@ -458,6 +506,7 @@ class Semaphore(object): The max_leases check. """ + def __init__(self, client, path, identifier=None, max_leases=1): """Create a Kazoo Lock @@ -483,12 +532,12 @@ def __init__(self, client, path, identifier=None, max_leases=1): # some data is written to the node. this can be queried via # contenders() to see who is contending for the lock - self.data = str(identifier or "").encode('utf-8') + self.data = str(identifier or "").encode("utf-8") self.max_leases = max_leases self.wake_event = client.handler.event_object() self.create_path = self.path + "/" + uuid.uuid4().hex - self.lock_path = path + '-' + '__lock__' + self.lock_path = path + "-" + "__lock__" self.is_acquired = False self.assured_path = False self.cancelled = False @@ -501,7 +550,7 @@ def _ensure_path(self): # node did already exist data, _ = self.client.get(self.path) try: - leases = int(data.decode('utf-8')) + leases = int(data.decode("utf-8")) except (ValueError, TypeError): # ignore non-numeric data, maybe the node data is used # for other purposes @@ -509,11 +558,11 @@ def _ensure_path(self): else: if leases != self.max_leases: raise ValueError( - "Inconsistent max leases: %s, expected: %s" % - (leases, self.max_leases) + "Inconsistent max leases: %s, expected: %s" + % (leases, self.max_leases) ) else: - self.client.set(self.path, str(self.max_leases).encode('utf-8')) + self.client.set(self.path, str(self.max_leases).encode("utf-8")) def cancel(self): """Cancel a pending semaphore acquire.""" @@ -548,7 +597,8 @@ def acquire(self, blocking=True, timeout=None): try: self.is_acquired = self.client.retry( - self._inner_acquire, blocking=blocking, timeout=timeout) + self._inner_acquire, blocking=blocking, timeout=timeout + ) except KazooException: # if we did ultimately fail, attempt to clean up self._best_effort_cleanup() @@ -590,8 +640,9 @@ def _inner_acquire(self, blocking, timeout=None): self.wake_event.wait(w.leftover()) if not self.wake_event.isSet(): raise LockTimeout( - "Failed to acquire semaphore on %s " - "after %s seconds" % (self.path, timeout)) + "Failed to acquire semaphore on %s" + " after %s seconds" % (self.path, timeout) + ) else: return False finally: @@ -612,8 +663,9 @@ def _get_lease(self, data=None): # Get a list of the current potential lock holders. If they change, # notify our wake_event object. This is used to unblock a blocking # self._inner_acquire call. - children = self.client.get_children(self.path, - self._watch_lease_change) + children = self.client.get_children( + self.path, self._watch_lease_change + ) # If there are leases available, acquire one if len(children) < self.max_leases: @@ -674,7 +726,7 @@ def lease_holders(self): for child in children: try: data, stat = self.client.get(self.path + "/" + child) - lease_holders.append(data.decode('utf-8')) + lease_holders.append(data.decode("utf-8")) except NoNodeError: # pragma: nocover pass return lease_holders diff --git a/kazoo/tests/test_lock.py b/kazoo/tests/test_lock.py index ee0ff3d5..0e16949e 100644 --- a/kazoo/tests/test_lock.py +++ b/kazoo/tests/test_lock.py @@ -1,5 +1,7 @@ import collections +import mock import threading +import unittest import uuid import pytest @@ -7,6 +9,7 @@ from kazoo.exceptions import CancelledError from kazoo.exceptions import LockTimeout from kazoo.exceptions import NoNodeError +from kazoo.recipe.lock import Lock from kazoo.testing import KazooTestCase from kazoo.tests import util as test_util @@ -97,8 +100,10 @@ def test_lock_one(self): lock_name = uuid.uuid4().hex lock = self.client.Lock(self.lockpath, lock_name) event = self.make_event() - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event), + ) thread.start() lock2_name = uuid.uuid4().hex @@ -131,8 +136,9 @@ def test_lock(self): for name in names: e = self.make_event() l = self.client.Lock(self.lockpath, name) - t = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(name, l, e)) + t = self.make_thread( + target=self._thread_lock_acquire_til_event, args=(name, l, e) + ) contender_bits[name] = (t, e) threads.append(t) @@ -176,9 +182,11 @@ def test_lock(self): def test_lock_reconnect(self): event = self.make_event() - other_lock = self.client.Lock(self.lockpath, 'contender') - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=('contender', other_lock, event)) + other_lock = self.client.Lock(self.lockpath, "contender") + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("contender", other_lock, event), + ) # acquire the lock ourselves first to make the contender line up lock = self.client.Lock(self.lockpath, "test") @@ -188,7 +196,7 @@ def test_lock_reconnect(self): # wait for the contender to line up on the lock wait = self.make_wait() wait(lambda: len(lock.contenders()) == 2) - assert lock.contenders() == ['test', 'contender'] + assert lock.contenders() == ["test", "contender"] self.expire_session(self.make_event) @@ -197,7 +205,7 @@ def test_lock_reconnect(self): with self.condition: while not self.active_thread: self.condition.wait() - assert self.active_thread == 'contender' + assert self.active_thread == "contender" event.set() thread.join() @@ -207,8 +215,10 @@ def test_lock_non_blocking(self): lock = self.client.Lock(self.lockpath, lock_name) event = self.make_event() - thread = self.make_thread(target=self._thread_lock_acquire_til_event, - args=(lock_name, lock, event)) + thread = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event), + ) thread.start() lock1 = self.client.Lock(self.lockpath, lock_name) @@ -227,8 +237,10 @@ def test_lock_non_blocking(self): def test_lock_fail_first_call(self): event1 = self.make_event() lock1 = self.client.Lock(self.lockpath, "one") - thread1 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) + thread1 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1), + ) thread1.start() # wait for this thread to acquire the lock @@ -243,8 +255,10 @@ def test_lock_fail_first_call(self): def test_lock_cancel(self): event1 = self.make_event() lock1 = self.client.Lock(self.lockpath, "one") - thread1 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("one", lock1, event1)) + thread1 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1), + ) thread1.start() # wait for this thread to acquire the lock @@ -257,8 +271,10 @@ def test_lock_cancel(self): client2.start() event2 = self.make_event() lock2 = client2.Lock(self.lockpath, "two") - thread2 = self.make_thread(target=self._thread_lock_acquire_til_event, - args=("two", lock2, event2)) + thread2 = self.make_thread( + target=self._thread_lock_acquire_til_event, + args=("two", lock2, event2), + ) thread2.start() # this one should block in acquire. check that it is a contender @@ -361,7 +377,7 @@ def test_lock_ephemeral(self): client1.start() lock = client1.Lock(self.lockpath, "ephemeral") lock.acquire(ephemeral=False) - znode = self.lockpath + '/' + lock.node + znode = self.lockpath + "/" + lock.node client1.stop() try: self.client.get(znode) @@ -418,7 +434,6 @@ def test_read_lock(self): # and that it's still not reentrant. gotten = lock.acquire(blocking=False) assert gotten is False - # Test that a second client we can share the same read lock client2 = self._get_client() client2.start() @@ -428,7 +443,6 @@ def test_read_lock(self): assert lock2.is_acquired is True gotten = lock2.acquire(blocking=False) assert gotten is False - # Test that a writer is unable to share it client3 = self._get_client() client3.start() @@ -518,28 +532,33 @@ def sema_one(): def test_non_blocking(self): sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=2) + self.lockpath, identifier="sem1", max_leases=2 + ) sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=2) + self.lockpath, identifier="sem2", max_leases=2 + ) sem3 = self.client.Semaphore( - self.lockpath, identifier='sem3', max_leases=2) + self.lockpath, identifier="sem3", max_leases=2 + ) sem1.acquire() sem2.acquire() assert not sem3.acquire(blocking=False) - assert set(sem1.lease_holders()) == set(['sem1', 'sem2']) + assert set(sem1.lease_holders()) == set(["sem1", "sem2"]) sem2.release() # the next line isn't required, but avoids timing issues in tests sem3.acquire() - assert set(sem1.lease_holders()) == set(['sem1', 'sem3']) + assert set(sem1.lease_holders()) == set(["sem1", "sem3"]) sem1.release() sem3.release() def test_non_blocking_release(self): sem1 = self.client.Semaphore( - self.lockpath, identifier='sem1', max_leases=1) + self.lockpath, identifier="sem1", max_leases=1 + ) sem2 = self.client.Semaphore( - self.lockpath, identifier='sem2', max_leases=1) + self.lockpath, identifier="sem2", max_leases=1 + ) sem1.acquire() sem2.acquire(blocking=False) @@ -552,7 +571,7 @@ def test_holders(self): event = self.make_event() def sema_one(): - with self.client.Semaphore(self.lockpath, 'fred', max_leases=1): + with self.client.Semaphore(self.lockpath, "fred", max_leases=1): started.set() event.wait() @@ -561,13 +580,13 @@ def sema_one(): started.wait() sem1 = self.client.Semaphore(self.lockpath) holders = sem1.lease_holders() - assert holders == ['fred'] + assert holders == ["fred"] event.set() thread.join() def test_semaphore_cancel(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) - sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1) + sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, "george", max_leases=1) sem1.acquire() started = self.make_event() event = self.make_event() @@ -583,7 +602,7 @@ def sema_one(): thread = self.make_thread(target=sema_one, args=()) thread.start() started.wait() - assert sem1.lease_holders() == ['fred'] + assert sem1.lease_holders() == ["fred"] assert not event.is_set() sem2.cancel() event.wait() @@ -591,7 +610,7 @@ def sema_one(): thread.join() def test_multiple_acquire_and_release(self): - sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) + sem1 = self.client.Semaphore(self.lockpath, "fred", max_leases=1) sem1.acquire() sem1.acquire() @@ -599,12 +618,13 @@ def test_multiple_acquire_and_release(self): assert not sem1.release() def test_handle_session_loss(self): - expire_semaphore = self.client.Semaphore(self.lockpath, 'fred', - max_leases=1) + expire_semaphore = self.client.Semaphore( + self.lockpath, "fred", max_leases=1 + ) client = self._get_client() client.start() - lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1) + lh_semaphore = client.Semaphore(self.lockpath, "george", max_leases=1) lh_semaphore.acquire() started = self.make_event() @@ -621,7 +641,7 @@ def sema_one(): thread1.start() started.wait() - assert lh_semaphore.lease_holders() == ['george'] + assert lh_semaphore.lease_holders() == ["george"] # Fired in a separate thread to make sure we can see the effect expired = self.make_event() @@ -639,7 +659,7 @@ def expire(): client.stop() event.wait(15) - assert expire_semaphore.lease_holders() == ['fred'] + assert expire_semaphore.lease_holders() == ["fred"] event2.set() for t in (thread1, thread2): @@ -658,7 +678,7 @@ def test_inconsistent_max_leases_other_data(self): sem2 = self.client.Semaphore(self.lockpath, max_leases=2) self.client.ensure_path(self.lockpath) - self.client.set(self.lockpath, b'a$') + self.client.set(self.lockpath, b"a$") sem1.acquire() # sem2 thinks it's ok to have two lease holders @@ -716,3 +736,28 @@ def _thread(sem, event, timeout): # Cleanup t.join() client2.stop() + + +class TestSequence(unittest.TestCase): + def test_get_predecessor(self): + """Validate selection of predecessors. + """ + goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031" + pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032" + children = ["hello", goLock, "world", pyLock] + client = mock.MagicMock() + client.get_children.return_value = children + lock = Lock(client, "test") + assert lock._get_predecessor(pyLock) is None + + def test_get_predecessor_go(self): + """Test selection of predecessor when instructed to consider go-zk + locks. + """ + goLock = "_c_8eb60557ba51e0da67eefc47467d3f34-lock-0000000031" + pyLock = "514e5a831836450cb1a56c741e990fd8__lock__0000000032" + children = ["hello", goLock, "world", pyLock] + client = mock.MagicMock() + client.get_children.return_value = children + lock = Lock(client, "test", extra_lock_patterns=["-lock-"]) + assert lock._get_predecessor(pyLock) == goLock