Skip to content

Commit

Permalink
Merge PR #103: Reduce memory usage and calculate statistics online/in…
Browse files Browse the repository at this point in the history
…crementally
  • Loading branch information
smarr committed Jul 29, 2018
2 parents 4b105d2 + 20c48b6 commit 43a1d15
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 333 deletions.
7 changes: 0 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ language: python
matrix:
include:
- python: "2.7"
virtualenv:
system_site_packages: true
addons:
apt:
packages:
- python-scipy

- python: "3.6"
# PyPy versions
- python: pypy
Expand Down
15 changes: 5 additions & 10 deletions rebench/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

from . import subprocess_with_timeout as subprocess_timeout
from .interop.adapter import ExecutionDeliveredNoResults
from .statistics import StatisticProperties, mean


class FailedBuilding(Exception):
Expand Down Expand Up @@ -83,14 +82,10 @@ def _indicate_progress(self, completed_task, run):
if not self._ui.spinner_initialized():
return

totals = run.get_total_values()
if completed_task:
self._runs_completed += 1

if totals:
art_mean = mean(run.get_total_values())
else:
art_mean = 0
art_mean = run.get_mean_of_totals()

hour, minute, sec = self._estimate_time_left()

Expand Down Expand Up @@ -419,15 +414,15 @@ def execute_run(self, run_id):
terminate = self._generate_data_point(cmdline, gauge_adapter,
run_id, termination_check)

stats = StatisticProperties(run_id.get_total_values())
mean_of_totals = run_id.get_mean_of_totals()
if terminate:
run_id.report_run_completed(stats, cmdline)
run_id.report_run_completed(cmdline)
if (not run_id.is_failed() and run_id.min_iteration_time
and stats.mean < run_id.min_iteration_time):
and mean_of_totals < run_id.min_iteration_time):
self._ui.warning(
("{ind}Warning: Low mean run time.\n"
+ "{ind}{ind}The mean (%.1f) is lower than min_iteration_time (%d)\n")
% (stats.mean, run_id.min_iteration_time), run_id, cmdline)
% (mean_of_totals, run_id.min_iteration_time), run_id, cmdline)

return terminate

Expand Down
4 changes: 4 additions & 0 deletions rebench/model/data_point.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def __init__(self, run_id):
self._total = None
self._invocation = -1

@property
def run_id(self):
return self._run_id

@property
def invocation(self):
return self._invocation
Expand Down
60 changes: 34 additions & 26 deletions rebench/model/run_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from .benchmark import Benchmark
from .termination_check import TerminationCheck
from ..statistics import StatisticProperties
from ..ui import UIError


Expand All @@ -34,7 +35,8 @@ def __init__(self, benchmark, cores, input_size, var_value):

self._reporters = set()
self._persistence = set()
self._data_points = []
self._statistics = StatisticProperties()
self._total_unit = None

self._termination_check = None
self._cmdline = None
Expand Down Expand Up @@ -136,9 +138,9 @@ def report_run_failed(self, cmdline, return_code, output):
for reporter in self._reporters:
reporter.run_failed(self, cmdline, return_code, output)

def report_run_completed(self, statistics, cmdline):
def report_run_completed(self, cmdline):
for reporter in self._reporters:
reporter.run_completed(self, statistics, cmdline)
reporter.run_completed(self, self._statistics, cmdline)

def report_job_completed(self, run_ids):
for reporter in self._reporters:
Expand All @@ -152,42 +154,44 @@ def report_start_run(self):
for reporter in self._reporters:
reporter.start_run(self)

def is_persisted_by(self, persistence):
return persistence in self._persistence

def add_persistence(self, persistence):
self._persistence.add(persistence)

def close_files(self):
for persistence in self._persistence:
persistence.close()

def loaded_data_point(self, data_point):
def _new_data_point(self, data_point):
self._max_invocation = max(self._max_invocation, data_point.invocation)
self._data_points.append(data_point)
if self._total_unit is None:
self._total_unit = data_point.get_total_unit()

