diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a6772b1c..6792b71a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: matrix: os: [ubuntu-latest] python: ['3.10', '3.11', '3.12', '3.13'] - transformers: ['4.48.3', '4.51.3', '4.52.4', '4.53.0', 'main'] + transformers: ['4.48.3', '4.51.3', '4.52.4', '4.53.1', 'main'] torch: ['2.7', 'main'] exclude: - python: '3.10' @@ -28,7 +28,7 @@ jobs: - python: '3.10' transformers: 'main' - python: '3.11' - transformers: '4.53.0' + transformers: '4.53.1' - python: '3.11' transformers: 'main' - python: '3.13' diff --git a/_unittests/ut_helpers/test_log_helper.py b/_unittests/ut_helpers/test_log_helper.py index 216507c9..9bf0ede4 100644 --- a/_unittests/ut_helpers/test_log_helper.py +++ b/_unittests/ut_helpers/test_log_helper.py @@ -14,6 +14,8 @@ enumerate_csv_files, open_dataframe, filter_data, + mann_kendall, + breaking_last_point, ) @@ -207,7 +209,7 @@ def test_enumerate_csv_files(self): self.assertIn("RAWFILENAME", cube.data.columns) def test_cube_logs_performance1(self): - output = self.get_dump_file("test_cube_logs_performance.xlsx") + output = self.get_dump_file("test_cube_logs_performance1.xlsx") filename = os.path.join(os.path.dirname(__file__), "data", "data-agg.zip") assert list(enumerate_csv_files(filename)) dfs = [open_dataframe(df) for df in enumerate_csv_files(filename)] @@ -232,7 +234,7 @@ def test_cube_logs_performance1(self): self.assertExists(output) def test_cube_logs_performance2(self): - output = self.get_dump_file("test_cube_logs_performance.xlsx") + output = self.get_dump_file("test_cube_logs_performance2.xlsx") filename = os.path.join(os.path.dirname(__file__), "data", "data-agg.zip") assert list(enumerate_csv_files(filename)) dfs = [open_dataframe(df) for df in enumerate_csv_files(filename)] @@ -256,6 +258,16 @@ def test_cube_logs_performance2(self): ) self.assertExists(output) + def test_cube_logs_performance_cube_time(self): + filename = os.path.join(os.path.dirname(__file__), "data", "data-agg.zip") + assert list(enumerate_csv_files(filename)) + dfs = [open_dataframe(df) for df in enumerate_csv_files(filename)] + assert dfs, f"{filename!r} empty" + cube = CubeLogsPerformance(dfs, keep_last_date=True) + cube.load() + ct = cube.clone() + self.assertEqual((52, 106), ct.shape) + def test_duplicate(self): df = pandas.DataFrame( [ @@ -402,6 +414,60 @@ def test_filter_data(self): df2 = filter_data(df, "", "model_exporter:onnx-dynamo;T", verbose=1) self.assertEqualDataFrame(df[df.model_exporter != "onnx-dynamo"], df2) + def test_mann_kendall(self): + test = mann_kendall(list(range(5))) + self.assertEqual((np.float64(1.0), np.float64(0.5196152422706631)), test) + test = mann_kendall(list(range(3))) + self.assertEqual((0, np.float64(0.24618298195866545)), test) + test = mann_kendall(list(range(5, 0, -1))) + self.assertEqual((np.float64(-1.0), np.float64(-0.5196152422706631)), test) + + def test_breaking_last_point(self): + test = breaking_last_point([1, 1, 1, 2]) + self.assertEqual((1, np.float64(1.0)), test) + test = breaking_last_point([1, 1, 1.1, 2]) + self.assertEqual((np.float64(1.0), np.float64(20.50609665440986)), test) + test = breaking_last_point([-1, -1, -1.1, -2]) + self.assertEqual((np.float64(-1.0), np.float64(-20.50609665440986)), test) + test = breaking_last_point([1, 1, 1.1, 1]) + self.assertEqual((np.float64(0.0), np.float64(-0.7071067811865491)), test) + + def test_historical_cube_time(self): + # case 1 + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/02", time_p=0.62, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/03", time_p=0.62, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/01", time_p=0.51, exporter="E2", m_name="A", m_cls="CA"), + dict(date="2025/01/02", time_p=0.62, exporter="E2", m_name="A", m_cls="CA"), + dict(date="2025/01/03", time_p=0.50, exporter="E2", m_name="A", m_cls="CA"), + ] + ) + cube = CubeLogs(df, keys=["^m_*", "exporter"], time="date").load() + cube_time = cube.cube_time(threshold=1.1) + v = cube_time.data["time_p"].tolist() + self.assertEqual([0, -1], v) + + @hide_stdout() + def test_historical_cube_time_mask(self): + output = self.get_dump_file("test_historical_cube_time_mask.xlsx") + df = pandas.DataFrame( + [ + dict(date="2025/01/01", time_p=0.51, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/02", time_p=0.62, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/03", time_p=0.62, exporter="E1", m_name="A", m_cls="CA"), + dict(date="2025/01/01", time_p=0.51, exporter="E2", m_name="A", m_cls="CA"), + dict(date="2025/01/02", time_p=0.62, exporter="E2", m_name="A", m_cls="CA"), + dict(date="2025/01/03", time_p=0.50, exporter="E2", m_name="A", m_cls="CA"), + dict(date="2025/01/01", time_p=0.71, exporter="E2", m_name="B", m_cls="CA"), + dict(date="2025/01/02", time_p=0.72, exporter="E2", m_name="B", m_cls="CA"), + dict(date="2025/01/03", time_p=0.70, exporter="E2", m_name="B", m_cls="CA"), + ] + ) + cube = CubeLogs(df, keys=["^m_*", "exporter"], time="date").load() + cube.to_excel(output, views=["time_p"], time_mask=True, verbose=1) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/onnx_diagnostic/_command_lines_parser.py b/onnx_diagnostic/_command_lines_parser.py index 46a765fe..a2d12723 100644 --- a/onnx_diagnostic/_command_lines_parser.py +++ b/onnx_diagnostic/_command_lines_parser.py @@ -657,9 +657,16 @@ def get_parser_agg() -> ArgumentParser: ), epilog=textwrap.dedent( """ - examples:\n + examples: python -m onnx_diagnostic agg test_agg.xlsx raw/*.zip -v 1 + python -m onnx_diagnostic agg agg.xlsx raw/*.zip raw/*.csv -v 1 \\ + --no-raw --keep-last-date --filter-out "exporter:test-exporter" + + Another to create timeseries: + + python -m onnx_diagnostic agg history.xlsx raw/*.csv -v 1 --no-raw \\ + --no-recent """ ), formatter_class=RawTextHelpFormatter, @@ -812,6 +819,7 @@ def _cmd_agg(argv: List[Any]): verbose=args.verbose, csv=args.csv.split(","), raw=args.raw, + time_mask=True, ) if args.verbose: print(f"Wrote {args.output!r}") diff --git a/onnx_diagnostic/helpers/log_helper.py b/onnx_diagnostic/helpers/log_helper.py index 510d52f4..5434d888 100644 --- a/onnx_diagnostic/helpers/log_helper.py +++ b/onnx_diagnostic/helpers/log_helper.py @@ -21,6 +21,87 @@ BUCKET_SCALES = BUCKET_SCALES_VALUES / 100 + 1 +def mann_kendall(series: Sequence[float], threshold: float = 0.5): + """ + Computes the test of Mann-Kendall. + + :param series: series + :param threshold: 1.96 is the usual value, 0.5 means a short timeseries + ``(0, 1, 2, 3, 4)`` has a significant trend + :return: trend (-1, 0, +1), test value + + .. math:: + + S =\\sum_{i=1}^{n}\\sum_{j=i+1}^{n} sign(x_j - x_i) + + where the function *sign* is: + + .. math:: + + sign(x) = \\left\\{ \\begin{array}{l} -1 if x < 0 \\\\ 0 if x = 0 \\\\ +1 otherwise + \\right. + + And: + + .. math:: + + Var(S)= \\frac{n(n-1)(2n+5)} - \\sum_t t(t-1)(2t+5)}{18} + """ + aseries = np.asarray(series) + stat = 0 + n = len(aseries) + var = n * (n - 1) * (2 * n + 5) + for i in range(n - 1): + stat += np.sign(aseries[i + 1 :] - aseries[i]).sum() + var = var**0.5 + test = (stat + (1 if stat < 0 else (0 if stat == 0 else -1))) / var + trend = np.sign(test) if np.abs(test) > threshold else 0 + return trend, test + + +def breaking_last_point(series: Sequence[float], threshold: float = 1.2): + """ + Assuming a timeseries is constant, we check the last value + is not an outlier. + + :param series: series + :return: significant change (-1, 0, +1), test value + """ + signal = np.asarray(series) + if not np.issubdtype(signal.dtype, np.number): + return 0, np.nan + assert len(signal.shape) == 1, f"Unexpected signal shape={signal.shape}, signal={signal}" + if signal.shape[0] <= 2: + return 0, 0 + + has_value = ~(np.isnan(signal).all()) and ~(np.isinf(signal).all()) + if np.isnan(signal[-1]) or np.isinf(signal[-1]): + return (-1, np.inf) if has_value else (0, 0) + + try: + m = np.mean(signal[:-1]) + except (TypeError, ValueError): + # Not a numerical type + return 0, np.nan + + if np.isnan(m) or np.isinf(m): + return (1, np.inf) if np.isinf(signal[-2]) or np.isnan(signal[-2]) else (0, 0) + v = np.std(signal[:-1]) + if v == 0: + test = signal[-1] - m + assert not np.isnan( + test + ), f"Unexpected test value, test={test}, signal={signal}, m={m}, v={v}" + trend = np.sign(test) + return trend, trend + test = (signal[-1] - m) / v + assert not np.isnan( + test + ), f"Unexpected test value, test={test}, signal={signal}, m={m}, v={v}" + trend = np.sign(test) if np.abs(test) > threshold else 0 + return trend, test + + def filter_data( df: pandas.DataFrame, filter_in: Optional[str] = None, @@ -203,6 +284,42 @@ def open_dataframe( raise ValueError(f"Unexpected value for data: {data!r}") +def align_dataframe_with( + df: pandas.DataFrame, baseline: pandas.DataFrame, fill_value: float = 0 +) -> Optional[pandas.DataFrame]: + """ + Modifies the first dataframe *df* to get the exact same number of columns and rows. + They must share the same levels on both axes. Empty cells are filled with 0. + We only keep the numerical columns. The function return None if the output is empty. + """ + df = df.select_dtypes(include="number") + if df.shape[1] == 0: + return None + bool_cols = list(df.select_dtypes(include="bool").columns) + if bool_cols: + df[bool_cols] = df[bool_cols].astype(int) + assert ( + df.columns.names == baseline.columns.names or df.index.names == baseline.index.names + ), ( + f"Levels mismatch, expected index.names={baseline.index.names}, " + f"expected columns.names={baseline.columns.names}, " + f"got index.names={df.index.names}, " + f"got columns.names={df.columns.names}" + ) + dtypes = set(df[c].dtype for c in df.columns) + assert all(np.issubdtype(dt, np.number) for dt in dtypes), ( + f"All columns in the first dataframe are expected to share " + f"the same type or be at least numerical but got {dtypes}\n{df}" + ) + common_index = df.index.intersection(baseline.index) + cp = pandas.DataFrame(float(fill_value), index=baseline.index, columns=baseline.columns) + for c in df.columns: + if c not in cp.columns or not np.issubdtype(df[c].dtype, np.number): + continue + cp.loc[common_index, c] = df.loc[common_index, c].astype(cp[c].dtype) + return cp + + class CubeViewDef: """ Defines how to compute a view. @@ -226,9 +343,46 @@ class CubeViewDef: :param name: name of the view, used mostly to debug :param plots: adds plot to the Excel sheet :param no_index: remove the index (but keeps the columns) + + Some examples of views. First example is an aggregated view + for many metrics. + + .. code-block:: python + + cube = CubeLogs(...) + + CubeViewDef( + key_index=cube._filter_column(fs, cube.keys_time), + values=cube._filter_column( + ["TIME_ITER", "speedup", "time_latency.*", "onnx_n_nodes"], + cube.values, + ), + ignore_unique=True, + key_agg=["model_name", "task", "model_task", "suite"], + agg_args=lambda column_name: "sum" if column_name.startswith("n_") else "mean", + agg_multi={"speedup_weighted": mean_weight, "speedup_geo": mean_geo}, + name="agg-all", + plots=True, + ) + + Next one focuses on a couple of metrics. + + .. code-block:: python + + cube = CubeLogs(...) + + CubeViewDef( + key_index=cube._filter_column(fs, cube.keys_time), + values=cube._filter_column(["speedup"], cube.values), + ignore_unique=True, + keep_columns_in_index=["suite"], + name="speedup", + ) """ class HighLightKind(enum.IntEnum): + "Codes to highlight values." + NONE = 0 RED = 1 GREEN = 2 @@ -279,17 +433,21 @@ def __repr__(self) -> str: def apply_excel_style( filename_or_writer: Any, f_highlights: Optional[Dict[str, Callable[[Any], CubeViewDef.HighLightKind]]] = None, + time_mask_view: Optional[Dict[str, pandas.DataFrame]] = None, ): """ Applies styles on all sheets in a file unless the sheet is too big. :param filename_or_writer: filename, modified inplace :param f_highlight: color function to apply, one per sheet + :param time_mask_view: if specified, it contains dataframe with the same shape + and values in {-1, 0, +1} which indicates if a value is unexpectedly lower (-1) + or higher (+1), it changes the color of the background then. """ from openpyxl import load_workbook from openpyxl.styles import Alignment from openpyxl.utils import get_column_letter - from openpyxl.styles import Font # , PatternFill, numbers + from openpyxl.styles import Font, PatternFill if isinstance(filename_or_writer, str): workbook = load_workbook(filename_or_writer) @@ -298,6 +456,9 @@ def apply_excel_style( workbook = filename_or_writer.book save = False + mask_low = PatternFill(fgColor="AAAAF0", fill_type="solid") + mask_high = PatternFill(fgColor="F0AAAA", fill_type="solid") + left = Alignment(horizontal="left") left_shrink = Alignment(horizontal="left", shrink_to_fit=True) right = Alignment(horizontal="right") @@ -307,6 +468,14 @@ def apply_excel_style( } for name in workbook.sheetnames: + if time_mask_view and name in time_mask_view: + mask = time_mask_view[name] + with pandas.ExcelWriter(io.BytesIO(), engine="openpyxl") as mask_writer: + mask.to_excel(mask_writer, sheet_name=name) + sheet_mask = mask_writer.sheets[name] + else: + sheet_mask = None + f_highlight = f_highlights.get(name, None) if f_highlights else None sheet = workbook[name] n_rows = sheet.max_row @@ -384,6 +553,16 @@ def apply_excel_style( h = f_highlight(cell.value) if h in font_colors: cell.font = font_colors[h] + + if sheet_mask is not None: + for i in range(1, n_rows + 1): + for j, (cell, cell_mask) in enumerate(zip(sheet[i], sheet_mask[i])): + if j > n_cols: + break + if cell_mask.value not in (1, -1): + continue + cell.fill = mask_low if cell_mask.value < 0 else mask_high + if save: workbook.save(filename_or_writer) @@ -397,6 +576,26 @@ class CubePlot: :param split: draw a graph per line in the dataframe :param timeseries: this assumes the time is one level of the columns, this argument indices the level name + + It defines a graph. Usually *bar* or *barh* is used to + compare experiments for every metric, a subplot by metric. + + .. code-block:: python + + CubePlot(df, kind="barh", orientation="row", split=True) + + *line* is usually used to plot timeseries showing the + evolution of metrics over time. + + .. code-block:: python + + CubePlot( + df, + kind="line", + orientation="row", + split=True, + timeseries="time", + ) """ KINDS = {"bar", "barh", "line"} @@ -607,6 +806,35 @@ def rotate_align(ax, angle=15, align="right"): class CubeLogs: """ Processes logs coming from experiments. + A cube is basically a database with certain columns + playing specific roles. + + * time: only one column, it is not mandatory but it is recommended + to have one + * keys: they are somehow coordinates, they cannot be aggregated, + they are not numbers, more like categories, `(time, *keys)` + identifies an element of the database in an unique way, + there cannot be more than one row sharing the same key and time + values + * values: they are not necessary numerical, but if they are, + they can be aggregated + + Every other columns is ignored. More columns can be added + by using formulas. + + :param data: the raw data + :param time: the time column + :param keys: the keys, can include regular expressions + :param values: the values, can include regular expressions + :param ignored: ignores some column, acts as negative regular + expressions for the other two + :param recent: if more than one rows share the same keys, + the cube only keeps the most recent one + :param formulas: columns to add, defined with formulas + :param fill_missing: a dictionary, defines values replacing missing one + for some columns + :param keep_last_date: overwrites all the times with the most recent + one, it makes things easier for timeseries """ def __init__( @@ -636,6 +864,20 @@ def __init__( self.fill_missing = fill_missing self.keep_last_date = keep_last_date + def clone(self, data: Optional[pandas.DataFrame] = None) -> "CubeLogs": + """ + Makes a copy of the dataframe. + It copies the processed data not the original one. + """ + cube = self.__class__( + data if data is not None else self.data.copy(), + time=self.time, + keys=self.keys_no_time, + values=self.values, + ) + cube.load() + return cube + def post_load_process_piece( self, df: pandas.DataFrame, unique: bool = False ) -> pandas.DataFrame: @@ -741,17 +983,13 @@ def load(self, verbose: int = 0): print(f"[CubeLogs.load] dropped={self.dropped}") print(f"[CubeLogs.load] data.shape={self.data.shape}") - shape = self.data.shape if verbose: print(f"[CubeLogs.load] removed columns, shape={self.data.shape}") self._preprocess() if verbose: print(f"[CubeLogs.load] preprocess, shape={self.data.shape}") - assert ( - self.data.shape[0] > 0 - ), f"The preprocessing reduced shape {shape} to {self.data.shape}." - if self.recent and verbose: - print(f"[CubeLogs.load] keep most recent data.shape={self.data.shape}") + if self.recent: + print(f"[CubeLogs.load] keep most recent data.shape={self.data.shape}") # Let's apply the formulas if self._formulas: @@ -883,6 +1121,18 @@ def __str__(self) -> str: "usual" return str(self.data) if hasattr(self, "data") else str(self._data) + def make_view_def(self, name: str) -> Optional[CubeViewDef]: + """ + Returns a view definition. + + :param name: name of a value + :return: a CubeViewDef or None if name does not make sense + """ + assert name in self.values, f"{name!r} is not one of the values {self.values}" + keys = sorted(self.keys_no_time) + index = len(keys) // 2 + (len(keys) % 2) + return CubeViewDef(key_index=keys[:index], values=[name], name=name) + def view( self, view_def: Union[str, CubeViewDef], @@ -900,6 +1150,12 @@ def view( :param verbose: verbosity level :return: dataframe """ + if isinstance(view_def, str): + # We automatically create a view for a metric + view_def_ = self.make_view_def(view_def) + assert view_def_ is not None, f"Unable to create a view from {view_def!r}" + view_def = view_def_ + assert isinstance( view_def, CubeViewDef ), f"view_def should be a CubeViewDef, got {type(view_def)}: {view_def!r} instead" @@ -1228,9 +1484,10 @@ def to_excel( raw: Optional[str] = "raw", verbose: int = 0, csv: Optional[Sequence[str]] = None, + time_mask: bool = False, ): """ - Creates an excel file with a list of view. + Creates an excel file with a list of views. :param output: output file to create :param views: sequence or dictionary of views to append @@ -1238,9 +1495,14 @@ def to_excel( :param raw: add a page with the raw data :param csv: views to dump as csv files (same name as outputs + view naw) :param verbose: verbosity + :param time_mask: color the background of the cells if one + of the value for the last date is unexpected, + assuming they should remain stale """ if verbose: print(f"[CubeLogs.to_excel] create Excel file {output}, shape={self.shape}") + time_mask &= len(self.data[self.time].unique()) > 2 + cube_time = self.cube_time(fill_other_dates=True) if time_mask else None views = {k: k for k in views} if not isinstance(views, dict) else views f_highlights = {} plots = [] @@ -1252,10 +1514,25 @@ def to_excel( print(f"[CubeLogs.to_excel] add sheet {main!r} with shape {df.shape}") df.to_excel(writer, sheet_name=main, freeze_panes=(1, 1)) + time_mask_view: Dict[str, pandas.DataFrame] = {} for name, view in views.items(): if view is None: continue df, tview = self.view(view, return_view_def=True, verbose=max(verbose - 1, 0)) + if cube_time is not None: + cube_mask = cube_time.view(view) + aligned = align_dataframe_with(cube_mask, df) + if aligned is not None: + assert aligned.shape == df.shape, ( + f"Shape mismatch between the view {df.shape} and the mask " + f"{time_mask_view[name].shape}" + ) + time_mask_view[name] = aligned + if verbose: + print( + f"[CubeLogs.to_excel] compute mask for view {name!r} " + f"with shape {aligned.shape}" + ) if tview is None: continue memory = df.memory_usage(deep=True).sum() @@ -1366,10 +1643,50 @@ def to_excel( if verbose: print(f"[CubeLogs.to_excel] applies style to {output!r}") - apply_excel_style(writer, f_highlights) # type: ignore[arg-type] + apply_excel_style(writer, f_highlights, time_mask_view=time_mask_view) # type: ignore[arg-type] if verbose: print(f"[CubeLogs.to_excel] done with {len(views)} views") + def cube_time(self, fill_other_dates: bool = False, threshold: float = 1.2) -> "CubeLogs": + """ + Aggregates the data over time to detect changes on the last value. + If *fill_other_dates* is True, all dates are kept, but values + are filled with 0. + *threshold* determines the bandwidth within the values are expected, + should be a factor of the standard deviation. + """ + unique_time = self.data[self.time].unique() + assert len(unique_time) > 2, f"Not enough dates to proceed: unique_time={unique_time}" + gr = self.data[[*self.keys_no_time, *self.values]].groupby( + self.keys_no_time, dropna=False + ) + dgr = gr.agg( + lambda series, th=threshold: int(breaking_last_point(series, threshold=th)[0]) + ) + tm = unique_time.max() + assert dgr.shape[0] > 0, ( + f"Unexpected output shape={dgr.shape}, unique_time={unique_time}, " + f"data.shape={self.data.shape}" + ) + dgr[self.time] = tm + if fill_other_dates: + other_df = [] + other_dates = [t for t in unique_time if t != tm] + for t in other_dates: + df = dgr.copy() + df[self.time] = t + for c in df.columns: + if c != self.time: + df[c] = 0 + other_df.append(df) + dgr = pandas.concat([dgr, *other_df], axis=0) + assert dgr.shape[0] > 0, ( + f"Unexpected output shape={dgr.shape}, unique_time={unique_time}, " + f"data.shape={self.data.shape}, " + f"other_df shapes={[df.shape for df in other_df]}" + ) + return self.clone(data=dgr.reset_index(drop=False)) + class CubeLogsPerformance(CubeLogs): """ @@ -1456,6 +1773,21 @@ def __init__( keep_last_date=keep_last_date, ) + def clone(self, data: Optional[pandas.DataFrame] = None) -> "CubeLogs": + """ + Makes a copy of the dataframe. + It copies the processed data not the original one. + """ + cube = self.__class__( + data if data is not None else self.data.copy(), + time=self.time, + keys=self.keys_no_time, + values=self.values, + recent=False, + ) + cube.load() + return cube + def _process_formula( self, formula: Union[str, Callable[[pandas.DataFrame], pandas.Series]] ) -> Callable[[pandas.DataFrame], pandas.Series]: