Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickets/DM-37655: Add convienence interfaces to Pipeline class #301

Merged
merged 7 commits into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-37655.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added convince methods to the python api for Pipelines. These methods allow merging pipelines, adding labels
to / removing labels from subsets, and finding subsets containing a specified label.
85 changes: 85 additions & 0 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,91 @@ def fromPipeline(cls, pipeline: Pipeline) -> Pipeline:
def __str__(self) -> str:
return str(self._pipelineIR)

def mergePipeline(self, pipeline: Pipeline) -> None:
"""Merge another in-memory `Pipeline` object into this one.

This merges another pipeline into this object, as if it were declared
in the import block of the yaml definition of this pipeline. This
modifies this pipeline in place.

Parameters
----------
pipeline : `Pipeline`
The `Pipeline` object that is to be merged into this object
natelust marked this conversation as resolved.
Show resolved Hide resolved
"""
self._pipelineIR.merge_pipelines((pipeline._pipelineIR,))

def addLabelToSubset(self, subset: str, label: str) -> None:
"""Add a task label from the specified subset.

Parameters
----------
subset : `str`
The labeled subset to modify
label : `str`
The task label to remove from the specified subset
natelust marked this conversation as resolved.
Show resolved Hide resolved

Raises
------
ValueError : Raised if the specified subset does not exist within the
pipeline.
Raised if the specified label does not exist within the
pipeline.
"""
if label not in self._pipelineIR.tasks:
raise ValueError(f"Label {label} does not appear within the pipeline")
if subset not in self._pipelineIR.labeled_subsets:
raise ValueError(f"Subset {label} does not appear within the pipeline")
natelust marked this conversation as resolved.
Show resolved Hide resolved
self._pipelineIR.labeled_subsets[subset].subset.add(label)

def removeLabelFromSubset(self, subset: str, label: str) -> None:
"""Remove a task label from the specified subset.

Parameters
----------
subset : `str`
The labeled subset to modify
label : `str`
The task label to remove from the specified subset

Raises
------
ValueError : Raised if the specified subset does not exist in the
pipeline.
Raised if the specified label does not exist within the
specified subset.
"""
if subset not in self._pipelineIR.labeled_subsets:
raise ValueError(f"Subset {label} does not appear within the pipeline")
natelust marked this conversation as resolved.
Show resolved Hide resolved
if label not in self._pipelineIR.labeled_subsets[subset].subset:
raise ValueError(f"Label {label} does not appear within the pipeline")
self._pipelineIR.labeled_subsets[subset].subset.remove(label)

def findLabelInSubsets(self, label: str) -> set[str]:
natelust marked this conversation as resolved.
Show resolved Hide resolved
"""Find any subsets which may contain the specified label.

This function returns the name of subsets which return the specified
label. May return an empty set if there are no subsets, or no subsets
containing the specified label.

Parameters
----------
label : `str`
The task label to use in membership check

natelust marked this conversation as resolved.
Show resolved Hide resolved
Raises
------
ValueError : Raised if the specified label does not exist within the
this pipeline.
"""
results = set()
if label not in self._pipelineIR.tasks:
raise ValueError(f"Subset {label} does not appear within the pipeline")
natelust marked this conversation as resolved.
Show resolved Hide resolved
for subset in self._pipelineIR.labeled_subsets.values():
if label in subset.subset:
results.add(subset.label)
return results

def addInstrument(self, instrument: Union[Instrument, str]) -> None:
"""Add an instrument to the pipeline, or replace an instrument that is
already defined.
Expand Down
27 changes: 24 additions & 3 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
Dict,
Generator,
Hashable,
Iterable,
List,
Literal,
Mapping,
Expand Down Expand Up @@ -664,18 +665,38 @@ def process_args(argument: Union[str, dict]) -> dict:
else:
self.imports = [ImportIR(**process_args(tmp_import))]

self.merge_pipelines([fragment.toPipelineIR() for fragment in self.imports])

def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None:
"""Merge one or more other `PipelineIR` objects into this object.

Parameters
----------
pipelines : `Iterable` of `PipelineIR` objects
An `Iterable` that contains one or more `PipelineIR` objects to
merge into this object.

Raises
------
ValueError : Raised if there is a conflict in instrument specifications
natelust marked this conversation as resolved.
Show resolved Hide resolved
Raised if a task label appears in more than one of the
input `PipelineIR` objects which are to be merged.
Raise if a labeled subset appears in more than one of the
natelust marked this conversation as resolved.
Show resolved Hide resolved
input `PipelineIR` objects which are to be merged, and
with any subset existing in this object.
"""
# integrate any imported pipelines
accumulate_tasks: Dict[str, TaskIR] = {}
accumulate_labeled_subsets: Dict[str, LabeledSubset] = {}
accumulated_parameters = ParametersIR({})
for other_pipeline in self.imports:
tmp_IR = other_pipeline.toPipelineIR()

for tmp_IR in pipelines:
if self.instrument is None:
self.instrument = tmp_IR.instrument
elif self.instrument != tmp_IR.instrument and tmp_IR.instrument is not None:
msg = (
"Only one instrument can be declared in a pipeline or its imports. "
f"Top level pipeline defines {self.instrument} but {other_pipeline.location} "
f"Top level pipeline defines {self.instrument} but pipeline to merge "
f"defines {tmp_IR.instrument}."
)
raise ValueError(msg)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import lsst.utils.tests
from lsst.pipe.base import Pipeline, PipelineDatasetTypes, TaskDef
from lsst.pipe.base.pipelineIR import LabeledSubset
from lsst.pipe.base.tests.simpleQGraph import AddTask, makeSimplePipeline


Expand Down Expand Up @@ -61,6 +62,49 @@ def testInitial(self):
self.assertEqual(expandedPipeline[0].label, "task0")
self.assertEqual(expandedPipeline[1].label, "task1")

def testModifySubset(self):
pipeline = makeSimplePipeline(2)

# Test adding labels.
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "new_label")
pipeline._pipelineIR.labeled_subsets["test"] = LabeledSubset("test", set(), None)
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "missing_label")
pipeline.addLabelToSubset("test", "task0")
self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set(("task0",)))

# Test removing labels.
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("missing_subset", "task0")
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "missing_label")
pipeline.removeLabelFromSubset("test", "task0")
self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set())

def testMergingPipelines(self):
pipeline1 = makeSimplePipeline(2)
pipeline2 = makeSimplePipeline(4)
pipeline2.removeTask("task0")
pipeline2.removeTask("task1")

pipeline1.mergePipeline(pipeline2)
self.assertEqual(pipeline1._pipelineIR.tasks.keys(), set(("task0", "task1", "task2", "task3")))

def testFindingSubset(self):
pipeline = makeSimplePipeline(2)
pipeline._pipelineIR.labeled_subsets["test1"] = LabeledSubset("test1", set(), None)
pipeline._pipelineIR.labeled_subsets["test2"] = LabeledSubset("test2", set(), None)
pipeline._pipelineIR.labeled_subsets["test3"] = LabeledSubset("test3", set(), None)

pipeline.addLabelToSubset("test1", "task0")
pipeline.addLabelToSubset("test3", "task0")

with self.assertRaises(ValueError):
pipeline.findLabelInSubsets("missing_label")

self.assertEqual(pipeline.findLabelInSubsets("task0"), set(("test1", "test3")))

def testParameters(self):
"""Test that parameters can be set and used to format"""
pipeline_str = textwrap.dedent(
Expand Down