Permalink
Browse files

Partial rewrite to use Kazoo instead of zc.zk.

  • Loading branch information...
Vincent Bernat
Vincent Bernat committed Dec 20, 2012
1 parent 95bbad4 commit 5149e8f1b3853d00f8a38a85e3ea6fc5df46bb85
Showing with 79 additions and 54 deletions.
  1. +5 −8 bin/zkfarmer
  2. +2 −2 example/export.py
  3. +1 −2 requirements.txt
  4. +62 −32 zkfarmer/watcher.py
  5. +9 −10 zkfarmer/zkfarmer.py
View
@@ -14,12 +14,11 @@ from zkfarmer.conf import Conf
from zkfarmer.utils import create_filter, dict_filter, ColorizingStreamHandler
from zkfarmer import ZkFarmer
import zookeeper
import logging
from kazoo.client import KazooClient
import logging
def main():
import zc.zk
import argparse
from signal import signal, SIGTERM, SIGINT
@@ -131,13 +130,10 @@ def main():
if args.verbose:
level = logging.DEBUG
zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
elif args.quiet:
level = logging.WARN
zookeeper.set_debug_level(zookeeper.LOG_LEVEL_WARN)
else:
level = logging.WARN
zookeeper.set_debug_level(zookeeper.LOG_LEVEL_WARN)
# Setup a nice logging output
logger = logging.getLogger()
@@ -160,10 +156,11 @@ def main():
parser.error(e)
exit(1)
zkconn = zc.zk.ZooKeeper(args.host, wait=True)
zkconn = KazooClient(args.host, max_retries=-1)
zkconn.start()
def sighandler(sig, frame):
zkconn.close()
zkconn.stop()
exit()
signal(SIGTERM, sighandler)
View
@@ -2,14 +2,14 @@
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/..')
import zc.zk
from zkfarmer.conf import ConfBase
from zkfarmer import ZkFarmer
from kazoo.client import KazooClient
class Farm(ConfBase):
def write(self, nodes):
print nodes
zkconn = zc.zk.ZooKeeper('localhost:2181')
zkconn = KazooClient('localhost:2181')
farmer = ZkFarmer(zkconn)
farmer.export('/services/test', Farm())
View
@@ -1,5 +1,4 @@
argparse>=1.2.1
watchdog>=0.5.4
zc.zk>=0.7.0
zkpython>=0.4
PyYAML>=3.10
kazoo>=0.8
View
@@ -7,19 +7,29 @@
import logging
import threading
import time
from socket import socket, gethostname, AF_INET, SOCK_DGRAM
from zookeeper import EPHEMERAL, NoNodeException, ConnectionLossException
import zc.zk
from watchdog.observers import Observer
from .utils import serialize, unserialize
from kazoo.exceptions import NoNodeError, NodeExistsError, ZookeeperError
from kazoo.client import KazooState, OPEN_ACL_UNSAFE
class ZkFarmWatcher(object):
def __init__(self):
def __init__(self, zkconn):
self.cv = threading.Condition()
self.handled = False
zkconn.add_listener(self.zkchange)
def zkchange(self, state):
if state == KazooState.CONNECTED:
logging.info("Handle reconnection to ZooKeeper")
try:
self.notify()
except RuntimeError:
# Not blocked
pass
def wait(self):
while True:
@@ -35,32 +45,32 @@ def notify(self):
class ZkFarmExporter(ZkFarmWatcher):
def __init__(self, zkconn, root_node_path, conf, updated_handler=None, filter_handler=None):
super(ZkFarmExporter, self).__init__()
super(ZkFarmExporter, self).__init__(zkconn)
self.watched_paths = {}
while True:
with self.cv:
try:
node_names = zkconn.get_children(root_node_path, self.get_watcher(root_node_path))
except NoNodeException:
zkconn.create_recursive(root_node_path, '', zc.zk.OPEN_ACL_UNSAFE)
continue
except ConnectionLossException:
self.wait()
node_names = zkconn.retry(zkconn.get_children, root_node_path,
watch=self.get_watcher(root_node_path))
except NoNodeError:
zkconn.retry(zkconn.ensure_path, root_node_path, acl=OPEN_ACL_UNSAFE)
continue
new_conf = {}
for name in node_names:
subnode_path = '%s/%s' % (root_node_path, name)
info = unserialize(zkconn.get(subnode_path, self.get_watcher(subnode_path))[0])
info = unserialize(zkconn.retry(zkconn.get, subnode_path,
watch=self.get_watcher(subnode_path))[0])
if not filter_handler or filter_handler(info):
new_conf[name] = info
conf.write(new_conf)
if updated_handler:
updated_handler()
self.wait()
def watcher(self, handle, type, state, path):
def watcher(self, watched):
with self.cv:
path = watched.path
if path in self.watched_paths:
del self.watched_paths[path]
self.notify()
@@ -73,7 +83,7 @@ def get_watcher(self, path):
class ZkFarmJoiner(ZkFarmWatcher):
def __init__(self, zkconn, root_node_path, conf):
super(ZkFarmJoiner, self).__init__()
super(ZkFarmJoiner, self).__init__(zkconn)
self.update_remote_timer = None
self.update_local_timer = None
@@ -86,38 +96,58 @@ def __init__(self, zkconn, root_node_path, conf):
info['hostname'] = gethostname()
conf.write(info)
zkconn.create(self.node_path, serialize(conf.read()), zc.zk.OPEN_ACL_UNSAFE, EPHEMERAL)
zkconn.retry(zkconn.create,
self.node_path, serialize(conf.read()),
acl=OPEN_ACL_UNSAFE, ephemeral=True)
observer = Observer()
observer.schedule(self, path=conf.file_path, recursive=True)
observer.start()
zkconn.get(self.node_path, self.node_watcher)
zkconn.retry(zkconn.get, self.node_path, self.node_watcher)
while True:
with self.cv:
self.wait()
try:
zkconn.retry(zkconn.create,
self.node_path, serialize(conf.read()),
acl=OPEN_ACL_UNSAFE, ephemeral=True)
zkconn.retry(zkconn.get, self.node_path, self.node_watcher)
logging.info("register again %s" % self.node_path)
self.notify()
except NodeExistsError:
pass
def dispatch(self, event):
with self.cv:
try:
current_conf = unserialize(self.zkconn.get(self.node_path)[0])
new_conf = self.conf.read()
if current_conf != new_conf:
logging.info('Local conf changed')
self.zkconn.set(self.node_path, serialize(new_conf))
except ConnectionLossException:
pass
self.notify()
while True:
with self.cv:
try:
current_conf = unserialize(self.zkconn.retry(self.zkconn.get, self.node_path)[0])
new_conf = self.conf.read()
if current_conf != new_conf:
logging.info('Local conf changed')
self.zkconn.retry(self.zkconn.set, self.node_path, serialize(new_conf))
except ZookeeperError, e:
logging.exception("A Zookeeper error happended when dispatching local changes")
time.sleep(5)
continue
self.notify()
break
def node_watcher(self, handle, type, state, path):
def node_watcher(self, watched):
with self.cv:
path = watched.path
current_conf = self.conf.read()
new_conf = unserialize(self.zkconn.get(self.node_path, self.node_watcher)[0])
if current_conf != new_conf:
logging.info('Remote conf changed')
self.conf.write(new_conf)
self.notify()
try:
new_conf = unserialize(self.zkconn.retry(self.zkconn.get, self.node_path,
self.node_watcher)[0])
if current_conf != new_conf:
logging.info('Remote conf changed')
self.conf.write(new_conf)
self.notify()
except NoNodeError:
logging.warn("not able to watch for node %s: not exist anymore" % self.node_path)
def myip(self):
# Try to find default IP
View
@@ -5,12 +5,11 @@
# For the full copyright and license information, please view the LICENSE
# file that was distributed with this source code.
from zookeeper import BadVersionException, NoNodeException
import zc.zk
from .utils import serialize, unserialize, dict_set_path, dict_filter, create_filter
from .watcher import ZkFarmJoiner, ZkFarmExporter
from kazoo.client import OPEN_ACL_UNSAFE
from kazoo.exceptions import NoNodeError, BadVersionError
class ZkFarmer(object):
STATUS_OK = 0
@@ -23,7 +22,7 @@ def __init__(self, zkconn):
def join(self, zknode, conf):
# Create farms ZkNode if doesn't already exists
self.zkconn.create_recursive(zknode, '', zc.zk.OPEN_ACL_UNSAFE)
self.zkconn.retry(self.zkconn.ensure_path, zknode, acl=OPEN_ACL_UNSAFE)
# If we are going to enlarged the farm max seen size, store it
current_size = len(self.list(zknode)) + 1
if current_size > self.get(zknode, 'size'):
@@ -36,14 +35,14 @@ def export(self, zknode, conf, updated_handler=None, filters=None):
def list(self, zknode):
try:
return self.zkconn.get_children(zknode)
except NoNodeException:
return self.zkconn.retry(self.zkconn.get_children, zknode)
except NoNodeError:
return []
def get(self, zknode, field_or_fields=None):
try:
data = self.zkconn.get(zknode)[0]
except NoNodeException:
data = self.zkconn.retry(self.zkconn.get, zknode)[0]
except NoNodeError:
return {'size': 0}
return dict_filter(unserialize(data), field_or_fields)
@@ -54,9 +53,9 @@ def set(self, zknode, field, value):
info = unserialize(data[0])
dict_set_path(info, field, value)
try:
self.zkconn.set(zknode, serialize(info), data[1]['version'])
self.zkconn.retry(self.zkconn.set, zknode, serialize(info), data[1]['version'])
break
except BadVersionException:
except BadVersionError:
# remove value changed since I get it, retry with fresh value
retry = retry - 1
pass

0 comments on commit 5149e8f

Please sign in to comment.