From 824753a591d5317c4c1e4d4f8e8c6754b563be50 Mon Sep 17 00:00:00 2001 From: Brady Catherman Date: Sat, 17 Jul 2010 08:29:54 +0000 Subject: [PATCH] Initial commit of Twitcher for the public repository. Twitcher is a process for running scripts when znodes change in ZooKeeper. It was initially written by Brady Catherman for use inside of Twitter and is being open sourced with Twitter's permission under the Apache 2.0 license. For more information please read the README in the top level directory. --- LICENSE | 202 +++++++++++++++++ README | 186 ++++++++++++++++ bin/twitcher | 155 +++++++++++++ config/example.twc | 10 + init-twitcher | 90 ++++++++ makerpm.sh | 23 ++ twitcher.spec | 60 ++++++ twitcher/__init__.py | 0 twitcher/config.py | 191 +++++++++++++++++ twitcher/core.py | 489 ++++++++++++++++++++++++++++++++++++++++++ twitcher/inotify.py | 181 ++++++++++++++++ twitcher/twitcher.py | 93 ++++++++ twitcher/zkwrapper.py | 215 +++++++++++++++++++ 13 files changed, 1895 insertions(+) create mode 100644 LICENSE create mode 100644 README create mode 100755 bin/twitcher create mode 100644 config/example.twc create mode 100755 init-twitcher create mode 100755 makerpm.sh create mode 100644 twitcher.spec create mode 100644 twitcher/__init__.py create mode 100644 twitcher/config.py create mode 100644 twitcher/core.py create mode 100644 twitcher/inotify.py create mode 100644 twitcher/twitcher.py create mode 100644 twitcher/zkwrapper.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README b/README new file mode 100644 index 0000000..11fe483 --- /dev/null +++ b/README @@ -0,0 +1,186 @@ +================ +=== Twitcher === +================ + +1. License +2. Overview +3. Installation +4. Configuration Language +4.1 Examples +4.2 Things to Avoid +5. Known Issues +6. Future Features +7. Change Log + +1. License +========== + +This software is licensed under the Apache 2.0 license. Please see LICENSE +in the top level of this git repository for more information. + +2. Overview +=========== + +Twitcher is a minimal tool used to run scripts when a znode in ZooKeeper +changes. The intent is to allow various system tasks to take place on a +watched model rather than via polling. + +The best description of Twitcher is to highlight an example that it solves +very well. Lets assume we are facing a problem where we need a single git +repository to be checked out and up to date on 100 machines in a cluster. The +typical solution to this is to add a cron job that performs a git pull every +minute. This can be problematic as your git server has to endure the load of +100 servers performing a pull every minute. You can configure Twitcher on all +machines to watch /git/head on your ZooKeeper instance. When that znode +changes Twitcher can run git pull. Add a post-commit script in your main +git repository that pokes that file and you will be able to fetch updates +within few seconds of a commit on all machines while not requiring constant +fetching when no changes are being made. + +3. Installation +=============== + +The default installation of Twitcher places the twitcher binary (bin/twitcher) +in /usr/sbin and the twitcher module (twicher/*) in the site-packages +directory which is often /usr/lib/python2.6/sites-packages/twitcher. + +The configs are read recursively from /etc/twitcher while twitcher is running. + +Note that Twitcher uses inofity to update when files change so it can +automatically reload them. This is used to allow quick adding and removing +of configs without restarting the server binary. If a config fails to parse +then it will not be updated in the running binary and a log message will be +written. + +By default Twitcher uses syslog under daemon as the default logging method. + +4. Configuration Language +========================= + +The configuration language for Twitcher is actually just python. Each +configuration must register a znode, and an action for each watch it wants. +This is done via a RegisterWatch function which takes several arguments. + +RegisterWatch(): Creates a watch that will run a script when a Zookeeper node + is updated. + znode: This is the znode in ZooKeeper that is to be watched. + action: This is a function or lambda that will perform an action when + the znode is modified. The most common use here is to call Exec() + which is a function documented later. + pipe_stdin: If True then the contents of 'znode' will be piped to the stdin + of the processes running action. Default is True. + run_on_load: If True then the action will be executed when Twitcher starts. + Without this your action may miss updates. + The default is True. + run_mode: This defines how Twitcher will react when 'znode' is updated + while it is running 'action' for a previous update. The optional + modes are: + QUEUE: This will queue the watch until the currently running + 'action' finishes, then run 'action' with the most + recent contents. If several watches are received while + 'action' is running then only the last update will + be executed. + DISCARD: Ignore the update, don't run the script, but + re-register the watch. + PARALLEL: Run the script in parallel for every update received. + The default run mode is QUEUE. + uid: The user id to run the process as (string or int). The default is to + run as root. + gid: The group id to run the process as (string or int). The default is to + run as root. + description: This is a text description of the watch which will be used + for logging. The default is to name watches after the file + they are configured in. + +Exec(): Returns a lambda that will execute a given command when run. + command: If this is a string then the command will be invoked in a + shell interpreter. If its a list then it will be executed + exactly as specified. + +By default the configuration files should be placed in /etc/twitcher and +should use an extension of ".twc". + + +4.1. Examples +============= + +In this example config a node called '/twitcher/example1' will be watched for +updates, and when it changes it will execute +"date >> /tmp/twitcher_example1.out". + +RegisterWatch( + znode='/twitcher/example1', + action=Exec('date >> /tmp/twitcher_example1.out') + ) + +As another example we can run a python function rather than executing a +command. For this example we will execute the function example2() every time +'/twitcher/example2' is updated. + +def example2(): + open('/tmp/twitcher_example2.out', 'w').write('testing') + +RegisterWatch( + znode='/twitcher/example2', + action=example2 + ) + +Keep in mind that the action function is run in a process that has been forked +off from the main Twitcher instance. As such the following example +DOES NOT WORK AS EXPECTED. The output will always be 1 since the increment +happens in the child process which exits after doesnotwork() finishes. + +x = 0 +def doesnotwork(): + global x + x += 1 + open('/tmp/twitcher_badexample.out', 'w').write(x) + +RegisterWatch( + znode='/twitcher/doesnotwork', + action=doesnotwork + ) + +4.2 Things to Avoid +=================== + +Any action run by Twitcher is expected to by idempotent. Since the script may +be executed at any time and can be triggered rapidly it is expected that +each script does basic health and sanity checking before performing expensive +or destructive actions. + +Even if the script is in QUEUE mode there is a possibility that it may be run +twice. If Twitcher is restarted while the script is running it will then +restart the script when Twitcher restarts which leaves two running at the +same time. In order to prevent this your script should use file based sigil +or other method to verify exclusivity. + +5. Known Issues +=============== + +None at the moment. + +6. Future Features +================== + +Timeouts on the child processes: Allow the config to set a timeout after which +the script will be considered failed. + +Failure handling on the child process: Allow the config to specify what should +be done in the case where the script fails (returns non zero). Some examples +include "retry x times" "fail but keep watch" "fail and stop watching" + +7. Change Log +============= + +Release: 1.2, Open source under the Apache 2.0 license. + +2010/7/15: Added uid/gid support + +Release: 1.1 + +2010/6/24: Fixed a bug that allowed child processes to get ignored forever. + +Release 1.0 + +2010/6/19: Initial review. diff --git a/bin/twitcher b/bin/twitcher new file mode 100755 index 0000000..cd6e8d5 --- /dev/null +++ b/bin/twitcher @@ -0,0 +1,155 @@ +#!/usr/bin/python26 + +import fcntl +import logging +import logging.handlers +import syslog +import optparse +import os +import sys +import zookeeper +from twitcher.twitcher import Twitcher + +## this will be the daemon that acts off of zookeeper watches and compiles +## templates into usable config files. Twitcher is UK slang for +## birdwatcher. The US slang (birder) is boring + + +def parse_args(): + """Parses arguments and returns an OptionValues object. + + This is split into a function in order to allow garbage collection of all + the various short term object definitions. + """ + parser = optparse.OptionParser() + parser.add_option('-d', '--debug', action='store_true', dest='debug', + default=None) + parser.add_option('--daemonize', action='store_true', dest='daemonize', + default=None, + help='Daemonize and run in the background.') + parser.add_option('--pidfile', action='store', dest='pidfile', + default=None, + help='Write pid to the given path.') + parser.add_option('--logfile', action='store', dest='logfile', + default=None, + help='Log to a file rather than syslog.') + parser.add_option('-v', '--verbose', action='store_true', dest='verbose', + default=False) + parser.add_option('--log_to_stdout', action='store_true', + dest='log_to_stdout', default=None, + help='Write to stdout rather than syslog.') + parser.add_option('--devel', action='store_true', dest='devel', + default=False) + parser.add_option('--config_path', action='store', dest='config_path', + default=None, + help='Location of the twitcher config files.') + parser.add_option('--zkservers', action='store', dest='zkservers', + default='zookeeper.local.twitter.com:2181', + help='Comma-separated list of host:port pairs.') + (options, args) = parser.parse_args() + parser.destroy() + if args: + print >> sys.stderr, 'Unknown arguments: %s' % ' '.join(args) + parser.get_usage() + sys.exit(1) + + if options.devel: + if options.debug is None: + options.debug = True + if options.log_to_stdout is None: + options.log_to_stdout = True + if options.daemonize is None: + options.daemonize = False + if options.config_path is None: + options.config_path = 'config' + else: + if options.config_path is None: + options.config_path = '/etc/twitcher' + + # Set the logging level to debug/info/warning. + logger = logging.getLogger() + if options.debug: + logger.setLevel(logging.DEBUG) + zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG) + elif options.verbose: + logger.setLevel(logging.INFO) + zookeeper.set_debug_level(zookeeper.LOG_LEVEL_WARN) + else: + logger.setLevel(logging.WARNING) + zookeeper.set_debug_level(zookeeper.LOG_LEVEL_ERROR) + + # Set the log format and configure where we log too. + if not options.log_to_stdout: + # If we are not logging to stdout we must disable zookeeper logging since + # it will write to stdout regardless which is very annoying. + zookeeper.set_debug_level(0) + formatter = logging.Formatter('%(asctime)s %(process)d %(message)s') + if options.logfile: + handler = logging.FileHandler(options.logfile) + else: + handler = logging.handlers.SysLogHandler( + address='/dev/log', facility=syslog.LOG_DAEMON) + handler.setFormatter(formatter) + logger.addHandler(handler) + else: + formatter = logging.Formatter('%(asctime)s %(filename)s:%(lineno)d: ' + '%(message)s') + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + return options + + +def write_pid(pid_filename): + try: + fdw = open(pid_filename, 'a') + fcntl.lockf(fdw, fcntl.LOCK_NB | fcntl.LOCK_EX) + os.ftruncate(fdw.fileno(), 0) + fdw.write(str(os.getpid())) + fdw.flush() + + # We have to stash the file descriptor away otherwise python will gc + # the object which closes the file and eliminates the lock. + globals()['___pid_file_lock_%s' % fdw.fileno()] = fdw + + # We leave the file descriptor open for a long as the process is running in + # order to maintain the lock on the file. Once the app closes the lock is + # released which allows another instance to start. + # TODO: Figure out why atexit is deleting pids before exiting. + #atexit.register(os.unlink, pid_filename) + except IOError, e: + # This usually means that there is already a running process. + # Get the pid of the other process. + fdr = open(pid_filename, 'r') + pid = fdr.read() + fdr.close() + print >> sys.stderr, 'Unable to obtain lock on %s' % pid_filename + print >> sys.stderr, 'Is another process already running? Perhaps %s' % pid + sys.exit(1) + + +def daemonize(): + logging.info('Daemonizing') + try: + os.chdir('/') + if os.fork() != 0: + os._exit(0) + os.setsid() + if os.fork() != 0: + os._exit(0) + os.umask(0) + + except OSError, e: + logging.error('Unable to daemonize: %s' % e.message()) + + +options = parse_args() + +if options.daemonize: + daemonize() +if options.pidfile: + write_pid(options.pidfile) + +t = Twitcher(options.zkservers.split(','), options.config_path) +t.run() diff --git a/config/example.twc b/config/example.twc new file mode 100644 index 0000000..c4e5740 --- /dev/null +++ b/config/example.twc @@ -0,0 +1,10 @@ +# This command will run the given shell script when /brady/test changes. +RegisterWatch( + znode='/brady/test', + action=Exec('\n'.join([ + 'date > /tmp/twitcher_example.output', + 'echo START >> /tmp/twitcher_example.output', + 'cat >> /tmp/twitcher_example.output', + 'echo END >> /tmp/twitcher_example.output', + 'sleep 5 >> /tmp/twitcher_example.output' + ]))) diff --git a/init-twitcher b/init-twitcher new file mode 100755 index 0000000..2190dc2 --- /dev/null +++ b/init-twitcher @@ -0,0 +1,90 @@ +#!/bin/bash +# +# twitcher This starts and stops twitcher. +# +# chkconfig: 345 56 50 +# description: Twitcher is a zookeeper watch system. +# +# processname: /usr/sbin/twitcher +# pidfile: /var/run/twitcher.pid + +PATH=/sbin:/bin:/usr/bin:/usr/sbin + +# Source function library. +. /etc/init.d/functions + +# Check that we are root ... so non-root users stop here +[ `id -u` = 0 ] || exit 1 + +# Check to see if the binary exists. +[ -f /usr/sbin/twitcher ] || exit 1 + +RETVAL=0 + +prog="twitcher" + +start() { + echo -n $"Starting $prog: " + + unset HOME MAIL USER USERNAME + daemon $prog --pidfile /var/run/twitcher.pid --daemonize --logfile /var/log/twitcher.log + RETVAL=$? + echo + touch /var/lock/subsys/twitcher + return $RETVAL +} + +stop() { + echo -n $"Stopping $prog: " + killproc -p /var/run/twitcher.pid python26 + RETVAL=$? + echo + rm -f /var/lock/subsys/twitcher + return $RETVAL +} + +reload() { + echo -n $"Reloading configuration: " + killproc -p /var/run/twitcher.pid python26 -HUP + RETVAL=$? + echo + return $RETVAL +} + +restart() { + stop + start +} + +condrestart() { + [ -e /var/lock/subsys/twitcher ] && restart + return 0 +} + + +# See how we were called. +case "$1" in + start) + start + ;; + stop) + stop + ;; + status) + status $prog + ;; + restart) + restart + ;; + reload) + reload + ;; + condrestart) + condrestart + ;; + *) + echo $"Usage: $0 {start|stop|status|restart|condrestart|reload}" + RETVAL=1 +esac + +exit $RETVAL diff --git a/makerpm.sh b/makerpm.sh new file mode 100755 index 0000000..296c7d7 --- /dev/null +++ b/makerpm.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +TMPDIR=$(mktemp -d) +trap 'rm -rf $TMPDIR' EXIT + +mkdir -p "$TMPDIR/BUILD" +mkdir -p "$TMPDIR/RPMS" +mkdir -p "$TMPDIR/SOURCES" +mkdir -p "$TMPDIR/SPECS" + +tar -cv --exclude '*.pyc' \ + init-twitcher bin twitcher > "$TMPDIR/SOURCES/twitcher.tar" +cp twitcher.spec "$TMPDIR/SPECS" + +rpmbuild --define "_topdir $TMPDIR" -bb "$TMPDIR/SPECS/twitcher.spec" + +RPMS=$(ls $TMPDIR/RPMS/x86_64) + +mv "$TMPDIR/RPMS/x86_64/"*.rpm /tmp + +echo "Placing the following rpms in /tmp:" +echo "$RPMS" + diff --git a/twitcher.spec b/twitcher.spec new file mode 100644 index 0000000..4af9f98 --- /dev/null +++ b/twitcher.spec @@ -0,0 +1,60 @@ +%define VERSION 1.2 +%define RELEASE 1 + +Name: twitcher +Version: %{VERSION} +Release: %{RELEASE} +Summary: A zookeeper watch daemon. +Group: Tools/Twitcher +License: Internal +Url: http://github.com/liquidgecka/twitcher +Source: twitcher.tar + +BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root + +Requires: python26 +BuildRequires: python26 + +%description +A basic daemon that watches znodes in ZooKeeper and runs scripts on the +system when they change. + +%prep +rm -rf "${RPM_BUILD_DIR}"/twitcher +mkdir -p "${RPM_BUILD_DIR}"/twitcher +tar -C "${RPM_BUILD_DIR}"/twitcher -xvf "${RPM_SOURCE_DIR}"/twitcher.tar + +%install +mkdir -p "$RPM_BUILD_ROOT/usr/sbin" +cp "${RPM_BUILD_DIR}"/twitcher/bin/twitcher "${RPM_BUILD_ROOT}/usr/sbin" + +mkdir -p "$RPM_BUILD_ROOT/etc/init.d" +cp "${RPM_BUILD_DIR}"/twitcher/init-twitcher \ + "${RPM_BUILD_ROOT}/etc/init.d/twitcher" + +mkdir -p "$RPM_BUILD_ROOT/usr/lib/python2.6/site-packages" +cp -rf "${RPM_BUILD_DIR}"/twitcher/twitcher \ + "${RPM_BUILD_ROOT}/usr/lib/python2.6/site-packages" + +mkdir -p "${RPM_BUILD_ROOT}"/etc/twitcher + +# Generate the proper .pyc and .pyo files. +/usr/lib/rpm/brp-python-bytecompile /usr/bin/python26 + +%clean +rm -rf "${RPM_BUILD_ROOT}" + +%post +/sbin/chkconfig twitcher on +/etc/init.d/twitcher start + +%preun +/sbin/chkconfig twitcher off +/etc/init.d/twitcher stop + +%files +%defattr(-,root,root,-) +/etc/twitcher +/etc/init.d/twitcher +/usr/sbin/twitcher +/usr/lib/python2.6/site-packages/twitcher diff --git a/twitcher/__init__.py b/twitcher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/twitcher/config.py b/twitcher/config.py new file mode 100644 index 0000000..491b48d --- /dev/null +++ b/twitcher/config.py @@ -0,0 +1,191 @@ +#!/usr/bin/python26 + +"""This file contains the classes needed to manage twitcher configs. + +Author: Brady Catherman (brady@twitter.com) +""" + +import logging +import os +import types + +# twitcher module libs +import core +import inotify +import zkwrapper + + +class _NamespaceConfig(object): + """This class is imported into the exec namespace as 'twitcher'. + + This class provides the configuration files access in and out of twitcher. + It is intended that this class will be used as thought it was a module + inside of the twitcher config files namespaces. + + Some examples: + in the config: twitcher.RegisterWatch(...) + in this class: _NamespaceConfig.RegisterWatch(...) + + This allows modules to configure themselves using full python without also + being able to directly access twitcher internals. + + Args: + config_file: The file this object should parse. + """ + def __init__(self, config_file): + self._configurations = [] + self._config_file = config_file + + def RegisterWatch(self, znode=None, action=None, pipe_stdin=None, + run_on_load=None, run_mode=None, description=None, + uid=None, gid=None): + """Registers a watch and action on a given znode. + + This is the main function configuration files are expected to work with. + A configuration file should call this script in order to register a + basic znode watch and set the function that should be called when + the node is updated. + + Args: + znode: The node that will be watched for updates. + action: The function that should be called when the node updates. + pipe_stdin: Pipe the contents of the znode to stdin when the + 'action' function run. + run_on_load: Should the action function be run when the config file + is loaded or reloaded. + run_mode: Defines what should happen if the znode updates while the + script is still running. The options are: + QUEUE: Queue the update and run the action once the current action + finishes. Note that if two watchs trigger while the action + is currently running it will still only be run once. + PARALLEL: Run the action every time it triggers regardless if + an action is already running. + DISCARD: Ignore any watches that fire while the action script + is already running. + description: The string description of this object. This will be used + for logging. + uid: The user id to run the process as. + gid: The group id to run the process as. + + Returns: + Nothing. + """ + assert type(znode) == types.StringType, ( + 'RegisterWatch: znode must be a string.') + assert (type(action) == types.FunctionType or + type(action) == types.UnboundMethodType or + type(action) == types.LambdaType), ( + 'RegisterWatch: action must be a function, method or lambda.') + assert pipe_stdin is None or type(pipe_stdin) == types.BoolType, ( + 'RegisterWatch: pipe_stdin must be one of True or False.') + assert run_on_load is None or type(run_on_load) == types.BoolType, ( + 'RegisterWatch: run_on_load must be one of True of False.') + assert (run_mode is None or run_mode is core.QUEUE or + run_mode is core.PARALLEL or run_mode is core.DISCARD), ( + 'RegisterWatch: run_mode is not one of QUEUE, PARALLEL or DISCARD.') + assert description is None or type(description) == types.StringType, ( + 'RegisterWatch: Description must be a string.') + assert (uid is None or + type(uid) == types.IntType or + type(uid) == types.StringType), ( + 'RegisterWatch: uid must be a number of a string.') + assert (gid is None or + type(gid) == types.IntType or + type(gid) == types.StringType), ( + 'RegisterWatch: gid must be a number of a string.') + + if description is None: + description = '%s-%s' % (self._config_file, len(self._configurations) + 1) + + kwargs = {} + kwargs['description'] = description + if pipe_stdin is not None: + kwargs['pipe_stdin'] = pipe_stdin + if run_on_load is not None: + kwargs['run_on_load'] = run_on_load + if run_mode is not None: + kwargs['run_mode'] = run_mode + if uid is not None: + kwargs['uid'] = uid + if gid is not None: + kwargs['gid'] = gid + + config = core.TwitcherObject(znode, action, **kwargs) + self._configurations.append(config) + + def Exec(self, command): + """Returns a lambda that will execute the given command. + + This function is a simple tool intended to be used in configurations + files. This will return a lambda so the user doesn't have to write + a function that executes anything. + + Args: + command: The command to run. If this is a string then the command will + be run in a bash instance. If its a list then it will be + executed directly. + + Returns: + A Lambda that will run the given command. + """ + if isinstance(command, str): + command = ['/bin/sh', '-c', command] + return (lambda: os.execlp(command[0], *command)) + + def get_configurations(self): + """Returns a list of all configurations registered. + + This returns the list of all configurations created while parsing the + configuration file. A configuration is functionally a + twitcher.core.TwitcherObject which watches a znode and runs a script when + it changes. A single file can contain many configurations. + + Returns: + A list of TwitcherObjects. + """ + return self._configurations + + +class ConfigFile(inotify.WatchClass): + """Manages a single twitcher config file. + + This is a basic wrapper around a specific configuration file which allows + automatic reloaded when it changes on disk (via inotify) as well as status + reporting and other useful functionality. + + Args: + filename: The filename that this config should manage. + """ + def __init__(self, filename): + self._filename = filename + self._config_objects = [] + + def __del__(self): + """Called when this object is garbage collected.""" + if logging: + logging.warning('Unloading configs from: %s', self._filename) + + def reload(self): + """Loads or reloads the config file from disk.""" + logging.warning('Loading configuration from %s', self._filename) + namespace_config = _NamespaceConfig(self._filename) + plugins = {} + exec_globals = { + 'Exec': namespace_config.Exec, + 'RegisterWatch': namespace_config.RegisterWatch, + } + try: + execfile(self._filename, exec_globals, {}) + objects = namespace_config.get_configurations() + for o in objects: + o.init() + except Exception, e: + logging.error('Exception processing %s: %s' % (self._filename, e)) + return + + self._config_objects = objects + logging.warning('Successfully loaded configs from %s', self._filename) + + def get_configurations(self): + """Returns all configurations (TwitcherObjects) from this file.""" + return self._config_objects diff --git a/twitcher/core.py b/twitcher/core.py new file mode 100644 index 0000000..5d8f227 --- /dev/null +++ b/twitcher/core.py @@ -0,0 +1,489 @@ +#!/usr/bin/python26 + +"""Core functionality for Twitcher. + +This module contains the core functionality for twtcher. The main object +here is the TwitcherObject which acts as a simple wrapper for each watch. + +Author: Brady Catherman (brady@twitter.com) +""" + +import grp +import logging +import os +import resource +import threading +import types +import pwd +import sys +import time +import zookeeper + +# Twitcher object +import zkwrapper + + +# The default ZKWrapper object to use when registering watches. +default_zkwrapper = None + +# The default file descriptor to send '\0' to if we span a new process. +# This is used to allow select to restart which will pick up the new +# stdin that needs written too. +default_ping_fd = None + +def set_default_zkwrapper(obj): + """Sets the value of default_zkwrapper.""" + global default_zkwrapper + default_zkwrapper = obj + + +def set_default_ping_fd(fd): + """Sets the value of default_ping_fd.""" + global default_ping_fd + default_ping_fd = fd + + +QUEUE = 1 +PARALLEL = 2 +DISCARD = 3 + + +class UnknownUserError(Exception): + pass + + +class UnknownGroupError(Exception): + pass + + +class MinimalSubprocess(object): + """Wraps much of the functionality from subprocess. + + This is a basic wrapper of subprocess except that it allows us to not + exec. This makes twitcher very unix specific as it uses several unix only + calls. + + The basic idea here is that we fork and then run a python function that can + be anything from an exec to a custom block of code from the config file. + + We fork in order to protect the main twitcher process and so all basic + actions look exactly the same. + + Args: + desc: The string description of this subprocess. + data: The data we should write to stdin of the forked process. + """ + def __init__(self, desc, data): + logging.debug('Creating MinimalSubprocess (%s): %s', self, desc) + self.stdin = None + self.pid = -1 + self.desc = desc + self.data = data + + def __del__(self): + """Verifies that the file descripts all get closed properly.""" + logging.debug('Reaping MinimalSubprocess "%s" (%s)', self.desc, self) + if self.stdin is not None: + try: + os.close(self.stdin) + except OSError: + pass + + def poll(self): + """Tests to see if the process has exited. + + This function mimics subprocess.Popen.poll(). It returns None if the + process has not exited yet and if it has it will set returncode and + return the vaule. + + Once this call has returned a value other than None further calls will + result in a OSError being thrown. + + Returns: + None or a return code. + """ + logging.debug('Polling for process "%s" (%s)', self.desc, self.pid) + r = os.waitpid(self.pid, os.WNOHANG) + if r == (0, 0): + return None + self.returncode = r[1] + return r[1] + + def write_buffer(self): + """Attempts to write data to stdin. + + This will attempt to write the data passed into the constructor to + stdin. If the data is written completely then stdin will be closed, + otherwise it will retain the remainder of data that has not been written + for further calls. + + Throws: + OSError: If called after a non None value is returned. + + Returns: + Nothing. + """ + if self.stdin is not None: + written = os.write(self.stdin, self.data) + self.data = self.data[written:] + if not self.data: + self.data = None + os.close(self.stdin) + self.stdin = None + + def _child_exec(self, stdin_fd, func, uid=None, gid=None): + """Called to setup the child after the fork. + + This function handles all client operations post fork. The main + functionality presented here is to close all file descriptors other than + stdin/stdout/stderr, and to configure those three sockets to have a pipe + back to the parent. + + Args: + stdin_fd: The file descriptor created via pipe() that should become + our stdin. + func: The function we should run once setup properly. + uid: The userid (int) to switch to after forking. + gid: The groupid (int) to switch to after forking. + + Throws: + OSError: Any error during the dup/close cycle. + + Returns: + Nothing. + """ + os.dup2(stdin_fd, 0) + f = os.open('/dev/null', os.O_WRONLY) + os.dup2(f, 1) + os.dup2(f, 2) + + # Close all open file descriptors besides stdin/stdout/stderr + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (maxfd == resource.RLIM_INFINITY): + maxfd = MAXFD + for fd in xrange(3, maxfd): + try: + os.close(fd) + except OSError: + pass + + # Switch the userid if needed. + if gid: + os.setgid(gid) + if uid: + os.setuid(uid) + + # Run our function. + func() + + def fork_exec(self, func, uid=None, gid=None): + """Calls fork and mimics exec. + + This function forks and mimics the exec call by calling the function passed + into the constructor for this object. This function must be called before + all other functions in this object are called. + + Args: + func: The function that should be called post fork. + uid: The userid (int or string) to switch to after forking. + gid: The groupid (int or string) to switch to after forking. + + Throws: + OSError: Any kind of error during the fork cycle. + + Returns: + Nothing. + """ + try: + # Convert string based user names and group names into integers. + if type(uid) == types.StringType: + ud = pwd.getpwnam(uid) + if not ud: + raise UnknownUserError() + uid = ud.pw_uid + if type(gid) == types.StringType: + groups = pwd.getpwall() + gd = grp.getgrnam(gid) + if not gd: + raise UnknownGroupError() + gid = gd.gr_gid + + stdin = os.pipe() + pid = os.fork() + if pid < 0: + raise OSError('Unknown problem with fork()') + elif pid == 0: + # Child. + r = self._child_exec(stdin[0], func, uid=uid, gid=gid) + if type(r) == int: + os._exit(r) + elif r is None: + os._exit(0) + os._exit(1) + else: + # Parent + self.pid = pid + self.stdin = stdin[1] + os.close(stdin[0]) + except OSError, e: + logging.error('OSError while forking for "%s": %s', self.desc, e) + raise + + +class TwitcherObject(object): + """Manages a single Twitcher configuration + + This function manages a single Twitcher configuration. There may be + more than one configuration per file so this is not one to one with + files. Instead this is a basic node: script pairing. + + This manages the zookeeper watch and when triggered will execute the + intended action and watch that cycle. Once finished it will reregister + watches and continue working. + + Args: + run_func: The function that should be run in the subprocess once a watch + or initial load (see run_on_load) has initiated the process. + pipe_stdin: Retrieve the contents of the node and pipe them to stdin + on the subprocess. If False then nothing will be piped to + stdin. + run_on_load: Should this script be run when this config is loaded. + If this is True (default) then the script will be started + when twitcher loads and when watches fire. + run_mode: This defines how Twitcher should react when a watch is updated + while the script is currently running. This should be one of + the following: + core.QUEUE: Queue then run once the current running script + finishes. Note that this will run one instance after + the current script finishes regardless of how many + times the watch may have fired in the mean time. + core.PARALLEL: Run scripts in parallel. + core.DISCARD: Discard requests received while the script is + currently running. + uid: The user id the process should run as. + gid: The group id the process should run as. + description: The basic description of this command (ex: command line) + """ + def __init__(self, path, run_func, + pipe_stdin=True, run_on_load=True, + run_mode=QUEUE, uid=None, gid=None, + description='generic object'): + self._path = path + self._run_func = run_func + self._pipe_stdin = pipe_stdin + self._run_on_load = run_on_load + self._run_mode = run_mode + self._processes = [] + self._description = description + self._uid = uid + self._gid = gid + self._unhandled_watch = None + self._lock = threading.Lock() + + def init(self): + """Called to initialize this object. + + This is split out from the __init__ call to allow the msot likely error + conditions to be captured be twitcher rather than the configuration + object. + + Returns: + Nothing. + """ + logging.debug('Initializing %s', self._description) + self._register_watch(handler=self._run_on_load) + + def get_fds(self): + """Returns a list of all file descriptors of subprocesses. + + This is used for select() calls. It should return a list of all child + process file descriptors as a tuple ([r1, r2], [w1, w1]). + + Returns: + A 2 element tuple containing a list of read file descriptors, and + write file descriptors. + """ + r_fds = [] + w_fds = [] + self._lock.acquire() + for p in self._processes: + if p.stdin is not None: + w_fds.append(p.stdin) + self._lock.release() + return (r_fds, w_fds) + + def sigchld(self): + """Called when SIGCHLD is received. + + This should call poll on all subprocesses in order to find out if they + have exited. This is used to clear out child processes that have completed. + + This function shouldn't block. + + Returns: + Nothing. + """ + removed = [] + self._lock.acquire() + for p in self._processes: + r = p.poll() + if r is not None: + logging.warning('Process "%s" (%s) exited with code %s', + p.desc, p.pid, r) + self._processes.remove(p) + removed.append(p) + self._lock.release() + if removed: + self._post_exec() + + def select(self, r, w): + """Called when a socket is read/writable. + + This should check to see if any of the sockets listed in r and w are + attached to any subprocess so we can read/write to them. + + This function shouldn't block. + + Args: + r: A list of file descriptors that can be read from. + w: A list of file descriptors that can be written too. + + Returns: + Nothing. + """ + self._lock.acquire() + for p in self._processes: + if p.stdin in w: + p.write_buffer() + self._lock.release() + + def _register_watch(self, handler=True): + """Called to actually register a watch (and perform a get if needed.) + + This function will wrap the zookeeper calls in order to make it easy to + register a watch and fetch data regardless of the options we have + been configured with. + + Args: + handler: Optional. If true (default) then self._handler will be called + when new data is received, otherwise nothing will be called. + + Returns: + Nothing. + """ + if handler is True: + h = self._handler + else: + h = None + default_zkwrapper.aget(self._path, handler=h, watcher=self._watch) + + def _exec(self, data): + """Starts the registered fuction as a second process + + This will fork and start the registered function on a second process. + This shouldn't block on anything. It mearly starts then returns. + + Args: + data: The data that should be written to stdin on the sub process. + + Returns: + Nothing. + """ + logging.warning('Executing process: %s' % self._description) + try: + p = MinimalSubprocess(self._description, data) + p.fork_exec(self._run_func, self._uid, self._gid) + self._lock.acquire() + self._processes.append(p) + self._lock.release() + if default_ping_fd is not None: + os.write(default_ping_fd, '\0') + except UnknownUserError: + logging.error('%s: Unable to find user %s', self._description, + self._uid) + except UnknownGroupError: + logging.error('%s: Unable to find group %s', self._description, + self._gid) + + def _post_exec(self): + """Run once the script has finished executing. + + This will clean up after a script run. It should reexecute the script if + QUEUE mode is selected and we received a watch update while running the + script. If needed it will also reregister the watch so that we continue + getting update notifications. + + This function may call _watch() but shouldn't be expected to block. + + Returns: + Nothing. + """ + if self._unhandled_watch: + if self._run_mode == DISCARD: + # If we are in discard mode we simply discard the data we will + # get back from zookeeper. + self._register_watch(handler=False) + elif self._run_mode == QUEUE: + # Re run the watch that we missed as though we just received it. We do + # this by passing the arguments back into the mix. + logging.debug('Processing queued watches on %s', + self._unhandled_watch[1]) + args = self._unhandled_watch + self._unhandled_watch = None + self._watch(*args) + else: + # We shouldn't ever get here. + # FIXME(brady): + logging.error('_run_mode is invalid in %s' % self) + + def _watch(self, zh, path): + """Called when a zookeeper node we are watching updates. + + This function is called by zookeeper when a node we have registered a watch + on has changed in some way. Depending on the configuration this function + may call _exec() to start the subprocess or may perform an async get on + zookeeper in order to get the contents of the node. Either way it shouldn't + block. + + Args: + zh: The zookeeper handle that created the watch. + event: The event that triggered the watch. + state: The connection state. + path: The znode that triggered this watch. + + Returns: + Nothing. + """ + logging.info('Received watch notification for %s', path) + if self._processes and self._run_mode != PARALLEL: + logging.warning('Postponing processing of "%s" ' + '(a scipt is already running).', self._description) + self._unhandled_watch = (zh, path) + return + self._register_watch() + if not self._pipe_stdin: + # We don't need to wait for the data to arrive to execute in this mode + self._exec('') + + def _handler(self, zh, rc, data, path): + """Called with the data after an aget() request. + + This function is called by the ZKWrapper object once the data for a + znode has been fetched. In all cases we should exec the script unless + something has gone wrong. + + Args: + zh: The ZKWrapper object that is calling us. + rc: The return code from zookeeper. + data: The contents of the znode. + path: The znode that updated. + + Returns: + Nothing. + """ + if rc == zookeeper.OK: + self._exec(data) + else: + # Failure! + # FIXME(brady) + return diff --git a/twitcher/inotify.py b/twitcher/inotify.py new file mode 100644 index 0000000..7a9b8a6 --- /dev/null +++ b/twitcher/inotify.py @@ -0,0 +1,181 @@ +#!/usr/bin/python26 + +"""Watches a list of directories for file updates. + +The classes in this module will watch a list of subdirectories for file +updates. A class is passed in at object initialization time and is used to +create objects as new files are discovered. If a file is updated then the +reload() function on that class will be called. If the file is removed the +class will be deleted. + +It is important to verify that __init__, __del__, and reload() are all +defined properly. + +A simple example of this module use looks like this: + class watcher(object): + def __init__(self, filename): + self._filename = filename + print 'Init: %s' % filename + def __del__(self): + print 'Del: %s' % self._filename + def reload(self): + print 'reload: %s' % self._filename + + x = inotify.InotifyWatcher(['/tmp/bar'], watcher) + +Only one InotifyWatcher can be registered per process due to the way that +inotify works. + +Author: Brady Catherman (brady@twitter.com) +""" + +import fcntl +import logging +import os +import signal +import stat + + +WATCH_MASK = (fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_DELETE | + fcntl.DN_RENAME | fcntl.DN_MULTISHOT) + + +class WatchClass(object): + """Interface class to be passed into InotifyWatcher()""" + def __init__(self, filename): + pass + + def __del__(self): + pass + + def reload(self): + """Called when the file is updated on disk.""" + pass + + +class InotifyWatcher(object): + """Watches a list of directories for updates to the files in them. + + This class will watch the directories in watch_directories and will + automatically make a class of watch_class type when a new one is found. + + Args: + watch_directories: An iterable list of directories to watch for files in. + watch_class: The class that will be used to wrap each file. + file_pattern: An optional function that filters filenames. The basic + footprint takes a single parameter (the filename) and returns + True/False if it should be watched or not. If this is not + given then all files will be watched. + """ + def __init__(self, watch_directories, watch_class, file_pattern=None): + if file_pattern is None: + file_pattern = (lambda x: True) + self._watch_directories = watch_directories + self._watch_class = watch_class + self._file_pattern = file_pattern + self._watch_fds = {} + self._watch_files = {} + signal.signal(signal.SIGIO, self._inotify) + signal.signal(signal.SIGHUP, self._inotify) + self.rescan() + + def _recurse_directory(self): + """Recurses through all self._watch_directories finding files.""" + all_files = set() + dirs = set(self._watch_directories) + all_dirs = set() + while dirs: + dir = dirs.pop() + try: + files = [os.path.join(dir, f) for f in os.listdir(dir)] + all_dirs.add(dir) + all_files.update([f for f in files + if os.path.isfile(f) and self._file_pattern(f)]) + dirs.update([f for f in files if os.path.isdir(f) and f[0] != '.']) + except IOError, e: + logging.warning('Unable to access: %s' % dir) + except OSError, e: + logging.warning('Unable to access: %s' % dir) + return (all_dirs, all_files) + + def _register_inotify(self, dir): + """Registers a watch on the given directory.""" + if dir in self._watch_fds: + return + logging.info('Registering a inotify watch on %s' % dir) + try: + fd = os.open(dir, os.O_RDONLY) + fcntl.fcntl(fd, fcntl.F_NOTIFY, WATCH_MASK) + self._watch_fds[dir] = fd + except IOError, e: + logging.error('Unable to register watch on %s: %s' % (dir, e)) + + def _unregister_inotify(self, dir): + """Unregisters the directory for update notification.""" + if dir not in self._watch_fds: + return + logging.info('Unregistering a inotify watch on %s' % dir) + del self._watch_fds[dir] + + def _inotify(self, signum, frame): + """Called when either SIGHUP or SIGIO (inotify) is received.""" + logging.info('Received SIGHUP or a file update notification.') + signal.signal(signal.SIGIO, self._inotify) + signal.signal(signal.SIGHUP, self._inotify) + self.rescan() + + def _mtime(self, filename): + """Returns the mtime of the given file (in seconds).""" + try: + s = os.stat(filename) + return s[stat.ST_MTIME] + except IOError: + # On error we just return zero.. + # FIXME[brady]: Make this work better. + return 0 + + def files(self): + """Returns a list of all WatchFile objects we are watching. + + This will return a list of all WatchFile objects associated with config + files in the list of directories that we are currently watching. + + Returns: + A list of all WatchConfig objects we are maintaining. + """ + return [w for _, w in self._watch_files.itervalues()] + + def rescan(self): + """Rescans all directories looking for files inside. + + This will walk all the directories listed when this class was created + looking for configuration files. If new config files are found then + a object will be created using the class passed in at init time. If a + file that used to exist was deleted then the config object for it + will also be deleted. + """ + new_dirs, new_files = self._recurse_directory() + + # Old directories, unregister watches. + for dir in set(self._watch_fds.iterkeys()).difference(new_dirs): + self._unregister_inotify(dir) + + # New directories, register watches. + for dir in new_dirs: + self._register_inotify(dir) + + # Walk through all files that no longer exist. + for file in set(self._watch_files).difference(new_files): + logging.info('File deleted (%s): Removing its object.', file) + del self._watch_files[file] + + for file in new_files: + if file not in self._watch_files: + w = self._watch_class(file) + self._watch_files[file] = [None, w] + logging.info('Found new file (%s): Making new object', file) + t = self._watch_files[file] + m = self._mtime(file) + if t and t[0] != m: + t[0] = m + t[1].reload() diff --git a/twitcher/twitcher.py b/twitcher/twitcher.py new file mode 100644 index 0000000..d7224e6 --- /dev/null +++ b/twitcher/twitcher.py @@ -0,0 +1,93 @@ +#!/usr/bin/python26 + +"""Main loop for the twitcher binary. + +This class contains the main "Twitcher" object which is a simple wrapper for +the main loop functionality. It catches SIGCHLD, and manages the select loop. +It also creates the top level zookeeper and config directory watcher objects. + +Author: Brady Catherman (brady@twitter.com) +""" + +import errno +import logging +import os +import select +import signal + +# Twitcher modules +from inotify import InotifyWatcher +from config import ConfigFile +import core +import zkwrapper + + +class Twitcher(object): + """The main operating loop of the twitcher program. + + Args: + zkservers: A common seperated list of zookeeper servers to connect too. + config_path: The path to (recursively) read config files from. + """ + def __init__(self, zkservers, config_path): + self._signal_notifier = os.pipe() + signal.set_wakeup_fd(self._signal_notifier[1]) + signal.signal(signal.SIGCHLD, self._sigchld) + zh = zkwrapper.ZKWrapper(zkservers) + core.set_default_zkwrapper(zh) + core.set_default_ping_fd(self._signal_notifier[1]) + self._inotify_watcher = InotifyWatcher([config_path], ConfigFile, + self._is_config_file) + self._sigchld_received = False + + def _is_config_file(self, filename): + """Returns True if the file name is a twitcher config file.""" + return filename.endswith('.twc') + + def _sigchld(self, sig, frame): + """Called when a SIGCHLD signal has been received.""" + signal.signal(signal.SIGCHLD, self._sigchld) + self._sigchld_received = True + + def _get_all_config_objects(self): + """Returns a list of all config objects loaded.""" + watch_files = self._inotify_watcher.files() + r = [] + for w in watch_files: + r += w.get_configurations() + return r + + def run(self): + """The main running loop of the twitcher process. Doesn't return.""" + while True: + # If we received SIGCHLD then we should allow our config modules + # to reap all children. + if self._sigchld_received: + for c in self._get_all_config_objects(): + c.sigchld() + self._sigchld_received = False + + # We add our notified file descriptor by default so select will exit + # when sigchld is received. + r_fds = [self._signal_notifier[0]] + w_fds = [] + for c in self._get_all_config_objects(): + cr, cw = c.get_fds() + r_fds += cr + w_fds += cw + + try: + iready, oready, e = select.select(r_fds, w_fds, [], 60) + if not iready and not oready and not e: + # On timeouts we check all child processes for safety. This makes + # sure we don't lose one somehow. + _sigchld_received = True + logging.debug('select loop timed out without updates.') + else: + if self._signal_notifier[0] in iready: + os.read(self._signal_notifier[0], 1) + for c in self._get_all_config_objects(): + c.select(iready, oready) + except select.error, v: + if v[0] != errno.EINTR: + raise diff --git a/twitcher/zkwrapper.py b/twitcher/zkwrapper.py new file mode 100644 index 0000000..cf33bda --- /dev/null +++ b/twitcher/zkwrapper.py @@ -0,0 +1,215 @@ +#!/usr/bin/python26 + +"""Wrapper for zookeeper that manages watches. + +This is a simple wrapper for zookeeper that allows us to use a single watch +to handle many watchers. This should vastly reduce zookeeper overhead when +many watches on the same thing are used. + +This class also allows us to unregister watches which is something that +zookeeper doesn't do normally. This is important because the lack of unregister +means that an object can not be garbage collected (by nature of its reference +in the zookeeper module) until its watch fires. To handle this we instead +use this class to receive the watches with the knowledge that it shouldn't +be created and destroyed often. + +Author: Brady Catherman (brady@twitter.com) +""" + +import logging +import os +import socket +import threading +import zookeeper + + +class ZKWrapper(object): + """Wraps all zookeeper functionality into a simple wrapper. + + This is a basic wrapper for a zookeeper handle. The idea is to allow + multiplexing of zookeeper requests so we can reduce the watch counts. This + also allows us to unregister watches which is something the normal zookeeper + library doesn't support. + + Args: + servers: the list of zookeeper servers to connect too. + """ + def __init__(self, servers): + logging.debug('Creating ZKwrapper against %s', ','.join(servers)) + self._servers = [] + for s in servers: + parts = s.split(':') + if len(parts) == 2: + self._servers.append((parts[0], parts[1])) + else: + self._servers.append((s, 2181)) + self._lock = threading.Lock() + self._watcher_lock = threading.Lock() + self._watches = {} + self._handlers = {} + self._zookeeper = None + self._clientid = None + self._connect() + + def _global_watch(self, zh, event, state, path): + """Called when the connection to zookeeper has a state change.""" + if state == zookeeper.EXPIRED_SESSION_STATE: + self._connect() + if state == zookeeper.CONNECTED_STATE: + self._clientid = zookeeper.client_id(self._zookeeper) + + def _connect(self): + """Creates a connection to a zookeeper instance.""" + s = [] + for host, port in self._servers: + try: + _, _, ips = socket.gethostbyname_ex(host) + for ip in ips: + s.append('%s:%s' % (ip, port)) + except socket.gaierror: + logging.error('Hotname not known: %s', host) + except socket.herror: + logging.error('Unable to resolve %s', host) + + if self._clientid is not None: + # Existing connections get registered with the same clientid that was + # used before. + self._zookeeper = zookeeper.init(','.join(s), self._global_watch, None, + clientid) + else: + self._zookeeper = zookeeper.init(','.join(s), self._global_watch) + + def aget(self, path, watcher=None, handler=None): + """A simple wrapper for zookeeper async get function. + + This function wraps the zookeeper aget call which allows the caller + to register a function to handle the data once it is received as well + as a function that will be called once the data has been updated. If + neither is given then this function will do nothing. + + Args: + path: The znode to watch. + watcher: Called when the given znode is updated or changed. the basic + footprint of this function is: + func(zh, path) + zh will be this object, and path will be the znode path. + handler: Called when the data has been fetched from zookeper. The basic + footprint of this function is: + func(zh, path, rc, data) + zh will be this object and path will be the znode path. + rc is the return code from zookeeper. + data is the contents of the znode. + + Returns: + Nothing. + """ + register = False + get = False + self._lock.acquire() + if watcher: + register = path not in self._watches + self._watches.setdefault(path, []).append(watcher) + if handler: + get = path not in self._handlers + self._handlers.setdefault(path, []).append(handler) + self._lock.release() + if register or get: + if register: + w = self._watcher + else: + w = None + # We use a lambda here so we can make sure that the path gets appended + # to the args. This allows us to multiplex the call. + h = (lambda zh, rc, data, stat: self._handler(zh, rc, data, stat, path)) + # FIXME(error handling) + logging.debug('Performing a get against %s', path) + zookeeper.aget(self._zookeeper, path, w, h) + + def unregister(self, path, watcher=None, handler=None): + """Removes an existing watch or handler. + + This unregisters an object's watch and handler callback functions. It + doesn't actually prevent the watch or hanler from triggering but it + does remove all references frmo the object and prevent the functions + from being called. This allows garbage collection of the object. + + Args: + path: The znode being watched. + watcher: The watcher function that should be removed. + handler: The handler function that should be removed. + + Returns: + Nothing. + """ + if watcher: + try: + while True: + self._watches.get(path, []).remove(watcher) + except ValueError: + pass + if handler: + try: + while True: + self._watches.get(path, []).remove(handler) + except ValueError: + pass + + def _watcher(self, zh, event, state, path): + """Internal function called by zookeeper when a node updates. + + This function is called by zookeeper when any of the watched nodes update. + We use this is a simple wrapper so that the zookeeper module doesn't + actually have to have a reference to any object other than this one. This + is important if you want to actually support unregistering of watches + otherwise the unregistered object will retain a reference in the + zookeeper module which prevents gc. + + Args: + zh: The real zookeeper handler object that created the watch. + event: The event that triggered this watch. + state: The state of the connection. + path: The znode that triggered this watch. + + Returns: + Nothing. + """ + logging.info('Received a zookeeper watcher notification for %s', path) + watches = self._watches.pop(path, None) + # We lock this while we call all the registered watchers so they all + # have a chance to call aget() in order to get the data _before_ we + # process the returned data. This allows for better batching of get + # requests so we can reduce load on the zookeeper servers. + self._watcher_lock.acquire() + while watches: + callback = watches.pop() + callback(self, path) + self._watcher_lock.release() + + def _handler(self, zh, rc, data, stat, path): + """Handles zookeeper data calls. + + This function is called once an aget() request completes. It returns + the data in the znode back to the caller. In the _watcher function + above we use a lambda as the real call back in order allow passing of + the znode in which it seems the zookeeper library doesn't do. + + Args: + zh: the zookeeper object the watched was registered against. + rc: The return code from the call. + data: The contents of the znode. + stat: The stat data from the call. + path: The znode that we are getting the contents of. + + Returns: + Nothing. + """ + logging.info('Received znode contents for %s', path) + logging.debug('Contents of %s\n"""%s""".', path, data) + # This lock means that we will not process the handler until all + # watchers have been notified. + self._watcher_lock.acquire() + handlers = self._handlers.pop(path, None) + self._watcher_lock.release() + while handlers: + handler = handlers.pop() + handler(self, rc, data, path)