Skip to content

Commit

Permalink
Make CommandPool.run() terminate commands before aborting
Browse files Browse the repository at this point in the history
This bumps the major version number because the change isn't
backwards compatible and version numbers are cheap :-)
  • Loading branch information
xolox committed Oct 5, 2015
1 parent c3c5d83 commit f50fb66
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 23 deletions.
2 changes: 1 addition & 1 deletion executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
unicode = str

# Semi-standard module versioning.
__version__ = '5.1'
__version__ = '6.0'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down
50 changes: 30 additions & 20 deletions executor/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os

# External dependencies.
from humanfriendly import pluralize, Spinner, Timer
from humanfriendly import format, pluralize, Spinner, Timer
from property_manager import mutable_property

# Initialize a logger.
Expand Down Expand Up @@ -153,24 +153,34 @@ def run(self):
This method calls :func:`spawn()` and :func:`collect()` in a loop until
all commands registered using :func:`add()` have run and finished. If
you're writing code where you want to own the main loop then consider
calling :func:`spawn()` and :func:`collect()` directly instead of using
:func:`run()`.
:func:`collect()` raises an exception any running commands are
terminated before the exception is propagated to the caller.
If you're writing code where you want to own the main loop then
consider calling :func:`spawn()` and :func:`collect()` directly instead
of using :func:`run()`.
"""
# Start spawning processes to execute the commands.
timer = Timer()
logger.debug("Preparing to run %s with a concurrency of %i ..",
pluralize(self.num_commands, "command"),
self.concurrency)
with Spinner(timer=timer) as spinner:
while not self.is_finished:
self.spawn()
self.collect()
label_format = "Waiting for %i/%i %s"
waiting_for = self.num_commands - self.num_finished
commands_pluralized = "command" if self.num_commands == 1 else "commands"
spinner.step(label=label_format % (waiting_for, self.num_commands, commands_pluralized))
spinner.sleep()
try:
with Spinner(timer=timer) as spinner:
while not self.is_finished:
self.spawn()
self.collect()
spinner.step(label=format(
"Waiting for %i/%i %s",
self.num_commands - self.num_finished, self.num_commands,
"command" if self.num_commands == 1 else "commands",
))
spinner.sleep()
except Exception:
# Terminate commands that are still running.
self.terminate()
# Re-raise the exception to the caller.
raise
# Collect the output and return code of any commands not yet collected.
self.collect()
logger.debug("Finished running %s in %s.",
Expand All @@ -193,7 +203,7 @@ def spawn(self):
cmd.start()
num_started += 1
if num_started > 0:
logger.debug("Spawned %i external commands ..", num_started)
logger.debug("Spawned %s ..", pluralize(num_started, "external command"))
return num_started

def collect(self):
Expand All @@ -207,10 +217,10 @@ def collect(self):
pool have :attr:`~.ExternalCommand.check` set to
:data:`True`.
.. warning:: If an exception is raised then commands that are still
running will not be aborted! If this concerns you then
consider calling :func:`terminate()` from a
:keyword:`finally` block.
.. warning:: If an exception is raised, commands that are still running
will not be terminated! If this concerns you then consider
calling :func:`terminate()` from a :keyword:`finally`
block (this is what :func:`run()` does).
"""
num_collected = 0
for identifier, command in self.commands:
Expand All @@ -223,7 +233,7 @@ def collect(self):
command.wait()
num_collected += 1
if num_collected > 0:
logger.debug("Collected %i external commands ..", num_collected)
logger.debug("Collected %s ..", pluralize(num_collected, "external command"))
return num_collected

def terminate(self):
Expand All @@ -242,5 +252,5 @@ def terminate(self):
if command.terminate():
num_terminated += 1
if num_terminated > 0:
logger.debug("Terminated %i external commands ..", num_terminated)
logger.warning("Terminated %s ..", pluralize(num_terminated, "external command"))
return num_terminated
2 changes: 0 additions & 2 deletions executor/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,6 @@ def test_command_pool_termination(self):
assert False, "Assumed CommandPool.run() to raise ExternalCommandFailed!"
except ExternalCommandFailed:
pass
finally:
pool.terminate()
# Make sure the sleep command was terminated.
assert sleep_cmd.is_terminated

Expand Down

0 comments on commit f50fb66

Please sign in to comment.