From 07ed672f665622277db6bb65d37645494ea78da8 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 10 Nov 2025 00:39:16 +0100 Subject: [PATCH 1/4] Use polars for performance analytics and add more statistics --- reframe/frontend/printer.py | 3 +- reframe/frontend/reporting/__init__.py | 258 +++++++----------- reframe/frontend/reporting/storage.py | 14 +- reframe/frontend/reporting/utility.py | 348 +++++++++++++++++-------- requirements.txt | 4 + unittests/test_cli.py | 6 +- unittests/test_reporting.py | 106 +++++--- 7 files changed, 421 insertions(+), 318 deletions(-) diff --git a/reframe/frontend/printer.py b/reframe/frontend/printer.py index a26fd7d4b5..87b46990ee 100644 --- a/reframe/frontend/printer.py +++ b/reframe/frontend/printer.py @@ -286,7 +286,8 @@ def table(self, data, **kwargs): table_format = rt.runtime().get_option('general/0/table_format') if table_format == 'csv': - return self._table_as_csv(data) + self._table_as_csv(data) + return # Map our options to tabulate if table_format == 'plain': diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index 940595f01f..b60c0a48d7 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -11,6 +11,7 @@ import lxml.etree as etree import math import os +import polars as pl import re import socket import time @@ -27,7 +28,7 @@ from reframe.core.warnings import suppress_deprecations from reframe.utility import nodelist_abbrev, OrderedSet from .storage import StorageBackend -from .utility import Aggregator, parse_cmp_spec, parse_query_spec +from .utility import parse_cmp_spec, parse_query_spec # The schema data version # Major version bumps are expected to break the validation of previous schemas @@ -564,54 +565,53 @@ class _TCProxy(UserDict): _required_keys = ['name', 'system', 'partition', 'environ'] def __init__(self, testcase, include_only=None): + # Define the derived attributes + def _basename(): + return testcase['name'].split()[0] + + def _sysenv(): + return _format_sysenv(testcase['system'], + testcase['partition'], + testcase['environ']) + + def _job_nodelist(): + nodelist = testcase['job_nodelist'] + if isinstance(nodelist, str): + return nodelist + else: + return nodelist_abbrev(testcase['job_nodelist']) + if isinstance(testcase, _TCProxy): testcase = testcase.data if include_only is not None: self.data = {} - for k in include_only + self._required_keys: - if k in testcase: - self.data.setdefault(k, testcase[k]) - else: - self.data = testcase + for key in include_only + self._required_keys: + # Computed attributes + if key == 'basename': + val = _basename() + elif key == 'sysenv': + val = _sysenv() + elif key == 'job_nodelist': + val = _job_nodelist() + else: + val = testcase.get(key) - def __getitem__(self, key): - val = super().__getitem__(key) - if key == 'job_nodelist': - val = nodelist_abbrev(val) - - return val - - def __missing__(self, key): - if key == 'basename': - return self.data['name'].split()[0] - elif key == 'sysenv': - return _format_sysenv(self.data['system'], - self.data['partition'], - self.data['environ']) - elif key == 'pdiff': - return None + self.data.setdefault(key, val) else: - raise KeyError(key) - - -def _group_key(groups, testcase: _TCProxy): - key = [] - for grp in groups: - with reraise_as(ReframeError, (KeyError,), 'no such group'): - val = testcase[grp] - if not isinstance(val, Hashable): - val = str(val) - - key.append(val) - - return tuple(key) + # Include the derived attributes too + testcase.update({ + 'basename': _basename(), + 'sysenv': _sysenv(), + 'job_nodelist': _job_nodelist() + }) + self.data = testcase @time_function -def _group_testcases(testcases, groups, columns): - grouped = {} - record_cols = groups + [c for c in columns if c not in groups] +def _create_dataframe(testcases, groups, columns): + record_cols = list(OrderedSet(groups) | OrderedSet(columns)) + data = [] for tc in map(_TCProxy, testcases): for pvar, reftuple in tc['perfvalues'].items(): pvar = pvar.split(':')[-1] @@ -636,127 +636,54 @@ def _group_testcases(testcases, groups, columns): 'punit': punit, 'presult': presult }) - key = _group_key(groups, record) - grouped.setdefault(key, []) - grouped[key].append(record) - - return grouped + data.append(record) - -@time_function -def _aggregate_perf(grouped_testcases, aggr_fn, cols): - # Update delimiter for joining unique values based on the table format - table_format = runtime().get_option('general/0/table_format') - if table_format == 'pretty': - delim = '\n' + if data: + return pl.DataFrame(data) else: - delim = '|' - - other_aggr = Aggregator.create('join_uniq', delim) - count_aggr = Aggregator.create('count') - aggr_data = {} - for key, seq in grouped_testcases.items(): - aggr_data.setdefault(key, {}) - with reraise_as(ReframeError, (KeyError,), 'no such column'): - for c in cols: - if c == 'pval': - fn = aggr_fn - elif c == 'psamples': - fn = count_aggr - else: - fn = other_aggr + return pl.DataFrame(schema=record_cols) - if fn is count_aggr: - aggr_data[key][c] = fn(seq) - else: - aggr_data[key][c] = fn(tc[c] for tc in seq) - return aggr_data +@time_function +def _aggregate_data(testcases, query): + df = _create_dataframe(testcases, query.group_by, query.attributes) + df = df.group_by(query.group_by).agg( + query.aggregation.col_spec(query.aggregated_attributes) + ).sort(query.group_by) + return df @time_function -def compare_testcase_data(base_testcases, target_testcases, base_fn, target_fn, - groups=None, columns=None): - groups = groups or [] - - # Clean up columns and store those for which we want explicitly the A or B - # variants - cols = [] - variants_A = set() - variants_B = set() - for c in columns: - if c.endswith('_A'): - variants_A.add(c[:-2]) - cols.append(c[:-2]) - elif c.endswith('_B'): - variants_B.add(c[:-2]) - cols.append(c[:-2]) - else: - variants_A.add(c) - variants_B.add(c) - cols.append(c) - - grouped_base = _group_testcases(base_testcases, groups, cols) - grouped_target = _group_testcases(target_testcases, groups, cols) - pbase = _aggregate_perf(grouped_base, base_fn, cols) - ptarget = _aggregate_perf(grouped_target, target_fn, cols) - - # For visual purposes if `name` is in `groups`, consider also its - # derivative `basename` to be in, so as to avoid duplicate columns - if 'name' in groups: - groups.append('basename') - - # Build the final table data - extra_cols = set(cols) - set(groups) - {'pdiff'} - - # Header line - header = [] - for c in cols: - if c in extra_cols: - if c in variants_A: - header.append(f'{c}_A') - - if c in variants_B: - header.append(f'{c}_B') - else: - header.append(c) - - data = [header] - for key, aggr_data in pbase.items(): - pdiff = None - line = [] - for c in cols: - base = aggr_data.get(c) - try: - target = ptarget[key][c] - except KeyError: - target = None - - if c == 'pval': - line.append('n/a' if base is None else base) - line.append('n/a' if target is None else target) - - # compute diff for later usage - if base is not None and target is not None: - if base == 0 and target == 0: - pdiff = math.nan - elif target == 0: - pdiff = math.inf - else: - pdiff = (base - target) / target - pdiff = '{:+7.2%}'.format(pdiff) - elif c == 'pdiff': - line.append('n/a' if pdiff is None else pdiff) - elif c in extra_cols: - if c in variants_A: - line.append('n/a' if base is None else base) - - if c in variants_B: - line.append('n/a' if target is None else target) - else: - line.append('n/a' if base is None else base) +def compare_testcase_data(base_testcases, target_testcases, query): + df_base = _aggregate_data(base_testcases, query).with_columns( + pl.col(query.aggregated_columns).name.suffix(query.lhs_column_suffix) + ) + df_target = _aggregate_data(target_testcases, query).with_columns( + pl.col(query.aggregated_columns).name.suffix(query.rhs_column_suffix) + ) + pval = query.aggregation.column_names('pval')[0] + pval_lhs = f'{pval}{query.lhs_column_suffix}' + pval_rhs = f'{pval}{query.rhs_column_suffix}' + cols = OrderedSet(query.group_by) | OrderedSet(query.aggregated_variants) + if not df_base.is_empty() and not df_target.is_empty(): + cols |= {query.diff_column} + df = df_base.join(df_target, on=query.group_by).with_columns( + (100*(pl.col(pval_lhs) - pl.col(pval_rhs)) / pl.col(pval_rhs)) + .round(2).alias(query.diff_column) + ).select(cols) + elif df_base.is_empty(): + df = pl.DataFrame(schema=list(cols)) + else: + # df_target is empty; add an empty col for all `rhs` variants + df = df_base.select( + pl.col(col) + if col in df_base.columns else pl.lit('').alias(col) + for col in cols + ) - data.append(line) + data = [df.columns] + for row in df.iter_rows(): + data.append(row) return data @@ -766,10 +693,10 @@ def performance_compare(cmp, report=None, namepatt=None, filterA=None, filterB=None): with reraise_as(ReframeError, (ValueError,), 'could not parse comparison spec'): - match = parse_cmp_spec(cmp) + query = parse_cmp_spec(cmp) backend = StorageBackend.default() - if match.base is None: + if query.lhs is None: if report is None: raise ValueError('report cannot be `None` ' 'for current run comparisons') @@ -785,11 +712,10 @@ def performance_compare(cmp, report=None, namepatt=None, except IndexError: tcs_base = [] else: - tcs_base = backend.fetch_testcases(match.base, namepatt, filterA) + tcs_base = backend.fetch_testcases(query.lhs, namepatt, filterA) - tcs_target = backend.fetch_testcases(match.target, namepatt, filterB) - return compare_testcase_data(tcs_base, tcs_target, match.aggregator, - match.aggregator, match.groups, match.columns) + tcs_target = backend.fetch_testcases(query.rhs, namepatt, filterB) + return compare_testcase_data(tcs_base, tcs_target, query) @time_function @@ -837,22 +763,20 @@ def session_data(query): def testcase_data(spec, namepatt=None, test_filter=None): with reraise_as(ReframeError, (ValueError,), 'could not parse comparison spec'): - match = parse_cmp_spec(spec, default_extra_cols=['pval']) + query = parse_cmp_spec(spec) - if match.base is not None: + if query.lhs is not None: raise ReframeError('only one time period or session are allowed: ' 'if you want to compare performance, ' 'use the `--performance-compare` option') storage = StorageBackend.default() - testcases = storage.fetch_testcases(match.target, namepatt, test_filter) - aggregated = _aggregate_perf( - _group_testcases(testcases, match.groups, match.columns), - match.aggregator, match.columns + df = _aggregate_data( + storage.fetch_testcases(query.rhs, namepatt, test_filter), query ) - data = [match.columns] - for aggr_data in aggregated.values(): - data.append([aggr_data[c] for c in match.columns]) + data = [df.columns] + for row in df.iter_rows(): + data.append(row) return data diff --git a/reframe/frontend/reporting/storage.py b/reframe/frontend/reporting/storage.py index 7175744691..e40e5a79ce 100644 --- a/reframe/frontend/reporting/storage.py +++ b/reframe/frontend/reporting/storage.py @@ -17,7 +17,7 @@ from reframe.core.logging import getlogger, time_function, getprofiler from reframe.core.runtime import runtime from reframe.utility import nodelist_abbrev -from ..reporting.utility import QuerySelector +from ..reporting.utility import QuerySelectorTestcase class StorageBackend: @@ -41,7 +41,7 @@ def store(self, report, report_file): '''Store the given report''' @abc.abstractmethod - def fetch_testcases(self, selector: QuerySelector, name_patt=None, + def fetch_testcases(self, selector: QuerySelectorTestcase, name_patt=None, test_filter=None): '''Fetch test cases based on the specified query selector. @@ -54,7 +54,7 @@ def fetch_testcases(self, selector: QuerySelector, name_patt=None, ''' @abc.abstractmethod - def fetch_sessions(self, selector: QuerySelector, decode=True): + def fetch_sessions(self, selector: QuerySelectorTestcase, decode=True): '''Fetch sessions based on the specified query selector. :arg selector: an instance of :class:`QuerySelector` that will specify @@ -65,7 +65,7 @@ def fetch_sessions(self, selector: QuerySelector, decode=True): ''' @abc.abstractmethod - def remove_sessions(self, selector: QuerySelector): + def remove_sessions(self, selector: QuerySelectorTestcase): '''Remove sessions based on the specified query selector :arg selector: an instance of :class:`QuerySelector` that will specify @@ -382,7 +382,7 @@ def _fetch_testcases_time_period(self, ts_start, ts_end, name_patt=None, return [*filter(filt_fn, testcases)] @time_function - def fetch_testcases(self, selector: QuerySelector, + def fetch_testcases(self, selector: QuerySelectorTestcase, name_patt=None, test_filter=None): if selector.by_session(): return self._fetch_testcases_from_session( @@ -394,7 +394,7 @@ def fetch_testcases(self, selector: QuerySelector, ) @time_function - def fetch_sessions(self, selector: QuerySelector, decode=True): + def fetch_sessions(self, selector: QuerySelectorTestcase, decode=True): query = 'SELECT uuid, json_blob FROM sessions' if selector.by_time_period(): ts_start, ts_end = selector.time_period @@ -448,7 +448,7 @@ def _do_remove2(self, conn, uuids): return [rec[0] for rec in results] @time_function - def remove_sessions(self, selector: QuerySelector): + def remove_sessions(self, selector: QuerySelectorTestcase): if selector.by_session_uuid(): uuids = [selector.uuid] else: diff --git a/reframe/frontend/reporting/utility.py b/reframe/frontend/reporting/utility.py index 7d127b9409..0c9af7f8f0 100644 --- a/reframe/frontend/reporting/utility.py +++ b/reframe/frontend/reporting/utility.py @@ -3,99 +3,114 @@ # # SPDX-License-Identifier: BSD-3-Clause -import abc +import polars as pl import re -import statistics -import types -from collections import namedtuple from datetime import datetime, timedelta, timezone from numbers import Number +from typing import Dict, List +from reframe.utility import OrderedSet + + +class Aggregation: + '''Represents a user aggregation''' + + OP_REGEX = re.compile(r'(?P\S+)\((?P\S+)\)|(?P\S+)') + OP_VALID = {'min', 'max', 'median', 'mean', 'std', + 'first', 'last', 'stats', 'sum'} + + def __init__(self, agg_spec: str): + '''Create an Aggregation from an aggretion spec''' + self._aggregations: Dict[str, List[str]] = {} + self._agg_names: Dict[str, str] = {} + for agg in agg_spec.split(','): + m = self.OP_REGEX.match(agg) + if m: + op = m.group('op') or m.group('op2') + col = m.group('col') or 'pval' + if op not in self.OP_VALID: + raise ValueError(f'unknown aggregation: {op}') + + if op == 'stats': + agg_ops = ('min', 'p01', 'p05', 'median', 'p95', 'p99', + 'max', 'mean', 'stddev') + else: + agg_ops = [op] + + self._aggregations.setdefault(col, []) + self._aggregations[col] += agg_ops + for op in agg_ops: + self._agg_names[self._fmt_col(col, op)] = col + else: + raise ValueError(f'invalid aggregation spec: {agg}') + + def __repr__(self) -> str: + return f'Aggregation({self._aggregations})' + + def _fmt_col(self, col: str, op: str) -> str: + '''Format the aggregation's column name''' + return f'{col} ({op})' + + def attributes(self) -> List[str]: + '''Return the attributes to be aggregated''' + return list(self._aggregations.keys()) + + def column_names(self, col: str) -> List[str]: + '''Return the aggragation's column names''' + try: + ops = self._aggregations[col] + return [self._fmt_col(col, op) for op in ops] + except KeyError: + return [col] + + def strip_suffix(self, col: str) -> str: + '''Strip aggregation suffix from column''' + return self._agg_names.get(col, col) + + def col_spec(self, extra_cols: List[str]) -> List[pl.Expr]: + '''Return a list of polars expressions for this aggregation''' + def _expr_from_op(col, op): + if op == 'min': + return pl.col(col).min().alias(f'{col} (min)') + elif op == 'max': + return pl.col(col).max().alias(f'{col} (max)') + elif op == 'median': + return pl.col(col).median().alias(f'{col} (median)') + elif op == 'mean': + return pl.col(col).mean().alias(f'{col} (mean)') + elif op == 'std': + return pl.col(col).std().alias(f'{col} (stddev)') + elif op == 'first': + return pl.col(col).first().alias(f'{col} (first)') + elif op == 'last': + return pl.col(col).last().alias(f'{col} (last)') + elif op == 'p01': + return pl.col(col).quantile(0.01).alias(f'{col} (p01)') + elif op == 'p05': + return pl.col(col).quantile(0.01).alias(f'{col} (p05)') + elif op == 'p95': + return pl.col(col).quantile(0.01).alias(f'{col} (p95)') + elif op == 'p99': + return pl.col(col).quantile(0.01).alias(f'{col} (p99)') + elif op == 'stddev': + return pl.col(col).std().alias(f'{col} (stddev)') + elif op == 'sum': + return pl.col(col).sum().alias(f'{col} (sum)') + + specs = [] + for col, ops in self._aggregations.items(): + for op in ops: + specs.append(_expr_from_op(col, op)) + + # Add col specs for the extra columns requested + for col in extra_cols: + if col == 'pval': + continue + elif col == 'psamples': + specs.append(pl.len().alias('psamples')) + else: + specs.append(pl.col(col).unique().str.join('|')) - -class Aggregator: - @classmethod - def create(cls, name, *args, **kwargs): - if name == 'first': - return AggrFirst(*args, **kwargs) - elif name == 'last': - return AggrLast(*args, **kwargs) - elif name == 'mean': - return AggrMean(*args, **kwargs) - elif name == 'median': - return AggrMedian(*args, **kwargs) - elif name == 'min': - return AggrMin(*args, **kwargs) - elif name == 'max': - return AggrMax(*args, **kwargs) - elif name == 'count': - return AggrCount(*args, **kwargs) - elif name == 'join_uniq': - return AggrJoinUniqueValues(*args, **kwargs) - else: - raise ValueError(f'unknown aggregation function: {name!r}') - - @abc.abstractmethod - def __call__(self, iterable): - pass - - -class AggrFirst(Aggregator): - def __call__(self, iterable): - for i, elem in enumerate(iterable): - if i == 0: - return elem - - -class AggrLast(Aggregator): - def __call__(self, iterable): - if not isinstance(iterable, types.GeneratorType): - return iterable[-1] - - for elem in iterable: - pass - - return elem - - -class AggrMean(Aggregator): - def __call__(self, iterable): - return statistics.mean(iterable) - - -class AggrMedian(Aggregator): - def __call__(self, iterable): - return statistics.median(iterable) - - -class AggrMin(Aggregator): - def __call__(self, iterable): - return min(iterable) - - -class AggrMax(Aggregator): - def __call__(self, iterable): - return max(iterable) - - -class AggrJoinUniqueValues(Aggregator): - def __init__(self, delim): - self.__delim = delim - - def __call__(self, iterable): - unique_vals = {str(elem) for elem in iterable} - return self.__delim.join(unique_vals) - - -class AggrCount(Aggregator): - def __call__(self, iterable): - if hasattr(iterable, '__len__'): - return len(iterable) - - count = 0 - for _ in iterable: - count += 1 - - return count + return specs def _parse_timestamp(s): @@ -153,7 +168,7 @@ def is_uuid(s): return _UUID_PATTERN.match(s) is not None -class QuerySelector: +class QuerySelectorTestcase: '''A class for encapsulating the different session and testcase queries. A session or testcase query can be of one of the following kinds: @@ -237,7 +252,8 @@ def _parse_aggregation(s, base_columns=None): except ValueError: raise ValueError(f'invalid aggregate function spec: {s}') from None - return Aggregator.create(op), _parse_columns(group_cols, base_columns) + # return Aggregator.create(op), _parse_columns(group_cols, base_columns) + return Aggregation(op), _parse_columns(group_cols, base_columns) def parse_query_spec(s): @@ -245,29 +261,151 @@ def parse_query_spec(s): return None if is_uuid(s): - return QuerySelector(uuid=s) + return QuerySelectorTestcase(uuid=s) if '?' in s: time_period, sess_filter = s.split('?', maxsplit=1) if time_period: - return QuerySelector(sess_filter=sess_filter, - time_period=parse_time_period(time_period)) + return QuerySelectorTestcase( + sess_filter=sess_filter, + time_period=parse_time_period(time_period) + ) else: - return QuerySelector(sess_filter=sess_filter) + return QuerySelectorTestcase(sess_filter=sess_filter) + + return QuerySelectorTestcase(time_period=parse_time_period(s)) + + +class _QueryMatch: + '''Class to represent the user's query''' + + def __init__(self, + lhs: QuerySelectorTestcase, + rhs: QuerySelectorTestcase, + aggregation: Aggregation, + groups: List[str], + columns: List[str]): + self.__lhs: QuerySelectorTestcase = lhs + self.__rhs: QuerySelectorTestcase = rhs + self.__aggregation: Aggregation = aggregation + self.__tc_group_by: List[str] = groups + self.__tc_attrs: List[str] = [] + self.__col_variants: Dict[str, List[str]] = {} + + if self.is_compare() and 'pval' not in columns: + # Always add `pval` if the query is a performance comparison + columns.append('pval') + + for col in columns: + if self.is_compare(): + # This is a comparison; trim any column suffixes and store + # them for later selection + if col.endswith(self.lhs_select_suffix): + col = col[:-len(self.lhs_select_suffix)] + self.__col_variants.setdefault(col, []) + self.__col_variants[col].append(self.lhs_column_suffix) + elif col.endswith(self.rhs_select_suffix): + col = col[:-len(self.rhs_select_suffix)] + self.__col_variants.setdefault(col, []) + self.__col_variants[col].append(self.rhs_column_suffix) + else: + self.__col_variants.setdefault(col, []) + self.__col_variants[col].append(self.lhs_column_suffix) + self.__col_variants[col].append(self.rhs_column_suffix) + + self.__tc_attrs.append(col) + + self.__tc_attrs_agg: List[str] = (OrderedSet(self.__tc_attrs) - + OrderedSet(self.__tc_group_by)) + self.__aggregated_cols: List[str] = [] + for col in self.__tc_attrs_agg: + self.__aggregated_cols += self.__aggregation.column_names(col) + + self.__col_variants_agg: List[str] = [] + for col in self.__aggregated_cols: + col_stripped = self.aggregation.strip_suffix(col) + if col_stripped in self.__col_variants: + self.__col_variants_agg += [ + f'{col}{variant}' + for variant in self.__col_variants[col_stripped] + ] + else: + self.__col_variants_agg.append(col) + + def is_compare(self): + '''Check if this query is a performance comparison''' + return self.__lhs is not None - return QuerySelector(time_period=parse_time_period(s)) + @property + def lhs_column_suffix(self): + '''The suffix of the lhs column in a comparison''' + return ' (lhs)' + + @property + def lhs_select_suffix(self): + '''The suffix for selecting the lhs column in a comparison''' + return '_L' + + @property + def rhs_column_suffix(self): + '''The suffix of the rhs column in a comparison''' + return ' (rhs)' + + @property + def rhs_select_suffix(self): + '''The suffix for selecting the rhs column in a comparison''' + return '_R' + + @property + def diff_column(self): + '''The name of the performance difference column''' + return 'pdiff (%)' + + @property + def lhs(self) -> QuerySelectorTestcase: + '''The lhs data sub-query''' + return self.__lhs + + @property + def rhs(self) -> QuerySelectorTestcase: + '''The rhs data sub-query''' + return self.__rhs + + @property + def aggregation(self) -> Aggregation: + '''The aggregation of this query''' + return self.__aggregation + + @property + def attributes(self) -> List[str]: + '''Test attributes requested by this query''' + return self.__tc_attrs + + @property + def aggregated_attributes(self) -> List[str]: + '''Test attributes whose values must be aggregated''' + return self.__tc_attrs_agg + + @property + def aggregated_columns(self) -> List[str]: + '''Column names of the aggregated attributes''' + return self.__aggregated_cols + @property + def aggregated_variants(self) -> List[str]: + '''Column names of the aggregated lhs/rhs attributes''' + return self.__col_variants_agg + + @property + def group_by(self) -> List[str]: + '''Test attributes to be grouped''' + return self.__tc_group_by -_Match = namedtuple('_Match', - ['base', 'target', 'aggregator', 'groups', 'columns']) DEFAULT_GROUP_BY = ['name', 'sysenv', 'pvar', 'punit'] -DEFAULT_EXTRA_COLS = ['pval', 'pdiff'] -def parse_cmp_spec(spec, default_group_by=None, default_extra_cols=None): - default_group_by = default_group_by or list(DEFAULT_GROUP_BY) - default_extra_cols = default_extra_cols or list(DEFAULT_EXTRA_COLS) +def parse_cmp_spec(spec): parts = spec.split('/') if len(parts) == 3: base_spec, target_spec, aggr, cols = None, *parts @@ -278,8 +416,8 @@ def parse_cmp_spec(spec, default_group_by=None, default_extra_cols=None): base = parse_query_spec(base_spec) target = parse_query_spec(target_spec) - aggr_fn, group_cols = _parse_aggregation(aggr, default_group_by) + aggr, group_cols = _parse_aggregation(aggr, DEFAULT_GROUP_BY) # Update base columns for listing - columns = _parse_columns(cols, group_cols + default_extra_cols) - return _Match(base, target, aggr_fn, group_cols, columns) + columns = _parse_columns(cols, group_cols + aggr.attributes()) + return _QueryMatch(base, target, aggr, group_cols, columns) diff --git a/requirements.txt b/requirements.txt index 584665e36c..7990f45c75 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,10 @@ jsonschema==3.2.0 lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.4.0; python_version < '3.8' and platform_machine != 'aarch64' lxml==6.0.2; python_version >= '3.8' +polars==0.12.5; python_version == '3.6' +polars==0.18.4; python_version == '3.7' +polars==1.8.2; python_version == '3.8' +polars==1.35.1; python_version >= '3.9' pytest==7.0.1; python_version < '3.8' pytest==8.3.5; python_version == '3.8' pytest==8.4.2; python_version >= '3.9' diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 8742dc173b..e0224a243f 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -634,8 +634,9 @@ def test_timestamp_option_default(run_reframe): assert returncode == 0 matches = re.findall( - r'(stage|output) directory: .*\/(\d{8}T\d{6}\+\d{4})', stdout + r'(stage|output) directory: .*\/(\d{8}T\d{6}(\+|-)\d{4})', stdout ) + print(stdout) assert len(matches) == 2 @@ -1377,6 +1378,7 @@ def table_format(request): def assert_no_crash(returncode, stdout, stderr, exitcode=0): + print(stdout) assert returncode == exitcode assert 'Traceback' not in stdout assert 'Traceback' not in stderr @@ -1528,5 +1530,5 @@ def assert_no_crash(returncode, stdout, stderr, exitcode=0): assert_no_crash( *run_reframe2( action='--performance-compare=now-1m:now/now-1d:now/mean:+foo/+bar' - ), exitcode=1 + ) ) diff --git a/unittests/test_reporting.py b/unittests/test_reporting.py index c83a35ca6b..c9a30cd13a 100644 --- a/unittests/test_reporting.py +++ b/unittests/test_reporting.py @@ -6,6 +6,7 @@ import json import jsonschema import os +import polars as pl import pytest import sys import time @@ -18,14 +19,13 @@ import reframe.frontend.reporting as reporting import reframe.frontend.reporting.storage as report_storage from reframe.frontend.reporting.utility import (parse_cmp_spec, is_uuid, - QuerySelector, - DEFAULT_GROUP_BY, - DEFAULT_EXTRA_COLS) + QuerySelectorTestcase, + DEFAULT_GROUP_BY) from reframe.core.exceptions import ReframeError from reframe.frontend.reporting import RunReport -_DEFAULT_BASE_COLS = DEFAULT_GROUP_BY + DEFAULT_EXTRA_COLS +_DEFAULT_BASE_COLS = DEFAULT_GROUP_BY + ['pval'] # NOTE: We could move this to utility @@ -211,7 +211,7 @@ def test_parse_cmp_spec_period(time_period): spec, duration = time_period duration = int(duration) match = parse_cmp_spec(f'{spec}/{spec}/mean:/') - for query in ('base', 'target'): + for query in ('lhs', 'rhs'): assert getattr(match, query).by_time_period() ts_start, ts_end = getattr(match, query).time_period if 'now' in spec: @@ -223,36 +223,65 @@ def test_parse_cmp_spec_period(time_period): # Check variant without base period match = parse_cmp_spec(f'{spec}/mean:/') - assert match.base is None + assert match.lhs is None @pytest.fixture(params=['first', 'last', 'mean', 'median', - 'min', 'max', 'count']) + 'min', 'max', 'std', 'stats', 'sum']) def aggregator(request): return request.param def test_parse_cmp_spec_aggregations(aggregator): match = parse_cmp_spec(f'now-1m:now/now-1d:now/{aggregator}:/') - data = [1, 2, 3, 4, 5] + num_recs = 10 + nodelist = [f'nid{i}' for i in range(num_recs)] + df = pl.DataFrame({ + 'name': ['test' for i in range(num_recs)], + 'pvar': ['time' for i in range(num_recs)], + 'unit': ['s' for i in range(num_recs)], + 'pval': [1 + i/10 for i in range(num_recs)], + 'node': nodelist + }) + agg = df.group_by('name').agg(match.aggregation.col_spec(['node'])) + assert set(agg['node'][0].split('|')) == set(nodelist) if aggregator == 'first': - match.aggregator(data) == data[0] + assert 'pval (first)' in agg.columns + assert agg['pval (first)'][0] == 1 elif aggregator == 'last': - match.aggregator(data) == data[-1] + assert 'pval (last)' in agg.columns + assert agg['pval (last)'][0] == 1.9 elif aggregator == 'min': - match.aggregator(data) == 1 + assert 'pval (min)' in agg.columns + assert agg['pval (min)'][0] == 1 elif aggregator == 'max': - match.aggregator(data) == 5 + assert 'pval (max)' in agg.columns + assert agg['pval (max)'][0] == 1.9 elif aggregator == 'median': - match.aggregator(data) == 3 + assert 'pval (median)' in agg.columns + assert agg['pval (median)'][0] == 1.45 elif aggregator == 'mean': - match.aggregator(data) == sum(data) / len(data) - elif aggregator == 'count': - match.aggregator(data) == len(data) + assert 'pval (mean)' in agg.columns + assert agg['pval (mean)'][0] == 1.45 + elif aggregator == 'std': + assert 'pval (stddev)' in agg.columns + elif aggregator == 'stats': + assert 'pval (min)' in agg.columns + assert 'pval (p01)' in agg.columns + assert 'pval (p05)' in agg.columns + assert 'pval (median)' in agg.columns + assert 'pval (p95)' in agg.columns + assert 'pval (p99)' in agg.columns + assert 'pval (max)' in agg.columns + assert 'pval (mean)' in agg.columns + assert 'pval (stddev)' in agg.columns + elif aggregator == 'sum': + assert 'pval (sum)' in agg.columns + assert agg['pval (sum)'][0] == 14.5 # Check variant without base period match = parse_cmp_spec(f'now-1d:now/{aggregator}:/') - assert match.base is None + assert match.lhs is None @pytest.fixture(params=[('', DEFAULT_GROUP_BY), @@ -270,11 +299,11 @@ def test_parse_cmp_spec_group_by(group_by_columns): match = parse_cmp_spec( f'now-1m:now/now-1d:now/min:{spec}/' ) - assert match.groups == expected + assert match.group_by == expected # Check variant without base period match = parse_cmp_spec(f'now-1d:now/min:{spec}/') - assert match.base is None + assert match.lhs is None @pytest.fixture(params=[('', _DEFAULT_BASE_COLS), @@ -292,11 +321,17 @@ def test_parse_cmp_spec_extra_cols(columns): match = parse_cmp_spec( f'now-1m:now/now-1d:now/min:/{spec}' ) - assert match.columns == expected + + # `pval` is always added in case of comparisons + if spec == 'col1,col2': + assert match.attributes == expected + ['pval'] + else: + assert match.attributes == expected # Check variant without base period match = parse_cmp_spec(f'now-1d:now/min:/{spec}') - assert match.base is None + assert match.lhs is None + assert match.attributes == expected def test_is_uuid(): @@ -340,11 +375,11 @@ def _uuids(s): match = parse_cmp_spec(uuid_spec) base_uuid, target_uuid = _uuids(uuid_spec) - if match.base.by_session_uuid(): - assert match.base.uuid == base_uuid + if match.lhs.by_session_uuid(): + assert match.lhs.uuid == base_uuid - if match.target.by_session_uuid(): - assert match.target.uuid == target_uuid + if match.rhs.by_session_uuid(): + assert match.rhs.uuid == target_uuid @pytest.fixture(params=[ @@ -358,16 +393,16 @@ def sess_filter(request): def test_parse_cmp_spec_with_filter(sess_filter): match = parse_cmp_spec(sess_filter) - if match.base: - assert match.base.by_session_filter() - assert match.base.sess_filter == 'xyz == "123"' + if match.lhs: + assert match.lhs.by_session_filter() + assert match.lhs.sess_filter == 'xyz == "123"' - assert match.target.by_session_filter() - assert match.target.sess_filter == 'xyz == "789"' + assert match.rhs.by_session_filter() + assert match.rhs.sess_filter == 'xyz == "789"' if sess_filter.startswith('now'): - assert match.target.by_time_period() - ts_start, ts_end = match.target.time_period + assert match.rhs.by_time_period() + ts_start, ts_end = match.rhs.time_period assert int(ts_end - ts_start) == 86400 @@ -423,7 +458,6 @@ def test_parse_cmp_spec_invalid_extra_cols(invalid_col_spec): 'now-1m:now/now-1d:now', 'now-1m:now/now-1d:now/mean', 'now-1m:now/now-1d:now/mean:', - 'now-1m:now/now-1d:now/mean:', '/now-1d:now/mean:/', 'now-1m:now//mean:']) def various_invalid_specs(request): @@ -446,13 +480,13 @@ def _count_failed(testcases): return count def from_time_period(ts_start, ts_end): - return QuerySelector(time_period=(ts_start, ts_end)) + return QuerySelectorTestcase(time_period=(ts_start, ts_end)) def from_session_uuid(x): - return QuerySelector(uuid=x) + return QuerySelectorTestcase(uuid=x) def from_session_filter(filt, ts_start, ts_end): - return QuerySelector(time_period=(ts_start, ts_end), sess_filter=filt) + return QuerySelectorTestcase(time_period=(ts_start, ts_end), sess_filter=filt) monkeypatch.setenv('HOME', str(tmp_path)) uuids = [] From 07ddc4d7406e773749c6e010214106e34dde228f Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 20 Nov 2025 11:05:10 -0600 Subject: [PATCH 2/4] Support changing the LHS/RHS terms in performance comparisons --- reframe/frontend/cli.py | 18 +++++++++++++++--- reframe/frontend/reporting/__init__.py | 5 +++-- reframe/frontend/reporting/utility.py | 14 +++++++++----- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index d783e86a40..f431948bca 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -702,6 +702,14 @@ def main(): help='The delimiter to use when using `--table-format=csv`', envvar='RFM_TABLE_FORMAT_DELIM', configvar='general/table_format_delim' ) + misc_options.add_argument( + '--term-lhs', action='store', + help='LHS term in performance comparisons' + ) + misc_options.add_argument( + '--term-rhs', action='store', + help='RHS term in performance comparisons' + ) misc_options.add_argument( '-v', '--verbose', action='count', help='Increase verbosity level of output', @@ -1168,8 +1176,10 @@ def restrict_logging(): sys.exit(1) printer.table( - reporting.performance_compare(options.performance_compare, - None, namepatt, *filt) + reporting.performance_compare( + options.performance_compare, None, namepatt, *filt, + options.term_lhs, options.term_rhs + ) ) sys.exit(0) @@ -1769,7 +1779,9 @@ def module_unuse(*paths): try: if rt.get_option('storage/0/enable'): data = reporting.performance_compare( - rt.get_option('general/0/perf_report_spec'), report + rt.get_option('general/0/perf_report_spec'), report, + term_lhs=options.term_lhs, + term_rhs=options.term_rhs ) else: data = report.report_data() diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index b60c0a48d7..0781900b4f 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -690,10 +690,11 @@ def compare_testcase_data(base_testcases, target_testcases, query): @time_function def performance_compare(cmp, report=None, namepatt=None, - filterA=None, filterB=None): + filterA=None, filterB=None, + term_lhs=None, term_rhs=None): with reraise_as(ReframeError, (ValueError,), 'could not parse comparison spec'): - query = parse_cmp_spec(cmp) + query = parse_cmp_spec(cmp, term_lhs, term_rhs) backend = StorageBackend.default() if query.lhs is None: diff --git a/reframe/frontend/reporting/utility.py b/reframe/frontend/reporting/utility.py index 0c9af7f8f0..4d9fb21489 100644 --- a/reframe/frontend/reporting/utility.py +++ b/reframe/frontend/reporting/utility.py @@ -284,13 +284,16 @@ def __init__(self, rhs: QuerySelectorTestcase, aggregation: Aggregation, groups: List[str], - columns: List[str]): + columns: List[str], + term_lhs: str, term_rhs: str): self.__lhs: QuerySelectorTestcase = lhs self.__rhs: QuerySelectorTestcase = rhs self.__aggregation: Aggregation = aggregation self.__tc_group_by: List[str] = groups self.__tc_attrs: List[str] = [] self.__col_variants: Dict[str, List[str]] = {} + self.__lhs_term: str = term_lhs or 'lhs' + self.__rhs_term: str = term_rhs or 'rhs' if self.is_compare() and 'pval' not in columns: # Always add `pval` if the query is a performance comparison @@ -339,7 +342,7 @@ def is_compare(self): @property def lhs_column_suffix(self): '''The suffix of the lhs column in a comparison''' - return ' (lhs)' + return f' ({self.__lhs_term})' @property def lhs_select_suffix(self): @@ -349,7 +352,7 @@ def lhs_select_suffix(self): @property def rhs_column_suffix(self): '''The suffix of the rhs column in a comparison''' - return ' (rhs)' + return f' ({self.__rhs_term})' @property def rhs_select_suffix(self): @@ -405,7 +408,7 @@ def group_by(self) -> List[str]: DEFAULT_GROUP_BY = ['name', 'sysenv', 'pvar', 'punit'] -def parse_cmp_spec(spec): +def parse_cmp_spec(spec, term_lhs=None, term_rhs=None): parts = spec.split('/') if len(parts) == 3: base_spec, target_spec, aggr, cols = None, *parts @@ -420,4 +423,5 @@ def parse_cmp_spec(spec): # Update base columns for listing columns = _parse_columns(cols, group_cols + aggr.attributes()) - return _QueryMatch(base, target, aggr, group_cols, columns) + return _QueryMatch(base, target, aggr, group_cols, columns, + term_lhs, term_rhs) From 407c9da120356cb723610278abd19f9bdbae68a2 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 20 Nov 2025 13:26:16 -0600 Subject: [PATCH 3/4] Remove unused imports --- reframe/frontend/reporting/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/reframe/frontend/reporting/__init__.py b/reframe/frontend/reporting/__init__.py index 0781900b4f..26638d6e18 100644 --- a/reframe/frontend/reporting/__init__.py +++ b/reframe/frontend/reporting/__init__.py @@ -17,14 +17,12 @@ import time import uuid from collections import UserDict -from collections.abc import Hashable import reframe as rfm import reframe.utility.jsonext as jsonext import reframe.utility.osext as osext from reframe.core.exceptions import ReframeError, what, is_severe, reraise_as from reframe.core.logging import getlogger, _format_time_rfc3339, time_function -from reframe.core.runtime import runtime from reframe.core.warnings import suppress_deprecations from reframe.utility import nodelist_abbrev, OrderedSet from .storage import StorageBackend From 68ed85983b770a71a0f5a98d620a255a08c7c267 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Thu, 20 Nov 2025 13:35:11 -0600 Subject: [PATCH 4/4] Add polars in `setup.cfg` --- setup.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.cfg b/setup.cfg index a3db8c3f05..5663cf2b04 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,6 +39,7 @@ install_requires = lxml==5.2.0; python_version < '3.8' and platform_machine == 'aarch64' lxml==5.4.0; python_version < '3.8' and platform_machine != 'aarch64' lxml + polars PyYAML==6.0.1; python_version < '3.8' PyYAML requests