Permalink
Browse files

Initial bulk of the drainers package.

  • Loading branch information...
nvie committed May 18, 2010
1 parent 5c981f2 commit 39938c45803eed29a71f9f4a75752e9fdf82a197
Showing with 367 additions and 0 deletions.
  1. +10 −0 .gitignore
  2. +19 −0 INSTALL
  3. +27 −0 LICENSE
  4. +6 −0 MANIFEST.in
  5. +21 −0 README.rst
  6. +16 −0 drainers/__init__.py
  7. +155 −0 drainers/drainer.py
  8. +67 −0 examples/example.py
  9. +46 −0 setup.py
View
@@ -0,0 +1,10 @@
+# iJunk
+.DS_Store
+
+# PyJunk
+*.pyc
+
+# Egg junk
+*.egg-info
+/build
+/dist
View
19 INSTALL
@@ -0,0 +1,19 @@
+Installing drainers
+===================
+
+You can install drainers either via the Python Package Index
+(PyPI) or from source.
+
+To install using pip::
+
+ $ pip install drainers
+
+To install using easy_install::
+
+ $ easy_install drainers
+
+If you have downloaded a source tarball you can install it by doing
+the following::
+
+ $ python setup.py build
+ $ sudo python setup.py install
View
27 LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2009, Vincent Driessen
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+Neither the name of Vincent Driessen nor the names of its contributors may
+be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGE.
View
@@ -0,0 +1,6 @@
+include README.rst
+include MANIFEST.in
+include LICENSE
+include INSTALL
+recursive-include drainers *.py
+prune drainers/*.pyc
View
@@ -0,0 +1,21 @@
+===================================================
+ drainers - Event-based draining of process output
+===================================================
+
+drainers is an abstraction around subprocess.Popen to read and control process
+output event-wise. It also allows you to abort running processes either
+gracefully or forcefully without having to directly interact with the processes
+or threads themself.
+
+Overview
+========
+
+...
+
+
+Examples
+========
+
+See the `examples
+<http://github.com/nvie/drainers/tree/develop/examples/>`_ directory for
+more detailed examples.
View
@@ -0,0 +1,16 @@
+"""
+drainers -- Event-based process monitoring.
+"""
+
+VERSION = (0, 0, 1)
+
+__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
+__author__ = "Vincent Driessen"
+__contact__ = "vincent@datafox.nl"
+__homepage__ = "http://github.com/nvie/drainers/"
+#__docformat__ = "restructuredtext"
+
+from drainer import Drainer
+from drainer import STDIN, STDOUT, STDERR
+
+__all__ = [ 'Drainer', 'STDIN', 'STDOUT', 'STDERR' ]
View
@@ -0,0 +1,155 @@
+import time
+import datetime
+import threading
+import subprocess
+import copy
+
+STDIN = 0
+STDOUT = 1
+STDERR = 2
+
+class Drainer(object):
+ def __init__(self, args, read_event_cb=None, should_abort_cb=None,
+ check_interval=2.0, force_kill_timeout=None, **kwargs):
+ '''Creates a new Drainer.
+
+ A process monitor initializes the given command (from `cmd`). To
+ start draining the pipes and generating the read events, invoke
+ `start()`.
+
+ Popen keyword arguments:
+ args -- Args that can be fed to `subprocess.Popen` (see [1]).
+ **pargs -- Keyword arguments that can be fed to
+ `subprocess.Popen` (see [1]). Beware that
+ `ProcessMonitor` always sets the `stdout` and
+ `stderr` params to `subprocess.PIPE`.
+
+ Keyword arguments:
+ read_event_cb --
+ Callback function that is invoked each time a line of
+ output is read from the subprocess. This function should
+ take two parameters `line` and `is_err`. Its return
+ value is ignored. `is_err` is `True` when the line
+ originated from the stderr stream.
+ should_abort_cb --
+ Callback function that is invoked periodically (see
+ argument `check_interval`) to check whether the process
+ should terminate. Have the callback function return
+ `True` when the process should be aborted, `False`
+ otherwise.
+ check_interval --
+ Time interval in seconds that the `_cbshould_abort_cb`
+ callback function is invoked. Choose lower for more
+ responsiveness, choose higher for better performance.
+ (Default: 2.0)
+ force_kill_timeout --
+ When the streams are read empty, the subprocess will be
+ killed. `Drainer` will try to `terminate()` the process
+ gracefully, but when the process isn't terminated after
+ `force_kill_timeout`, the process will be force-killed.
+ When not set, the process will never be force-killed.
+
+ Some notes:
+
+ * The `read_event_cb` callback function is surrounded by a thread
+ lock. This means that to simultaneous reads (from both stdout and
+ stderr) are delivered sequentially. Therefore, it is unnecessary
+ to provide your own thread locking in your callback function.
+
+ [1] http://docs.python.org/library/subprocess.html#subprocess.Popen
+
+ '''
+ self.check_interval = check_interval
+ self.force_kill_timeout = force_kill_timeout
+ self.should_abort_cb = should_abort_cb # callbacks
+ self.read_event_cb = read_event_cb
+
+ self._lock = threading.RLock()
+ self._cancel_event = threading.Event() # status bits
+ self._popen_args = copy.copy(args)
+ self._popen_kwargs = copy.copy(kwargs)
+
+ def _read_stream(self, stream, is_err):
+ # Process a line at a time, checking whether the _cancel_event is fired
+ while not self._cancel_event.is_set():
+ ln = stream.readline()
+ if ln != '':
+ self._lock.acquire()
+ try:
+ self.read_event_cb(ln, is_err)
+ finally:
+ self._lock.release()
+ else:
+ # Terminate normally
+ break
+
+ def _read_stdout(self, stream):
+ self._read_stream(stream, False)
+
+ def _read_stderr(self, stream):
+ self._read_stream(stream, True)
+
+ def _poll_should_abort(self):
+ while self.process.poll() is None: # as long as the process is alive
+ if self.should_abort_cb(): # callback should return True if cancelled
+ self._cancel_event.set()
+ self.process.terminate()
+ break
+ time.sleep(self.check_interval)
+
+ def _force_kill(self):
+ if not self.process.poll() is None:
+ self.process.kill()
+
+ def start(self):
+ '''Calling this will create the `subprocess.Popen` instance and drain
+ the stdout and stderr pipes of the process. Each time a line is
+ read from those pipes, the monitor calls back into the
+ `read_event_cb` callback function.
+
+ When a `should_abort_cb` callback function is set, a poller thread
+ will be set up to periodically check whether the process should
+ terminate.
+
+ Note that `start()` blocks until the process is finished.
+
+ '''
+ self._popen_kwargs['stdout'] = subprocess.PIPE
+ self._popen_kwargs['stderr'] = subprocess.PIPE
+ self.process = subprocess.Popen(self._popen_args,
+ **self._popen_kwargs)
+
+ rerr = threading.Thread(target=self._read_stderr, args=(self.process.stderr,))
+ rerr.daemon = True
+ rout = threading.Thread(target=self._read_stdout, args=(self.process.stdout,))
+ rout.daemon = True
+
+ if not self.should_abort_cb is None:
+ poller = threading.Thread(target=self._poll_should_abort)
+ poller.daemon = True
+ poller.start()
+
+ # Start draining the pipes
+ rerr.start()
+ rout.start()
+
+ # Wait for stream readers to finish, either normally, or aborted
+ rout.join()
+ rerr.join()
+
+ # Finally, wait for the poller thread to finish, too (worst
+ # case, this can take self.check_interval seconds)
+ if not self.should_abort_cb is None:
+ poller.join()
+
+ if self.force_kill_timeout is not None:
+ kill_timer = threading.Timer(self.force_kill_timeout, self._force_kill)
+ kill_timer.start()
+
+ # in case wait() finishes before force_kill_timeout elapsed, we
+ # may simply cancel the force_kill timer
+ self.process.wait()
+
+ if self.force_kill_timeout is not None:
+ kill_timer.cancel()
+
View
@@ -0,0 +1,67 @@
+import sys
+import threading
+from subprocess import Popen, PIPE
+import drainers
+
+vars = {}
+
+def abort():
+ vars['abort'] = True
+
+def should_abort():
+ abort = vars['abort']
+ sys.stdout.write('\npoll!\n')
+ sys.stdout.flush()
+ return abort
+
+def handle_line(line, is_err):
+ if is_err:
+ vars['err'] += 1
+ else:
+ vars['out'] += 1
+ show_totals()
+
+def show_totals():
+ sys.stdout.write("\rout = %d, err = %d" % (vars['out'], vars['err']))
+ sys.stdout.flush()
+
+vars['err'] = 0
+vars['out'] = 0
+
+test_cmds = [
+ ['find', '/'], # Takes long!
+ ['cat', '/dev/null'], # Super fast
+ ['ls', '-la'], # Regular fast
+ ['yes'], # Runs endlessly
+]
+
+t = None
+
+for cmd in test_cmds:
+ vars['abort'] = False
+ vars['err'] = 0
+ vars['out'] = 0
+
+ # Set off timer
+ t = threading.Timer(5.0, abort)
+ t.start()
+
+ print
+ print '==> Running %s' % ' '.join(cmd)
+ #p = Popen(cmd, bufsize=0, shell=False, stdout=PIPE, stderr=PIPE)
+ d = drainers.Drainer(# Popen args first
+ cmd, bufsize=0, shell=False,
+
+ # Then, the drainer args
+ should_abort_cb=should_abort,
+ read_event_cb=handle_line,
+ check_interval=1.0)
+ d.start()
+
+ print
+ print 'Total line count:'
+ show_totals()
+ print
+
+print
+print 'Done.'
View
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+import os
+import codecs
+try:
+ from setuptools import setup
+except ImportError:
+ from ez_setup import use_setuptools
+ use_setuptools()
+ from setuptools import setup
+
+import drainers as distmeta
+
+if os.path.exists("README.rst"):
+ long_description = codecs.open('README.rst', "r", "utf-8").read()
+else:
+ long_description = "See http://github.com/nvie/drainers/tree/master"
+
+setup(
+ name="drainers",
+ version=distmeta.__version__,
+ description=distmeta.__doc__,
+ author=distmeta.__author__,
+ author_email=distmeta.__contact__,
+ url=distmeta.__homepage__,
+ platforms=["any"],
+ license="BSD",
+ packages=["drainers"],
+ zip_safe=False,
+ classifiers=[
+ #"Development Status :: 1 - Planning",
+ "Development Status :: 2 - Pre-Alpha",
+ #"Development Status :: 3 - Alpha",
+ #"Development Status :: 4 - Beta",
+ #"Development Status :: 5 - Production/Stable",
+ #"Development Status :: 6 - Mature",
+ #"Development Status :: 7 - Inactive",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2.6",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: BSD License",
+ "Operating System :: POSIX",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ],
+ long_description=long_description,
+)

0 comments on commit 39938c4

Please sign in to comment.