Permalink
Browse files

Add lock cancelling support

  • Loading branch information...
labisso committed Feb 28, 2012
1 parent 14afabf commit f3b940667ac2cd90e45b116bb5e4607aca3acd8f
Showing with 84 additions and 16 deletions.
  1. +4 −1 kazoo/exceptions.py
  2. +9 −2 kazoo/recipe/leader.py
  3. +16 −2 kazoo/recipe/lock.py
  4. +55 −11 kazoo/recipe/test/test_lock.py
View
@@ -5,4 +5,7 @@
BadVersionException,NoChildrenForEphemeralsException,NodeExistsException,\
InvalidACLException, AuthFailedException, NotEmptyException,\
SessionExpiredException, InvalidCallbackException
-
+
+class CancelledError(Exception):
+ """Raised when a process is cancelled by another thread
+ """
View
@@ -1,4 +1,5 @@
from kazoo.recipe.lock import ZooLock
+from kazoo.exceptions import CancelledError
class LeaderElection(object):
@@ -9,6 +10,12 @@ def run(self, func, *args, **kwargs):
if not callable(func):
raise ValueError("leader function is not callable")
- with self.lock:
- func(*args, **kwargs)
+ try:
+ with self.lock:
+ func(*args, **kwargs)
+ except CancelledError:
+ pass
+
+ def cancel(self):
+ self.lock.cancel()
View
@@ -3,8 +3,8 @@
from kazoo.retry import ForceRetryError
from zookeeper import NoNodeException
+from kazoo.exceptions import CancelledError
-#noinspection PyArgumentList
class ZooLock(object):
_LOCK_NAME = '_lock_'
@@ -34,10 +34,18 @@ def __init__(self, client, path, contender_name=None):
self.assured_path = False
+ self.cancelled = False
+
+ def cancel(self):
+ """Cancel a pending lock acquire
+ """
+ with self.condition:
+ self.cancelled = True
+ self.condition.notify_all()
+
def acquire(self):
"""Acquire the mutex, blocking until it is obtained
"""
-
try:
self.client.retry(self._inner_acquire)
@@ -46,6 +54,7 @@ def acquire(self):
except Exception:
# if we did ultimately fail, attempt to clean up
self._best_effort_cleanup()
+ self.cancelled = False
raise
def _inner_acquire(self):
@@ -69,6 +78,11 @@ def _inner_acquire(self):
self.node = node
while True:
+
+ # bail out with an exception if cancellation has been requested
+ if self.cancelled:
+ raise CancelledError()
+
children = self._get_sorted_children()
try:
@@ -4,6 +4,7 @@
import time
from kazoo.recipe.lock import ZooLock
+from kazoo.exceptions import CancelledError
from kazoo.test import get_client_or_skip, until_timeout
class ZooLockTests(unittest.TestCase):
@@ -15,6 +16,7 @@ def setUp(self):
self.condition = threading.Condition()
self.active_thread = None
+ self.cancelled_threads = []
def tearDown(self):
if self.lockpath:
@@ -116,20 +118,62 @@ def test_lock(self):
thread.join()
+ def test_lock_cancel(self):
+
+ client1 = get_client_or_skip()
+ client1.connect()
+ event1 = threading.Event()
+ lock1 = ZooLock(client1, self.lockpath, "one")
+ thread1 = threading.Thread(target=self._thread_lock_acquire_til_event,
+ args=("one", lock1, event1))
+ thread1.start()
+
+ # wait for this thread to acquire the lock
+ with self.condition:
+ if not self.active_thread:
+ self.condition.wait(5)
+ self.assertEqual(self.active_thread, "one")
+
+ client2 = get_client_or_skip()
+ client2.connect()
+ event2 = threading.Event()
+ lock2 = ZooLock(client2, self.lockpath, "two")
+ thread2 = threading.Thread(target=self._thread_lock_acquire_til_event,
+ args=("two", lock2, event2))
+ thread2.start()
+
+ # this one should block in acquire. check that it is a contender
+ self.assertEqual(lock2.get_contenders(), ["one", "two"])
+
+ lock2.cancel()
+ with self.condition:
+ if not "two" in self.cancelled_threads:
+ self.condition.wait()
+ self.assertIn("two", self.cancelled_threads)
+
+ self.assertEqual(lock2.get_contenders(), ["one"])
+
+ thread2.join()
+ event1.set()
+ thread1.join()
+
def _thread_lock_acquire_til_event(self, name, lock, event):
- with lock:
- #print "%s enter lock" % name
- with self.condition:
- self.assertIsNone(self.active_thread)
- self.active_thread = name
- self.condition.notify_all()
+ try:
+ with lock:
+ with self.condition:
+ self.assertIsNone(self.active_thread)
+ self.active_thread = name
+ self.condition.notify_all()
+
+ event.wait()
- event.wait()
+ with self.condition:
+ self.assertEqual(self.active_thread, name)
+ self.active_thread = None
+ self.condition.notify_all()
+ except CancelledError:
with self.condition:
- self.assertEqual(self.active_thread, name)
- self.active_thread = None
+ self.cancelled_threads.append(name)
self.condition.notify_all()
- #print "%s exit lock" % name
-

0 comments on commit f3b9406

Please sign in to comment.