Skip to content

Commit

Permalink
Simulate OOM failures in PipelineTask mocking library.
Browse files Browse the repository at this point in the history
  • Loading branch information
TallJimbo committed Mar 27, 2024
1 parent e08bb7a commit 1f62fa5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-43484.feature.md
@@ -0,0 +1 @@
Add support for testing transient error recovery logic to the `PipelineTask` mock system.
95 changes: 64 additions & 31 deletions python/lsst/pipe/base/tests/mocks/_pipeline_task.py
Expand Up @@ -32,6 +32,7 @@
"DynamicConnectionConfig",
"DynamicTestPipelineTask",
"DynamicTestPipelineTaskConfig",
"ForcedFailure",
"MockPipelineTask",
"MockPipelineTaskConfig",
"mock_task_defs",
Expand All @@ -43,6 +44,7 @@
from collections.abc import Collection, Iterable, Mapping
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar

from astropy.units import Quantity
from lsst.daf.butler import DataCoordinate, DatasetRef, DeferredDatasetHandle, SerializedDatasetType
from lsst.pex.config import Config, ConfigDictField, ConfigurableField, Field, ListField
from lsst.utils.doImport import doImportType
Expand All @@ -68,10 +70,36 @@
_T = TypeVar("_T", bound=cT.BaseConnection)


@dataclasses.dataclass
class ForcedFailure:
"""Information about an exception that should be raised by one or more
quanta.
"""

condition: str
"""Butler expression-language string that matches the data IDs that should
raise.
"""

exception_type: type[Exception] | None = None
"""The type of exception to raise."""

memory_required: Quantity | None = None
"""If not `None`, this failure simulates an out-of-memory failure by
raising only if this value exceeds `ExecutionResources.max_mem`.
"""

def set_config(self, config: MockPipelineTaskConfig) -> None:
config.fail_condition = self.condition

Check warning on line 93 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L93

Added line #L93 was not covered by tests
if self.exception_type:
config.fail_exception = get_full_type_name(self.exception_type)
config.memory_required = self.memory_required

Check warning on line 96 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L95-L96

Added lines #L95 - L96 were not covered by tests


