Skip to content

Commit

Permalink
Support for start_event & finish_event callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
xolox committed Jun 3, 2016
1 parent b484912 commit 62df2d2
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 14 deletions.
56 changes: 52 additions & 4 deletions executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: June 1, 2016
# Last Change: June 3, 2016
# URL: https://executor.readthedocs.org

"""
Expand Down Expand Up @@ -68,7 +68,7 @@
unicode = str

# Semi-standard module versioning.
__version__ = '10.0'
__version__ = '10.1'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -560,13 +560,27 @@ def fakeroot(self):
If this option is :data:`True` (not the default) and the current
process doesn't have `superuser privileges`_ the external command is
run with ``fakeroot``. If the ``fakeroot`` program is not installed a
fall back to ``sudo`` is performed.
run with ``fakeroot``. If the ``fakeroot`` program is not installed the
external command will fail.
.. _superuser privileges: http://en.wikipedia.org/wiki/Superuser#Unix_and_Unix-like
"""
return False

@mutable_property
def finish_event(self):
"""
Optional callback that's called just after the command finishes.
The :attr:`start_event` and :attr:`finish_event` properties were
created for use in command pools, for example to report to the operator
when specific commands are started and when they finish. The invocation
of the :attr:`start_event` and :attr:`finish_event` callbacks is
performed inside the :class:`ExternalCommand` class though, so you're
free to repurpose these callbacks outside the context of command
pools.
"""

@property
def have_superuser_privileges(self):
"""
Expand Down Expand Up @@ -727,6 +741,20 @@ def silent(self):
"""
return False

@mutable_property
def start_event(self):
"""
Optional callback that's called just before the command is started.
The :attr:`start_event` and :attr:`finish_event` properties were
created for use in command pools, for example to report to the operator
when specific commands are started and when they finish. The invocation
of the :attr:`start_event` and :attr:`finish_event` callbacks is
performed inside the :class:`ExternalCommand` class though, so you're
free to repurpose these callbacks outside the context of command
pools.
"""

@property
def stderr(self):
"""
Expand Down Expand Up @@ -1023,6 +1051,8 @@ def start(self):
# Lightweight reset of internal state.
for name in 'error_type', 'pid', 'returncode', 'subprocess':
delattr(self, name)
# Invoke the start event callback?
self.invoke_event_callback('start_event')
# Remember that we called subprocess.Popen() regardless of whether it
# is about to raise an exception or not.
self.was_started = True
Expand Down Expand Up @@ -1051,6 +1081,11 @@ def start(self):
self.stdout_stream.finalize(stdout)
self.stderr_stream.finalize(stderr)
self.wait()
finally:
# Invoke the finish event callback? (only applies when the command
# is synchronous or the subprocess module raised an exception)
if not self.is_running:
self.invoke_event_callback('finish_event')

def wait(self, check=None, **kw):
"""
Expand Down Expand Up @@ -1170,6 +1205,8 @@ def cleanup(self):
# so we don't lose track of it once we allow the subprocess.Popen
# object to be garbage collected.
self.returncode = self.subprocess.wait()
# Invoke the finish event callback?
self.invoke_event_callback('finish_event')
else:
# Override the computed value of the `returncode' property
# because computing it again after we destroy our reference
Expand Down Expand Up @@ -1210,6 +1247,17 @@ def check_errors(self, check=None):
if (check if check is not None else self.check) and self.error_type is not None:
raise self.error_type(self)

def invoke_event_callback(self, name):
"""
Invoke one of the event callbacks.
:param name: The name of the callback (a string).
"""
callback = getattr(self, name)
if callback is not None:
logger.debug("Invoking %s callback ..", name)
callback(self)

