Skip to content

Commit

Permalink
Emit more information, but just for the top N.
Browse files Browse the repository at this point in the history
  • Loading branch information
benjyw committed May 4, 2018
1 parent 2ecab34 commit 15a8879
Showing 1 changed file with 57 additions and 18 deletions.
75 changes: 57 additions & 18 deletions src/python/pants/util/s3_log_aggregator.py
Expand Up @@ -5,12 +5,26 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

from s3logparse.s3logparse import parse_log_lines

import os
import sys
from collections import defaultdict

from s3logparse.s3logparse import parse_log_lines


class Measure(object):
def __init__(self, init_count=0, init_bytes=0):
self.count = init_count
self.bytes = init_bytes

def __add__(self, other):
return self.__class__(self.count + other.count, self.bytes + other.bytes)

def __iadd__(self, other):
self.count += other.count
self.bytes += other.bytes
return self


class S3LogAccumulator(object):
"""Aggregates total downloaded bytes per file from S3 logs.
Expand All @@ -26,40 +40,65 @@ class S3LogAccumulator(object):
./pants run src/python/pants/util/:s3_log_aggregator_bin -- /tmp/s3logs
"""

def __init__(self):
self._file_to_size = defaultdict(int)
self._file_to_count = defaultdict(int)
self._path_to_measure = defaultdict(Measure)
self._ip_to_measure = defaultdict(Measure)

def accumulate(self, logdir):
for filename in os.listdir(logdir):
with open(os.path.join(logdir, filename)) as fp:
for log_entry in parse_log_lines(fp.readlines()):
self._file_to_size[log_entry.s3_key] += log_entry.bytes_sent
self._file_to_count[log_entry.s3_key] += 1
m = Measure(1, log_entry.bytes_sent)
self._path_to_measure[log_entry.s3_key] += m
self._ip_to_measure[log_entry.remote_ip] += m

def get_by_size(self):
return sorted([(path, self._file_to_count[path], size)
for (path, size) in self._file_to_size.items()],
key=lambda x: x[2], reverse=True)
def print_top_n(self, n=10):
def do_print(heading, data):
print()
print(heading)
print('=' * len(heading))
for key, measure in data[0:n]:
print('{} {} {}'.format(measure.count, self._prettyprint_bytes(measure.bytes), key))
do_print('Paths by count:', self.get_paths_sorted_by_count())
do_print('Paths by bytes:', self.get_paths_sorted_by_bytes())
do_print('IPs by count:', self.get_ips_sorted_by_count())
do_print('IPs by bytes:', self.get_ips_sorted_by_bytes())
print()

def get_by_size_prettyprinted(self):
return [(path, count, self.prettyprint_bytes(size))
for (path, count, size) in self.get_by_size()]
def get_paths_sorted_by_bytes(self):
return self._get_paths(sort_key=lambda m: m.bytes)

def get_paths_sorted_by_count(self):
return self._get_paths(sort_key=lambda m: m.count)

def get_ips_sorted_by_bytes(self):
return self._get_ips(sort_key=lambda m: m.bytes)

def get_ips_sorted_by_count(self):
return self._get_ips(sort_key=lambda m: m.count)

def _get_paths(self, sort_key):
return self._get(self._path_to_measure, sort_key)

def _get_ips(self, sort_key):
return self._get(self._ip_to_measure, sort_key)

@staticmethod
def prettyprint_bytes(x):
def _get(measures_map, sort_key):
return sorted(measures_map.items(), key=lambda x: sort_key(x[1]), reverse=True)

@staticmethod
def _prettyprint_bytes(x):
for unit in ['B', 'KB', 'MB', 'GB']:
if abs(x) < 1024.0:
return '{:3.1f}{}'.format(x, unit)
x /= 1024.0
return '{:.1f}TB'.format(x)



if __name__ == '__main__':
accumulator = S3LogAccumulator()
for logdir in sys.argv[1:]:
accumulator.accumulate(logdir)

for path, count, total_bytes in accumulator.get_by_size_prettyprinted():
print('{} {} {}'.format(total_bytes, count, path))
accumulator.print_top_n()

0 comments on commit 15a8879

Please sign in to comment.