From 6a48cab0888c783f917b32bfa05bf9e224bd6782 Mon Sep 17 00:00:00 2001 From: Adrian Lundell Date: Mon, 10 Feb 2025 09:05:06 +0100 Subject: [PATCH] Make stage ids unique in the Arm TestPipeline Some stages in the TestPipeline may be added multiple times, such as .check(). To be able to target these by id, give them an unique suffix -> 'id.suffix' Refering to stages in terms of id instead of an index is more self documenting and future proof. This change modifies the add/pop_stage interface: - pos arg in add_stage is now an optional kwarg, appending to the pipline as default - suffix is added to add_stage as an optional_kwarg. If a suffix is not given to a non unique stage, a number is added instead. - pop_stage now allows to use ids for referring to stages. Additionally adds .visualize(stage) for quickly adding visualizing stages to the pipeline. Change-Id: If649a19096ddee6b2eca2c8aa735b54ca7eea3e8 --- backends/arm/test/ops/test_conv2d.py | 3 +- backends/arm/test/tester/test_pipeline.py | 141 ++++++++++++++++------ 2 files changed, 103 insertions(+), 41 deletions(-) diff --git a/backends/arm/test/ops/test_conv2d.py b/backends/arm/test/ops/test_conv2d.py index 96464c17097..827e6dfffa3 100644 --- a/backends/arm/test/ops/test_conv2d.py +++ b/backends/arm/test/ops/test_conv2d.py @@ -1,5 +1,4 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. @@ -371,7 +370,7 @@ def test_conv2d_tosa_BI(test_module): pipeline = TosaPipelineBI[input_t]( test_module, test_module.get_inputs(), aten_op, exir_op ) - pipeline.change_args("run_method_and_compare_outputs", qtol=1) + pipeline.change_args("run_method_and_compare_outputs.0", qtol=1) pipeline.run() diff --git a/backends/arm/test/tester/test_pipeline.py b/backends/arm/test/tester/test_pipeline.py index bc67783ddb1..b25773d604c 100644 --- a/backends/arm/test/tester/test_pipeline.py +++ b/backends/arm/test/tester/test_pipeline.py @@ -46,8 +46,8 @@ class PipelineStage: is_called: keeps track of if the function has been called """ - def __init__(self, func, *args, **kwargs): - self.id: str = func.__name__ + def __init__(self, func: Callable, id: str, *args, **kwargs): + self.id: str = id self.func: Callable = func self.args = args self.kwargs = kwargs @@ -86,48 +86,97 @@ def __init__( self.test_data = test_data self._stages = [] - self.add_stage(-1, self.tester.export) - self.add_stage(-1, self.tester.check, self.aten_ops) + self.add_stage(self.tester.export) + self.add_stage(self.tester.check, self.aten_ops, suffix="aten") if use_to_edge_transform_and_lower: - self.add_stage(-1, self.tester.to_edge_transform_and_lower) - + self.add_stage(self.tester.to_edge_transform_and_lower) else: - self.add_stage(-1, self.tester.to_edge) - self.add_stage(-1, self.tester.check, self.exir_ops) - self.add_stage(-1, self.tester.partition) - self.add_stage(-1, self.tester.check_not, self.exir_ops) + self.add_stage(self.tester.to_edge) + self.add_stage(self.tester.check, self.exir_ops, suffix="exir") + self.add_stage(self.tester.partition) + self.add_stage(self.tester.check_not, self.exir_ops, suffix="exir") self.add_stage( - -1, self.tester.check_count, {"torch.ops.higher_order.executorch_call_delegate": 1}, + suffix="exir", ) - self.add_stage(-1, self.tester.to_executorch) + self.add_stage(self.tester.to_executorch) + + def add_stage(self, func: Callable, *args, **kwargs): + """ + Adds a stage defined by a function with args and kwargs. By default appends to the pipeline. + For stages which may be added multiple times to a pipeline, s.a. checks and debug stages, + a suffix is appended with a dot to make sure every id is unique, e.g. check becomes check.0 - def add_stage(self, pos: int, func: Callable, *args, **kwargs): - """Adds a stage defined by a function with arguments to the pipeline at index pos. Pos wraps around the list for negative values.""" - pipeline_stage = self.PipelineStage(func, *args, **kwargs) + Special kwargs: + pos : specifies position in pipeline to add stage at. + suffix : specifies a custom suffix to identify non unique stages, instead of a number. + """ pipeline_length = len(self._stages) + pos = -1 + if "pos" in kwargs: + pos = kwargs.pop("pos") + if pos < 0: pos = pipeline_length + (pos + 1) - if not -pipeline_length <= pos <= pipeline_length: raise ValueError( f"Pos must be between [-{pipeline_length}, {pipeline_length}]" ) + suffix = None + if "suffix" in kwargs: + suffix = kwargs.pop("suffix") + + stage_id = func.__name__ + unique_stages = [ + "quantize", + "export", + "to_edge_transform_and_lower", + "to_edge", + "partition", + "to_executorch", + "serialize", + ] + id_list = [stage.id for stage in self._stages] + if stage_id in unique_stages: + if stage_id in id_list: + raise RuntimeError(f"Tried adding {stage_id} to pipeline twice.") + else: + if suffix is None: + stages_containing_stage_id = [ + id for id in id_list if stage_id == id.split(".")[0] + ] + + suffix = str(len(stages_containing_stage_id)) + + stage_id = stage_id + "." + suffix + + if stage_id in id_list: + raise ValueError("Suffix must be unique in pipeline") + + pipeline_stage = self.PipelineStage(func, stage_id, *args, **kwargs) self._stages.insert(pos, pipeline_stage) - logger.debug(f"Added stage {func.__name__} to {type(self).__name__}") + logger.debug(f"Added stage {stage_id} to {type(self).__name__}") return self - def pop_stage(self, pos: int): + def pop_stage(self, identifier: int | str): """Removes and returns the stage at postion pos""" - return self._stages.pop(pos) + if isinstance(identifier, int): + stage = self._stages.pop(identifier) + elif isinstance(identifier, str): + pos = self.find_pos(identifier) + stage = self._stages.pop(pos) + + logger.debug(f"Removed stage {stage.id} from {type(self).__name__}") + + return stage def find_pos(self, stage_id: str): - """Returns the position of the stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" + """Returns the position of the stage id.""" for i, stage in enumerate(self._stages): if stage.id == stage_id: return i @@ -135,23 +184,32 @@ def find_pos(self, stage_id: str): raise Exception(f"Stage id {stage_id} not found in pipeline") def add_stage_after(self, stage_id: str, func: Callable, *args, **kwargs): - """Adds a stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - pos = self.find_pos(stage_id) - self.add_stage(pos + 1, func, *args, **kwargs) + """Adds a stage after the given stage id.""" + pos = self.find_pos(stage_id) + 1 + kwargs["pos"] = pos + + self.add_stage(func, *args, **kwargs) + return self + + def dump_artifact(self, stage_id: str, suffix: str = None): + """Adds a dump_artifact stage after the given stage id.""" + self.add_stage_after(stage_id, self.tester.dump_artifact, suffix=suffix) return self - def dump_artifact(self, stage_id: str): - """Adds a dump_artifact stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - self.add_stage_after(stage_id, self.tester.dump_artifact) + def dump_operator_distribution(self, stage_id: str, suffix: str = None): + """Adds a dump_operator_distribution stage after the given stage id.""" + self.add_stage_after( + stage_id, self.tester.dump_operator_distribution, suffix=suffix + ) return self - def dump_operator_distribution(self, stage_id: str): - """Adds a dump_operator_distribution stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - self.add_stage_after(stage_id, self.tester.dump_operator_distribution) + def visualize(self, stage_id: str, suffix: str = None): + """Adds a dump_operator_distribution stage after the given stage id.""" + self.add_stage_after(stage_id, self.tester.visualize, suffix=suffix) return self def change_args(self, stage_id: str, *args, **kwargs): - """Updates the args to the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" + """Updates the args to the given stage id.""" pos = self.find_pos(stage_id) pipeline_stage = self._stages[pos] pipeline_stage.update(*args, **kwargs) @@ -193,7 +251,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -201,6 +259,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -215,10 +274,11 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, inputs=self.test_data + self.tester.run_method_and_compare_outputs, inputs=self.test_data ) @@ -252,10 +312,11 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, inputs=self.test_data + self.tester.run_method_and_compare_outputs, inputs=self.test_data ) @@ -280,7 +341,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -288,6 +349,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -302,12 +364,12 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) if run_on_fvp: - self.add_stage(-1, self.tester.serialize) + self.add_stage(self.tester.serialize) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, qtol=1, inputs=self.test_data, @@ -335,7 +397,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -343,6 +405,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -357,12 +420,12 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) if run_on_fvp: - self.add_stage(-1, self.tester.serialize) + self.add_stage(self.tester.serialize) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, qtol=1, inputs=self.test_data,