diff --git a/CHANGELOGS.rst b/CHANGELOGS.rst index da18f9eb..1971cfbe 100644 --- a/CHANGELOGS.rst +++ b/CHANGELOGS.rst @@ -4,6 +4,7 @@ Change Logs 0.8.0 +++++ +* :pr:`283`: fix historical aggregation when multiple input sets are used * :pr:`282`: add tools to understand better which functions were patched * :pr:`280`: fixes patches for sdpa_attention_forward for different version of transformers * :pr:`278`: implements ``onnx_generate_with_genai`` diff --git a/_unittests/ut_helpers/test_log_helper.py b/_unittests/ut_helpers/test_log_helper.py index 065e3b4d..38c1ee20 100644 --- a/_unittests/ut_helpers/test_log_helper.py +++ b/_unittests/ut_helpers/test_log_helper.py @@ -614,6 +614,151 @@ def test_cube_sbs_with_time(self): self.assertEqual(sbs_agg.index.names, ["date", "METRICS"]) self.assertEqual(sorted(sbs_agg.columns.names), ["CONF", "exporter"]) + def test_fix_non_consistent_historical_data_no_change(self): + # no change + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model_s="O", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"]), return_view_def=True + ) + expected = { + ("time_p", pandas.Timestamp("2025-01-01 00:00:00")): {"ALL": 0.51}, + ("time_p", pandas.Timestamp("2025-01-02 00:00:00")): {"ALL": 0.51}, + ("time_p", pandas.Timestamp("2025-01-03 00:00:00")): {"ALL": 0.53}, + } + self.assertEqual(expected, view.to_dict()) + + # no change + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model_s="O", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"], fix_aggregation_change=["model_s"]), + return_view_def=True, + ) + self.assertEqual(expected, view.to_dict()) + + def test_fix_non_consistent_historical_data_mixed_values1(self): + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model_s="A", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"], fix_aggregation_change=["model_s"]), + return_view_def=True, + ) + raw = view.to_dict() + self.assertEqual( + { + ("time_p", pandas.Timestamp("2025-01-01 00:00:00")): {"A-O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-02 00:00:00")): {"A-O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-03 00:00:00")): {"A-O": 0.53}, + }, + raw, + ) + + def test_fix_non_consistent_historical_data_mixed_values2(self): + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model_s="A", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"], fix_aggregation_change=["model_s"]), + return_view_def=True, + ) + raw = view.to_dict() + self.assertEqual( + { + ("time_p", pandas.Timestamp("2025-01-01 00:00:00")): {"A-O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-02 00:00:00")): {"A-O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-03 00:00:00")): {"A-O": 0.53}, + }, + raw, + ) + + def test_fix_non_consistent_historical_data_mixed_nan(self): + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"], fix_aggregation_change=["model_s"]), + return_view_def=True, + ) + raw = view.to_dict() + self.assertEqual( + { + ("time_p", pandas.Timestamp("2025-01-01 00:00:00")): {"O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-02 00:00:00")): {"O": 0.51}, + ("time_p", pandas.Timestamp("2025-01-03 00:00:00")): {"O": 0.53}, + }, + raw, + ) + + def test_fix_non_consistent_historical_data_nan(self): + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E1", model_s="O", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E1", model_s="A", model="M"), + dict(date="2025/01/01", time_p=0.51, exporter="E2", model="M"), + dict(date="2025/01/02", time_p=0.51, exporter="E2", model="M"), + dict(date="2025/01/03", time_p=0.53, exporter="E2", model="M"), + ] + ) + cube = CubeLogs( + df, keys=["^model*", "exporter", "opt"], values=["time_p"], time="date" + ).load() + view, _view_def = cube.view( + CubeViewDef(["^model.*"], ["^time_.*"], fix_aggregation_change=["model_s"]), + return_view_def=True, + ) + raw = view.reset_index(drop=True).fillna("NAN").to_dict(orient="list") + self.assertEqual( + { + ("time_p", "E1", pandas.Timestamp("2025-01-01 00:00:00")): ["NAN", 0.51], + ("time_p", "E1", pandas.Timestamp("2025-01-02 00:00:00")): ["NAN", 0.51], + ("time_p", "E1", pandas.Timestamp("2025-01-03 00:00:00")): ["NAN", 0.53], + ("time_p", "E2", pandas.Timestamp("2025-01-01 00:00:00")): [0.51, "NAN"], + ("time_p", "E2", pandas.Timestamp("2025-01-02 00:00:00")): [0.51, "NAN"], + ("time_p", "E2", pandas.Timestamp("2025-01-03 00:00:00")): [0.53, "NAN"], + }, + raw, + ) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/onnx_diagnostic/helpers/log_helper.py b/onnx_diagnostic/helpers/log_helper.py index 15e57bce..612c3466 100644 --- a/onnx_diagnostic/helpers/log_helper.py +++ b/onnx_diagnostic/helpers/log_helper.py @@ -42,6 +42,8 @@ class CubeViewDef: :param name: name of the view, used mostly to debug :param plots: adds plot to the Excel sheet :param no_index: remove the index (but keeps the columns) + :param fix_aggregation_change: a column among the keys which changes aggregation value + for different dates Some examples of views. First example is an aggregated view for many metrics. @@ -106,6 +108,7 @@ def __init__( name: Optional[str] = None, no_index: bool = False, plots: bool = False, + fix_aggregation_change: Optional[List["str"]] = None, ): self.key_index = key_index self.values = values @@ -123,6 +126,7 @@ def __init__( self.name = name self.no_index = no_index self.plots = plots + self.fix_aggregation_change = fix_aggregation_change def __repr__(self) -> str: "usual" @@ -750,6 +754,17 @@ def view( f"values={sorted(self.values)}" ) + if view_def.fix_aggregation_change and ( + set(view_def.fix_aggregation_change) & set(self.keys_no_time) + ): + # before aggregation, let's fix some keys whose values changed over time + data_to_process = self._fix_aggregation_change( + self.data, + list(set(view_def.fix_aggregation_change) & set(self.keys_no_time)), + ) + else: + data_to_process = self.data + # aggregation if key_agg: final_stack = True @@ -763,7 +778,7 @@ def view( print(f"[CubeLogs.view] aggregation of {set_key_agg}") print(f"[CubeLogs.view] groupby {keys_no_agg}") - data_red = self.data[[*keys_no_agg, *values]] + data_red = data_to_process[[*keys_no_agg, *values]] assert set(key_index) <= set(data_red.columns), ( f"view_def.name={view_def.name!r}, " f"nnable to find {set(key_index) - set(data_red.columns)}, " @@ -792,7 +807,7 @@ def view( key_index = self._filter_column(view_def.key_index, self.keys_time) if verbose: print(f"[CubeLogs.view] no aggregation, index={key_index}") - data = self.data[[*self.keys_time, *values]] + data = data_to_process[[*self.keys_time, *values]] set_all_keys = set(self.keys_time) final_stack = False @@ -829,7 +844,7 @@ def view( key_columns = sorted(set_key_columns) unique = set() - _md = lambda s: {k: v for k, v in self.values_for_key.items() if k in s} # noqa: E731 + # md = lambda s: {k: v for k, v in self.values_for_key.items() if k in s} # noqa: E731 all_cols = set(key_columns) | set(key_index) | set(key_agg) | unique assert all_cols == set(self.keys_time), ( f"view_def.name={view_def.name!r}, " @@ -892,7 +907,7 @@ def view( f"key={sorted(key_columns)}, key_agg={key_agg}, values={sorted(values)}, " f"columns={sorted(data.columns)}, ignored={view_def.ignore_columns}, " f"not unique={set(data.columns) - unique}" - f"\n--\n{not_unique.head()}" + f"\n--\n{not_unique.head(10)}" ) # pivot @@ -961,6 +976,70 @@ def view( print(f"[CubeLogs.view] -- done view {view_def.name!r}") return (piv, view_def) if return_view_def else piv + def _fix_aggregation_change( + self, + data: pandas.DataFrame, + columns_to_fix: Union[str, List[str]], + overwrite_or_merge: bool = True, + ) -> pandas.DataFrame: + """ + Fixes columns used to aggregate values because their meaning changed over time. + + :param data: data to fix + :param columns_to_fix: list of columns to fix + :param overwrite_or_merge: if True, overwrite all values by the concatenation + of all existing values, if merge, merges existing values found + and grouped by the other keys + :return: fixed data + """ + if not isinstance(columns_to_fix, str): + for c in columns_to_fix: + data = self._fix_aggregation_change(data, c) + return data + # Let's process one column. + keys = set(self.keys_time) - {columns_to_fix} + select = data[self.keys_time] + select_agg = select.groupby(list(keys)).count() + assert select_agg[columns_to_fix].max() <= 1, ( + f"Column {columns_to_fix!r} has two distinct values at least for one date\n" + f"{select_agg[select_agg[columns_to_fix] > 1]}" + ) + + # unique value (to fill NaN) + unique = "-".join(sorted(set(data[columns_to_fix].dropna()))) + + keys = set(self.keys_no_time) - {columns_to_fix} + select = data[self.keys_no_time] + select_agg = select.groupby(list(keys), as_index=True).apply( + lambda x: "-".join(sorted(set(x[columns_to_fix].dropna()))), include_groups=False + ) + select_agg = select_agg.to_frame(name=columns_to_fix) + res = pandas.merge( + data.drop([columns_to_fix], axis=1), + select_agg, + how="left", + left_on=list(keys), + right_index=True, + ) + val = f"?{unique}?" + res[columns_to_fix] = res[columns_to_fix].fillna(val).replace("", val) + assert ( + data.shape == res.shape + and sorted(data.columns) == sorted(res.columns) + and sorted(data.index) == sorted(res.index) + ), ( + f"Shape should match, data.shape={data.shape}, res.shape={res.shape}, " + f"lost={set(data.columns) - set(res.columns)}, " + f"added={set(res.columns) - set(data.columns)}" + ) + res = res[data.columns] + assert data.columns.equals(res.columns) and data.index.equals(res.index), ( + f"Columns or index mismatch " + f"data.columns.equals(res.columns)={data.columns.equals(res.columns)}, " + f"data.index.equals(res.columns)={data.index.equals(res.columns)}, " + ) + return res + def _dropna( self, data: pandas.DataFrame, @@ -1886,6 +1965,7 @@ def make_view_def(self, name: str) -> Optional[CubeViewDef]: * **cmd:** command lines * **raw-short:** raw data without all the unused columns """ + fix_aggregation_change = ["model_speedup_input_set", "model_test_with"] fs = ["suite", "model_suite", "task", "model_name", "model_task"] index_cols = self._filter_column(fs, self.keys_time) assert index_cols, ( @@ -1984,6 +2064,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="agg-suite", order=order, + fix_aggregation_change=fix_aggregation_change, ), "agg-all": lambda: CubeViewDef( key_index=index_cols, @@ -2014,6 +2095,7 @@ def mean_geo(gr): name="agg-all", order=order, plots=True, + fix_aggregation_change=fix_aggregation_change, ), "disc": lambda: CubeViewDef( key_index=index_cols, @@ -2023,6 +2105,7 @@ def mean_geo(gr): f_highlight=f_disc, name="disc", order=order, + fix_aggregation_change=fix_aggregation_change, ), "speedup": lambda: CubeViewDef( key_index=index_cols, @@ -2032,6 +2115,7 @@ def mean_geo(gr): f_highlight=f_speedup, name="speedup", order=order, + fix_aggregation_change=fix_aggregation_change, ), "counts": lambda: CubeViewDef( key_index=index_cols, @@ -2048,6 +2132,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="peak-gpu", order=order, + fix_aggregation_change=fix_aggregation_change, ), "time": lambda: CubeViewDef( key_index=index_cols, @@ -2058,6 +2143,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="time", order=order, + fix_aggregation_change=fix_aggregation_change, ), "time_export": lambda: CubeViewDef( key_index=index_cols, @@ -2066,6 +2152,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="time_export", order=order, + fix_aggregation_change=fix_aggregation_change, ), "err": lambda: CubeViewDef( key_index=index_cols, @@ -2076,6 +2163,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="err", order=order, + fix_aggregation_change=fix_aggregation_change, ), "bucket-speedup": lambda: CubeViewDef( key_index=index_cols, @@ -2085,6 +2173,7 @@ def mean_geo(gr): name="bucket-speedup", f_highlight=f_bucket, order=order, + fix_aggregation_change=fix_aggregation_change, ), "onnx": lambda: CubeViewDef( key_index=index_cols, @@ -2103,6 +2192,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="onnx", order=order, + fix_aggregation_change=fix_aggregation_change, ), "raw-short": lambda: CubeViewDef( key_index=self.keys_time, @@ -2111,6 +2201,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="raw-short", no_index=True, + fix_aggregation_change=fix_aggregation_change, ), } @@ -2123,6 +2214,7 @@ def mean_geo(gr): keep_columns_in_index=["suite"], name="cmd", order=order, + fix_aggregation_change=fix_aggregation_change, ) assert name in implemented_views or name in {"cmd"}, (