diff --git a/backends/arm/test/ops/test_conv2d.py b/backends/arm/test/ops/test_conv2d.py index 96464c17097..827e6dfffa3 100644 --- a/backends/arm/test/ops/test_conv2d.py +++ b/backends/arm/test/ops/test_conv2d.py @@ -1,5 +1,4 @@ # Copyright 2024-2025 Arm Limited and/or its affiliates. -# All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. @@ -371,7 +370,7 @@ def test_conv2d_tosa_BI(test_module): pipeline = TosaPipelineBI[input_t]( test_module, test_module.get_inputs(), aten_op, exir_op ) - pipeline.change_args("run_method_and_compare_outputs", qtol=1) + pipeline.change_args("run_method_and_compare_outputs.0", qtol=1) pipeline.run() diff --git a/backends/arm/test/tester/test_pipeline.py b/backends/arm/test/tester/test_pipeline.py index bc67783ddb1..b25773d604c 100644 --- a/backends/arm/test/tester/test_pipeline.py +++ b/backends/arm/test/tester/test_pipeline.py @@ -46,8 +46,8 @@ class PipelineStage: is_called: keeps track of if the function has been called """ - def __init__(self, func, *args, **kwargs): - self.id: str = func.__name__ + def __init__(self, func: Callable, id: str, *args, **kwargs): + self.id: str = id self.func: Callable = func self.args = args self.kwargs = kwargs @@ -86,48 +86,97 @@ def __init__( self.test_data = test_data self._stages = [] - self.add_stage(-1, self.tester.export) - self.add_stage(-1, self.tester.check, self.aten_ops) + self.add_stage(self.tester.export) + self.add_stage(self.tester.check, self.aten_ops, suffix="aten") if use_to_edge_transform_and_lower: - self.add_stage(-1, self.tester.to_edge_transform_and_lower) - + self.add_stage(self.tester.to_edge_transform_and_lower) else: - self.add_stage(-1, self.tester.to_edge) - self.add_stage(-1, self.tester.check, self.exir_ops) - self.add_stage(-1, self.tester.partition) - self.add_stage(-1, self.tester.check_not, self.exir_ops) + self.add_stage(self.tester.to_edge) + self.add_stage(self.tester.check, self.exir_ops, suffix="exir") + self.add_stage(self.tester.partition) + self.add_stage(self.tester.check_not, self.exir_ops, suffix="exir") self.add_stage( - -1, self.tester.check_count, {"torch.ops.higher_order.executorch_call_delegate": 1}, + suffix="exir", ) - self.add_stage(-1, self.tester.to_executorch) + self.add_stage(self.tester.to_executorch) + + def add_stage(self, func: Callable, *args, **kwargs): + """ + Adds a stage defined by a function with args and kwargs. By default appends to the pipeline. + For stages which may be added multiple times to a pipeline, s.a. checks and debug stages, + a suffix is appended with a dot to make sure every id is unique, e.g. check becomes check.0 - def add_stage(self, pos: int, func: Callable, *args, **kwargs): - """Adds a stage defined by a function with arguments to the pipeline at index pos. Pos wraps around the list for negative values.""" - pipeline_stage = self.PipelineStage(func, *args, **kwargs) + Special kwargs: + pos : specifies position in pipeline to add stage at. + suffix : specifies a custom suffix to identify non unique stages, instead of a number. + """ pipeline_length = len(self._stages) + pos = -1 + if "pos" in kwargs: + pos = kwargs.pop("pos") + if pos < 0: pos = pipeline_length + (pos + 1) - if not -pipeline_length <= pos <= pipeline_length: raise ValueError( f"Pos must be between [-{pipeline_length}, {pipeline_length}]" ) + suffix = None + if "suffix" in kwargs: + suffix = kwargs.pop("suffix") + + stage_id = func.__name__ + unique_stages = [ + "quantize", + "export", + "to_edge_transform_and_lower", + "to_edge", + "partition", + "to_executorch", + "serialize", + ] + id_list = [stage.id for stage in self._stages] + if stage_id in unique_stages: + if stage_id in id_list: + raise RuntimeError(f"Tried adding {stage_id} to pipeline twice.") + else: + if suffix is None: + stages_containing_stage_id = [ + id for id in id_list if stage_id == id.split(".")[0] + ] + + suffix = str(len(stages_containing_stage_id)) + + stage_id = stage_id + "." + suffix + + if stage_id in id_list: + raise ValueError("Suffix must be unique in pipeline") + + pipeline_stage = self.PipelineStage(func, stage_id, *args, **kwargs) self._stages.insert(pos, pipeline_stage) - logger.debug(f"Added stage {func.__name__} to {type(self).__name__}") + logger.debug(f"Added stage {stage_id} to {type(self).__name__}") return self - def pop_stage(self, pos: int): + def pop_stage(self, identifier: int | str): """Removes and returns the stage at postion pos""" - return self._stages.pop(pos) + if isinstance(identifier, int): + stage = self._stages.pop(identifier) + elif isinstance(identifier, str): + pos = self.find_pos(identifier) + stage = self._stages.pop(pos) + + logger.debug(f"Removed stage {stage.id} from {type(self).__name__}") + + return stage def find_pos(self, stage_id: str): - """Returns the position of the stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" + """Returns the position of the stage id.""" for i, stage in enumerate(self._stages): if stage.id == stage_id: return i @@ -135,23 +184,32 @@ def find_pos(self, stage_id: str): raise Exception(f"Stage id {stage_id} not found in pipeline") def add_stage_after(self, stage_id: str, func: Callable, *args, **kwargs): - """Adds a stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - pos = self.find_pos(stage_id) - self.add_stage(pos + 1, func, *args, **kwargs) + """Adds a stage after the given stage id.""" + pos = self.find_pos(stage_id) + 1 + kwargs["pos"] = pos + + self.add_stage(func, *args, **kwargs) + return self + + def dump_artifact(self, stage_id: str, suffix: str = None): + """Adds a dump_artifact stage after the given stage id.""" + self.add_stage_after(stage_id, self.tester.dump_artifact, suffix=suffix) return self - def dump_artifact(self, stage_id: str): - """Adds a dump_artifact stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - self.add_stage_after(stage_id, self.tester.dump_artifact) + def dump_operator_distribution(self, stage_id: str, suffix: str = None): + """Adds a dump_operator_distribution stage after the given stage id.""" + self.add_stage_after( + stage_id, self.tester.dump_operator_distribution, suffix=suffix + ) return self - def dump_operator_distribution(self, stage_id: str): - """Adds a dump_operator_distribution stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" - self.add_stage_after(stage_id, self.tester.dump_operator_distribution) + def visualize(self, stage_id: str, suffix: str = None): + """Adds a dump_operator_distribution stage after the given stage id.""" + self.add_stage_after(stage_id, self.tester.visualize, suffix=suffix) return self def change_args(self, stage_id: str, *args, **kwargs): - """Updates the args to the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages.""" + """Updates the args to the given stage id.""" pos = self.find_pos(stage_id) pipeline_stage = self._stages[pos] pipeline_stage.update(*args, **kwargs) @@ -193,7 +251,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -201,6 +259,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -215,10 +274,11 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, inputs=self.test_data + self.tester.run_method_and_compare_outputs, inputs=self.test_data ) @@ -252,10 +312,11 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, inputs=self.test_data + self.tester.run_method_and_compare_outputs, inputs=self.test_data ) @@ -280,7 +341,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -288,6 +349,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -302,12 +364,12 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) if run_on_fvp: - self.add_stage(-1, self.tester.serialize) + self.add_stage(self.tester.serialize) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, qtol=1, inputs=self.test_data, @@ -335,7 +397,7 @@ def __init__( compile_spec, use_to_edge_transform_and_lower, ) - self.add_stage(0, self.tester.quantize) + self.add_stage(self.tester.quantize, pos=0) self.add_stage_after( "quantize", self.tester.check, @@ -343,6 +405,7 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) remove_quant_nodes_stage = ( @@ -357,12 +420,12 @@ def __init__( "torch.ops.quantized_decomposed.dequantize_per_tensor.default", "torch.ops.quantized_decomposed.quantize_per_tensor.default", ], + suffix="quant_nodes", ) if run_on_fvp: - self.add_stage(-1, self.tester.serialize) + self.add_stage(self.tester.serialize) self.add_stage( - -1, self.tester.run_method_and_compare_outputs, qtol=1, inputs=self.test_data,