Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,14 @@ def main():
help='The delimiter to use when using `--table-format=csv`',
envvar='RFM_TABLE_FORMAT_DELIM', configvar='general/table_format_delim'
)
misc_options.add_argument(
'--term-lhs', action='store',
help='LHS term in performance comparisons'
)
misc_options.add_argument(
'--term-rhs', action='store',
help='RHS term in performance comparisons'
)
misc_options.add_argument(
'-v', '--verbose', action='count',
help='Increase verbosity level of output',
Expand Down Expand Up @@ -1168,8 +1176,10 @@ def restrict_logging():
sys.exit(1)

printer.table(
reporting.performance_compare(options.performance_compare,
None, namepatt, *filt)
reporting.performance_compare(
options.performance_compare, None, namepatt, *filt,
options.term_lhs, options.term_rhs
)
)
sys.exit(0)

Expand Down Expand Up @@ -1769,7 +1779,9 @@ def module_unuse(*paths):
try:
if rt.get_option('storage/0/enable'):
data = reporting.performance_compare(
rt.get_option('general/0/perf_report_spec'), report
rt.get_option('general/0/perf_report_spec'), report,
term_lhs=options.term_lhs,
term_rhs=options.term_rhs
)
else:
data = report.report_data()
Expand Down
3 changes: 2 additions & 1 deletion reframe/frontend/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ def table(self, data, **kwargs):

table_format = rt.runtime().get_option('general/0/table_format')
if table_format == 'csv':
return self._table_as_csv(data)
self._table_as_csv(data)
return

# Map our options to tabulate
if table_format == 'plain':
Expand Down
263 changes: 93 additions & 170 deletions reframe/frontend/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,22 @@
import lxml.etree as etree
import math
import os
import polars as pl
import re
import socket
import time
import uuid
from collections import UserDict
from collections.abc import Hashable

import reframe as rfm
import reframe.utility.jsonext as jsonext
import reframe.utility.osext as osext
from reframe.core.exceptions import ReframeError, what, is_severe, reraise_as
from reframe.core.logging import getlogger, _format_time_rfc3339, time_function
from reframe.core.runtime import runtime
from reframe.core.warnings import suppress_deprecations
from reframe.utility import nodelist_abbrev, OrderedSet
from .storage import StorageBackend
from .utility import Aggregator, parse_cmp_spec, parse_query_spec
from .utility import parse_cmp_spec, parse_query_spec

# The schema data version
# Major version bumps are expected to break the validation of previous schemas
Expand Down Expand Up @@ -564,54 +563,53 @@ class _TCProxy(UserDict):
_required_keys = ['name', 'system', 'partition', 'environ']

def __init__(self, testcase, include_only=None):
# Define the derived attributes
def _basename():
return testcase['name'].split()[0]

def _sysenv():
return _format_sysenv(testcase['system'],
testcase['partition'],
testcase['environ'])

def _job_nodelist():
nodelist = testcase['job_nodelist']
if isinstance(nodelist, str):
return nodelist
else:
return nodelist_abbrev(testcase['job_nodelist'])

if isinstance(testcase, _TCProxy):
testcase = testcase.data

if include_only is not None:
self.data = {}
for k in include_only + self._required_keys:
if k in testcase:
self.data.setdefault(k, testcase[k])
else:
self.data = testcase
for key in include_only + self._required_keys:
# Computed attributes
if key == 'basename':
val = _basename()
elif key == 'sysenv':
val = _sysenv()
elif key == 'job_nodelist':
val = _job_nodelist()
else:
val = testcase.get(key)

def __getitem__(self, key):
val = super().__getitem__(key)
if key == 'job_nodelist':
val = nodelist_abbrev(val)

return val

def __missing__(self, key):
if key == 'basename':
return self.data['name'].split()[0]
elif key == 'sysenv':
return _format_sysenv(self.data['system'],
self.data['partition'],
self.data['environ'])
elif key == 'pdiff':
return None
self.data.setdefault(key, val)
else:
raise KeyError(key)


def _group_key(groups, testcase: _TCProxy):
key = []
for grp in groups:
with reraise_as(ReframeError, (KeyError,), 'no such group'):
val = testcase[grp]
if not isinstance(val, Hashable):
val = str(val)

key.append(val)

return tuple(key)
# Include the derived attributes too
testcase.update({
'basename': _basename(),
'sysenv': _sysenv(),
'job_nodelist': _job_nodelist()
})
self.data = testcase


@time_function
def _group_testcases(testcases, groups, columns):
grouped = {}
record_cols = groups + [c for c in columns if c not in groups]
def _create_dataframe(testcases, groups, columns):
record_cols = list(OrderedSet(groups) | OrderedSet(columns))
data = []
for tc in map(_TCProxy, testcases):
for pvar, reftuple in tc['perfvalues'].items():
pvar = pvar.split(':')[-1]
Expand All @@ -636,140 +634,68 @@ def _group_testcases(testcases, groups, columns):
'punit': punit,
'presult': presult
})
key = _group_key(groups, record)
grouped.setdefault(key, [])
grouped[key].append(record)

