Permalink
Browse files

Merge branch 'release/0.0.1'

  • Loading branch information...
2 parents 5c981f2 + 2a1ddb5 commit 66057f5189208c20aed39b5e38044ecf826682bd @nvie committed May 20, 2010
Showing with 523 additions and 0 deletions.
  1. +10 −0 .gitignore
  2. +19 −0 INSTALL
  3. +27 −0 LICENSE
  4. +8 −0 MANIFEST.in
  5. +1 −0 README
  6. +107 −0 README.rst
  7. +16 −0 drainers/__init__.py
  8. +157 −0 drainers/drainer.py
  9. +23 −0 examples/simple_annotate_date.py
  10. +54 −0 examples/simple_find.py
  11. +37 −0 examples/simple_kill_sleep.py
  12. +18 −0 examples/simple_ls.py
  13. +46 −0 setup.py
View
@@ -0,0 +1,10 @@
+# iJunk
+.DS_Store
+
+# PyJunk
+*.pyc
+
+# Egg junk
+*.egg-info
+/build
+/dist
View
@@ -0,0 +1,19 @@
+Installing drainers
+===================
+
+You can install drainers either via the Python Package Index
+(PyPI) or from source.
+
+To install using pip::
+
+ $ pip install drainers
+
+To install using easy_install::
+
+ $ easy_install drainers
+
+If you have downloaded a source tarball you can install it by doing
+the following::
+
+ $ python setup.py build
+ $ sudo python setup.py install
View
@@ -0,0 +1,27 @@
+Copyright (c) 2009, Vincent Driessen
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+Neither the name of Vincent Driessen nor the names of its contributors may
+be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
View
@@ -0,0 +1,8 @@
+include README.rst
+include MANIFEST.in
+include LICENSE
+include INSTALL
+recursive-include examples *.py
+recursive-include drainers *.py
+prune examples/*.pyc
+prune drainers/*.pyc
View
View
@@ -0,0 +1,107 @@
+===================================================
+ drainers - Event-based draining of process output
+===================================================
+
+drainers is an abstraction around `subprocess.Popen` to read and control
+process output event-wise. It also allows you to abort running processes
+either gracefully or forcefully without having to directly interact with the
+processes or threads themself.
+
+Overview
+========
+
+Defining a process
+------------------
+A `Drainer` is a factory and controller wrapper around
+`subprocess.Popen` and therefore takes all of the (optional) parameters
+that `subprocess.Popen`'s initializer takes. For example, the minimal
+`Drainer` takes a command array::
+
+ from drainers import Drainer
+
+ def ignore_event(line, is_err):
+ pass
+
+ my_drainer = Drainer(['ls', '-la'], read_event_cb=ignore_event)
+ my_drainer.start()
+
+But, extra arguments are allowed, too::
+
+ my_drainer = Drainer(['echo', '$JAVA_HOME'], shell=True, bufsize=64,
+ read_event_cb=ignore_event)
+ my_drainer.start()
+
+The only two arguments to `Drainer` that are reserved are
+`stdout` and `stderr`. `Drainer` requires them to be
+`subprocess.PIPE` explicitly, and sets them for you accordingly.
+
+Defining a callback
+-------------------
+`Drainer`'s strength lies in the fact that each line that is read from the
+process' standard output or standard error streams leads to a callback
+function being invoked. This allows you to process virtually any process'
+output, as long as it's line-based.
+
+The callback function can be specified using the `read_event_cb` parameter to
+the constructor, as seen in the example above. It is mandatory. The callback
+function specified needs to have a specific signature::
+
+ def my_callback(line, is_err):
+ ...
+
+It should take two parameters: `line` (a string) and `is_err` (a boolean).
+The latter indicates that the line is read from the standard error stream.
+There is nothing more to it. It does not need to return anything: it's return
+value will be ignored. Your callback may be a class method, too, like in the
+following example. Notice that in those cases, you pass `foo.my_method` as
+the value for the `read_event_cb` parameter::
+
+ class MyClass(object):
+
+ def my_method(self, line, is_err):
+ ...
+
+ foo = MyClass()
+ my_drainer = Drainer(['ls'], read_event_cb=foo.my_method)
+ my_drainer.start()
+
+The granularity currently is a single line. If you want to read predefined
+chunks of data, please fork this repo and implement a `Drainer` subclass
+yourself. If you want a callback that isn't invoked after each line read, but
+after an arbitrary time or amount of lines, you have to implement this
+yourself. (It shouldn't be too hard, though. See the `examples` directory
+for inspiration.)
+
+Aborting processes
+------------------
+`Drainer` allows you to abort a running process in the middle of execution,
+forcefully sending the process a `terminate()` message (Python equivalent of a
+Unix `SIGTERM` message) when a certain condition arises. By default, the
+process will never be terminated abnormally. To specify termination criteria,
+implement a callback function that takes no parameters and returns `True` if
+abortion is desired and `False` otherwise. For example, for a long running
+process you might want to terminate it if the disk is getting (almost) full.
+But checking how much space is free can be a lengthy operation, so you might
+want to do it only sparingly::
+
+ def out_of_diskspace():
+ left = handytools.check_disk_free()
+ total = handytools.check_disk_total()
+ return (left / total) < 0.03
+
+ # The following drainer executes the cruncher and checks whether the disk
+ # is (almost) full every 5 seconds. It aborts if free disk space runs
+ # under 3%.
+ my_drainer = Drainer(['/bin/crunch', 'inputfile', 'outputfile'],
+ read_event_cb=ignore_event,
+ should_abort=out_of_diskspace,
+ check_interval=5.0)
+ exitcode = my_drainer.start()
+
+The example is pretty self-explaining. You can check the exitcode to see the
+result of the process.
+
+
+More examples
+=============
+See the `examples` directory for more detailed examples.
View
@@ -0,0 +1,16 @@
+"""
+drainers -- Event-based process monitoring.
+"""
+
+VERSION = (0, 0, 1)
+
+__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
+__author__ = "Vincent Driessen"
+__contact__ = "vincent@datafox.nl"
+__homepage__ = "http://github.com/nvie/python-drainers/"
+#__docformat__ = "restructuredtext"
+
+from drainer import Drainer
+from drainer import STDIN, STDOUT, STDERR
+
+__all__ = [ 'Drainer', 'STDIN', 'STDOUT', 'STDERR' ]
View
@@ -0,0 +1,157 @@
+import time
+import threading
+import subprocess
+import copy
+
+STDIN = 0
+STDOUT = 1
+STDERR = 2
+
+class Drainer(object):
+ def __init__(self, args, read_event_cb=None, should_abort_cb=None,
+ check_interval=2.0, force_kill_timeout=None, **pargs):
+ '''Creates a new Drainer.
+
+ A process monitor initializes the given command (from `cmd`). To
+ start draining the pipes and generating the read events, invoke
+ `start()`.
+
+ Popen arguments:
+ args -- Args that can be fed to `subprocess.Popen` (see [1]).
+ **pargs -- Keyword arguments that can be fed to `subprocess.Popen`
+ (see [1]). Beware that `ProcessMonitor` always sets the
+ `stdout` and `stderr` params to `subprocess.PIPE`.
+
+ Keyword arguments:
+ read_event_cb --
+ Callback function that is invoked each time a line of
+ output is read from the subprocess. This function should
+ take two parameters `line` and `is_err`. Its return
+ value is ignored. `is_err` is `True` when the line
+ originated from the stderr stream.
+ should_abort_cb --
+ Callback function that is invoked periodically (see
+ argument `check_interval`) to check whether the process
+ should terminate. Have the callback function return
+ `True` when the process should be aborted, `False`
+ otherwise.
+ check_interval --
+ Time interval in seconds that the `_cbshould_abort_cb`
+ callback function is invoked. Choose lower for more
+ responsiveness, choose higher for better performance.
+ (Default: 2.0)
+ force_kill_timeout --
+ When the streams are read empty, the subprocess will be
+ killed. `Drainer` will try to `terminate()` the process
+ gracefully, but when the process isn't terminated after
+ `force_kill_timeout`, the process will be force-killed.
+ When not set, the process will never be force-killed.
+
+ Some notes:
+
+ * The `read_event_cb` callback function is surrounded by a thread
+ lock. This means that to simultaneous reads (from both stdout and
+ stderr) are delivered sequentially. Therefore, it is unnecessary
+ to provide your own thread locking in your callback function.
+
+ [1] http://docs.python.org/library/subprocess.html#subprocess.Popen
+
+ '''
+ self.check_interval = check_interval
+ self.force_kill_timeout = force_kill_timeout
+ self.should_abort_cb = should_abort_cb
+ self.read_event_cb = read_event_cb
+
+ self._lock = threading.RLock()
+ self._cancel_event = threading.Event()
+ self._popen_args = copy.copy(args)
+ self._popen_kwargs = copy.copy(pargs)
+
+ def _read_stream(self, stream, is_err):
+ # Process a line at a time, checking whether the _cancel_event is fired
+ while not self._cancel_event.is_set():
+ ln = stream.readline()
+ if ln != '':
+ self._lock.acquire()
+ try:
+ self.read_event_cb(ln, is_err)
+ finally:
+ self._lock.release()
+ else:
+ # Terminate normally
+ break
+
+ def _read_stdout(self, stream):
+ self._read_stream(stream, False)
+
+ def _read_stderr(self, stream):
+ self._read_stream(stream, True)
+
+ def _poll_should_abort(self):
+ while self.process.poll() is None: # as long as the process is alive
+ if self.should_abort_cb(): # callback should return True if cancelled
+ self._cancel_event.set()
+ self.process.terminate()
+ break
+ time.sleep(self.check_interval)
+
+ def _force_kill(self):
+ if not self.process.poll() is None:
+ self.process.kill()
+
+ def start(self):
+ '''Calling this will create the `subprocess.Popen` instance and drain
+ the stdout and stderr pipes of the process. Each time a line is
+ read from those pipes, the monitor calls back into the
+ `read_event_cb` callback function.
+
+ When a `should_abort_cb` callback function is set, a poller thread
+ will be set up to periodically check whether the process should
+ terminate.
+
+ Note that `start()` blocks until the process is finished.
+
+ Returns the exitcode of the process.
+
+ '''
+ self._popen_kwargs['stdout'] = subprocess.PIPE
+ self._popen_kwargs['stderr'] = subprocess.PIPE
+ self.process = subprocess.Popen(self._popen_args,
+ **self._popen_kwargs)
+
+ rerr = threading.Thread(target=self._read_stderr, args=(self.process.stderr,))
+ rerr.daemon = True
+ rout = threading.Thread(target=self._read_stdout, args=(self.process.stdout,))
+ rout.daemon = True
+
+ if not self.should_abort_cb is None:
+ poller = threading.Thread(target=self._poll_should_abort)
+ poller.daemon = True
+ poller.start()
+
+ # Start draining the pipes
+ rerr.start()
+ rout.start()
+
+ # Wait for stream readers to finish, either normally, or aborted
+ rout.join()
+ rerr.join()
+
+ # Finally, wait for the poller thread to finish, too (worst
+ # case, this can take self.check_interval seconds)
+ if not self.should_abort_cb is None:
+ poller.join()
+
+ if self.force_kill_timeout is not None:
+ kill_timer = threading.Timer(self.force_kill_timeout, self._force_kill)
+ kill_timer.start()
+
+ # in case wait() finishes before force_kill_timeout elapsed, we
+ # may simply cancel the force_kill timer
+ exitcode = self.process.wait()
+
+ if self.force_kill_timeout is not None:
+ kill_timer.cancel()
+
+ return exitcode
+
@@ -0,0 +1,23 @@
+"""
+Annotate each line coming in with the current date.
+
+Example runs:
+python simple_annotate_date.py find .
+python simple_annotate_date.py cat (and start typing in stdin)
+python simple_annotate_date.py grep 'foobar' *
+"""
+import sys
+import datetime
+import drainers
+
+def annotate(line, is_err):
+ if is_err:
+ stream = sys.stderr
+ else:
+ stream = sys.stdout
+ stream.write('[%s] ' % datetime.datetime.now())
+ stream.write('ERROR: ')
+ stream.write(line)
+
+d = drainers.Drainer(sys.argv[1:], read_event_cb=annotate)
+d.start()
Oops, something went wrong.

0 comments on commit 66057f5

Please sign in to comment.