Permalink
Browse files

joiner: add ability to execute a command on remote change

This is the same ability offered by the exporter. Since the joiner is
both an importer and an exporter, it makes sense for the joiner to also
be able to execute commands on configuration change.
  • Loading branch information...
vincentbernat committed Aug 2, 2014
1 parent 9138713 commit b3e67b53ec23b6214015d1f655446c71467df73a
Showing with 53 additions and 3 deletions.
  1. +2 −0 README.md
  2. +6 −1 bin/zkfarmer
  3. +32 −0 tests/test_joiner.py
  4. +10 −0 zkfarmer/watcher.py
  5. +3 −2 zkfarmer/zkfarmer.py
View
@@ -62,6 +62,8 @@ Usage for the `zkfarmer join` command:
-h, --help show this help message and exit
-f {json,yaml,php,dir}, --format {json,yaml,php,dir}
set the configuration format
--changed-cmd CMD a command to be executed each time the configuration
change
-c, --common use a common zookeeper node instead of a dedicated node
Syncing Farm Configuration
View
@@ -49,6 +49,8 @@ def main():
subparser.add_argument('conf', help='Path to the node configuration')
subparser.add_argument('-f', '--format', dest='format', choices=['json', 'yaml', 'php', 'dir'],
help='set the configuration format')
subparser.add_argument('--changed-cmd', dest='changed_cmd', metavar='CMD',
help='a command to be executed each time the configuration change')
subparser.add_argument('-c', '--common', dest='common', action='store_true',
help='use a common zookeeper node instead of a dedicated node')
@@ -201,7 +203,10 @@ def main():
farmer.export(args.zknode, conf, updated_handler, args.filters)
elif args.command == 'join':
farmer.join(args.zknode, conf, args.common)
def updated_handler():
if args.changed_cmd:
os.system(args.changed_cmd)
farmer.join(args.zknode, conf, args.common, updated_handler)
elif args.command == 'import':
farmer.importer(args.zknode, conf, args.common)
View
@@ -273,6 +273,21 @@ def test_zookeeper_modification(self):
self.conf.write.assert_called_once_with({"enabled": "0",
"hostname": self.NAME})
def test_updated_handler_called(self):
"""Test the appropriate handler is called on modification"""
self.conf.read.return_value = {"enabled": "1",
"hostname": self.NAME}
handler = Mock()
z = ZkFarmJoiner(self.client, "/services/db", self.conf,
updated_handler=handler)
z.loop(3, timeout=self.TIMEOUT)
handler.reset_mock()
self.client.set("/services/db/%s" % self.IP,
json.dumps({"enabled": "0",
"hostname": self.NAME}))
z.loop(2, timeout=self.TIMEOUT)
handler.assert_called_once_with()
def test_no_write_when_no_modification(self):
"""Check we don't write modification if not needed"""
self.conf.read.return_value = {"enabled": "1",
@@ -288,6 +303,23 @@ def test_no_write_when_no_modification(self):
z.loop(2, timeout=self.TIMEOUT)
self.assertFalse(self.conf.write.called)
def test_no_update_handler_when_no_modification(self):
"""Check we don't call handler if not needed"""
self.conf.read.return_value = {"enabled": "1",
"hostname": self.NAME}
handler = Mock()
z = ZkFarmJoiner(self.client, "/services/db", self.conf,
updated_handler=handler)
z.loop(3, timeout=self.TIMEOUT)
handler.reset_mock()
self.conf.read.return_value = {"enabled": "0",
"hostname": self.NAME}
self.client.set("/services/db/%s" % self.IP,
json.dumps({"enabled": "0",
"hostname": self.NAME}))
z.loop(2, timeout=self.TIMEOUT)
self.assertFalse(handler.called)
def test_disconnect_and_remote_modification(self):
"""Test we handle disconnect and remote modification after reconnect"""
z = self.test_disconnect()
View
@@ -289,6 +289,12 @@ def dispatch(self, event):
class ZkFarmJoiner(ZkFarmImporter):
def __init__(self, zkconn, root_node_path, conf, common=False,
updated_handler=None):
self.updated_handler = updated_handler
super(ZkFarmJoiner, self).__init__(zkconn, root_node_path,
conf, common)
def watch_node(self, what):
self.event("znode modified")
@@ -299,6 +305,8 @@ def exec_initial_setup(self):
if not self.common:
info['hostname'] = gethostname()
self.conf.write(info)
if self.updated_handler:
self.updated_handler()
super(ZkFarmJoiner, self).exec_initial_setup()
@@ -331,5 +339,7 @@ def exec_znode_modified_from_idle(self):
logger.debug('Previous conf: %r' % current_conf)
logger.debug('New conf: %r' % new_conf)
self.conf.write(new_conf)
if self.updated_handler:
self.updated_handler()
except NoNodeError:
logger.warn("not able to watch for node %s: not exist anymore" % self.node_path)
View
@@ -20,7 +20,7 @@ class ZkFarmer(object):
def __init__(self, zkconn):
self.zkconn = zkconn
def join(self, zknode, conf, common=False):
def join(self, zknode, conf, common=False, updated_handler=None):
# Create farms ZkNode if doesn't already exists
self.zkconn.retry(self.zkconn.ensure_path, zknode, acl=OPEN_ACL_UNSAFE)
# If we are going to enlarged the farm max seen size, store it
@@ -29,7 +29,8 @@ def join(self, zknode, conf, common=False):
if current_size > self.get(zknode, 'size'):
self.set(zknode, 'size', current_size)
# Join the farm
ZkFarmJoiner(self.zkconn, zknode, conf, common).loop(ignore_unknown_transitions=True)
ZkFarmJoiner(self.zkconn, zknode, conf, common,
updated_handler).loop(ignore_unknown_transitions=True)
def importer(self, zknode, conf, common=False):
ZkFarmImporter(self.zkconn, zknode, conf, common).loop(ignore_unknown_transitions=True)

0 comments on commit b3e67b5

Please sign in to comment.