Skip to content

Commit

Permalink
add zookeeper module and state
Browse files Browse the repository at this point in the history
Also use __context__ in zk_conncurrency
  • Loading branch information
gtmanfred committed Apr 20, 2017
1 parent 10764b7 commit 0b70e94
Show file tree
Hide file tree
Showing 6 changed files with 1,020 additions and 64 deletions.
6 changes: 6 additions & 0 deletions doc/ref/modules/all/salt.modules.zookeeper.rst
@@ -0,0 +1,6 @@
======================
salt.modules.zookeeper
======================

.. automodule:: salt.modules.zookeeper
:members:
6 changes: 6 additions & 0 deletions doc/ref/states/all/salt.states.zookeeper.rst
@@ -0,0 +1,6 @@
=====================
salt.states.zookeeper
=====================

.. automodule:: salt.states.zookeeper
:members:
157 changes: 102 additions & 55 deletions salt/modules/zk_concurrency.py
@@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
'''
:depends: kazoo
:configuration: See :py:mod:`salt.modules.zookeeper` for setup instructions.
Concurrency controls in zookeeper
=========================================================================
Expand All @@ -13,7 +16,7 @@
import sys

try:
from kazoo.client import KazooClient
import kazoo.client

from kazoo.retry import (
ForceRetryError
Expand Down Expand Up @@ -90,8 +93,6 @@ def _get_lease(self, data=None):
except ImportError:
HAS_DEPS = False

ZK_CONNECTION = None
SEMAPHORE_MAP = {}

__virtualname__ = 'zk_concurrency'

Expand All @@ -100,33 +101,67 @@ def __virtual__():
if not HAS_DEPS:
return (False, "Module zk_concurrency: dependencies failed")

__context__['semaphore_map'] = {}

return __virtualname__


def _get_zk_conn(hosts):
global ZK_CONNECTION
if ZK_CONNECTION is None:
ZK_CONNECTION = KazooClient(hosts=hosts)
ZK_CONNECTION.start()
def _get_zk_conn(profile=None, **connection_args):
if profile:
prefix = 'zookeeper:' + profile
else:
prefix = 'zookeeper'

def get(key, default=None):
'''
look in connection_args first, then default to config file
'''
return connection_args.get(key) or __salt__['config.get'](':'.join([prefix, key]), default)

hosts = get('hosts', '127.0.0.1:2181')
scheme = get('scheme', None)
username = get('username', None)
password = get('password', None)
default_acl = get('default_acl', None)

return ZK_CONNECTION
if isinstance(hosts, list):
hosts = ','.join(hosts)

if username is not None and password is not None and scheme is None:
scheme = 'digest'

def _close_zk_conn():
global ZK_CONNECTION
if ZK_CONNECTION is None:
return
auth_data = None
if scheme and username and password:
auth_data = [(scheme, ':'.join([username, password]))]

ZK_CONNECTION.stop()
ZK_CONNECTION = None
if default_acl is not None:
if isinstance(default_acl, list):
default_acl = [__salt__['zookeeper.make_digest_acl'](**acl) for acl in default_acl]
else:
default_acl = [__salt__['zookeeper.make_digest_acl'](**default_acl)]

__context__.setdefault('zkconnection', {}).setdefault(profile or hosts,
kazoo.client.KazooClient(hosts=hosts,
default_acl=default_acl,
auth_data=auth_data))

if not __context__['zkconnection'][profile or hosts].connected:
__context__['zkconnection'][profile or hosts].start()

return __context__['zkconnection'][profile or hosts]


def lock_holders(path,
zk_hosts,
zk_hosts=None,
identifier=None,
max_concurrency=1,
timeout=None,
ephemeral_lease=False):
ephemeral_lease=False,
profile=None,
scheme=None,
username=None,
password=None,
default_acl=None):
'''
Return an un-ordered list of lock holders
Expand Down Expand Up @@ -154,24 +189,27 @@ def lock_holders(path,
salt minion zk_concurrency.lock_holders /lock/path host1:1234,host2:1234
'''

zk = _get_zk_conn(zk_hosts)
if path not in SEMAPHORE_MAP:
SEMAPHORE_MAP[path] = _Semaphore(zk,
path,
identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)
return SEMAPHORE_MAP[path].lease_holders()
zk = _get_zk_conn(profile=profile, hosts=zk_hosts, scheme=scheme,
username=username, password=password, default_acl=default_acl)
if path not in __context__['semaphore_map']:
__context__['semaphore_map'][path] = _Semaphore(zk, path, identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)
return __context__['semaphore_map'][path].lease_holders()


def lock(path,
zk_hosts,
zk_hosts=None,
identifier=None,
max_concurrency=1,
timeout=None,
ephemeral_lease=False,
force=False, # foricble get the lock regardless of open slots
profile=None,
scheme=None,
username=None,
password=None,
default_acl=None,
):
'''
Get lock (with optional timeout)
Expand Down Expand Up @@ -203,35 +241,39 @@ def lock(path,
salt minion zk_concurrency.lock /lock/path host1:1234,host2:1234
'''
zk = _get_zk_conn(zk_hosts)
if path not in SEMAPHORE_MAP:
SEMAPHORE_MAP[path] = _Semaphore(zk,
path,
identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)
zk = _get_zk_conn(profile=profile, hosts=zk_hosts, scheme=scheme,
username=username, password=password, default_acl=default_acl)
if path not in __context__['semaphore_map']:
__context__['semaphore_map'][path] = _Semaphore(zk, path, identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)

# forcibly get the lock regardless of max_concurrency
if force:
SEMAPHORE_MAP[path].assured_path = True
SEMAPHORE_MAP[path].max_leases = sys.maxint
__context__['semaphore_map'][path].assured_path = True
__context__['semaphore_map'][path].max_leases = sys.maxint

# block waiting for lock acquisition
if timeout:
logging.info('Acquiring lock {0} with timeout={1}'.format(path, timeout))
SEMAPHORE_MAP[path].acquire(timeout=timeout)
__context__['semaphore_map'][path].acquire(timeout=timeout)
else:
logging.info('Acquiring lock {0} with no timeout'.format(path))
SEMAPHORE_MAP[path].acquire()
__context__['semaphore_map'][path].acquire()

return SEMAPHORE_MAP[path].is_acquired
return __context__['semaphore_map'][path].is_acquired


def unlock(path,
zk_hosts=None, # in case you need to unlock without having run lock (failed execution for example)
identifier=None,
max_concurrency=1,
ephemeral_lease=False
ephemeral_lease=False,
scheme=None,
profile=None,
username=None,
password=None,
default_acl=None
):
'''
Remove lease from semaphore
Expand Down Expand Up @@ -260,29 +302,33 @@ def unlock(path,
salt minion zk_concurrency.unlock /lock/path host1:1234,host2:1234
'''
# if someone passed in zk_hosts, and the path isn't in SEMAPHORE_MAP, lets
# if someone passed in zk_hosts, and the path isn't in __context__['semaphore_map'], lets
# see if we can find it
if zk_hosts is not None and path not in SEMAPHORE_MAP:
zk = _get_zk_conn(zk_hosts)
SEMAPHORE_MAP[path] = _Semaphore(zk,
path,
identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)

if path in SEMAPHORE_MAP:
SEMAPHORE_MAP[path].release()
del SEMAPHORE_MAP[path]
zk = _get_zk_conn(profile=profile, hosts=zk_hosts, scheme=scheme,
username=username, password=password, default_acl=default_acl)
if path not in __context__['semaphore_map']:
__context__['semaphore_map'][path] = _Semaphore(zk, path, identifier,
max_leases=max_concurrency,
ephemeral_lease=ephemeral_lease)

if path in __context__['semaphore_map']:
__context__['semaphore_map'][path].release()
del __context__['semaphore_map'][path]
return True
else:
logging.error('Unable to find lease for path {0}'.format(path))
return False


def party_members(path,
zk_hosts,
zk_hosts=None,
min_nodes=1,
blocking=False
blocking=False,
profile=None,
scheme=None,
username=None,
password=None,
default_acl=None,
):
'''
Get the List of identifiers in a particular party, optionally waiting for the
Expand All @@ -307,7 +353,8 @@ def party_members(path,
salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234
salt minion zk_concurrency.party_members /lock/path host1:1234,host2:1234 min_nodes=3 blocking=True
'''
zk = _get_zk_conn(zk_hosts)
zk = _get_zk_conn(profile=profile, hosts=zk_hosts, scheme=scheme,
username=username, password=password, default_acl=default_acl)
party = kazoo.recipe.party.ShallowParty(zk, path)
if blocking:
barrier = kazoo.recipe.barrier.DoubleBarrier(zk, path, min_nodes)
Expand Down

0 comments on commit 0b70e94

Please sign in to comment.