def loaded_data_point(self, data_point):
self._new_data_point(data_point)
self._statistics.add_sample(data_point.get_total_value())

def add_data_point(self, data_point, warmup):
self._max_invocation = max(self._max_invocation, data_point.invocation)
self._new_data_point(data_point)

if not warmup:
self._data_points.append(data_point)
self._statistics.add_sample(data_point.get_total_value())
for persistence in self._persistence:
persistence.persist_data_point(data_point)

def get_number_of_data_points(self):
return len(self._data_points)
return self._statistics.num_samples

def get_data_points(self):
return self._data_points
def get_mean_of_totals(self):
return self._statistics.mean

def discard_data_points(self):
self._data_points = []
self._max_invocation = 0

def get_total_values(self):
return [dp.get_total_value() for dp in self._data_points]
def get_statistics(self):
return self._statistics

def get_total_unit(self):
if not self._data_points:
return None
return self._data_points[0].get_total_unit()
return self._total_unit

def get_termination_check(self, ui):
if self._termination_check is None:
Expand All @@ -202,7 +206,7 @@ def is_completed(self, ui):
def run_failed(self):
return (self._termination_check.fails_consecutively() or
self._termination_check.has_too_many_failures(
len(self._data_points)))
self.get_number_of_data_points()))

def __hash__(self):
return hash(self.cmdline())
Expand Down Expand Up @@ -236,18 +240,22 @@ def _expand_vars(self, string):
def cmdline(self):
if self._cmdline:
return self._cmdline
return self._construct_cmdline()

def _construct_cmdline(self):
cmdline = ""
if self._benchmark.suite.executor.path:
cmdline = "%s/" % (self._benchmark.suite.executor.path, )
cmdline = self._benchmark.suite.executor.path + "/"

cmdline += self._benchmark.suite.executor.executable

cmdline += "%s %s" % (self._benchmark.suite.executor.executable,
self._benchmark.suite.executor.args or '')
if self._benchmark.suite.executor.args:
cmdline += " " + str(self._benchmark.suite.executor.args)

cmdline += self._benchmark.suite.command
cmdline += " " + self._benchmark.suite.command

if self._benchmark.extra_args is not None:
cmdline += " %s" % self._benchmark.extra_args
if self._benchmark.extra_args:
cmdline += " " + str(self._benchmark.extra_args)

cmdline = self._expand_vars(cmdline)

Expand Down
70 changes: 29 additions & 41 deletions rebench/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import os
import shutil
import subprocess
import sys
from datetime import datetime
from tempfile import NamedTemporaryFile
from threading import Lock

from .model.data_point import DataPoint
Expand All @@ -36,9 +38,9 @@ def __init__(self, ui):
self._bench_cfgs = {}
self._ui = ui

def load_data(self):
def load_data(self, runs, discard_run_data):
for persistence in list(self._files.values()):
persistence._load_data()
persistence.load_data(runs, discard_run_data)

def get(self, filename, discard_old_data):
if filename not in self._files:
Expand Down Expand Up @@ -81,41 +83,6 @@ def register_config(self, cfg):
self._bench_cfgs[key] = cfg
return cfg

@classmethod
def get_by_file(cls, runs):
by_file = {}
for run in runs:
points = run.get_data_points()
run.discard_data_points()
for point in points:
measurements = point.get_measurements()
for measure in measurements:
if measure.filename in by_file:
by_file[measure.filename].append(measure)
else:
by_file[measure.filename] = [measure]
return by_file

@classmethod
def discard_data_of_runs(cls, runs, ui):
by_file = cls.get_by_file(runs)
for filename, measures in by_file.items():
try:
with open(filename, 'r') as data_file:
lines = data_file.readlines()
except IOError:
ui.debug_error_info(
"Tried to discard old data, but file does not seem to exist: %s\n" % filename)
continue

for measure in measures:
lines[measure.line_number] = None

lines = filter(None, lines)

with open(filename, 'w') as data_file:
data_file.writelines(lines)


class _DataPointPersistence(object):

