-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-17028: Implement MakeWarp as PipelineTask #263
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,11 +28,11 @@ | |
import lsst.pipe.base as pipeBase | ||
import lsst.log as log | ||
from lsst.meas.algorithms import CoaddPsf, CoaddPsfConfig | ||
from .coaddBase import CoaddBaseTask | ||
from .coaddBase import CoaddBaseTask, makeSkyInfo | ||
from .warpAndPsfMatch import WarpAndPsfMatchTask | ||
from .coaddHelpers import groupPatchExposures, getGroupDataRef | ||
|
||
__all__ = ["MakeCoaddTempExpTask"] | ||
__all__ = ["MakeCoaddTempExpTask", "MakeWarpTask", "MakeWarpConfig"] | ||
|
||
|
||
class MissingExposureError(Exception): | ||
|
@@ -314,7 +314,36 @@ def runDataRef(self, patchRef, selectDataList=[]): | |
except (KeyError, ValueError): | ||
visitId = i | ||
|
||
exps = self.run(calexpRefList, skyInfo, visitId).exposures | ||
calExpList = [] | ||
ccdIdList = [] | ||
dataIdList = [] | ||
|
||
for calExpInd, calExpRef in enumerate(calexpRefList): | ||
self.log.info("Reading calexp %s of %s for Warp id=%s", calExpInd+1, len(calexpRefList), | ||
calExpRef.dataId) | ||
try: | ||
ccdId = calExpRef.get("ccdExposureId", immediate=True) | ||
except Exception: | ||
ccdId = calExpInd | ||
try: | ||
# We augment the dataRef here with the tract, which is harmless for loading things | ||
# like calexps that don't need the tract, and necessary for meas_mosaic outputs, | ||
# which do. | ||
calExpRef = calExpRef.butlerSubset.butler.dataRef("calexp", dataId=calExpRef.dataId, | ||
tract=skyInfo.tractInfo.getId()) | ||
calExp = self.getCalibratedExposure(calExpRef, bgSubtracted=self.config.bgSubtracted) | ||
except Exception as e: | ||
self.log.warn("Calexp %s not found; skipping it: %s", calExpRef.dataId, e) | ||
continue | ||
|
||
if self.config.doApplySkyCorr: | ||
self.applySkyCorr(calExpRef, calExp) | ||
|
||
calExpList.append(calExp) | ||
ccdIdList.append(ccdId) | ||
dataIdList.append(calExpRef.dataId) | ||
|
||
exps = self.run(calExpList, ccdIdList, skyInfo, visitId, dataIdList).exposures | ||
|
||
if any(exps.values()): | ||
dataRefList.append(tempExpRef) | ||
|
@@ -329,7 +358,7 @@ def runDataRef(self, patchRef, selectDataList=[]): | |
|
||
return dataRefList | ||
|
||
def run(self, calexpRefList, skyInfo, visitId=0): | ||
def run(self, calExpList, ccdIdList, skyInfo, visitId=0, dataIdList=None, **kwargs): | ||
"""Create a Warp from inputs | ||
|
||
We iterate over the multiple calexps in a single exposure to construct | ||
|
@@ -356,38 +385,24 @@ def run(self, calexpRefList, skyInfo, visitId=0): | |
totGoodPix = {warpType: 0 for warpType in warpTypeList} | ||
didSetMetadata = {warpType: False for warpType in warpTypeList} | ||
coaddTempExps = {warpType: self._prepareEmptyExposure(skyInfo) for warpType in warpTypeList} | ||
inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calexpRefList)) | ||
inputRecorder = {warpType: self.inputRecorder.makeCoaddTempExpRecorder(visitId, len(calExpList)) | ||
for warpType in warpTypeList} | ||
|
||
modelPsf = self.config.modelPsf.apply() if self.config.makePsfMatched else None | ||
for calExpInd, calExpRef in enumerate(calexpRefList): | ||
self.log.info("Processing calexp %d of %d for this Warp: id=%s", | ||
calExpInd+1, len(calexpRefList), calExpRef.dataId) | ||
try: | ||
ccdId = calExpRef.get("ccdExposureId", immediate=True) | ||
except Exception: | ||
ccdId = calExpInd | ||
try: | ||
# We augment the dataRef here with the tract, which is harmless for loading things | ||
# like calexps that don't need the tract, and necessary for meas_mosaic outputs, | ||
# which do. | ||
calExpRef = calExpRef.butlerSubset.butler.dataRef("calexp", dataId=calExpRef.dataId, | ||
tract=skyInfo.tractInfo.getId()) | ||
calExp = self.getCalibratedExposure(calExpRef, bgSubtracted=self.config.bgSubtracted) | ||
except Exception as e: | ||
self.log.warn("Calexp %s not found; skipping it: %s", calExpRef.dataId, e) | ||
continue | ||
if dataIdList is None: | ||
dataIdList = ccdIdList | ||
|
||
if self.config.doApplySkyCorr: | ||
self.applySkyCorr(calExpRef, calExp) | ||
for calExpInd, (calExp, ccdId, dataId) in enumerate(zip(calExpList, ccdIdList, dataIdList)): | ||
self.log.info("Processing calexp %d of %d for this Warp: id=%s", | ||
calExpInd+1, len(calExpList), dataId) | ||
|
||
try: | ||
warpedAndMatched = self.warpAndPsfMatch.run(calExp, modelPsf=modelPsf, | ||
wcs=skyInfo.wcs, maxBBox=skyInfo.bbox, | ||
makeDirect=self.config.makeDirect, | ||
makePsfMatched=self.config.makePsfMatched) | ||
except Exception as e: | ||
self.log.warn("WarpAndPsfMatch failed for calexp %s; skipping it: %s", calExpRef.dataId, e) | ||
self.log.warn("WarpAndPsfMatch failed for calexp %s; skipping it: %s", dataId, e) | ||
continue | ||
try: | ||
numGoodPix = {warpType: 0 for warpType in warpTypeList} | ||
|
@@ -405,7 +420,7 @@ def run(self, calexpRefList, skyInfo, visitId=0): | |
coaddTempExp.getMaskedImage(), exposure.getMaskedImage(), self.getBadPixelMask()) | ||
totGoodPix[warpType] += numGoodPix[warpType] | ||
self.log.debug("Calexp %s has %d good pixels in this patch (%.1f%%) for %s", | ||
calExpRef.dataId, numGoodPix[warpType], | ||
dataId, numGoodPix[warpType], | ||
100.0*numGoodPix[warpType]/skyInfo.bbox.getArea(), warpType) | ||
if numGoodPix[warpType] > 0 and not didSetMetadata[warpType]: | ||
coaddTempExp.setCalib(exposure.getCalib()) | ||
|
@@ -419,7 +434,7 @@ def run(self, calexpRefList, skyInfo, visitId=0): | |
inputRecorder[warpType].addCalExp(calExp, ccdId, numGoodPix[warpType]) | ||
|
||
except Exception as e: | ||
self.log.warn("Error processing calexp %s; skipping it: %s", calExpRef.dataId, e) | ||
self.log.warn("Error processing calexp %s; skipping it: %s", dataId, e) | ||
continue | ||
|
||
for warpType in warpTypeList: | ||
|
@@ -538,3 +553,179 @@ def applySkyCorr(self, dataRef, calexp): | |
if isinstance(calexp, afwImage.Exposure): | ||
calexp = calexp.getMaskedImage() | ||
calexp -= bg.getImage() | ||
|
||
|
||
class MakeWarpConfig(pipeBase.PipelineTaskConfig, MakeCoaddTempExpConfig): | ||
calExpList = pipeBase.InputDatasetField( | ||
doc="Input exposures to be resampled and optionally PSF-matched onto a SkyMap projection/patch", | ||
name="calexp", | ||
storageClass="ExposureF", | ||
dimensions=("Visit", "Detector") | ||
) | ||
backgroundList = pipeBase.InputDatasetField( | ||
doc="Input backgrounds to be added back into the calexp if bgSubtracted=False", | ||
name="calexpBackground", | ||
storageClass="Background", | ||
dimensions=("Visit", "Detector") | ||
) | ||
skyCorrList = pipeBase.InputDatasetField( | ||
doc="SkyCorr", | ||
name="Input Sky Correction to be subtracted from the calexp if doApplySkyCorr=True", | ||
storageClass="Background", | ||
dimensions=("Visit", "Detector") | ||
) | ||
skyMap = pipeBase.InputDatasetField( | ||
doc="Input definition of geometry/bbox and projection/wcs for warped exposures", | ||
nameTemplate="{coaddName}Coadd_skyMap", | ||
storageClass="SkyMap", | ||
dimensions=("SkyMap",), | ||
scalar=True | ||
) | ||
direct = pipeBase.OutputDatasetField( | ||
doc=("Output direct warped exposure (previously called CoaddTempExp), produced by resampling ", | ||
"calexps onto the skyMap patch geometry."), | ||
nameTemplate="{coaddName}Coadd_directWarp", | ||
storageClass="ExposureF", | ||
dimensions=("Tract", "Patch", "SkyMap", "Visit"), | ||
scalar=True | ||
) | ||
psfMatched = pipeBase.OutputDatasetField( | ||
doc=("Output PSF-Matched warped exposure (previously called CoaddTempExp), produced by resampling ", | ||
"calexps onto the skyMap patch geometry and PSF-matching to a model PSF."), | ||
nameTemplate="{coaddName}Coadd_psfMatchedWarp", | ||
storageClass="ExposureF", | ||
dimensions=("Tract", "Patch", "SkyMap", "Visit"), | ||
scalar=True | ||
) | ||
|
||
def setDefaults(self): | ||
super().setDefaults() | ||
self.formatTemplateNames({"coaddName": "deep"}) | ||
self.quantum.dimensions = ("Tract", "Patch", "SkyMap", "Visit") | ||
|
||
def validate(self): | ||
super().validate() | ||
# TODO: Remove this constraint after DM-17062 | ||
if self.doApplyUberCal: | ||
raise RuntimeError("Gen3 MakeWarpTask cannot apply meas_mosaic or jointcal results." | ||
"Please set doApplyUbercal=False.") | ||
|
||
|
||
class MakeWarpTask(MakeCoaddTempExpTask, pipeBase.PipelineTask): | ||
"""Warp and optionally PSF-Match calexps onto an a common projection | ||
|
||
First Draft of a Gen3 compatible MakeWarpTask which | ||
currently does not handle doApplyUberCal=True. | ||
""" | ||
ConfigClass = MakeWarpConfig | ||
_DefaultName = "makeWarp" | ||
|
||
@classmethod | ||
def getInputDatasetTypes(cls, config): | ||
"""Return input dataset type descriptors | ||
|
||
Remove input dataset types not used by the Task | ||
""" | ||
inputTypeDict = super().getInputDatasetTypes(config) | ||
if config.bgSubtracted: | ||
inputTypeDict.pop("backgroundList", None) | ||
if not config.doApplySkyCorr: | ||
inputTypeDict.pop("skyCorr", None) | ||
return inputTypeDict | ||
|
||
@classmethod | ||
def getOutputDatasetTypes(cls, config): | ||
"""Return output dataset type descriptors | ||
|
||
Remove output dataset types not produced by the Task | ||
""" | ||
outputTypeDict = super().getOutputDatasetTypes(config) | ||
if not config.makeDirect: | ||
outputTypeDict.pop("direct", None) | ||
if not config.makePsfMatched: | ||
outputTypeDict.pop("psfMatched", None) | ||
return outputTypeDict | ||
|
||
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler): | ||
"""Construct warps for requested warp type for single epoch | ||
|
||
PipelineTask (Gen3) entry point to warp and optionally PSF-match | ||
calexps. This method is analogous to `runDataRef`, it prepares all | ||
the data products to be passed to `run`. | ||
Return a Struct with only requested warpTypes controlled by the configs | ||
makePsfMatched and makeDirect. | ||
|
||
Parameters | ||
---------- | ||
inputData : `dict` | ||
Keys are the names of the configs describing input dataset types. | ||
Values are input Python-domain data objects (or lists of objects) | ||
retrieved from data butler. | ||
inputDataIds : `dict` | ||
Keys are the names of the configs describing input dataset types. | ||
Values are DataIds (or lists of DataIds) that task consumes for | ||
corresponding dataset type. | ||
outputDataIds : `dict` | ||
Keys are the names of the configs describing input dataset types. | ||
Values are DataIds (or lists of DataIds) that task is to produce | ||
for corresponding dataset type. | ||
butler : `lsst.daf.butler.Butler` | ||
Gen3 Butler object for fetching additional data products before | ||
running the Task | ||
|
||
Returns | ||
------- | ||
result : `lsst.pipe.base.Struct` | ||
Result struct with components: | ||
|
||
- ``direct`` : (optional) direct Warp Exposure | ||
(``lsst.afw.image.Exposure``) | ||
- ``psfMatched``: (optional) PSF-Matched Warp Exposure | ||
(``lsst.afw.image.Exposure``) | ||
""" | ||
# Construct skyInfo expected by `run` | ||
skyMap = inputData["skyMap"] | ||
outputDataId = next(iter(outputDataIds.values())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If (as it seems) this is guaranteed to be a single-element dict, my favorite pattern for this is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a single-element dict. It returns two data products |
||
inputData['skyInfo'] = makeSkyInfo(skyMap, | ||
tractId=outputDataId['tract'], | ||
patchId=outputDataId['patch']) | ||
|
||
# Construct list of DataIds expected by `run` | ||
dataIdList = inputDataIds['calExpList'] | ||
inputData['dataIdList'] = dataIdList | ||
|
||
# Construct list of ccdExposureIds expected by `run` | ||
inputData['ccdIdList'] = [butler.registry.packDataId("VisitDetector", dataId) | ||
for dataId in dataIdList] | ||
|
||
# Extract integer visitId requested by `run` | ||
visits = [dataId['visit'] for dataId in dataIdList] | ||
assert(all(visits[0] == visit for visit in visits)) | ||
inputData["visitId"] = visits[0] | ||
|
||
self.prepareCalibratedExposures(**inputData) | ||
results = self.run(**inputData) | ||
return pipeBase.Struct(**results.exposures) | ||
|
||
def prepareCalibratedExposures(self, calExpList, backgroundList=None, skyCorrList=None, **kwargs): | ||
"""Calibrate and add backgrounds to input calExpList in place | ||
|
||
TODO DM-17062: apply jointcal/meas_mosaic here | ||
|
||
Parameters | ||
---------- | ||
calExpList : `list` of `lsst.afw.image.Exposure` | ||
Sequence of calexps to be modified in place | ||
backgroundList : `list` of `lsst.afw.math.backgroundList` | ||
Sequence of backgrounds to be added back in if bgSubtracted=False | ||
skyCorrList : `list` of `lsst.afw.math.backgroundList` | ||
Sequence of background corrections to be subtracted if doApplySkyCorr=True | ||
""" | ||
backgroundList = len(calExpList)*[None] if backgroundList is None else backgroundList | ||
skyCorrList = len(calExpList)*[None] if skyCorrList is None else skyCorrList | ||
for calexp, background, skyCorr in zip(calExpList, backgroundList, skyCorrList): | ||
mi = calexp.maskedImage | ||
if not self.config.bgSubtracted: | ||
mi += background.getImage() | ||
if self.config.doApplySkyCorr: | ||
mi -= skyCorr.getImage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remember to go back and expand these docstrings later if we're not doing them now. If you don't have a pre-existing list for such things (ticket or otherwise) I can start one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had entirely intended to go back and write complete docstrings for these on this ticket.