Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions kazoo/recipe/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
"""

import sys
try:
from time import monotonic as now
except ImportError:
from time import time as now
import uuid

import six
Expand All @@ -32,6 +36,22 @@
from kazoo.protocol.states import KazooState


class _Watch(object):
def __init__(self, duration=None):
self.duration = duration
self.started_at = None

def start(self):
self.started_at = now()

def leftover(self):
if self.duration is None:
return None
else:
elapsed = now() - self.started_at
return max(0, self.duration - elapsed)


class Lock(object):
"""Kazoo Lock

Expand Down Expand Up @@ -448,7 +468,13 @@ def _inner_acquire(self, blocking, timeout=None):
if self.client.exists(self.create_path):
return True

with self.client.Lock(self.lock_path, self.data):
w = _Watch(duration=timeout)
w.start()
lock = self.client.Lock(self.lock_path, self.data)
gotten = lock.acquire(blocking=blocking, timeout=w.leftover())
if not gotten:
return False
try:
while True:
self.wake_event.clear()

Expand All @@ -459,16 +485,15 @@ def _inner_acquire(self, blocking, timeout=None):
if blocking:
# If blocking, wait until self._watch_lease_change() is
# called before returning
self.wake_event.wait(timeout)
self.wake_event.wait(w.leftover())
if not self.wake_event.isSet():
raise LockTimeout(
"Failed to acquire semaphore on %s "
"after %s seconds" % (self.path, timeout))
else:
# If not blocking, register another watch that will trigger
# self._get_lease() as soon as the children change again.
self.client.get_children(self.path, self._get_lease)
return False
finally:
lock.release()

def _watch_lease_change(self, event):
self.wake_event.set()
Expand Down