def __enter__(self):
"""
Start the external command if it hasn't already been started.
Expand Down
34 changes: 25 additions & 9 deletions executor/concurrent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: May 29, 2016
# Last Change: June 3, 2016
# URL: https://executor.readthedocs.org

"""
Expand Down Expand Up @@ -62,6 +62,11 @@ def concurrency(self):
This is a positive integer number. It defaults to the return value of
:func:`multiprocessing.cpu_count()` (which may not make much sense if
your commands are I/O bound instead of CPU bound).
Setting :attr:`concurrency` to one is a supported use case intended to
make it easier for users of the :mod:`executor.concurrent` module to
reuse the code they've built on top of command pools (if only for
debugging, but there are lots of use cases :-).
"""
return multiprocessing.cpu_count()

Expand Down Expand Up @@ -139,9 +144,9 @@ def results(self):
A mapping of identifiers to external command objects.
This is a dictionary with external command identifiers as keys (refer
to :func:`add()`) and :class:`~executor.ExternalCommand` objects as
values. The :class:`~executor.ExternalCommand` objects provide access
to the return codes and/or output of the finished commands.
to :func:`add()`) and :class:`.ExternalCommand` objects as values. The
:class:`.ExternalCommand` objects provide access to the return codes
and/or output of the finished commands.
"""
return dict(self.commands)

Expand Down Expand Up @@ -171,7 +176,7 @@ def add(self, command, identifier=None, log_file=None):
Add an external command to the pool of commands.
:param command: The external command to add to the pool (an
:class:`~executor.ExternalCommand` object).
:class:`.ExternalCommand` object).
:param identifier: A unique identifier for the external command (any
value). When this parameter is not provided the
identifier is set to the number of commands in the
Expand All @@ -183,8 +188,7 @@ def add(self, command, identifier=None, log_file=None):
The :attr:`~executor.ExternalCommand.async` property of command objects
is automatically set to :data:`True` when they're added to a
:class:`CommandPool`. If you really want the commands to execute with a
concurrency of one (1) then you can set :attr:`concurrency` to one
(I'm not sure why you'd want to do that though :-).
concurrency of one then you can set :attr:`concurrency` to one.
"""
command.async = True
if command.logger == parent_logger:
Expand Down Expand Up @@ -218,6 +222,10 @@ def run(self):
If you're writing code where you want to own the main loop then
consider calling :func:`spawn()` and :func:`collect()` directly instead
of using :func:`run()`.
When :attr:`concurrency` is set to one, specific care is taken to make
sure that the callbacks configured by the :attr:`.start_event` and
:attr:`.finish_event` are called in the expected (intuitive) order.
"""
# Start spawning processes to execute the commands.
timer = Timer()
Expand All @@ -226,9 +234,17 @@ def run(self):
self.concurrency)
try:
with Spinner(interactive=self.spinner, timer=timer) as spinner:
num_started = 0
num_collected = 0
while not self.is_finished:
self.spawn()
self.collect()
# When concurrency is set to one (I know, initially it
# sounds like a silly use case, bear with me) I want the
# start_event and finish_event callbacks of external
# commands to fire in the right order. The following
# conditional is intended to accomplish this goal.
if self.concurrency > (num_started - num_collected):
num_started += self.spawn()
num_collected += self.collect()
spinner.step(label=format(
"Waiting for %i/%i %s",
self.num_commands - self.num_finished, self.num_commands,
Expand Down
20 changes: 19 additions & 1 deletion executor/tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Automated tests for the `executor' module.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: June 1, 2016
# Last Change: June 3, 2016
# URL: https://executor.readthedocs.org

"""
Expand Down Expand Up @@ -482,6 +482,24 @@ def coerce_timestamp(self, cmd):
"""Callback for :func:`test_callback_evaluation()`."""
return datetime.datetime.fromtimestamp(float(cmd.output))

def test_event_callbacks(self):
"""Make sure the ``start_event`` and ``finish_event`` callbacks are actually invoked."""
for async in True, False:
results = []
cmd = ExternalCommand(
'sleep', '0.1',
async=async,
start_event=lambda cmd: results.append(('started', time.time())),
finish_event=lambda cmd: results.append(('finished', time.time())),
)
cmd.start()
mapping = dict(results)
assert 'started' in mapping
cmd.wait()
mapping = dict(results)
assert 'finished' in mapping
assert mapping['finished'] > mapping['started']

def test_repr(self):
"""Make sure that repr() on external commands gives sane output."""
cmd = ExternalCommand('echo 42',
Expand Down

0 comments on commit 62df2d2

Please sign in to comment.