Skip to content

Commit

Permalink
Merge branch 'release/0.0.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
nvie committed May 25, 2010
2 parents 66057f5 + 8529afc commit e37ce28
Show file tree
Hide file tree
Showing 9 changed files with 552 additions and 10 deletions.
7 changes: 2 additions & 5 deletions README.rst
Expand Up @@ -66,11 +66,8 @@ the value for the `read_event_cb` parameter::
my_drainer.start()

The granularity currently is a single line. If you want to read predefined
chunks of data, please fork this repo and implement a `Drainer` subclass
yourself. If you want a callback that isn't invoked after each line read, but
after an arbitrary time or amount of lines, you have to implement this
yourself. (It shouldn't be too hard, though. See the `examples` directory
for inspiration.)
chunks (lines) of data, use `BufferedDrainer` instead. See
examples/buffer_results.py for an example.

Aborting processes
------------------
Expand Down
8 changes: 4 additions & 4 deletions drainers/__init__.py
Expand Up @@ -2,15 +2,15 @@
drainers -- Event-based process monitoring.
"""

VERSION = (0, 0, 1)
VERSION = (0, 0, 2)

__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
__author__ = "Vincent Driessen"
__contact__ = "vincent@datafox.nl"
__homepage__ = "http://github.com/nvie/python-drainers/"
#__docformat__ = "restructuredtext"

from drainer import Drainer
from drainer import STDIN, STDOUT, STDERR
from base import Drainer
from buffered import BufferedDrainer

__all__ = [ 'Drainer', 'STDIN', 'STDOUT', 'STDERR' ]
__all__ = [ 'Drainer', 'BufferedDrainer' ]
6 changes: 5 additions & 1 deletion drainers/drainer.py → drainers/base.py
Expand Up @@ -7,7 +7,8 @@
STDOUT = 1
STDERR = 2

class Drainer(object):
class _BaseDrainer(object):

def __init__(self, args, read_event_cb=None, should_abort_cb=None,
check_interval=2.0, force_kill_timeout=None, **pargs):
'''Creates a new Drainer.
Expand Down Expand Up @@ -155,3 +156,6 @@ def start(self):

return exitcode

class Drainer(_BaseDrainer):
pass

132 changes: 132 additions & 0 deletions drainers/buffered.py
@@ -0,0 +1,132 @@
import threading
from base import _BaseDrainer

class BufferedDrainer(_BaseDrainer):

def __init__(self, args, read_event_cb=None, should_abort_cb=None,
check_interval=2.0, force_kill_timeout=None,
chunk_size=0,
flush_timeout=None,
**pargs):
'''Creates a new BufferedDrainer.
For initialization parameters, see the documentation of
`_BaseDrainer`.
`BufferedDrainer` adds options to limit the number of times
the `read_event_cb` callback function is invoked by buffering
either a fixed number of lines and/or a fixed timeout value.
Note:
This implementation expects `read_event_cb` to take a single
**list** first parameter, instead of a string and a boolean
parameter. The list contains `(line, is_err)` tuples for
each line read. For example:
def my_buffered_callback(lines):
for line, is_err in lines:
# Use familiar `line` and `is_err` variables here
...
`BufferedDrainer` adds the following arguments to the
constructor:
chunk_size -- The size of the buffer (in lines). Chunks of
`chunk_size` lines will be passed to
`read_event_cb` at once. This reduces the
amount of calls to `read_event_cb`.
(Default: disabled, 0)
flush_timeout --
A timeout value (in seconds, floating point) to
time-limit buffering. When `flush_timeout`
elapses, the buffer is flushed by calling
`read_event_cb` with all the lines that are
read so far.
(Default: None)
If both `chunk_size` and `flush_timeout` are specified, the
buffer is flushed when either of both conditions is matched.
When such a flush occurs, the flush timer is reset.
'''
super(BufferedDrainer, self).__init__(
args,
read_event_cb=self._wrapper,
should_abort_cb=should_abort_cb,
check_interval=check_interval,
force_kill_timeout=force_kill_timeout,
**pargs)
self._orig_read_event_cb = read_event_cb
self.chunk_size = chunk_size
self.flush_timeout = flush_timeout
self._buffer = []
self._timer = None

@property
def buffer(self):
return self._buffer

def _should_flush(self):
# If neither chunk_size or flush_timeout is set, behave
# like a regular Drainer
if self.chunk_size == 0 and self.flush_timeout is None:
return True

if self.chunk_size <= 0:
# Use timer-based flushing instead
return False
else:
return len(self.buffer) >= self.chunk_size

def _wrapper(self, line, is_err):
tuple = (line, is_err)
self.buffer.append(tuple)

if self._should_flush():
self._flush()

def _empty_buffer(self):
'''Empty the buffer and return a copy of it.'''
bufcopy = []
while len(self.buffer) > 0:
bufcopy.append(self.buffer.pop(0))
return bufcopy

def _flush(self):
if len(self.buffer) > 0:
bufcopy = self._empty_buffer()
self._orig_read_event_cb(bufcopy)

def _create_timer(self):
if not self.flush_timeout is None:
self._timer = threading.Timer(self.flush_timeout, self._flush_and_reset)
self._timer.daemon = True
self._timer.start()

def _destroy_timer(self):
if not self._timer is None:
self._timer.cancel()

def _reset_timer(self):
if not self._timer is None:
self._destroy_timer()
self._create_timer()

def _flush_and_reset(self):
self._flush()
self._create_timer()

def start(self):
# 1. Start the timer upfront
# 2. Invoke super(BufferedDrainer, self).start()
# 3. Stop/kill timer
# 4. Flush remaining buffer
# 4. Return return value from step 2

self._create_timer()
result = super(BufferedDrainer, self).start()
self._destroy_timer()
self._flush()
return result

35 changes: 35 additions & 0 deletions examples/buffer_results.py
@@ -0,0 +1,35 @@
"""
Process 20 lines of output at a time.
Example runs:
python buffer_results.py
"""
import sys
import time
import drainers

# fake this
def setup_cruncher():
time.sleep(1)

def do_something_expensive(file):
time.sleep(0.005)

def destroy_cruncher():
time.sleep(0.8)

files = []

def crunch(lines):
print 'Setting up cruncher...'
setup_cruncher()
for line, is_err in lines:
if is_err: # ignore errors
continue
print '- Crunching file %s...' % line.strip()
do_something_expensive(line)
print 'Releasing cruncher...'
destroy_cruncher()

d = drainers.BufferedDrainer(['find', '.', '-type', 'f'], read_event_cb=crunch, chunk_size=20)
d.start()
2 changes: 2 additions & 0 deletions setup.cfg
@@ -0,0 +1,2 @@
[nosetests]
where = tests
9 changes: 9 additions & 0 deletions tests/fixtures/counter.sh
@@ -0,0 +1,9 @@
#!/bin/sh
i=0
while [ $i -lt 30 ]; do
if [ $(($i % 10)) -eq 0 ]; then
sleep 1
fi
echo $(($i+1))
i=$(($i+1))
done

0 comments on commit e37ce28

Please sign in to comment.