Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Docstrings, better ZK connection handling

  • Loading branch information...
commit 6c9424723145449b6dbab32ceea482c98f37aad5 1 parent e9f5e7c
@labisso labisso authored
View
129 azookeeper/client.py
@@ -6,6 +6,8 @@
from azookeeper.sync import get_sync_strategy
+ZK_OPEN_ACL_UNSAFE = {"perms": zookeeper.PERM_ALL, "scheme": "world",
+ "id": "anyone"}
class ZookeeperClient(object):
"""A gevent-friendly wrapper of the Apache Zookeeper zkpython client
@@ -16,7 +18,7 @@ class ZookeeperClient(object):
* tests
* the rest of the operations
"""
- def __init__(self, hosts, timeout):
+ def __init__(self, hosts, timeout=10000):
self._hosts = hosts
self._timeout = timeout
@@ -24,6 +26,7 @@ def __init__(self, hosts, timeout):
self._connected = False
self._connected_async_result = self._sync.async_result()
+ self._connection_timed_out = False
@property
def connected(self):
@@ -32,20 +35,45 @@ def connected(self):
def _wrap_callback(self, func):
return partial(self._sync.dispatch_callback, func)
- def _session_watcher(self, handle, type, state, path):
- #TODO fo real
- self._connected = True
+ def _session_callback(self, handle, type, state, path):
+ if state == zookeeper.CONNECTED_STATE:
+ self._connected = True
+ elif state == zookeeper.CONNECTING_STATE:
+ self._connected = False
+
if not self._connected_async_result.ready():
- self._connected_async_result.set()
+ #close the connection if we already timed out
+ if self._connection_timed_out and self._connected:
+ self.close()
+ else:
+ self._connected_async_result.set()
- def connect(self):
- #TODO connect timeout? async version?
+ def connect_async(self):
+ """Asynchronously initiate connection to ZK
- cb = self._wrap_callback(self._session_watcher)
+ @return: AsyncResult object set on connection success
+ @rtype AsyncResult
+ """
+
+ cb = self._wrap_callback(self._session_callback)
self._handle = zookeeper.init(self._hosts, cb, self._timeout)
- self._connected_async_result.wait()
+ return self._connected_async_result
+
+ def connect(self, timeout=None):
+ """Initiate connection to ZK
+
+ @param timeout: time in seconds to wait for connection to succeed
+ """
+ async_result = self.connect_async()
+ try:
+ async_result.wait(timeout=timeout)
+ except self._sync.timeout_error:
+ self._connection_timed_out = True
+ raise
def close(self):
+ """Disconnect from ZooKeeper
+ """
if self._connected:
code = zookeeper.close(self._handle)
self._handle = None
@@ -54,6 +82,13 @@ def close(self):
raise err_to_exception(code)
def add_auth_async(self, scheme, credential):
+ """Asynchronously send credentials to server
+
+ @param scheme: authentication scheme (default supported: "digest")
+ @param credential: the credential -- value depends on scheme
+ @return: AsyncResult object set on completion
+ @rtype AsyncResult
+ """
async_result = self._sync.async_result()
callback = partial(_generic_callback, async_result)
@@ -61,9 +96,25 @@ def add_auth_async(self, scheme, credential):
return async_result
def add_auth(self, scheme, credential):
+ """Send credentials to server
+
+ @param scheme: authentication scheme (default supported: "digest")
+ @param credential: the credential -- value depends on scheme
+ """
return self.add_auth_async(scheme, credential).get()
- def create_async(self, path, value, acl, ephemeral=False, sequence=False):
+ def create_async(self, path, value, acl=(ZK_OPEN_ACL_UNSAFE,),
+ ephemeral=False, sequence=False):
+ """Asynchronously create a ZNode
+
+ @param path: path of node
+ @param value: initial value of node
+ @param acl: permissions for node
+ @param ephemeral: boolean indicating whether node is ephemeral (tied to this session)
+ @param sequence: boolean indicating whether path is suffixed with a unique index
+ @return: AsyncResult object set on completion with the real path of the new node
+ @rtype AsyncResult
+ """
flags = 0
if ephemeral:
flags |= zookeeper.EPHEMERAL
@@ -77,10 +128,26 @@ def create_async(self, path, value, acl, ephemeral=False, sequence=False):
return async_result
def create(self, path, value, acl, ephemeral=False, sequence=False):
+ """Create a ZNode
+
+ @param path: path of node
+ @param value: initial value of node
+ @param acl: permissions for node
+ @param ephemeral: boolean indicating whether node is ephemeral (tied to this session)
+ @param sequence: boolean indicating whether path is suffixed with a unique index
+ @return: real path of the new node
+ """
return self.create_async(path, value, acl, ephemeral, sequence).get()
def get_async(self, path, watcher=None):
+ """Asynchronously get the value of a node
+
+ @param path: path of node
+ @param watcher: optional watch callback to set for future changes to this path
+ @return AsyncResult set with tuple (value, stat) of node on success
+ @rtype AsyncResult
+ """
async_result = self._sync.async_result()
callback = partial(_generic_callback, async_result)
watcher_callback = self._wrap_callback(watcher)
@@ -89,9 +156,22 @@ def get_async(self, path, watcher=None):
return async_result
def get(self, path, watcher=None):
+ """Get the value of a node
+
+ @param path: path of node
+ @param watcher: optional watch callback to set for future changes to this path
+ @return tuple (value, stat) of node
+ """
return self.get_async(path, watcher).get()
def get_children_async(self, path, watcher=None):
+ """Asynchronously get a list of child nodes of a path
+
+ @param path: path of node to list
+ @param watcher: optional watch callback to set for future changes to this path
+ @return: AsyncResult set with list of child node names on success
+ @rtype: AsyncResult
+ """
async_result = self._sync.async_result()
callback = partial(_generic_callback, async_result)
watcher_callback = self._wrap_callback(watcher)
@@ -100,9 +180,27 @@ def get_children_async(self, path, watcher=None):
return async_result
def get_children(self, path, watcher=None):
+ """Get a list of child nodes of a path
+
+ @param path: path of node to list
+ @param watcher: optional watch callback to set for future changes to this path
+ @return: list of child node names
+ """
return self.get_children_async(path, watcher).get()
def set_async(self, path, data, version=-1):
+ """Set the value of a node
+
+ If the version of the node being updated is newer than the supplied
+ version (and the supplied version is not -1), a BadVersionException
+ will be raised.
+
+ @param path: path of node to set
+ @param data: new data value
+ @param version: version of node being updated, or -1
+ @return: AsyncResult set with new node stat on success
+ @rtype AsyncResult
+ """
async_result = self._sync.async_result()
callback = partial(_generic_callback, async_result)
@@ -110,6 +208,17 @@ def set_async(self, path, data, version=-1):
return async_result
def set(self, path, data, version=-1):
+ """Set the value of a node
+
+ If the version of the node being updated is newer than the supplied
+ version (and the supplied version is not -1), a BadVersionException
+ will be raised.
+
+ @param path: path of node to set
+ @param data: new data value
+ @param version: version of node being updated, or -1
+ @return: updated node stat
+ """
return self.set_async(path, data, version).get()
View
2  azookeeper/sync/sync_gevent.py
@@ -3,6 +3,7 @@
import gevent
import gevent.event
+from gevent.timeout import Timeout
# get the unpatched thread module
import azookeeper.sync.util
@@ -54,6 +55,7 @@ def set(self):
class GeventSyncStrategy(object):
name = "gevent"
+ timeout_error = Timeout
def __init__(self):
self._pipe_read, self._pipe_write = _pipe()
View
1  azookeeper/sync/sync_threading.py
@@ -79,6 +79,7 @@ def get_nowait(self):
class ThreadingSyncStrategy(object):
name = "threading"
+ timeout_error = TimeoutError
def async_result(self):
return _AsyncResult()
Please sign in to comment.
Something went wrong with that request. Please try again.