Skip to content

Commit

Permalink
Support capturing command pool output to logs directory
Browse files Browse the repository at this point in the history
  • Loading branch information
xolox committed Oct 2, 2015
1 parent 60461a6 commit b635f4e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
2 changes: 1 addition & 1 deletion executor/__init__.py
Expand Up @@ -57,7 +57,7 @@
)

# Semi-standard module versioning.
__version__ = '4.5'
__version__ = '4.6'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down
39 changes: 36 additions & 3 deletions executor/concurrent.py
@@ -1,7 +1,7 @@
# Programmer friendly subprocess wrapper.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: October 1, 2015
# Last Change: October 2, 2015
# URL: https://executor.readthedocs.org

"""
Expand All @@ -16,6 +16,7 @@
# Standard library modules.
import logging
import multiprocessing
import os

# Modules included in our package.
from executor.property_manager import mutable_property
Expand All @@ -37,16 +38,18 @@ class CommandPool(object):
:func:`run()`.
"""

def __init__(self, concurrency=None):
def __init__(self, concurrency=None, logs_directory=None):
"""
Construct a :class:`CommandPool` object.
:param concurrency: Override the value of :attr:`concurrency`.
:param logs_directory: Override the value of :attr:`logs_directory`.
"""
self.collected = set()
self.commands = []
if concurrency:
self.concurrency = concurrency
self.logs_directory = logs_directory

@mutable_property
def concurrency(self):
Expand All @@ -59,6 +62,23 @@ def concurrency(self):
"""
return multiprocessing.cpu_count()

@mutable_property
def logs_directory(self):
"""
The pathname of a directory where captured output is stored (a string).
If this property is set to the pathname of a directory (before any
external commands have been started) the merged output of each external
command is captured and stored in a log file in this directory. The
directory will be created if it doesn't exist yet.
Output will start appearing in the log files before the external
commands are finished, this enables `tail -f`_ to inspect the progress
of commands that are still running and emitting output.
.. _tail -f: https://en.wikipedia.org/wiki/Tail_(Unix)#File_monitoring
"""

@property
def is_finished(self):
""":data:`True` if all commands in the pool have finished, :data:`False` otherwise."""
Expand Down Expand Up @@ -91,7 +111,7 @@ def results(self):
"""
return dict(self.commands)

def add(self, command, identifier=None):
def add(self, command, identifier=None, log_file=None):
"""
Add an external command to the pool of commands.
Expand All @@ -101,6 +121,9 @@ def add(self, command, identifier=None):
value). When this parameter is not provided the
identifier is set to the number of commands in the
pool plus one (i.e. the first command gets id 1).
:param log_file: Override the default log file name for the command
(the identifier with ``.log`` appended) in case
:attr:`logs_directory` is set.
The :attr:`~executor.ExternalCommand.async` property of command objects
is automatically set to :data:`True` when they're added to a
Expand All @@ -111,6 +134,16 @@ def add(self, command, identifier=None):
command.async = True
if identifier is None:
identifier = len(self.commands) + 1
if self.logs_directory:
if not log_file:
log_file = '%s.log' % identifier
pathname = os.path.join(self.logs_directory, log_file)
directory = os.path.dirname(pathname)
if not os.path.isdir(directory):
os.makedirs(directory)
handle = open(pathname, 'wb')
command.stdout_file = handle
command.stderr_file = handle
self.commands.append((identifier, command))

def run(self):
Expand Down
36 changes: 29 additions & 7 deletions executor/tests.py
Expand Up @@ -11,6 +11,7 @@
import os
import random
import shlex
import shutil
import socket
import tempfile
import time
Expand Down Expand Up @@ -335,6 +336,24 @@ def test_command_pool(self):
assert all(cmd.returncode == 0 for cmd in results.values())
assert timer.elapsed_time < (num_commands * sleep_time)

def test_command_pool_logs_directory(self):
"""Make sure command pools can log output of commands in a directory."""
directory = tempfile.mkdtemp()
identifiers = [1, 2, 3, 4, 5]
try:
pool = CommandPool(concurrency=5, logs_directory=directory)
for i in identifiers:
pool.add(identifier=i, command=ExternalCommand('echo %i' % i))
pool.run()
files = os.listdir(directory)
assert sorted(files) == sorted(['%s.log' % i for i in identifiers])
for filename in files:
with open(os.path.join(directory, filename)) as handle:
contents = handle.read()
assert filename == ('%s.log' % contents.strip())
finally:
shutil.rmtree(directory)

def test_ssh_command_lines(self):
"""Make sure SSH client command lines are correctly generated."""
# Construct a remote command using as much defaults as possible and
Expand Down Expand Up @@ -396,13 +415,16 @@ def test_remote_working_directory(self):
"""Make sure remote working directories can be set."""
with SSHServer(async=True) as server:
some_random_directory = tempfile.mkdtemp()
cmd = RemoteCommand('127.0.0.1',
'pwd',
capture=True,
directory=some_random_directory,
**server.client_options)
cmd.start()
assert cmd.output == some_random_directory
try:
cmd = RemoteCommand('127.0.0.1',
'pwd',
capture=True,
directory=some_random_directory,
**server.client_options)
cmd.start()
assert cmd.output == some_random_directory
finally:
shutil.rmtree(some_random_directory)

def test_remote_error_handling(self):
"""Make sure remote commands preserve exit codes."""
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Expand Up @@ -10,7 +10,7 @@ envlist = py26, py27, py34, pypy
deps =
coloredlogs
pytest
commands = py.test --capture=no --exitfirst executor/tests.py
commands = py.test {posargs}

[flake8]
exclude = .tox
Expand Down

0 comments on commit b635f4e

Please sign in to comment.