Skip to content

Commit

Permalink
[utilities] Allow size limits for command output
Browse files Browse the repository at this point in the history
Previously, any command output generated from plugins from
add_cmd_output and add_journal would be collected in full in memory. For
example, if a journal was 4GB in size, then 4GB would be read into
memory and subsequently written to the final sos archive. This lead to
not only potentially large archives, but in some cases failure to
collect data or produce an archive due to memory constraints on the
system.

This patch adds the ability to use a sizelimit option in both
add_cmd_output and add_journal. This will limit the collected output
from commands or journals to the given limit, both what is read into
memory and what is written to the final archive. If not given,
sizelimit will default to --log-size. For journal collection, if no
sizelimit is given then the larger of either --log-size or 100mb is
used.

Resolves: #1120

Signed-off-by: Jake Hunsaker <jhunsake@redhat.com>
Signed-off-by: Bryn M. Reeves <bmr@redhat.com>
  • Loading branch information
TurboTurtle authored and bmr-cymru committed May 29, 2018
1 parent d551281 commit 2fcf5f0
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
39 changes: 25 additions & 14 deletions sos/plugins/__init__.py
Expand Up @@ -600,15 +600,16 @@ def getmtime(path):

def get_command_output(self, prog, timeout=300, stderr=True,
chroot=True, runat=None, env=None,
binary=False):
binary=False, sizelimit=None):
if chroot or self.commons['cmdlineopts'].chroot == 'always':
root = self.sysroot
else:
root = None

result = sos_get_command_output(prog, timeout=timeout, stderr=stderr,
chroot=root, chdir=runat,
env=env, binary=binary)
env=env, binary=binary,
sizelimit=sizelimit)

if result['status'] == 124:
self._log_warn("command '%s' timed out after %ds"
Expand Down Expand Up @@ -646,22 +647,25 @@ def check_ext_prog(self, prog):

def _add_cmd_output(self, cmd, suggest_filename=None,
root_symlink=None, timeout=300, stderr=True,
chroot=True, runat=None, env=None, binary=False):
chroot=True, runat=None, env=None, binary=False,
sizelimit=None):
"""Internal helper to add a single command to the collection list."""
cmdt = (
cmd, suggest_filename,
root_symlink, timeout, stderr,
chroot, runat, env, binary
chroot, runat, env, binary, sizelimit
)
_tuplefmt = "('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', '%s')"
_tuplefmt = ("('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s', '%s', "
"'%s')")
_logstr = "packed command tuple: " + _tuplefmt
self._log_debug(_logstr % cmdt)
self.collect_cmds.append(cmdt)
self._log_info("added cmd output '%s'" % cmd)

def add_cmd_output(self, cmds, suggest_filename=None,
root_symlink=None, timeout=300, stderr=True,
chroot=True, runat=None, env=None, binary=False):
chroot=True, runat=None, env=None, binary=False,
sizelimit=None):
"""Run a program or a list of programs and collect the output"""
if isinstance(cmds, six.string_types):
cmds = [cmds]
Expand All @@ -670,7 +674,7 @@ def add_cmd_output(self, cmds, suggest_filename=None,
for cmd in cmds:
self._add_cmd_output(cmd, suggest_filename,
root_symlink, timeout, stderr,
chroot, runat, env, binary)
chroot, runat, env, binary, sizelimit)

def get_cmd_output_path(self, name=None, make=True):
"""Return a path into which this module should store collected
Expand Down Expand Up @@ -725,14 +729,15 @@ def add_string_as_file(self, content, filename):
def get_cmd_output_now(self, exe, suggest_filename=None,
root_symlink=False, timeout=300, stderr=True,
chroot=True, runat=None, env=None,
binary=False):
binary=False, sizelimit=None):
"""Execute a command and save the output to a file for inclusion in the
report.
"""
start = time()
result = self.get_command_output(exe, timeout=timeout, stderr=stderr,
chroot=chroot, runat=runat,
env=env, binary=binary)
env=env, binary=binary,
sizelimit=sizelimit)
self._log_debug("collected output of '%s' in %s"
% (exe.split()[0], time() - start))

Expand Down Expand Up @@ -781,7 +786,7 @@ def add_custom_text(self, text):

def add_journal(self, units=None, boot=None, since=None, until=None,
lines=None, allfields=False, output=None, timeout=None,
identifier=None, catalog=None):
identifier=None, catalog=None, sizelimit=None):
""" Collect journald logs from one of more units.
Keyword arguments:
Expand All @@ -803,6 +808,8 @@ def add_journal(self, units=None, boot=None, since=None, until=None,
identifier -- an optional message identifier.
catalog -- If True, augment lines with descriptions from the
system catalog.
sizelimit -- Limit to the size of output returned in MB. Defaults
to --log-size
"""
journal_cmd = "journalctl --no-pager "
unit_opt = " --unit %s"
Expand Down Expand Up @@ -850,7 +857,9 @@ def add_journal(self, units=None, boot=None, since=None, until=None,
journal_cmd += output_opt % output

self._log_debug("collecting journal: %s" % journal_cmd)
self._add_cmd_output(journal_cmd, None, None, timeout)
self._add_cmd_output(journal_cmd, None, None, timeout,
sizelimit=sizelimit
)

def add_udev_info(self, device, attrs=False):
"""Collect udevadm info output for a given device
Expand Down Expand Up @@ -887,16 +896,18 @@ def _collect_cmd_output(self):
timeout,
stderr,
chroot, runat,
env, binary
env, binary,
sizelimit
) = progs[0]
self._log_debug(("unpacked command tuple: " +
"('%s', '%s', '%s', %s, '%s', '%s', '%s', '%s'," +
"'%s')") % progs[0])
"'%s %s')") % progs[0])
self._log_info("collecting output of '%s'" % prog)
self.get_cmd_output_now(prog, suggest_filename=suggest_filename,
root_symlink=root_symlink, timeout=timeout,
stderr=stderr, chroot=chroot, runat=runat,
env=env, binary=binary)
env=env, binary=binary,
sizelimit=sizelimit)

