Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions backends/arm/test/ops/test_conv2d.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright 2024-2025 Arm Limited and/or its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
Expand Down Expand Up @@ -371,7 +370,7 @@ def test_conv2d_tosa_BI(test_module):
pipeline = TosaPipelineBI[input_t](
test_module, test_module.get_inputs(), aten_op, exir_op
)
pipeline.change_args("run_method_and_compare_outputs", qtol=1)
pipeline.change_args("run_method_and_compare_outputs.0", qtol=1)
pipeline.run()


Expand Down
141 changes: 102 additions & 39 deletions backends/arm/test/tester/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class PipelineStage:
is_called: keeps track of if the function has been called
"""

def __init__(self, func, *args, **kwargs):
self.id: str = func.__name__
def __init__(self, func: Callable, id: str, *args, **kwargs):
self.id: str = id
self.func: Callable = func
self.args = args
self.kwargs = kwargs
Expand Down Expand Up @@ -86,72 +86,130 @@ def __init__(
self.test_data = test_data
self._stages = []

self.add_stage(-1, self.tester.export)
self.add_stage(-1, self.tester.check, self.aten_ops)
self.add_stage(self.tester.export)
self.add_stage(self.tester.check, self.aten_ops, suffix="aten")
if use_to_edge_transform_and_lower:
self.add_stage(-1, self.tester.to_edge_transform_and_lower)

self.add_stage(self.tester.to_edge_transform_and_lower)
else:
self.add_stage(-1, self.tester.to_edge)
self.add_stage(-1, self.tester.check, self.exir_ops)
self.add_stage(-1, self.tester.partition)
self.add_stage(-1, self.tester.check_not, self.exir_ops)
self.add_stage(self.tester.to_edge)
self.add_stage(self.tester.check, self.exir_ops, suffix="exir")
self.add_stage(self.tester.partition)
self.add_stage(self.tester.check_not, self.exir_ops, suffix="exir")
self.add_stage(
-1,
self.tester.check_count,
{"torch.ops.higher_order.executorch_call_delegate": 1},
suffix="exir",
)
self.add_stage(-1, self.tester.to_executorch)
self.add_stage(self.tester.to_executorch)

def add_stage(self, func: Callable, *args, **kwargs):
"""
Adds a stage defined by a function with args and kwargs. By default appends to the pipeline.
For stages which may be added multiple times to a pipeline, s.a. checks and debug stages,
a suffix is appended with a dot to make sure every id is unique, e.g. check becomes check.0

