Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lsst/pipe/tasks/deblendCoaddSourcesPipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class DeblendCoaddSourcesMultiConnections(PipelineTaskConnections,
)
objectParents = cT.Output(
doc="Parents of the deblended objects",
name="object_parents",
name="object_parent_patch",
storageClass="SourceCatalog",
dimensions=("tract", "patch", "skymap"),
)
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/pipe/tasks/multiBand.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,8 @@ def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None,
sources : `lsst.afw.table.SourceCatalog`
A catalog built from the results of merged detections, or
deblender outputs.
parentCatalog : `lsst.afw.table.SourceCatalog`
Catalog of parent sources corresponding to sources.
skyInfo : `lsst.pipe.base.Struct`
A struct containing information about the position of the input exposure within
a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box.
Expand Down
58 changes: 58 additions & 0 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1826,3 +1826,61 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputRefs.inputCatalogs[0].datasetType.name)
df = pd.concat(inputs["inputCatalogs"])
butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)


class ConsolidateParentTractConnections(
pipeBase.PipelineTaskConnections,
dimensions=("instrument", "tract")
):
inputCatalogs = connectionTypes.Input(
doc="Parents of the deblended objects",
name="object_parent_patch",
storageClass="SourceCatalog",
dimensions=("tract", "patch", "skymap"),
multiple=True,
)

outputCatalog = connectionTypes.Output(
doc="Output per-tract concatenation of DataFrame Tables",
name="object_parent",
storageClass="ArrowAstropy",
dimensions=("tract", "skymap"),
)


class ConsolidateParentTractConfig(
pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateParentTractConnections,
):
pass


class ConsolidateParentTractTask(pipeBase.PipelineTask):
"""Concatenate any per-patch, dataframe list into a single
per-tract DataFrame.
"""
_DefaultName = "ConsolidateTract"
ConfigClass = ConsolidateParentTractConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
self.log.info("Concatenating %s per-patch %s Tables",
len(inputRefs.inputCatalogs),
inputRefs.inputCatalogs[0].datasetType.name)

tables = []
for ref in inputRefs.inputCatalogs:
catalog = butlerQC.get(ref)
table = catalog.asAstropy()

# Rename columns
table.rename_column("id", "objectId")
table.rename_column("parent", "parentObjectId")
table.rename_column("merge_peak_sky", "sky_object")

# Add tract and patch columns
table["tract"] = ref.dataId["tract"]
table["patch"] = ref.dataId["patch"]

tables.append(table)
outputTable = astropy.table.vstack(tables, join_type="exact")
butlerQC.put(pipeBase.Struct(outputCatalog=outputTable), outputRefs)
13 changes: 2 additions & 11 deletions schemas/Object.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1225,22 +1225,13 @@ refFlags:
- detect_isDeblendedSource
- detect_isDeblendedModelSource
- merge_peak_sky
- deblend_nChild
- deblend_nPeaks
- deblend_failed
- deblend_skipped
- deblend_isolatedParent
- deblend_parentTooBig
- deblend_tooManyPeaks
- deblend_masked
- deblend_incompleteData
- deblend_iterations
- deblend_peak_center_x
- deblend_peak_center_y
- deblend_logL
- deblend_chi2
- deblend_blendId
- deblend_blendNChild
- deblend_parentNPeaks
- deblend_parentNChild
- slot_Shape_flag
flags:
# flags are columns taken and exploded per-band from the meas tables
Expand Down
17 changes: 14 additions & 3 deletions tests/test_isPrimaryFlag.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ def testIsScarletPrimaryFlag(self):

# We'll customize the configuration of measurement to just run the
# minimal number of plugins to make setPrimaryFlags work.
# As of DM-51670 we also include `base_PsfFlux` to ensure that
# the measurement plugins run correctly with the split between
# parent and child catalogs.
measureConfig = SingleFrameMeasurementTask.ConfigClass()
measureConfig.plugins.names = ["base_SdssCentroid", "base_SkyCoord"]
measureConfig.slots.psfFlux = None
measureConfig.plugins.names = ["base_SdssCentroid", "base_SkyCoord", "base_PsfFlux"]
measureConfig.slots.apFlux = None
measureConfig.slots.shape = None
measureConfig.slots.modelFlux = None
Expand Down Expand Up @@ -282,6 +284,10 @@ def testIsScarletPrimaryFlag(self):
# since they both have the same blended sources and only differ
# over which model to use for the isolated sources.
isPseudo = outputCat["merge_peak_sky"]

# Check that all 5 pseudo-sources were created
self.assertEqual(np.sum(isPseudo), 5)

self.assertEqual(
np.sum(outputCat["detect_isDeblendedSource"] & ~isPseudo),
np.sum(outputCat["detect_isDeblendedModelSource"]))
Expand All @@ -300,11 +306,16 @@ def testIsScarletPrimaryFlag(self):
self.assertEqual(sum((outputCat["detect_isPrimary"]) & (outputCat["merge_peak_sky"])), 0)

# Check that sky objects have not been deblended
# (deblended sources have parent > 0)
np.testing.assert_array_equal(
isPseudo,
isPseudo & (outputCat["deblend_nChild"] == 0)
isPseudo & (outputCat["parent"] == 0)
)

# Check that measurements were performed on all of the children
self.assertTrue(np.all(outputCat["base_PsfFlux_instFlux"] != 0) and np.all(np.isfinite(
outputCat["base_PsfFlux_instFlux"])))


class MemoryTester(lsst.utils.tests.MemoryTestCase):
pass
Expand Down