diff --git a/src/python/pants/util/s3_log_aggregator.py b/src/python/pants/util/s3_log_aggregator.py index 477dc521d53..782e41e3ee8 100644 --- a/src/python/pants/util/s3_log_aggregator.py +++ b/src/python/pants/util/s3_log_aggregator.py @@ -5,12 +5,26 @@ from __future__ import (absolute_import, division, generators, nested_scopes, print_function, unicode_literals, with_statement) -from s3logparse.s3logparse import parse_log_lines - import os import sys from collections import defaultdict +from s3logparse.s3logparse import parse_log_lines + + +class Measure(object): + def __init__(self, init_count=0, init_bytes=0): + self.count = init_count + self.bytes = init_bytes + + def __add__(self, other): + return self.__class__(self.count + other.count, self.bytes + other.bytes) + + def __iadd__(self, other): + self.count += other.count + self.bytes += other.bytes + return self + class S3LogAccumulator(object): """Aggregates total downloaded bytes per file from S3 logs. @@ -26,29 +40,55 @@ class S3LogAccumulator(object): ./pants run src/python/pants/util/:s3_log_aggregator_bin -- /tmp/s3logs """ - def __init__(self): - self._file_to_size = defaultdict(int) - self._file_to_count = defaultdict(int) + self._path_to_measure = defaultdict(Measure) + self._ip_to_measure = defaultdict(Measure) def accumulate(self, logdir): for filename in os.listdir(logdir): with open(os.path.join(logdir, filename)) as fp: for log_entry in parse_log_lines(fp.readlines()): - self._file_to_size[log_entry.s3_key] += log_entry.bytes_sent - self._file_to_count[log_entry.s3_key] += 1 + m = Measure(1, log_entry.bytes_sent) + self._path_to_measure[log_entry.s3_key] += m + self._ip_to_measure[log_entry.remote_ip] += m - def get_by_size(self): - return sorted([(path, self._file_to_count[path], size) - for (path, size) in self._file_to_size.items()], - key=lambda x: x[2], reverse=True) + def print_top_n(self, n=10): + def do_print(heading, data): + print() + print(heading) + print('=' * len(heading)) + for key, measure in data[0:n]: + print('{} {} {}'.format(measure.count, self._prettyprint_bytes(measure.bytes), key)) + do_print('Paths by count:', self.get_paths_sorted_by_count()) + do_print('Paths by bytes:', self.get_paths_sorted_by_bytes()) + do_print('IPs by count:', self.get_ips_sorted_by_count()) + do_print('IPs by bytes:', self.get_ips_sorted_by_bytes()) + print() - def get_by_size_prettyprinted(self): - return [(path, count, self.prettyprint_bytes(size)) - for (path, count, size) in self.get_by_size()] + def get_paths_sorted_by_bytes(self): + return self._get_paths(sort_key=lambda m: m.bytes) + + def get_paths_sorted_by_count(self): + return self._get_paths(sort_key=lambda m: m.count) + + def get_ips_sorted_by_bytes(self): + return self._get_ips(sort_key=lambda m: m.bytes) + + def get_ips_sorted_by_count(self): + return self._get_ips(sort_key=lambda m: m.count) + + def _get_paths(self, sort_key): + return self._get(self._path_to_measure, sort_key) + + def _get_ips(self, sort_key): + return self._get(self._ip_to_measure, sort_key) @staticmethod - def prettyprint_bytes(x): + def _get(measures_map, sort_key): + return sorted(measures_map.items(), key=lambda x: sort_key(x[1]), reverse=True) + + @staticmethod + def _prettyprint_bytes(x): for unit in ['B', 'KB', 'MB', 'GB']: if abs(x) < 1024.0: return '{:3.1f}{}'.format(x, unit) @@ -56,10 +96,9 @@ def prettyprint_bytes(x): return '{:.1f}TB'.format(x) + if __name__ == '__main__': accumulator = S3LogAccumulator() for logdir in sys.argv[1:]: accumulator.accumulate(logdir) - - for path, count, total_bytes in accumulator.get_by_size_prettyprinted(): - print('{} {} {}'.format(total_bytes, count, path)) + accumulator.print_top_n()