Skip to content

Commit

Permalink
Add execReadlines to executils.
Browse files Browse the repository at this point in the history
Returns output in realtime instead of buffering it.
  • Loading branch information
bcl committed Jul 30, 2015
1 parent 11b8eb8 commit cfe4777
Showing 1 changed file with 105 additions and 5 deletions.
110 changes: 105 additions & 5 deletions src/pylorax/executils.py
Expand Up @@ -20,8 +20,8 @@

import os
import subprocess
from subprocess import TimeoutExpired
import signal
from time import sleep

import logging
log = logging.getLogger("pylorax")
Expand Down Expand Up @@ -76,6 +76,7 @@ def startProgram(argv, root='/', stdin=None, stdout=subprocess.PIPE, stderr=subp
:param reset_lang: whether to set the locale of the child process to C
:param kwargs: Additional parameters to pass to subprocess.Popen
:return: A Popen object for the running command.
:keyword preexec_fn: A function to run before execution starts.
"""
if env_prune is None:
env_prune = []
Expand Down Expand Up @@ -136,6 +137,9 @@ def _run_program(argv, root='/', stdin=None, stdout=None, env_prune=None, log_ou
:param filter_stderr: whether to exclude the contents of stderr from the returned output
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The return code of the command and the output
:raises: OSError or CalledProcessError
"""
Expand All @@ -151,9 +155,13 @@ def _run_program(argv, root='/', stdin=None, stdout=None, env_prune=None, log_ou

if callback:
while callback(proc) and proc.poll() is None:
sleep(1)

(output_string, err_string) = proc.communicate()
try:
(output_string, err_string) = proc.communicate(timeout=1)
break
except TimeoutExpired:
pass
else:
(output_string, err_string) = proc.communicate()
if output_string:
if binary_output:
output_lines = [output_string]
Expand Down Expand Up @@ -206,6 +214,9 @@ def execWithRedirect(command, argv, stdin=None, stdout=None, root='/', env_prune
:param binary_output: whether to treat the output of command as binary data
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The return code of the command
"""
argv = [command] + list(argv)
Expand All @@ -223,14 +234,103 @@ def execWithCapture(command, argv, stdin=None, root='/', log_output=True, filter
:param root: The directory to chroot to before running command.
:param log_output: Whether to log the output of command
:param filter_stderr: Whether stderr should be excluded from the returned output
:param raise_err: whether to raise a CalledProcessError if the returncode is non-zero
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: The output of the command
"""
argv = [command] + list(argv)
return _run_program(argv, stdin=stdin, root=root, log_output=log_output, filter_stderr=filter_stderr,
raise_err=raise_err, callback=callback, env_add=env_add,
reset_handlers=reset_handlers, reset_lang=reset_lang)[1]

def execReadlines(command, argv, stdin=None, root='/', env_prune=None, filter_stderr=False,
callback=lambda x: True, env_add=None, reset_handlers=True, reset_lang=True):
""" Execute an external command and return the line output of the command
in real-time.
This method assumes that there is a reasonably low delay between the
end of output and the process exiting. If the child process closes
stdout and then keeps on truckin' there will be problems.
NOTE/WARNING: UnicodeDecodeError will be raised if the output of the
external command can't be decoded as UTF-8.
:param command: The command to run
:param argv: The argument list
:param stdin: The file object to read stdin from.
:param stdout: Optional file object to redirect stdout and stderr to.
:param root: The directory to chroot to before running command.
:param env_prune: environment variable to remove before execution
:param filter_stderr: Whether stderr should be excluded from the returned output
:param callback: method to call while waiting for process to finish, passed Popen object
:param env_add: environment variables to add before execution
:param reset_handlers: whether to reset to SIG_DFL any signal handlers set to SIG_IGN
:param reset_lang: whether to set the locale of the child process to C
:return: Iterator of the lines from the command
Output from the file is not logged to program.log
This returns an iterator with the lines from the command until it has finished
"""

class ExecLineReader(object):
"""Iterator class for returning lines from a process and cleaning
up the process when the output is no longer needed.
"""

def __init__(self, proc, argv, callback):
self._proc = proc
self._argv = argv
self._callback = callback

def __iter__(self):
return self

def __del__(self):
# See if the process is still running
if self._proc.poll() is None:
# Stop the process and ignore any problems that might arise
try:
self._proc.terminate()
except OSError:
pass

def __next__(self):
# Read the next line, blocking if a line is not yet available
line = self._proc.stdout.readline().decode("utf-8")
if line == '' or not self._callback(self._proc):
# Output finished, wait for the process to end
self._proc.communicate()

# Check for successful exit
if self._proc.returncode < 0:
raise OSError("process '%s' was killed by signal %s" %
(self._argv, -self._proc.returncode))
elif self._proc.returncode > 0:
raise OSError("process '%s' exited with status %s" %
(self._argv, self._proc.returncode))
raise StopIteration

return line.strip()

argv = [command] + argv

if filter_stderr:
stderr = subprocess.DEVNULL
else:
stderr = subprocess.STDOUT

try:
proc = startProgram(argv, root=root, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, bufsize=1,
env_prune=env_prune, env_add=env_add, reset_handlers=reset_handlers, reset_lang=reset_lang)
except OSError as e:
with program_log_lock:
program_log.error("Error running %s: %s", argv[0], e.strerror)
raise

return ExecLineReader(proc, argv, callback)

def runcmd(cmd, **kwargs):
""" run execWithRedirect with raise_err=True
"""
Expand Down

0 comments on commit cfe4777

Please sign in to comment.