From d163ad35eed6745b3362217a80f877072faae0a7 Mon Sep 17 00:00:00 2001 From: fred3m Date: Fri, 14 Nov 2025 10:03:35 -0800 Subject: [PATCH 1/3] Implement object_parent as an output for coadd deblending --- .../pipe/tasks/deblendCoaddSourcesPipeline.py | 2 +- python/lsst/pipe/tasks/multiBand.py | 2 + python/lsst/pipe/tasks/postprocess.py | 56 +++++++++++++++++++ schemas/Object.yaml | 13 +---- schemas/Parent.yaml | 26 +++++++++ tests/test_isPrimaryFlag.py | 17 +++++- 6 files changed, 101 insertions(+), 15 deletions(-) create mode 100644 schemas/Parent.yaml diff --git a/python/lsst/pipe/tasks/deblendCoaddSourcesPipeline.py b/python/lsst/pipe/tasks/deblendCoaddSourcesPipeline.py index 35d785363..6c8646fd8 100644 --- a/python/lsst/pipe/tasks/deblendCoaddSourcesPipeline.py +++ b/python/lsst/pipe/tasks/deblendCoaddSourcesPipeline.py @@ -123,7 +123,7 @@ class DeblendCoaddSourcesMultiConnections(PipelineTaskConnections, ) objectParents = cT.Output( doc="Parents of the deblended objects", - name="object_parents", + name="object_parent_patch", storageClass="SourceCatalog", dimensions=("tract", "patch", "skymap"), ) diff --git a/python/lsst/pipe/tasks/multiBand.py b/python/lsst/pipe/tasks/multiBand.py index be286d9a6..3b2eef00c 100644 --- a/python/lsst/pipe/tasks/multiBand.py +++ b/python/lsst/pipe/tasks/multiBand.py @@ -909,6 +909,8 @@ def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, sources : `lsst.afw.table.SourceCatalog` A catalog built from the results of merged detections, or deblender outputs. + parentCatalog : `lsst.afw.table.SourceCatalog` + Catalog of parent sources corresponding to sources. skyInfo : `lsst.pipe.base.Struct` A struct containing information about the position of the input exposure within a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box. diff --git a/python/lsst/pipe/tasks/postprocess.py b/python/lsst/pipe/tasks/postprocess.py index df583f460..12d8c4225 100644 --- a/python/lsst/pipe/tasks/postprocess.py +++ b/python/lsst/pipe/tasks/postprocess.py @@ -1826,3 +1826,59 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): inputRefs.inputCatalogs[0].datasetType.name) df = pd.concat(inputs["inputCatalogs"]) butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) + + +class ConsolidateParentTractConnections( + pipeBase.PipelineTaskConnections, + dimensions=("instrument", "tract") +): + inputCatalogs = connectionTypes.Input( + doc="Parents of the deblended objects", + name="object_parent_patch", + storageClass="SourceCatalog", + dimensions=("tract", "patch", "skymap"), + multiple=True, + ) + + outputCatalog = connectionTypes.Output( + doc="Output per-tract concatenation of DataFrame Tables", + name="object_parent", + storageClass="DataFrame", + dimensions=("tract", "skymap"), + ) + + +class ConsolidateParentTractConfig( + pipeBase.PipelineTaskConfig, + pipelineConnections=ConsolidateParentTractConnections, +): + pass + + +class ConsolidateParentTractTask(pipeBase.PipelineTask): + """Concatenate any per-patch, dataframe list into a single + per-tract DataFrame. + """ + _DefaultName = "ConsolidateTract" + ConfigClass = ConsolidateParentTractConfig + + def runQuantum(self, butlerQC, inputRefs, outputRefs): + self.log.info("Concatenating %s per-patch %s Tables", + len(inputRefs.inputCatalogs), + inputRefs.inputCatalogs[0].datasetType.name) + + dataFrames = [] + for ref in inputRefs.inputCatalogs: + catalog = butlerQC.get(ref) + df = catalog.asAstropy().to_pandas() + df.rename(columns={ + "id": "objectId", + "parent": "parentObjectId", + "merge_peak_sky": "sky_object", + }, inplace=True) + df.set_index("objectId", inplace=True) + df["tract"] = ref.dataId["tract"] + df["patch"] = ref.dataId["patch"] + dataFrames.append(df) + df = pd.concat(dataFrames) + butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) diff --git a/schemas/Object.yaml b/schemas/Object.yaml index 2fd1223d0..918980dbf 100644 --- a/schemas/Object.yaml +++ b/schemas/Object.yaml @@ -1225,22 +1225,13 @@ refFlags: - detect_isDeblendedSource - detect_isDeblendedModelSource - merge_peak_sky - - deblend_nChild - - deblend_nPeaks - - deblend_failed - - deblend_skipped - - deblend_isolatedParent - - deblend_parentTooBig - - deblend_tooManyPeaks - - deblend_masked - - deblend_incompleteData - - deblend_iterations - deblend_peak_center_x - deblend_peak_center_y - - deblend_logL - deblend_chi2 - deblend_blendId - deblend_blendNChild + - deblend_parentNPeaks + - deblend_parentNChild - slot_Shape_flag flags: # flags are columns taken and exploded per-band from the meas tables diff --git a/schemas/Parent.yaml b/schemas/Parent.yaml new file mode 100644 index 000000000..052a5f3a7 --- /dev/null +++ b/schemas/Parent.yaml @@ -0,0 +1,26 @@ +# This file defines the mapping between the columns in a parent table with +# their respective DPDD-style column names, as used by +# `lsst.pipe.tasks.postprocess.TransformObjectCatalogTask`. +funcs: + objectId: # the index of object_parent IS the objectId + functor: Index + parentObjectId: + functor: Column + args: parent + dataset: ref +refFlags: + # refFlags are columns taken without translation from the ref table + - merge_peak_sky + - deblend_nChild + - deblend_nPeaks + - deblend_failed + - deblend_skipped + - deblend_isolatedParent + - deblend_parentTooBig + - deblend_tooManyPeaks + - deblend_masked + - deblend_incompleteData + - deblend_iterations + - deblend_runtime + - deblend_spectrumInitFlag + - deblend_chi2 diff --git a/tests/test_isPrimaryFlag.py b/tests/test_isPrimaryFlag.py index 17e3aa342..ce55e79c7 100755 --- a/tests/test_isPrimaryFlag.py +++ b/tests/test_isPrimaryFlag.py @@ -228,9 +228,11 @@ def testIsScarletPrimaryFlag(self): # We'll customize the configuration of measurement to just run the # minimal number of plugins to make setPrimaryFlags work. + # As of DM-51670 we also include `base_PsfFlux` to ensure that + # the measurement plugins run correctly with the split between + # parent and child catalogs. measureConfig = SingleFrameMeasurementTask.ConfigClass() - measureConfig.plugins.names = ["base_SdssCentroid", "base_SkyCoord"] - measureConfig.slots.psfFlux = None + measureConfig.plugins.names = ["base_SdssCentroid", "base_SkyCoord", "base_PsfFlux"] measureConfig.slots.apFlux = None measureConfig.slots.shape = None measureConfig.slots.modelFlux = None @@ -282,6 +284,10 @@ def testIsScarletPrimaryFlag(self): # since they both have the same blended sources and only differ # over which model to use for the isolated sources. isPseudo = outputCat["merge_peak_sky"] + + # Check that all 5 pseudo-sources were created + self.assertEqual(np.sum(isPseudo), 5) + self.assertEqual( np.sum(outputCat["detect_isDeblendedSource"] & ~isPseudo), np.sum(outputCat["detect_isDeblendedModelSource"])) @@ -300,11 +306,16 @@ def testIsScarletPrimaryFlag(self): self.assertEqual(sum((outputCat["detect_isPrimary"]) & (outputCat["merge_peak_sky"])), 0) # Check that sky objects have not been deblended + # (deblended sources have parent > 0) np.testing.assert_array_equal( isPseudo, - isPseudo & (outputCat["deblend_nChild"] == 0) + isPseudo & (outputCat["parent"] == 0) ) + # Check that measurements were performed on all of the children + self.assertTrue(np.all(outputCat["base_PsfFlux_instFlux"] != 0) and np.all(np.isfinite( + outputCat["base_PsfFlux_instFlux"]))) + class MemoryTester(lsst.utils.tests.MemoryTestCase): pass From 2accfa79f7838a13e2c6fbf50dc958df71b4ae67 Mon Sep 17 00:00:00 2001 From: fred3m Date: Tue, 25 Nov 2025 16:53:23 -0800 Subject: [PATCH 2/3] Remove unnecessary Parent table schema --- schemas/Parent.yaml | 26 -------------------------- 1 file changed, 26 deletions(-) delete mode 100644 schemas/Parent.yaml diff --git a/schemas/Parent.yaml b/schemas/Parent.yaml deleted file mode 100644 index 052a5f3a7..000000000 --- a/schemas/Parent.yaml +++ /dev/null @@ -1,26 +0,0 @@ -# This file defines the mapping between the columns in a parent table with -# their respective DPDD-style column names, as used by -# `lsst.pipe.tasks.postprocess.TransformObjectCatalogTask`. -funcs: - objectId: # the index of object_parent IS the objectId - functor: Index - parentObjectId: - functor: Column - args: parent - dataset: ref -refFlags: - # refFlags are columns taken without translation from the ref table - - merge_peak_sky - - deblend_nChild - - deblend_nPeaks - - deblend_failed - - deblend_skipped - - deblend_isolatedParent - - deblend_parentTooBig - - deblend_tooManyPeaks - - deblend_masked - - deblend_incompleteData - - deblend_iterations - - deblend_runtime - - deblend_spectrumInitFlag - - deblend_chi2 From bbdc7eecd845547585cf19aa04993ac80fc4912c Mon Sep 17 00:00:00 2001 From: fred3m Date: Wed, 26 Nov 2025 12:31:01 -0800 Subject: [PATCH 3/3] Switch to astropy table --- python/lsst/pipe/tasks/postprocess.py | 30 ++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/python/lsst/pipe/tasks/postprocess.py b/python/lsst/pipe/tasks/postprocess.py index 12d8c4225..8089ac1d4 100644 --- a/python/lsst/pipe/tasks/postprocess.py +++ b/python/lsst/pipe/tasks/postprocess.py @@ -1843,7 +1843,7 @@ class ConsolidateParentTractConnections( outputCatalog = connectionTypes.Output( doc="Output per-tract concatenation of DataFrame Tables", name="object_parent", - storageClass="DataFrame", + storageClass="ArrowAstropy", dimensions=("tract", "skymap"), ) @@ -1867,18 +1867,20 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): len(inputRefs.inputCatalogs), inputRefs.inputCatalogs[0].datasetType.name) - dataFrames = [] + tables = [] for ref in inputRefs.inputCatalogs: catalog = butlerQC.get(ref) - df = catalog.asAstropy().to_pandas() - df.rename(columns={ - "id": "objectId", - "parent": "parentObjectId", - "merge_peak_sky": "sky_object", - }, inplace=True) - df.set_index("objectId", inplace=True) - df["tract"] = ref.dataId["tract"] - df["patch"] = ref.dataId["patch"] - dataFrames.append(df) - df = pd.concat(dataFrames) - butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs) + table = catalog.asAstropy() + + # Rename columns + table.rename_column("id", "objectId") + table.rename_column("parent", "parentObjectId") + table.rename_column("merge_peak_sky", "sky_object") + + # Add tract and patch columns + table["tract"] = ref.dataId["tract"] + table["patch"] = ref.dataId["patch"] + + tables.append(table) + outputTable = astropy.table.vstack(tables, join_type="exact") + butlerQC.put(pipeBase.Struct(outputCatalog=outputTable), outputRefs)