Skip to content

Commit

Permalink
Merge branch 'tickets/DM-37655'
Browse files Browse the repository at this point in the history
  • Loading branch information
natelust committed Jan 26, 2023
2 parents be262eb + 6856567 commit 269f7bd
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 3 deletions.
2 changes: 2 additions & 0 deletions doc/changes/DM-37655.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added convince methods to the python api for Pipelines. These methods allow merging pipelines, adding labels
to / removing labels from subsets, and finding subsets containing a specified label.
90 changes: 90 additions & 0 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,96 @@ def fromPipeline(cls, pipeline: Pipeline) -> Pipeline:
def __str__(self) -> str:
return str(self._pipelineIR)

def mergePipeline(self, pipeline: Pipeline) -> None:
"""Merge another in-memory `Pipeline` object into this one.
This merges another pipeline into this object, as if it were declared
in the import block of the yaml definition of this pipeline. This
modifies this pipeline in place.
Parameters
----------
pipeline : `Pipeline`
The `Pipeline` object that is to be merged into this object.
"""
self._pipelineIR.merge_pipelines((pipeline._pipelineIR,))

def addLabelToSubset(self, subset: str, label: str) -> None:
"""Add a task label from the specified subset.
Parameters
----------
subset : `str`
The labeled subset to modify
label : `str`
The task label to add to the specified subset.
Raises
------
ValueError
Raised if the specified subset does not exist within the pipeline.
Raised if the specified label does not exist within the pipeline.
"""
if label not in self._pipelineIR.tasks:
raise ValueError(f"Label {label} does not appear within the pipeline")
if subset not in self._pipelineIR.labeled_subsets:
raise ValueError(f"Subset {subset} does not appear within the pipeline")
self._pipelineIR.labeled_subsets[subset].subset.add(label)

def removeLabelFromSubset(self, subset: str, label: str) -> None:
"""Remove a task label from the specified subset.
Parameters
----------
subset : `str`
The labeled subset to modify
label : `str`
The task label to remove from the specified subset.
Raises
------
ValueError
Raised if the specified subset does not exist in the pipeline.
Raised if the specified label does not exist within the specified
subset.
"""
if subset not in self._pipelineIR.labeled_subsets:
raise ValueError(f"Subset {subset} does not appear within the pipeline")
if label not in self._pipelineIR.labeled_subsets[subset].subset:
raise ValueError(f"Label {label} does not appear within the pipeline")
self._pipelineIR.labeled_subsets[subset].subset.remove(label)

def findSubsetsWithLabel(self, label: str) -> set[str]:
"""Find any subsets which may contain the specified label.
This function returns the name of subsets which return the specified
label. May return an empty set if there are no subsets, or no subsets
containing the specified label.
Parameters
----------
label : `str`
The task label to use in membership check
Returns
-------
subsets : `set` of `str`
Returns a set (possibly empty) of subsets names which contain the
specified label.
Raises
------
ValueError
Raised if the specified label does not exist within this pipeline.
"""
results = set()
if label not in self._pipelineIR.tasks:
raise ValueError(f"Label {label} does not appear within the pipeline")
for subset in self._pipelineIR.labeled_subsets.values():
if label in subset.subset:
results.add(subset.label)
return results

def addInstrument(self, instrument: Union[Instrument, str]) -> None:
"""Add an instrument to the pipeline, or replace an instrument that is
already defined.
Expand Down
28 changes: 25 additions & 3 deletions python/lsst/pipe/base/pipelineIR.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
Dict,
Generator,
Hashable,
Iterable,
List,
Literal,
Mapping,
Expand Down Expand Up @@ -664,18 +665,39 @@ def process_args(argument: Union[str, dict]) -> dict:
else:
self.imports = [ImportIR(**process_args(tmp_import))]

self.merge_pipelines([fragment.toPipelineIR() for fragment in self.imports])

def merge_pipelines(self, pipelines: Iterable[PipelineIR]) -> None:
"""Merge one or more other `PipelineIR` objects into this object.
Parameters
----------
pipelines : `Iterable` of `PipelineIR` objects
An `Iterable` that contains one or more `PipelineIR` objects to
merge into this object.
Raises
------
ValueError
Raised if there is a conflict in instrument specifications.
Raised if a task label appears in more than one of the input
`PipelineIR` objects which are to be merged.
Raised if a labeled subset appears in more than one of the input
`PipelineIR` objects which are to be merged, and with any subset
existing in this object.
"""
# integrate any imported pipelines
accumulate_tasks: Dict[str, TaskIR] = {}
accumulate_labeled_subsets: Dict[str, LabeledSubset] = {}
accumulated_parameters = ParametersIR({})
for other_pipeline in self.imports:
tmp_IR = other_pipeline.toPipelineIR()

for tmp_IR in pipelines:
if self.instrument is None:
self.instrument = tmp_IR.instrument
elif self.instrument != tmp_IR.instrument and tmp_IR.instrument is not None:
msg = (
"Only one instrument can be declared in a pipeline or its imports. "
f"Top level pipeline defines {self.instrument} but {other_pipeline.location} "
f"Top level pipeline defines {self.instrument} but pipeline to merge "
f"defines {tmp_IR.instrument}."
)
raise ValueError(msg)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import lsst.utils.tests
from lsst.pipe.base import Pipeline, PipelineDatasetTypes, TaskDef
from lsst.pipe.base.pipelineIR import LabeledSubset
from lsst.pipe.base.tests.simpleQGraph import AddTask, makeSimplePipeline


Expand Down Expand Up @@ -61,6 +62,49 @@ def testInitial(self):
self.assertEqual(expandedPipeline[0].label, "task0")
self.assertEqual(expandedPipeline[1].label, "task1")

def testModifySubset(self):
pipeline = makeSimplePipeline(2)

# Test adding labels.
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "new_label")
pipeline._pipelineIR.labeled_subsets["test"] = LabeledSubset("test", set(), None)
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "missing_label")
pipeline.addLabelToSubset("test", "task0")
self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set(("task0",)))

# Test removing labels.
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("missing_subset", "task0")
with self.assertRaises(ValueError):
pipeline.addLabelToSubset("test", "missing_label")
pipeline.removeLabelFromSubset("test", "task0")
self.assertEqual(pipeline._pipelineIR.labeled_subsets["test"].subset, set())

def testMergingPipelines(self):
pipeline1 = makeSimplePipeline(2)
pipeline2 = makeSimplePipeline(4)
pipeline2.removeTask("task0")
pipeline2.removeTask("task1")

pipeline1.mergePipeline(pipeline2)
self.assertEqual(pipeline1._pipelineIR.tasks.keys(), set(("task0", "task1", "task2", "task3")))

def testFindingSubset(self):
pipeline = makeSimplePipeline(2)
pipeline._pipelineIR.labeled_subsets["test1"] = LabeledSubset("test1", set(), None)
pipeline._pipelineIR.labeled_subsets["test2"] = LabeledSubset("test2", set(), None)
pipeline._pipelineIR.labeled_subsets["test3"] = LabeledSubset("test3", set(), None)

pipeline.addLabelToSubset("test1", "task0")
pipeline.addLabelToSubset("test3", "task0")

with self.assertRaises(ValueError):
pipeline.findSubsetsWithLabel("missing_label")

self.assertEqual(pipeline.findSubsetsWithLabel("task0"), set(("test1", "test3")))

def testParameters(self):
"""Test that parameters can be set and used to format"""
pipeline_str = textwrap.dedent(
Expand Down

0 comments on commit 269f7bd

Please sign in to comment.