return grouped
data.append(record)


@time_function
def _aggregate_perf(grouped_testcases, aggr_fn, cols):
# Update delimiter for joining unique values based on the table format
table_format = runtime().get_option('general/0/table_format')
if table_format == 'pretty':
delim = '\n'
if data:
return pl.DataFrame(data)
else:
delim = '|'

other_aggr = Aggregator.create('join_uniq', delim)
count_aggr = Aggregator.create('count')
aggr_data = {}
for key, seq in grouped_testcases.items():
aggr_data.setdefault(key, {})
with reraise_as(ReframeError, (KeyError,), 'no such column'):
for c in cols:
if c == 'pval':
fn = aggr_fn
elif c == 'psamples':
fn = count_aggr
else:
fn = other_aggr
return pl.DataFrame(schema=record_cols)

if fn is count_aggr:
aggr_data[key][c] = fn(seq)
else:
aggr_data[key][c] = fn(tc[c] for tc in seq)

return aggr_data
@time_function
def _aggregate_data(testcases, query):
df = _create_dataframe(testcases, query.group_by, query.attributes)
df = df.group_by(query.group_by).agg(
query.aggregation.col_spec(query.aggregated_attributes)
).sort(query.group_by)
return df


@time_function
def compare_testcase_data(base_testcases, target_testcases, base_fn, target_fn,
groups=None, columns=None):
groups = groups or []

# Clean up columns and store those for which we want explicitly the A or B
# variants
cols = []
variants_A = set()
variants_B = set()
for c in columns:
if c.endswith('_A'):
variants_A.add(c[:-2])
cols.append(c[:-2])
elif c.endswith('_B'):
variants_B.add(c[:-2])
cols.append(c[:-2])
else:
variants_A.add(c)
variants_B.add(c)
cols.append(c)

grouped_base = _group_testcases(base_testcases, groups, cols)
grouped_target = _group_testcases(target_testcases, groups, cols)
pbase = _aggregate_perf(grouped_base, base_fn, cols)
ptarget = _aggregate_perf(grouped_target, target_fn, cols)

# For visual purposes if `name` is in `groups`, consider also its
# derivative `basename` to be in, so as to avoid duplicate columns
if 'name' in groups:
groups.append('basename')

# Build the final table data
extra_cols = set(cols) - set(groups) - {'pdiff'}

# Header line
header = []
for c in cols:
if c in extra_cols:
if c in variants_A:
header.append(f'{c}_A')

if c in variants_B:
header.append(f'{c}_B')
else:
header.append(c)

data = [header]
for key, aggr_data in pbase.items():
pdiff = None
line = []
for c in cols:
base = aggr_data.get(c)
try:
target = ptarget[key][c]
except KeyError:
target = None

if c == 'pval':
line.append('n/a' if base is None else base)
line.append('n/a' if target is None else target)

# compute diff for later usage
if base is not None and target is not None:
if base == 0 and target == 0:
pdiff = math.nan
elif target == 0:
pdiff = math.inf
else:
pdiff = (base - target) / target
pdiff = '{:+7.2%}'.format(pdiff)
elif c == 'pdiff':
line.append('n/a' if pdiff is None else pdiff)
elif c in extra_cols:
if c in variants_A:
line.append('n/a' if base is None else base)

if c in variants_B:
line.append('n/a' if target is None else target)
else:
line.append('n/a' if base is None else base)
def compare_testcase_data(base_testcases, target_testcases, query):
df_base = _aggregate_data(base_testcases, query).with_columns(
pl.col(query.aggregated_columns).name.suffix(query.lhs_column_suffix)
)
df_target = _aggregate_data(target_testcases, query).with_columns(
pl.col(query.aggregated_columns).name.suffix(query.rhs_column_suffix)
)
pval = query.aggregation.column_names('pval')[0]
pval_lhs = f'{pval}{query.lhs_column_suffix}'
pval_rhs = f'{pval}{query.rhs_column_suffix}'
cols = OrderedSet(query.group_by) | OrderedSet(query.aggregated_variants)
if not df_base.is_empty() and not df_target.is_empty():
cols |= {query.diff_column}
df = df_base.join(df_target, on=query.group_by).with_columns(
(100*(pl.col(pval_lhs) - pl.col(pval_rhs)) / pl.col(pval_rhs))
.round(2).alias(query.diff_column)
).select(cols)
elif df_base.is_empty():
df = pl.DataFrame(schema=list(cols))
else:
# df_target is empty; add an empty col for all `rhs` variants
df = df_base.select(
pl.col(col)
if col in df_base.columns else pl.lit('<no data>').alias(col)
for col in cols
)

data.append(line)
data = [df.columns]
for row in df.iter_rows():
data.append(row)

return data


@time_function
def performance_compare(cmp, report=None, namepatt=None,
filterA=None, filterB=None):
filterA=None, filterB=None,
term_lhs=None, term_rhs=None):
with reraise_as(ReframeError, (ValueError,),
'could not parse comparison spec'):
match = parse_cmp_spec(cmp)
query = parse_cmp_spec(cmp, term_lhs, term_rhs)

backend = StorageBackend.default()
if match.base is None:
if query.lhs is None:
if report is None:
raise ValueError('report cannot be `None` '
'for current run comparisons')
Expand All @@ -785,11 +711,10 @@ def performance_compare(cmp, report=None, namepatt=None,
except IndexError:
tcs_base = []
else:
tcs_base = backend.fetch_testcases(match.base, namepatt, filterA)
tcs_base = backend.fetch_testcases(query.lhs, namepatt, filterA)

tcs_target = backend.fetch_testcases(match.target, namepatt, filterB)
return compare_testcase_data(tcs_base, tcs_target, match.aggregator,
match.aggregator, match.groups, match.columns)
tcs_target = backend.fetch_testcases(query.rhs, namepatt, filterB)
return compare_testcase_data(tcs_base, tcs_target, query)


@time_function
Expand Down Expand Up @@ -837,22 +762,20 @@ def session_data(query):
def testcase_data(spec, namepatt=None, test_filter=None):
with reraise_as(ReframeError, (ValueError,),
'could not parse comparison spec'):
match = parse_cmp_spec(spec, default_extra_cols=['pval'])
query = parse_cmp_spec(spec)

if match.base is not None:
if query.lhs is not None:
raise ReframeError('only one time period or session are allowed: '
'if you want to compare performance, '
'use the `--performance-compare` option')

storage = StorageBackend.default()
testcases = storage.fetch_testcases(match.target, namepatt, test_filter)
aggregated = _aggregate_perf(
_group_testcases(testcases, match.groups, match.columns),
match.aggregator, match.columns
df = _aggregate_data(
storage.fetch_testcases(query.rhs, namepatt, test_filter), query
)
data = [match.columns]
for aggr_data in aggregated.values():
data.append([aggr_data[c] for c in match.columns])
data = [df.columns]
for row in df.iter_rows():
data.append(row)

return data

Expand Down
Loading
Loading