def _collect_strings(self):
for string, file_name in self.copy_strings:
Expand Down
61 changes: 58 additions & 3 deletions sos/utilities.py
Expand Up @@ -17,8 +17,10 @@
import errno
import shlex
import glob
import threading

from contextlib import closing
from collections import deque

# PYCOMPAT
import six
Expand Down Expand Up @@ -105,7 +107,7 @@ def is_executable(command):

def sos_get_command_output(command, timeout=300, stderr=False,
chroot=None, chdir=None, env=None,
binary=False):
binary=False, sizelimit=None):
"""Execute a command and return a dictionary of status and output,
optionally changing root or current working directory before
executing command.
Expand Down Expand Up @@ -147,7 +149,11 @@ def _child_prep_fn():
stderr=STDOUT if stderr else PIPE,
bufsize=-1, env=cmd_env, close_fds=True,
preexec_fn=_child_prep_fn)
stdout, stderr = p.communicate()

reader = AsyncReader(p.stdout, sizelimit, binary)
stdout = reader.get_contents()
p.poll()

This comment has been minimized.

Copy link
@vadmium

vadmium Sep 6, 2018

Some comments on this code, brought up at https://bugs.python.org/issue34566:

If the child closed its end of the stdout pipe before exiting, the exit status is not necessarily available. Possible solution is to call wait rather than poll.

If you passed "stderr=PIPE" and the child writes lots to stderr it will fill up the pipe's buffers. Since this change no longer closes nor reads that pipe, there could be a deadlock with the child writing to stderr and the parent reading stdout. Possible solution is to use stderr=DEVNULL.

This comment has been minimized.

Copy link
@bmr-cymru

bmr-cymru Sep 6, 2018

Member

I think DEVNULL is a good option here: there are two ways for this to be used. For some commands, we capture merged stdout/stderr, while for others we dump stderr. For the stderr=False case using DEVNULL rather than PIPE seems preferable.


except OSError as e:
if e.errno == errno.ENOENT:
return {'status': 127, 'output': ""}
Expand All @@ -159,7 +165,7 @@ def _child_prep_fn():

return {
'status': p.returncode,
'output': stdout if binary else stdout.decode('utf-8', 'ignore')
'output': stdout
}


Expand Down Expand Up @@ -187,6 +193,55 @@ def shell_out(cmd, timeout=30, chroot=None, runat=None):
chroot=chroot, chdir=runat)['output']


class AsyncReader(threading.Thread):
'''Used to limit command output to a given size without deadlocking
sos.
Takes a sizelimit value in MB, and will compile stdout from Popen into a
string that is limited to the given sizelimit.
'''

def __init__(self, channel, sizelimit, binary):
super(AsyncReader, self).__init__()
self.chan = channel
self.binary = binary
self.chunksize = 2048
slots = None
if sizelimit:
sizelimit = sizelimit * 1048576 # convert to bytes
slots = sizelimit / self.chunksize
self.deque = deque(maxlen=slots)
self.start()
self.join()

This comment has been minimized.

Copy link
@vadmium

vadmium Sep 6, 2018

It should be simpler to call run directly as a subroutine in the main thread, rather than starting a new thread and then joining it straight away.

This comment has been minimized.

Copy link
@bmr-cymru

bmr-cymru Sep 6, 2018

Member

Thanks for the comments @vadmium. As to the "double threading", @TurboTurtle can you weigh in on that? Istr you did this to work around a specific situation you'd seen?

This comment has been minimized.

Copy link
@TurboTurtle

TurboTurtle Sep 6, 2018

Author Member

Yes, though my memory of it is a bit fuzzy. IIRC it was surrounding an issue in writing to the archive. With all the recent archive work this may no longer be needed.


def run(self):
'''Reads from the channel (pipe) that is the output pipe for a
called Popen. As we are reading from the pipe, the output is added
to a deque. After the size of the deque exceeds the sizelimit
earlier (older) entries are removed.
This means the returned output is chunksize-sensitive, but is not
really byte-sensitive.
'''
try:
while True:
line = self.chan.read(self.chunksize)
if not line:
# Pipe can remain open after output has completed
break
self.deque.append(line)
except (ValueError, IOError):
# pipe has closed, meaning command output is done
pass

def get_contents(self):
'''Returns the contents of the deque as a string'''
if not self.binary:
return ''.join(ln.decode('utf-8', 'ignore') for ln in self.deque)
else:
return b''.join(ln for ln in self.deque)


class ImporterHelper(object):
"""Provides a list of modules that can be imported in a package.
Importable modules are located along the module __path__ list and modules
Expand Down

0 comments on commit 2fcf5f0

Please sign in to comment.