def mock_task_defs(
originals: Iterable[TaskDef],
unmocked_dataset_types: Iterable[str] = (),
force_failures: Mapping[str, tuple[str, type[Exception] | None]] | None = None,
force_failures: Mapping[str, ForcedFailure] | None = None,
) -> list[TaskDef]:
"""Create mocks for an iterable of TaskDefs.
Expand All @@ -82,15 +110,9 @@ def mock_task_defs(
unmocked_dataset_types : `~collections.abc.Iterable` [ `str` ], optional
Names of overall-input dataset types that should not be replaced with
mocks.
force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \
`type` [ `Exception` ] or `None` ] ]
Mapping from original task label to a 2-tuple indicating that some
quanta should raise an exception when executed. The first entry is a
data ID match using the butler expression language (i.e. a string of
the sort passed ass the ``where`` argument to butler query methods),
while the second is the type of exception to raise when the quantum
data ID matches the expression. An exception type of `None` uses
the default, `ValueError`.
force_failures : `~collections.abc.Mapping` [ `str`, `ForcedFailure` ]
Mapping from original task label to information about an exception one
or more quanta for this task should raise.
Returns
-------
Expand All @@ -108,10 +130,7 @@ def mock_task_defs(
config.original = original_task_def.config
config.unmocked_dataset_types.extend(unmocked_dataset_types)
if original_task_def.label in force_failures:
condition, exception_type = force_failures[original_task_def.label]
config.fail_condition = condition
if exception_type is not None:
config.fail_exception = get_full_type_name(exception_type)
force_failures[original_task_def.label].set_config(config)

Check warning on line 133 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L133

Added line #L133 was not covered by tests
mock_task_def = TaskDef(
config=config, taskClass=MockPipelineTask, label=get_mock_name(original_task_def.label)
)
Expand All @@ -122,7 +141,7 @@ def mock_task_defs(
def mock_pipeline_graph(
original_graph: PipelineGraph,
unmocked_dataset_types: Iterable[str] = (),
force_failures: Mapping[str, tuple[str, type[Exception] | None]] | None = None,
force_failures: Mapping[str, ForcedFailure] | None = None,
) -> PipelineGraph:
"""Create mocks for a full pipeline graph.
Expand All @@ -133,15 +152,9 @@ def mock_pipeline_graph(
unmocked_dataset_types : `~collections.abc.Iterable` [ `str` ], optional
Names of overall-input dataset types that should not be replaced with
mocks.
force_failures : `~collections.abc.Mapping` [ `str`, `tuple` [ `str`, \
`type` [ `Exception` ] or `None` ] ]
Mapping from original task label to a 2-tuple indicating that some
quanta should raise an exception when executed. The first entry is a
data ID match using the butler expression language (i.e. a string of
the sort passed as the ``where`` argument to butler query methods),
while the second is the type of exception to raise when the quantum
data ID matches the expression. An exception type of `None` uses
the default, `ValueError`.
force_failures : `~collections.abc.Mapping` [ `str`, `ForcedFailure` ]
Mapping from original task label to information about an exception one
or more quanta for this task should raise.
Returns
-------
Expand All @@ -159,10 +172,7 @@ def mock_pipeline_graph(
config.original = original_task_node.config
config.unmocked_dataset_types.extend(unmocked_dataset_types)
if original_task_node.label in force_failures:
condition, exception_type = force_failures[original_task_node.label]
config.fail_condition = condition
if exception_type is not None:
config.fail_exception = get_full_type_name(exception_type)
force_failures[original_task_node.label].set_config(config)

Check warning on line 175 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L175

Added line #L175 was not covered by tests
result.add_task(get_mock_name(original_task_node.label), MockPipelineTask, config=config)
return result

Expand Down Expand Up @@ -190,6 +200,17 @@ class BaseTestPipelineTaskConfig(PipelineTaskConfig, pipelineConnections=BaseTes
),
)

memory_required = Field[str](
dtype=str,
default=None,
optional=True,
doc=(
"If not None, simulate an out-of-memory failure by raising only if ExecutionResource.max_mem "
"exceeds this value. This string should include units as parsed by astropy.units.Quantity "
"(e.g. '4GB')."
),
)

def data_id_match(self) -> DataIdMatch | None:
if not self.fail_condition:
return None
Expand Down Expand Up @@ -240,6 +261,9 @@ def __init__(
self.data_id_match = self.config.data_id_match()
if self.data_id_match:
self.fail_exception = doImportType(self.config.fail_exception)
self.memory_required = (

Check warning on line 264 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L264

Added line #L264 was not covered by tests
Quantity(self.config.memory_required) if self.config.memory_required is not None else None
)
# Look for, check, and record init-inputs.
task_connections = self.ConfigClass.ConnectionsClass(config=config)
mock_dataset_quantum = MockDatasetQuantum(task_label=self.getName(), data_id={}, inputs={})
Expand Down Expand Up @@ -299,10 +323,19 @@ def runQuantum(

# Possibly raise an exception.
if self.data_id_match is not None and self.data_id_match.match(quantum.dataId):
_LOG.info("Simulating failure of task '%s' on quantum %s", self.getName(), quantum.dataId)
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"
assert self.fail_exception is not None, "Exception type must be defined"
raise self.fail_exception(message)
message = f"Simulated failure: task={self.getName()} dataId={quantum.dataId}"

Check warning on line 327 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L327

Added line #L327 was not covered by tests
if self.memory_required is not None:
if butlerQC.resources.max_mem < self.memory_required:
_LOG.info(

Check warning on line 330 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L330

Added line #L330 was not covered by tests
"Simulating out-of-memory failure for task '%s' on quantum %s",
self.getName(),
quantum.dataId,
)
raise self.fail_exception(message)

Check warning on line 335 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L335

Added line #L335 was not covered by tests
else:
_LOG.info("Simulating failure of task '%s' on quantum %s", self.getName(), quantum.dataId)
raise self.fail_exception(message)

Check warning on line 338 in python/lsst/pipe/base/tests/mocks/_pipeline_task.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/tests/mocks/_pipeline_task.py#L337-L338

Added lines #L337 - L338 were not covered by tests

# Populate the bit of provenance we store in all outputs.
_LOG.info("Reading input data for task '%s' on quantum %s", self.getName(), quantum.dataId)
Expand Down

0 comments on commit 1f62fa5

Please sign in to comment.