Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Started lock and leader election recipes

  • Loading branch information...
commit 3e5aff1938cfcef049a3a3a1ca3ae0c9cce7c1e7 1 parent 7b5096e
@labisso labisso authored
View
12 kazoo/client.py
@@ -5,6 +5,7 @@
import zookeeper
from kazoo.sync import get_sync_strategy
+from kazoo.retry import KazooRetry
ZK_OPEN_ACL_UNSAFE = {"perms": zookeeper.PERM_ALL, "scheme": "world",
"id": "anyone"}
@@ -18,7 +19,7 @@ class ZooKeeperClient(object):
* disconnected state handling
* the rest of the operations
"""
- def __init__(self, hosts, timeout=10000):
+ def __init__(self, hosts, timeout=10000, max_retries=None):
self._hosts = hosts
self._timeout = timeout
@@ -29,6 +30,15 @@ def __init__(self, hosts, timeout=10000):
self._connected_async_result = self._sync.async_result()
self._connection_timed_out = False
+ self._retry = None
+ self._max_retries = max_retries
+
+ @property
+ def retry(self):
+ if not self._retry:
+ self._retry = KazooRetry(self._max_retries)
+ return self._retry
+
@property
def connected(self):
return self._connected
View
2  kazoo/recipe/__init__.py
@@ -0,0 +1,2 @@
+
+
View
14 kazoo/recipe/leader.py
@@ -0,0 +1,14 @@
+from kazoo.recipe.lock import ZooLock
+
+
+class LeaderElection(object):
+ def __init__(self, client, path):
+ self.lock = ZooLock(client, path)
+
+ def run(self, func, *args, **kwargs):
+ if not callable(func):
+ raise ValueError("leader function is not callable")
+
+ with self.lock:
+ func(*args, **kwargs)
+
View
130 kazoo/recipe/lock.py
@@ -0,0 +1,130 @@
+import threading
+import uuid
+
+from kazoo.retry import ForceRetryError
+
+#noinspection PyArgumentList
+class ZooLock(object):
+ _LOCK_NAME = '_lock_'
+
+ def __init__(self, client, path):
+ """
+ @type client ZooKeeperClient
+ """
+ self.client = client
+ self.path = path
+
+ self.condition = threading.Condition()
+
+ # props to Netflix Curator for this trick. It is possible for our
+ # create request to succeed on the server, but for a failure to
+ # prevent us from getting back the full path name. We prefix our
+ # lock name with a uuid and can check for its presence on retry.
+ self.prefix = uuid.uuid4().hex + self._LOCK_NAME
+ self.create_path = self.path + "/" + self.prefix
+
+ self.create_tried = False
+
+ self.is_acquired = False
+
+ def acquire(self):
+ """Acquire the mutex, blocking until it is obtained
+ """
+
+ try:
+ self.client.retry(self._inner_acquire)
+
+ self.is_acquired = True
+
+ except Exception:
+ # if we did ultimately fail, attempt to clean up
+ self._best_effort_cleanup()
+
+ def _inner_acquire(self):
+ node = None
+ if self.create_tried:
+ node = self._find_node()
+ else:
+ self.create_tried = True
+
+ if not node:
+ node = self.client.create(self.create_path, "",
+ ephemeral=True, sequence=True)
+
+ self.node = node
+
+ while True:
+ children = self._get_sorted_children()
+
+ our_index = children.index(node)
+
+ if our_index == -1:
+ # somehow we aren't in the election -- probably we are
+ # recovering from a session failure and our ephemeral
+ # node was removed
+ raise ForceRetryError()
+
+ #noinspection PySimplifyBooleanCheck
+ if our_index == 0:
+ # we are leader
+ return True
+
+ # otherwise we are in the mix. watch predecessor and bide our time
+ predecessor = self.path + "/" + children[our_index-1]
+ with self.condition:
+ if self.client.exists(predecessor, self._watch_predecessor):
+ self.condition.wait()
+
+ def _watch_predecessor(self, type, state, path):
+ with self.condition:
+ self.condition.notify_all()
+
+ def _get_sorted_children(self):
+ children = self.client.get_children(self.path)
+
+ # can't just sort directly: the node names are prefixed by uuids
+ lockname = self._LOCK_NAME
+ children.sort(key=lambda c: c[c.find(lockname) + len(lockname):])
+ return children
+
+ def _find_node(self):
+ children = self.client.get_children(self.path)
+ for child in children:
+ if child.startswith(self.prefix):
+ return child
+ return None
+
+ def _best_effort_cleanup(self):
+ try:
+
+ node = self._find_node()
+ if node:
+ self.client.delete(self.path + "/" + node)
+
+ except Exception:
+ pass
+
+ def release(self):
+ """Release the mutex immediately
+ """
+ return self.client.retry(self._inner_release)
+
+ def _inner_release(self):
+ if not self.is_acquired:
+ return False
+
+ self.client.delete(self.path + "/" + self.node)
+
+ self.is_acquired = False
+ self.node = None
+
+ return True
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self,exc_type, exc_value, traceback):
+ self.release()
+
+
+
View
2  kazoo/recipe/test/__init__.py
@@ -0,0 +1,2 @@
+
+
View
55 kazoo/recipe/test/test_lock.py
@@ -0,0 +1,55 @@
+import unittest
+import uuid
+import threading
+
+from kazoo.recipe.lock import ZooLock
+from kazoo.test import get_client_or_skip
+
+class ZooLockTests(unittest.TestCase):
+
+ def setUp(self):
+ self._c = get_client_or_skip()
+ self._c.connect()
+ self.lockpath = "/" + uuid.uuid4().hex
+ self._c.create(self.lockpath, "")
+
+ self.semaphore = threading.Semaphore()
+
+ def tearDown(self):
+ if self.lockpath:
+ try:
+ self._c.delete(self.lockpath)
+ except Exception:
+ pass
+
+ def test_lock(self):
+ clients = []
+ locks = []
+
+ for _ in range(5):
+ c = get_client_or_skip()
+ c.connect()
+
+ l = ZooLock(c, self.lockpath)
+
+ clients.append(c)
+ locks.append(l)
+
+ threads = [threading.Thread(target=self._thread_lock_acquire,
+ args=(lock,)) for lock in locks]
+
+ for t in threads:
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ #TODO this test needs work
+
+ def _thread_lock_acquire(self, lock):
+ with lock:
+ self.semaphore.release()
+ print "got lock"
+ self.semaphore.acquire()
+
+
View
33 kazoo/retry.py
@@ -0,0 +1,33 @@
+from zookeeper import ConnectionLossException, OperationTimeoutException, \
+ SessionExpiredException, SessionMovedException
+
+
+class ForceRetryError(Exception):
+ """Raised when some recipe logic wants to force a retry
+ """
+
+class KazooRetry(object):
+ """Helper for retrying a method in the face of specific exceptions
+ """
+
+ ALLOWED_EX = (ConnectionLossException, OperationTimeoutException,
+ SessionMovedException, SessionExpiredException, ForceRetryError)
+
+ def __init__(self, max_tries=None):
+ self.max_tries = max_tries
+
+ def run(self, func, *args, **kwargs):
+ self(func, *args, **kwargs)
+
+ def __call__(self, func, *args, **kwargs):
+ tries = 1
+
+ while True:
+ try:
+ return func(*args, **kwargs)
+
+ except self.ALLOWED_EX:
+ if self.max_tries and tries == self.max_tries:
+ raise
+ tries += 1
+
View
19 kazoo/test/__init__.py
@@ -1,2 +1,19 @@
+import os
+import unittest
-
+from kazoo.client import ZooKeeperClient
+
+# if this env variable is set, ZK client integration tests are run
+# against the specified host list
+ENV_AZK_TEST_HOSTS = "AZK_TEST_HOSTS"
+
+def get_hosts_or_skip():
+ if ENV_AZK_TEST_HOSTS in os.environ:
+ return os.environ[ENV_AZK_TEST_HOSTS]
+ raise unittest.SkipTest("Skipping ZooKeeper test. To run, set "+
+ "%s env to a host list. (ex: localhost:2181)" %
+ ENV_AZK_TEST_HOSTS)
+
+def get_client_or_skip(**kwargs):
+ hosts = get_hosts_or_skip()
+ return ZooKeeperClient(hosts, **kwargs)
View
15 kazoo/test/test_client.py
@@ -1,17 +1,13 @@
import unittest
-import os
import uuid
import threading
from kazoo.client import ZooKeeperClient
-
-# if this env variable is set, ZK client integration tests are run
-# against the specified host list
-ENV_AZK_TEST_HOSTS = "AZK_TEST_HOSTS"
+from kazoo.test import get_hosts_or_skip
class ZooKeeperClientTests(unittest.TestCase):
def setUp(self):
- self.hosts = get_zk_hosts_or_skip()
+ self.hosts = get_hosts_or_skip()
self.zk = None
self.created = []
@@ -104,10 +100,5 @@ def test_create_delete(self):
exists = self.zk.exists(nodepath)
self.assertIsNone(exists)
-def get_zk_hosts_or_skip():
- if ENV_AZK_TEST_HOSTS in os.environ:
- return os.environ[ENV_AZK_TEST_HOSTS]
- raise unittest.SkipTest("Skipping ZooKeeper test. To run, set "+
- "%s env to a host list. (ex: localhost:2181)" %
- ENV_AZK_TEST_HOSTS)
+
Please sign in to comment.
Something went wrong with that request. Please try again.