Skip to content

Commit

Permalink
Bug fix: Make CommandPool.collect() resumable after failing commands
Browse files Browse the repository at this point in the history
  • Loading branch information
xolox committed Oct 5, 2015
1 parent f43d63e commit 9db7a06
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
2 changes: 1 addition & 1 deletion executor/__init__.py
Expand Up @@ -61,7 +61,7 @@
unicode = str

# Semi-standard module versioning.
__version__ = '5.0'
__version__ = '5.0.1'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down
28 changes: 20 additions & 8 deletions executor/concurrent.py
Expand Up @@ -148,7 +148,8 @@ def run(self):
"""
Keep spawning commands and collecting results until all commands have run.
:return: The value of :attr:`results`.
:returns: The value of :attr:`results`.
:raises: Any exceptions raised by :func:`collect()`.
This method calls :func:`spawn()` and :func:`collect()` in a loop until
all commands registered using :func:`add()` have run and finished. If
Expand Down Expand Up @@ -199,15 +200,26 @@ def collect(self):
"""
Collect the exit codes and output of finished commands.
:returns: The number of external commands that was collected by this
:returns: The number of external commands that were collected by this
invocation of :func:`collect()` (an integer).
:raises: :exc:`.ExternalCommandFailed`, :exc:`.RemoteCommandFailed` and
:exc:`.RemoteConnectFailed` can be raised if commands in the
pool have :attr:`~.ExternalCommand.check` set to
:data:`True`.
.. warning:: If an exception is raised then commands that are still
running will not be aborted!
"""
num_finished = 0
num_collected = 0
for identifier, command in self.commands:
if identifier not in self.collected and command.is_finished:
command.wait()
num_finished += 1
# Update our bookkeeping before wait() gets a chance to
# raise an exception in case an external command failed.
self.collected.add(identifier)
if num_finished > 0:
logger.debug("Collected %i external commands ..", num_finished)
return num_finished
# Load the command output and cleanup temporary resources.
# If an exception is raised we propagate it to the caller.
command.wait()
num_collected += 1
if num_collected > 0:
logger.debug("Collected %i external commands ..", num_collected)
return num_collected
33 changes: 31 additions & 2 deletions executor/tests.py
@@ -1,7 +1,7 @@
# Automated tests for the `executor' module.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: October 4, 2015
# Last Change: October 5, 2015
# URL: https://executor.readthedocs.org

"""Automated tests for the `executor` package."""
Expand Down Expand Up @@ -328,6 +328,26 @@ def test_command_pool(self):
assert all(cmd.returncode == 0 for cmd in results.values())
assert timer.elapsed_time < (num_commands * sleep_time)

def test_command_pool_resumable(self):
"""Make sure command pools can be resumed after raising exceptions."""
pool = CommandPool()
# Prepare two commands that will both raise an exception.
c1 = ExternalCommand('exit 1', check=True)
c2 = ExternalCommand('exit 42', check=True)
# Add the commands to the pool and start them.
pool.add(c1)
pool.add(c2)
pool.spawn()
# Wait for both commands to finish.
while not pool.is_finished:
time.sleep(0.1)
# The first call to collect() should raise an exception about `exit 1'.
e1 = intercept(pool.collect)
assert e1.command is c1
# The second call to collect() should raise an exception about `exit 42'.
e2 = intercept(pool.collect)
assert e2.command is c2

def test_command_pool_logs_directory(self):
"""Make sure command pools can log output of commands in a directory."""
directory = tempfile.mkdtemp()
Expand All @@ -351,7 +371,6 @@ def test_ssh_command_lines(self):
# Construct a remote command using as much defaults as possible and
# validate the resulting SSH client program command line.
cmd = RemoteCommand('localhost', 'true', ssh_user='some-random-user')
cmd.logger.debug("Command line: %s", cmd.command_line)
for token in (
'ssh', '-o', 'BatchMode=yes',
'-o', 'ConnectTimeout=%i' % DEFAULT_CONNECT_TIMEOUT,
Expand Down Expand Up @@ -480,6 +499,16 @@ def check_context(self, context):
assert context.capture('hostname') == socket.gethostname()


def intercept(func, *args, **kw):
"""Intercept and return a raised exception."""
try:
func(*args, **kw)
except Exception as e:
return e
else:
assert False, "Expected exception to be raised, but nothing happened! :-s"


def tokenize_command_line(cmd):
"""Tokenize a command line string into a list of strings."""
return sum(map(shlex.split, cmd.command_line), [])
Expand Down

0 comments on commit 9db7a06

Please sign in to comment.