Skip to content

Commit

Permalink
aggregators: Delegate formatting of runner output to loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
tohojo committed Jan 6, 2017
1 parent c12b455 commit 3cb6556
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 31 deletions.
33 changes: 7 additions & 26 deletions flent/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import collections
import io
import math
import pprint
import signal
Expand All @@ -37,6 +36,7 @@

logger = get_logger(__name__)


def new(settings):
cname = classname(settings.AGGREGATOR, "Aggregator")
if cname not in globals():
Expand Down Expand Up @@ -67,10 +67,6 @@ def __init__(self, settings):
self.settings = settings
self.failed_runners = 0
self.runner_counter = 0
if self.settings.LOG_FILE is None:
self.logfile = None
else:
self.logfile = io.open(self.settings.LOG_FILE, "at")

self.postprocessors = []

Expand Down Expand Up @@ -123,8 +119,6 @@ def collect(self):
"""Create a ProcessRunner thread for each instance and start them. Wait
for the threads to exit, then collect the results."""

if self.logfile:
self.logfile.write("Start run at %s\n" % datetime.now())
signal.signal(signal.SIGUSR1, handle_usr1)

result = {}
Expand Down Expand Up @@ -160,7 +154,9 @@ def collect(self):
sys.stderr.write(
"Already initiated graceful shutdown. "
"Patience, please...\n")
self._log(n, t)

logger.debug("Runner %s finished", n,
extra={'runner': t})

metadata['series'][n] = t.metadata
if 'transformers' in self.instances[n]:
Expand Down Expand Up @@ -205,12 +201,9 @@ def collect(self):
self.kill_runners()
raise

if self.logfile is not None:
self.logfile.write("Raw aggregated data:\n")
formatted = pprint.pformat(result)
if hasattr(formatted, 'decode'):
formatted = formatted.decode()
self.logfile.write(formatted)
logger.debug("Runner aggregation finished",
extra={'output': pprint.pformat(result)})

signal.signal(signal.SIGUSR1, signal.SIG_DFL)
return result, metadata, raw_values

Expand All @@ -223,18 +216,6 @@ def postprocess(self, result):
result = p(result)
return result

def _log(self, name, runner):
if self.logfile is None:
return
self.logfile.write("Runner: %s - %s\n" % (name,
runner.__class__.__name__))
self.logfile.write("Command: %s\nReturncode: %d\n" % (runner.command,
runner.returncode))
self.logfile.write("Program stdout:\n")
self.logfile.write(" " + "\n ".join(runner.out.splitlines()) + "\n")
self.logfile.write("Program stderr:\n")
self.logfile.write(" " + "\n ".join(runner.err.splitlines()) + "\n")


class IterationAggregator(Aggregator):
"""Iteration aggregator. Runs the jobs multiple times and aggregates the
Expand Down
38 changes: 33 additions & 5 deletions flent/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from logging import StreamHandler, FileHandler, Formatter

err_handler = out_handler = None
START_MARKER = "-- OUTPUT START -->"
END_MARKER = "<-- OUTPUT END --"


class MaxFilter(object):
Expand All @@ -41,17 +43,42 @@ def filter(self, record):


class LogFormatter(Formatter):
def __init__(self, *args, **kwargs):
def __init__(self, fmt=None, datefmt=None, format_output=False):
self.format_exceptions = True
super(LogFormatter, self).__init__(*args, **kwargs)
self.format_output = format_output
super(LogFormatter, self).__init__(fmt, datefmt)

def disable_exceptions(self):
self.format_exceptions = False

def formatException(self, ei):
if self.format_exceptions:
return super(LogFormatter, self).formatException(ei)
return ""

def disable_exceptions(self):
self.format_exceptions = False
def format(self, record):
s = super(LogFormatter, self).format(record)

if not self.format_output:
return s

if hasattr(record, 'output'):
if s[-1:] != "\n":
s = s + "\n"
s = s + START_MARKER + record.output + END_MARKER

elif hasattr(record, 'runner'):
if s[-1:] != "\n":
s = s + "\n"

s = s + "Runner class: %s\n" % record.runner.__class__.__name__
s = s + "Command: %s\n" % record.runner.command
s = s + "Return code: %s\n" % record.runner.returncode
s = s + "Stdout: " + START_MARKER + record.runner.out + \
END_MARKER + "\n"
s = s + "Stderr: " + START_MARKER + record.runner.err + END_MARKER

return s


def get_logger(name):
Expand Down Expand Up @@ -95,7 +122,8 @@ def setup_file(filename):
handler = FileHandler(filename, encoding='utf-8')
handler.setLevel(logging.DEBUG)
fmt = LogFormatter(
fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s")
fmt="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
format_output=True)
handler.setFormatter(fmt)
logger.addHandler(handler)

Expand Down

0 comments on commit 3cb6556

Please sign in to comment.