def add_stage(self, pos: int, func: Callable, *args, **kwargs):
"""Adds a stage defined by a function with arguments to the pipeline at index pos. Pos wraps around the list for negative values."""
pipeline_stage = self.PipelineStage(func, *args, **kwargs)
Special kwargs:
pos : specifies position in pipeline to add stage at.
suffix : specifies a custom suffix to identify non unique stages, instead of a number.
"""
pipeline_length = len(self._stages)

pos = -1
if "pos" in kwargs:
pos = kwargs.pop("pos")

if pos < 0:
pos = pipeline_length + (pos + 1)

if not -pipeline_length <= pos <= pipeline_length:
raise ValueError(
f"Pos must be between [-{pipeline_length}, {pipeline_length}]"
)

suffix = None
if "suffix" in kwargs:
suffix = kwargs.pop("suffix")

stage_id = func.__name__
unique_stages = [
"quantize",
"export",
"to_edge_transform_and_lower",
"to_edge",
"partition",
"to_executorch",
"serialize",
]
id_list = [stage.id for stage in self._stages]
if stage_id in unique_stages:
if stage_id in id_list:
raise RuntimeError(f"Tried adding {stage_id} to pipeline twice.")
else:
if suffix is None:
stages_containing_stage_id = [
id for id in id_list if stage_id == id.split(".")[0]
]

suffix = str(len(stages_containing_stage_id))

stage_id = stage_id + "." + suffix

if stage_id in id_list:
raise ValueError("Suffix must be unique in pipeline")

pipeline_stage = self.PipelineStage(func, stage_id, *args, **kwargs)
self._stages.insert(pos, pipeline_stage)

logger.debug(f"Added stage {func.__name__} to {type(self).__name__}")
logger.debug(f"Added stage {stage_id} to {type(self).__name__}")

return self

def pop_stage(self, pos: int):
def pop_stage(self, identifier: int | str):
"""Removes and returns the stage at postion pos"""
return self._stages.pop(pos)
if isinstance(identifier, int):
stage = self._stages.pop(identifier)
elif isinstance(identifier, str):
pos = self.find_pos(identifier)
stage = self._stages.pop(pos)

logger.debug(f"Removed stage {stage.id} from {type(self).__name__}")

return stage

def find_pos(self, stage_id: str):
"""Returns the position of the stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages."""
"""Returns the position of the stage id."""
for i, stage in enumerate(self._stages):
if stage.id == stage_id:
return i

raise Exception(f"Stage id {stage_id} not found in pipeline")

def add_stage_after(self, stage_id: str, func: Callable, *args, **kwargs):
"""Adds a stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages."""
pos = self.find_pos(stage_id)
self.add_stage(pos + 1, func, *args, **kwargs)
"""Adds a stage after the given stage id."""
pos = self.find_pos(stage_id) + 1
kwargs["pos"] = pos

self.add_stage(func, *args, **kwargs)
return self

def dump_artifact(self, stage_id: str, suffix: str = None):
"""Adds a dump_artifact stage after the given stage id."""
self.add_stage_after(stage_id, self.tester.dump_artifact, suffix=suffix)
return self

def dump_artifact(self, stage_id: str):
"""Adds a dump_artifact stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages."""
self.add_stage_after(stage_id, self.tester.dump_artifact)
def dump_operator_distribution(self, stage_id: str, suffix: str = None):
"""Adds a dump_operator_distribution stage after the given stage id."""
self.add_stage_after(
stage_id, self.tester.dump_operator_distribution, suffix=suffix
)
return self

def dump_operator_distribution(self, stage_id: str):
"""Adds a dump_operator_distribution stage after the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages."""
self.add_stage_after(stage_id, self.tester.dump_operator_distribution)
def visualize(self, stage_id: str, suffix: str = None):
"""Adds a dump_operator_distribution stage after the given stage id."""
self.add_stage_after(stage_id, self.tester.visualize, suffix=suffix)
return self

def change_args(self, stage_id: str, *args, **kwargs):
"""Updates the args to the given stage id. Note that this only finds the first stage with the given id, i.e. it should only be used with unique stages."""
"""Updates the args to the given stage id."""
pos = self.find_pos(stage_id)
pipeline_stage = self._stages[pos]
pipeline_stage.update(*args, **kwargs)
Expand Down Expand Up @@ -193,14 +251,15 @@ def __init__(
compile_spec,
use_to_edge_transform_and_lower,
)
self.add_stage(0, self.tester.quantize)
self.add_stage(self.tester.quantize, pos=0)
self.add_stage_after(
"quantize",
self.tester.check,
[
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

remove_quant_nodes_stage = (
Expand All @@ -215,10 +274,11 @@ def __init__(
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

self.add_stage(
-1, self.tester.run_method_and_compare_outputs, inputs=self.test_data
self.tester.run_method_and_compare_outputs, inputs=self.test_data
)


Expand Down Expand Up @@ -252,10 +312,11 @@ def __init__(
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

self.add_stage(
-1, self.tester.run_method_and_compare_outputs, inputs=self.test_data
self.tester.run_method_and_compare_outputs, inputs=self.test_data
)


Expand All @@ -280,14 +341,15 @@ def __init__(
compile_spec,
use_to_edge_transform_and_lower,
)
self.add_stage(0, self.tester.quantize)
self.add_stage(self.tester.quantize, pos=0)
self.add_stage_after(
"quantize",
self.tester.check,
[
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

remove_quant_nodes_stage = (
Expand All @@ -302,12 +364,12 @@ def __init__(
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

if run_on_fvp:
self.add_stage(-1, self.tester.serialize)
self.add_stage(self.tester.serialize)
self.add_stage(
-1,
self.tester.run_method_and_compare_outputs,
qtol=1,
inputs=self.test_data,
Expand Down Expand Up @@ -335,14 +397,15 @@ def __init__(
compile_spec,
use_to_edge_transform_and_lower,
)
self.add_stage(0, self.tester.quantize)
self.add_stage(self.tester.quantize, pos=0)
self.add_stage_after(
"quantize",
self.tester.check,
[
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

remove_quant_nodes_stage = (
Expand All @@ -357,12 +420,12 @@ def __init__(
"torch.ops.quantized_decomposed.dequantize_per_tensor.default",
"torch.ops.quantized_decomposed.quantize_per_tensor.default",
],
suffix="quant_nodes",
)

if run_on_fvp:
self.add_stage(-1, self.tester.serialize)
self.add_stage(self.tester.serialize)
self.add_stage(
-1,
self.tester.run_method_and_compare_outputs,
qtol=1,
inputs=self.test_data,
Expand Down
Loading