diff --git a/kazoo/recipe/lock.py b/kazoo/recipe/lock.py index c4ecb6cf..c34a8bbf 100644 --- a/kazoo/recipe/lock.py +++ b/kazoo/recipe/lock.py @@ -16,6 +16,10 @@ """ import sys +try: + from time import monotonic as now +except ImportError: + from time import time as now import uuid import six @@ -32,6 +36,22 @@ from kazoo.protocol.states import KazooState +class _Watch(object): + def __init__(self, duration=None): + self.duration = duration + self.started_at = None + + def start(self): + self.started_at = now() + + def leftover(self): + if self.duration is None: + return None + else: + elapsed = now() - self.started_at + return max(0, self.duration - elapsed) + + class Lock(object): """Kazoo Lock @@ -448,7 +468,13 @@ def _inner_acquire(self, blocking, timeout=None): if self.client.exists(self.create_path): return True - with self.client.Lock(self.lock_path, self.data): + w = _Watch(duration=timeout) + w.start() + lock = self.client.Lock(self.lock_path, self.data) + gotten = lock.acquire(blocking=blocking, timeout=w.leftover()) + if not gotten: + return False + try: while True: self.wake_event.clear() @@ -459,16 +485,15 @@ def _inner_acquire(self, blocking, timeout=None): if blocking: # If blocking, wait until self._watch_lease_change() is # called before returning - self.wake_event.wait(timeout) + self.wake_event.wait(w.leftover()) if not self.wake_event.isSet(): raise LockTimeout( "Failed to acquire semaphore on %s " "after %s seconds" % (self.path, timeout)) else: - # If not blocking, register another watch that will trigger - # self._get_lease() as soon as the children change again. - self.client.get_children(self.path, self._get_lease) return False + finally: + lock.release() def _watch_lease_change(self, event): self.wake_event.set()