Expand All @@ -141,18 +108,30 @@ def _truncate_file(filename):
with open(filename, 'w'):
pass

def _load_data(self):
def load_data(self, runs, discard_run_data):
"""
Loads the data from the configured data file
"""
if discard_run_data:
current_runs = {run for run in runs if run.is_persisted_by(self)}
else:
current_runs = None

try:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file)
if current_runs:
with NamedTemporaryFile('w', delete=False) as target:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file, current_runs, target)
os.unlink(self._data_filename)
shutil.move(target.name, self._data_filename)
else:
with open(self._data_filename, 'r') as data_file:
self._process_lines(data_file, current_runs, None)
except IOError:
self._ui.debug_error_info("No data loaded, since %s does not exist.\n"
% self._data_filename)

def _process_lines(self, data_file):
def _process_lines(self, data_file, runs, filtered_data_file):
"""
The most important assumptions we make here is that the total
measurement is always the last one serialized for a data point.
Expand All @@ -165,6 +144,8 @@ def _process_lines(self, data_file):
for line in data_file:
if line.startswith('#'): # skip comments, and shebang lines
line_number += 1
if filtered_data_file:
filtered_data_file.write(line)
continue

try:
Expand All @@ -173,6 +154,13 @@ def _process_lines(self, data_file):
line_number, self._data_filename)

run_id = measurement.run_id
if filtered_data_file and runs and run_id in runs:
continue

# these are all the measurements that are not filtered out
if filtered_data_file:
filtered_data_file.write(line)

if previous_run_id is not run_id:
data_point = DataPoint(run_id)
previous_run_id = run_id
Expand Down
14 changes: 4 additions & 10 deletions rebench/rebench.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,16 @@ def run(self, argv=None):
except ConfigurationError as exc:
raise UIError(exc.message + "\n", exc)

data_store.load_data()
return self.execute_experiment()
runs = self._config.get_runs()
data_store.load_data(runs, self._config.options.do_rerun)
return self.execute_experiment(runs)

def execute_experiment(self):
def execute_experiment(self, runs):
self._ui.verbose_output_info("Execute experiment: " + self._config.experiment_name + "\n")

# first load old data if available
if self._config.options.clean:
pass

scheduler_class = {'batch': BatchScheduler,
'round-robin': RoundRobinScheduler,
'random': RandomScheduler}.get(self._config.options.scheduler)
runs = self._config.get_runs()
if self._config.options.do_rerun:
DataStore.discard_data_of_runs(runs, self._ui)

executor = Executor(runs, self._config.use_nice, self._config.do_builds,
self._ui,
Expand Down
14 changes: 6 additions & 8 deletions rebench/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
from urllib import urlencode # pylint: disable=ungrouped-imports
from urllib2 import urlopen

from .statistics import StatisticProperties


class Reporter(object):

Expand Down Expand Up @@ -80,13 +78,14 @@ def _generate_all_output(run_ids):
rows = []

for run_id in run_ids:
stats = StatisticProperties(run_id.get_total_values())
mean = run_id.get_mean_of_totals()
num_samples = run_id.get_number_of_data_points()
out = run_id.as_str_list()
out.append(stats.num_samples)
if stats.num_samples == 0:
out.append(num_samples)
if num_samples == 0:
out.append("Failed")
else:
out.append(int(round(stats.mean, 0)))
out.append(int(round(mean, 0)))
rows.append(out)

return sorted(rows, key=itemgetter(2, 1, 3, 4, 5, 6, 7))
Expand Down Expand Up @@ -248,8 +247,7 @@ def _send_to_codespeed(self, results, run_id):
+ "{ind}{ind}" + msg + "\n", run_id)

def _prepare_result(self, run_id):
stats = StatisticProperties(run_id.get_total_values())
return self._format_for_codespeed(run_id, stats)
return self._format_for_codespeed(run_id, run_id.get_statistics())

def report_job_completed(self, run_ids):
if self._incremental_report:
Expand Down

0 comments on commit 43a1d15

Please sign in to comment.