Skip to content

Commit

Permalink
naming cleanup, more doc strings
Browse files Browse the repository at this point in the history
  • Loading branch information
bbangert committed May 9, 2012
1 parent 85f6d86 commit 2051bb5
Showing 1 changed file with 79 additions and 24 deletions.
103 changes: 79 additions & 24 deletions zktools/locking.py
Expand Up @@ -73,14 +73,46 @@ class ZkAsyncLock(object):
This Lock can be established asynchronously in the background.
Example non-blocking use::
lock = ZkAsyncLock(zk, '/Mylocks/resourceB')
lock.acquire()
# Do some stuff that doesn't care if the lock is
# established yet, then wait for the lock to acquire
lock.wait_for_acquire()
# Do stuff with lock
lock.release()
# Wait for the lock to be released (release is also async)
lock.wait_for_release()
Example blocking use::
lock = ZkAsyncLock(zk, '/Mylocks/resourceB')
with lock:
# Won't execute until the lock is acquired
do_stuff()
# lock is released
do_more_stuff()
"""
def __init__(self, connection, lock_path):
"""Create an Asynchronous Zookeeper Lock
:param connection: Zookeeper connection object
:type connection: zc.zk Zookeeper instance
:param lock_path: Path to the lock node that should be used
:type lock_root: string
"""
self._zk = connection
self._lock_path = lock_path
self._lock_event = threading.Event()
self._acquired = False
self._candidate_path = None
self._wait_timeout = None
self._acquire_func = self._release_func = None
self.errors = []

try:
Expand All @@ -90,42 +122,59 @@ def __init__(self, connection, lock_path):
pass

def __enter__(self):
"""Context manager blocking interface"""
self.acquire()
self._lock_event.wait(self._wait_timeout)
self._lock_event.wait()

def __exit__(self, exc_type, exc_value, traceback):
"""Context manager blocking interface"""
self.release()
self._lock_event.wait()
self._wait_timeout = None

def wait_for(self, timeout=None):
self._wait_timeout = timeout
return self

def wait_for_acquire(self, timeout=None):
"""Waits for lock acquisition
:param timeout: How long to wait for the lock, defaults to waiting
forever
"""
if not self._node_prefix:
raise Exception("Lock acquisition is not in process")
self._lock_event.wait(timeout)
return self.acquired

def wait_for_release(self, timeout=None):
"""Waits for lock release
:param timeout: How long to wait for the lock to release, defaults to
waiting forever
"""
self._lock_event.wait(timeout)
return not self.acquired

@property
def acquired(self):
return self._acquired

def acquire(self):
def acquire(self, func=None):
if self.acquired:
raise Exception("Lock already acquired")

self._lock_event.clear()
self._acquire_func = func
self._node_prefix = uuid.uuid4().hex
self._create_candidate()
return False

def release(self):
def release(self, func=None):
if not self.acquired:
raise Exception("Lock not acquired")
self._release_func = func
self._lock_event.clear()
self._delete_candidate()
return False

def _delete_candidate(self):
self._zk.adelete(self._candidate_path, -1, self._delete_callback)
Expand All @@ -134,8 +183,11 @@ def _delete_candidate(self):
def _delete_callback(self, p, return_code):
if return_code in (zookeeper.OK, zookeeper.NONODE):
self._candidate_path = self._node_prefix = None
self._acquire_func = None
self._acquired = False
self._lock_event.set()
if self._release_func:
self._release_func()
elif retryable(return_code):
time.sleep(0.1)
return self._delete_candidate()
Expand All @@ -146,22 +198,26 @@ def _create_candidate(self):
self._zk.create(self._lock_path + "/%s-lock-" % self._node_prefix,
"0", [ZOO_OPEN_ACL_UNSAFE],
zookeeper.EPHEMERAL | zookeeper.SEQUENCE,
self._candidate_creation)
self._candidate_creation_callback)

def _acquire(self):
self._zk.aget_children(self._lock_path, None,
self._check_candidate_nodes_callback)

def _candidate_creation(self, p, return_code, value):
def _candidate_creation_callback(self, p, return_code, value):
"""Callback for after the node creation runs"""
if return_code == zookeeper.OK:
self._candidate_path = value
return self._acquire()
elif retryable(return_code):
self._zk.aget_children(self._lock_path, None,
self._check_children_for_prefix)
self._check_children_for_prefix_callback)
else:
self.errors.append((return_code, 'Candidate creation'))
self._lock_event.set()

@threaded
def _check_children_for_prefix(self, p, return_code, children):
def _check_children_for_prefix_callback(self, p, return_code, children):
"""Checks to see during candidate creation errors if the node
was actually created"""
if return_code == zookeeper.OK:
Expand All @@ -174,17 +230,13 @@ def _check_children_for_prefix(self, p, return_code, children):
elif retryable(return_code): # Small sleep to avoid CPU hit
time.sleep(0.1)
self._zk.aget_children(self._lock_path, None,
self._check_children_for_prefix)
self._check_children_for_prefix_callback)
else:
self.errors.append((return_code, 'Check children for prefix'))
self._lock_event.set()

def _acquire(self):
self._zk.aget_children(self._lock_path, None,
self._check_candidate_nodes)

@threaded
def _check_candidate_nodes(self, p, return_code, children):
def _check_candidate_nodes_callback(self, p, return_code, children):
if retryable(return_code): # Small sleep to avoid CPU hit
time.sleep(0.1)
return self._acquire()
Expand All @@ -203,20 +255,23 @@ def _check_candidate_nodes(self, p, return_code, children):

if index == 0: # We're first, lock acquired
self._acquired = True
return self._lock_event.set()
self._lock_event.set()
if self._acquire_func:
self._acquire_func(self)
return

# We're not first, watch the next in line
prior_node = '/'.join([self._lock_path, children[index - 1]])
self._zk.aget(prior_node, self._prior_node_watch,
self._prior_node_exists)
self._zk.aget(prior_node, self._prior_node_watcher,
self._prior_node_get_callback)

def _prior_node_exists(self, p, return_code, value, stat):
def _prior_node_get_callback(self, p, return_code, value, stat):
if return_code == zookeeper.NONODE:
# No node? Check candidates again
return self._acquire()
self._acquire()
# Node still exists, wait for the watcher and ignore here

def _prior_node_watch(self, handle, type, state, path):
def _prior_node_watcher(self, handle, type, state, path):
if type != zookeeper.SESSION_EVENT:
# Retrigger our children check
self._acquire()
Expand Down

0 comments on commit 2051bb5

Please sign in to comment.