Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for new DeblendCoaddSourcesTask #62

Merged
merged 2 commits into from
Aug 24, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 81 additions & 25 deletions python/lsst/pipe/drivers/multiBandDriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from lsst.pipe.base import ArgumentParser, TaskRunner
from lsst.pipe.tasks.multiBand import (DetectCoaddSourcesTask,
MergeDetectionsTask,
DeblendCoaddSourcesTask,
MeasureMergedCoaddSourcesTask,
MergeMeasurementsTask,)
from lsst.ctrl.pool.parallel import BatchPoolTask
Expand Down Expand Up @@ -76,8 +77,9 @@ class MultiBandDriverConfig(Config):
doc="Detect sources on coadd")
mergeCoaddDetections = ConfigurableField(
target=MergeDetectionsTask, doc="Merge detections")
deblendCoaddSources = ConfigurableField(target=DeblendCoaddSourcesTask, doc="Deblend merged detections")
measureCoaddSources = ConfigurableField(target=MeasureMergedCoaddSourcesTask,
doc="Measure merged detections")
doc="Measure merged and (optionally) deblended detections")
mergeCoaddMeasurements = ConfigurableField(
target=MergeMeasurementsTask, doc="Merge measurements")
forcedPhotCoadd = ConfigurableField(target=ForcedPhotCoaddTask,
Expand All @@ -96,7 +98,7 @@ def setDefaults(self):
self.forcedPhotCoadd.references.retarget(MultiBandReferencesTask)

def validate(self):
for subtask in ("mergeCoaddDetections", "measureCoaddSources",
for subtask in ("mergeCoaddDetections", "deblendCoaddSources", "measureCoaddSources",
"mergeCoaddMeasurements", "forcedPhotCoadd"):
coaddName = getattr(self, subtask).coaddName
if coaddName != self.coaddName:
Expand Down Expand Up @@ -160,7 +162,29 @@ def __init__(self, butler=None, schema=None, refObjLoader=None, reuse=tuple(), *
self.reuse = tuple(reuse)
self.makeSubtask("detectCoaddSources")
self.makeSubtask("mergeCoaddDetections", schema=schema)
self.makeSubtask("measureCoaddSources", schema=afwTable.Schema(self.mergeCoaddDetections.schema),
if self.config.measureCoaddSources.inputCatalog.startswith("deblended"):
# Ensure that the output from deblendCoaddSources matches the input to measureCoaddSources
self.measurementInput = self.config.measureCoaddSources.inputCatalog
self.deblenderOutput = []
if self.config.deblendCoaddSources.simultaneous:
if self.config.deblendCoaddSources.multiBandDeblend.conserveFlux:
self.deblenderOutput.append("deblendedFlux")
if self.config.deblendCoaddSources.multiBandDeblend.saveTemplates:
self.deblenderOutput.append("deblendedModel")
else:
self.deblenderOutput.append("deblendedFlux")
if self.measurementInput not in self.deblenderOutput:
err = "Measurement input '{0}' is not in the list of deblender output catalogs '{1}'"
raise ValueError(err.format(self.measurementInput, self.deblenderOutput))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do lines 166-178 do more than checking if things are configured correctly? The comment suggested so to me. If so, this whole block can practically be removed, right? I'm a bit uneasy such checking lives in pipe_drivers; it looks like introducing additional logics beyond stitching tasks into a workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct that they only check whether or not the output from the deblender is consistent with the input to the measurements. I feel like it's important to check that the tasks are consistent before running them, otherwise the user would have to wait for the entire deblending task to complete before crashing, and the error might not be obvious.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Fred here, there are may times when people are developing things and run pipe drivers either for the data output or to test code. It is always a shame when something you set up to run over night or over a weekend crashes a few hours in. This test is cheap and really help developers from falling into this trap accidentally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this validation be moved to MultiBandDriverConfig.validate?


self.makeSubtask("deblendCoaddSources",
schema=afwTable.Schema(self.mergeCoaddDetections.schema),
peakSchema=afwTable.Schema(self.mergeCoaddDetections.merged.getPeakSchema()),
butler=butler)
measureInputSchema = afwTable.Schema(self.deblendCoaddSources.schema)
else:
measureInputSchema = afwTable.Schema(self.mergeCoaddDetections.schema)
Copy link

@hsinfang hsinfang Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this if/else (line 165 and 185) exist to make this new multiBandDriver backward-compatible with older data products generated with an older Stack?

In other words, for running multiBandDriver.py consistently with a post-DM-15104 Stack, shouldn't self.config.measureCoaddSources.inputCatalog.startswith("deblended") always be true?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second look, maybe this part was added to make multiBandDriver more configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is not always true. Just like before when it was possible to turn the deblender off with doDeblend, it is now possible to skip deblending using inputCatalog="mergeDet".

self.makeSubtask("measureCoaddSources", schema=measureInputSchema,
peakSchema=afwTable.Schema(
self.mergeCoaddDetections.merged.getPeakSchema()),
refObjLoader=refObjLoader, butler=butler)
Expand Down Expand Up @@ -270,30 +294,30 @@ def runDataRef(self, patchRefList):

pool.map(self.runMergeDetections, patches.values())

# Measure merged detections, and test for reprocessing
# Deblend merged detections, and test for reprocessing
#
# The reprocessing allows us to have multiple attempts at deblending large footprints. Large
# footprints can suck up a lot of memory in the deblender, which means that when we process on a
# cluster, we want to refuse to deblend them (they're flagged "deblend.parent-too-big"). But since
# they may have astronomically interesting data, we want the ability to go back and reprocess them
# with a more permissive configuration when we have more memory or processing time.
#
# self.runMeasureMerged will return whether there are any footprints in that image that required
# self.runDeblendMerged will return whether there are any footprints in that image that required
# reprocessing. We need to convert that list of booleans into a dict mapping the patchId (x,y) to
# a boolean. That tells us whether the merge measurement and forced photometry need to be re-run on
# a particular patch.
#
# This determination of which patches need to be reprocessed exists only in memory (the measurements
# have been written, clobbering the old ones), so if there was an exception we would lose this
# information, leaving things in an inconsistent state (measurements new, but merged measurements and
# information, leaving things in an inconsistent state (measurements, merged measurements and
# forced photometry old). To attempt to preserve this status, we touch a file (dataset named
# "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the merge and
# forced photometry.
# "deepCoadd_multibandReprocessing") --- if this file exists, we need to re-run the measurements,
# merge and forced photometry.
#
# This is, hopefully, a temporary workaround until we can improve the
# deblender.
try:
reprocessed = pool.map(self.runMeasureMerged, dataIdList)
reprocessed = pool.map(self.runDeblendMerged, patches.values())
finally:
if self.config.reprocessing:
patchReprocessing = {}
Expand All @@ -317,6 +341,8 @@ def runDataRef(self, patchRefList):
patchReprocessing[patchId] = True

# Only process patches that have been identified as needing it
pool.map(self.runMeasurements, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
patchReprocessing[dataId["patch"]]])
pool.map(self.runMergeMeasurements, [idList for patchId, idList in patches.items() if
not self.config.reprocessing or patchReprocessing[patchId]])
pool.map(self.runForcedPhot, [dataId1 for dataId1 in dataIdList if not self.config.reprocessing or
Expand Down Expand Up @@ -368,45 +394,75 @@ def runMergeDetections(self, cache, dataIdList):
return
self.mergeCoaddDetections.runDataRef(dataRefList)

def runMeasureMerged(self, cache, dataId):
"""!Run measurement on a patch for a single filter
def runDeblendMerged(self, cache, dataIdList):
"""Run the deblender on a list of dataId's

Only slave nodes execute this method.

@param cache: Pool cache, with butler
@param dataId: Data identifier for patch
@return whether the patch requires reprocessing.
Parameters
----------
cache: Pool cache
Pool cache with butler.
dataIdList: list
Data identifier for patch in each band.

Returns
-------
result: bool
whether the patch requires reprocessing.
"""
with self.logOperation("measurement on %s" % (dataId,)):
dataRef = getDataRef(cache.butler, dataId,
self.config.coaddName + "Coadd_calexp")
with self.logOperation("deblending %s" % (dataIdList,)):
dataRefList = [getDataRef(cache.butler, dataId, self.config.coaddName + "Coadd_calexp") for
dataId in dataIdList]
reprocessing = False # Does this patch require reprocessing?
if ("measureCoaddSources" in self.reuse and
dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
if ("deblendCoaddSources" in self.reuse and
dataRef.datasetExists(self.config.coaddName + self.measurementInput, write=True)):
if not self.config.reprocessing:
self.log.info("Skipping measureCoaddSources for %s; output already exists" % dataId)
self.log.info("Skipping deblendCoaddSources for %s; output already exists" % dataIdList)
return False

catalog = dataRef.get(self.config.coaddName + "Coadd_meas")
catalog = dataRefList[0].get(self.config.coaddName + self.measurementInput)
bigFlag = catalog["deblend.parent-too-big"]
numOldBig = bigFlag.sum()
if numOldBig == 0:
self.log.info("No large footprints in %s" %
(dataRef.dataId,))
return False
numNewBig = sum((self.measureCoaddSources.deblend.isLargeFootprint(src.getFootprint()) for
numNewBig = sum((self.deblendCoaddSources.isLargeFootprint(src.getFootprint()) for
src in catalog[bigFlag]))
if numNewBig == numOldBig:
self.log.info("All %d formerly large footprints continue to be large in %s" %
(numOldBig, dataRef.dataId,))
(numOldBig, dataRefList[0].dataId,))
return False
self.log.info("Found %d large footprints to be reprocessed in %s" %
(numOldBig - numNewBig, dataRef.dataId))
(numOldBig - numNewBig, [dataRef.dataId for dataRef in dataRefList]))
reprocessing = True

self.measureCoaddSources.runDataRef(dataRef)
self.deblendCoaddSources.runDataRef(dataRefList)
return reprocessing

def runMeasurements(self, cache, dataId):
"""Run measurement on a patch for a single filter

Only slave nodes execute this method.

Parameters
----------
cache: Pool cache
Pool cache, with butler
dataId: dataRef
Data identifier for patch
"""
with self.logOperation("measurements on %s" % (dataId,)):
dataRef = getDataRef(cache.butler, dataId,
self.config.coaddName + "Coadd_calexp")
if ("measureCoaddSources" in self.reuse and
not self.config.reprocessing and
dataRef.datasetExists(self.config.coaddName + "Coadd_meas", write=True)):
self.log.info("Skipping measuretCoaddSources for %s; output already exists" % dataId)
return
self.measureCoaddSources.runDataRef(dataRef)

def runMergeMeasurements(self, cache, dataIdList):
"""!Run measurement merging on a patch

Expand Down