From 3cb655669340b645a4c5adb3858a151f9eb9d252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toke=20H=C3=B8iland-J=C3=B8rgensen?= Date: Fri, 6 Jan 2017 18:13:11 +0100 Subject: [PATCH] aggregators: Delegate formatting of runner output to loggers --- flent/aggregators.py | 33 +++++++-------------------------- flent/loggers.py | 38 +++++++++++++++++++++++++++++++++----- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/flent/aggregators.py b/flent/aggregators.py index f1c04321..2dea1bf8 100644 --- a/flent/aggregators.py +++ b/flent/aggregators.py @@ -22,7 +22,6 @@ from __future__ import absolute_import, division, print_function, unicode_literals import collections -import io import math import pprint import signal @@ -37,6 +36,7 @@ logger = get_logger(__name__) + def new(settings): cname = classname(settings.AGGREGATOR, "Aggregator") if cname not in globals(): @@ -67,10 +67,6 @@ def __init__(self, settings): self.settings = settings self.failed_runners = 0 self.runner_counter = 0 - if self.settings.LOG_FILE is None: - self.logfile = None - else: - self.logfile = io.open(self.settings.LOG_FILE, "at") self.postprocessors = [] @@ -123,8 +119,6 @@ def collect(self): """Create a ProcessRunner thread for each instance and start them. Wait for the threads to exit, then collect the results.""" - if self.logfile: - self.logfile.write("Start run at %s\n" % datetime.now()) signal.signal(signal.SIGUSR1, handle_usr1) result = {} @@ -160,7 +154,9 @@ def collect(self): sys.stderr.write( "Already initiated graceful shutdown. " "Patience, please...\n") - self._log(n, t) + + logger.debug("Runner %s finished", n, + extra={'runner': t}) metadata['series'][n] = t.metadata if 'transformers' in self.instances[n]: @@ -205,12 +201,9 @@ def collect(self): self.kill_runners() raise - if self.logfile is not None: - self.logfile.write("Raw aggregated data:\n") - formatted = pprint.pformat(result) - if hasattr(formatted, 'decode'): - formatted = formatted.decode() - self.logfile.write(formatted) + logger.debug("Runner aggregation finished", + extra={'output': pprint.pformat(result)}) + signal.signal(signal.SIGUSR1, signal.SIG_DFL) return result, metadata, raw_values @@ -223,18 +216,6 @@ def postprocess(self, result): result = p(result) return result - def _log(self, name, runner): - if self.logfile is None: - return - self.logfile.write("Runner: %s - %s\n" % (name, - runner.__class__.__name__)) - self.logfile.write("Command: %s\nReturncode: %d\n" % (runner.command, - runner.returncode)) - self.logfile.write("Program stdout:\n") - self.logfile.write(" " + "\n ".join(runner.out.splitlines()) + "\n") - self.logfile.write("Program stderr:\n") - self.logfile.write(" " + "\n ".join(runner.err.splitlines()) + "\n") - class IterationAggregator(Aggregator): """Iteration aggregator. Runs the jobs multiple times and aggregates the diff --git a/flent/loggers.py b/flent/loggers.py index 39dc2106..dcd01b98 100644 --- a/flent/loggers.py +++ b/flent/loggers.py @@ -27,6 +27,8 @@ from logging import StreamHandler, FileHandler, Formatter err_handler = out_handler = None +START_MARKER = "-- OUTPUT START -->" +END_MARKER = "<-- OUTPUT END --" class MaxFilter(object): @@ -41,17 +43,42 @@ def filter(self, record): class LogFormatter(Formatter): - def __init__(self, *args, **kwargs): + def __init__(self, fmt=None, datefmt=None, format_output=False): self.format_exceptions = True - super(LogFormatter, self).__init__(*args, **kwargs) + self.format_output = format_output + super(LogFormatter, self).__init__(fmt, datefmt) + + def disable_exceptions(self): + self.format_exceptions = False def formatException(self, ei): if self.format_exceptions: return super(LogFormatter, self).formatException(ei) return "" - def disable_exceptions(self): - self.format_exceptions = False + def format(self, record): + s = super(LogFormatter, self).format(record) + + if not self.format_output: + return s + + if hasattr(record, 'output'): + if s[-1:] != "\n": + s = s + "\n" + s = s + START_MARKER + record.output + END_MARKER + + elif hasattr(record, 'runner'): + if s[-1:] != "\n": + s = s + "\n" + + s = s + "Runner class: %s\n" % record.runner.__class__.__name__ + s = s + "Command: %s\n" % record.runner.command + s = s + "Return code: %s\n" % record.runner.returncode + s = s + "Stdout: " + START_MARKER + record.runner.out + \ + END_MARKER + "\n" + s = s + "Stderr: " + START_MARKER + record.runner.err + END_MARKER + + return s def get_logger(name): @@ -95,7 +122,8 @@ def setup_file(filename): handler = FileHandler(filename, encoding='utf-8') handler.setLevel(logging.DEBUG) fmt = LogFormatter( - fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s") + fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + format_output=True) handler.setFormatter(fmt) logger.addHandler(handler)