-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tickets/DM-17390 Update CalibrateTask to be a PipelineTask #264
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,14 +24,16 @@ | |
from lsstDebug import getDebugFrame | ||
import lsst.pex.config as pexConfig | ||
import lsst.pipe.base as pipeBase | ||
from lsst.pipe.base import (InitInputDatasetField, InitOutputDatasetField, InputDatasetField, | ||
OutputDatasetField, PipelineTaskConfig, PipelineTask) | ||
import lsst.afw.table as afwTable | ||
from lsst.meas.astrom import AstrometryTask, displayAstrometry, denormalizeMatches | ||
from lsst.meas.extensions.astrometryNet import LoadAstrometryNetObjectsTask | ||
from lsst.obs.base import ExposureIdInfo | ||
import lsst.daf.base as dafBase | ||
from lsst.afw.math import BackgroundList | ||
from lsst.afw.table import IdFactory, SourceTable | ||
from lsst.meas.algorithms import SourceDetectionTask | ||
from lsst.meas.algorithms import SourceDetectionTask, ReferenceObjectLoader | ||
from lsst.meas.base import (SingleFrameMeasurementTask, ApplyApCorrTask, | ||
CatalogCalculationTask) | ||
from lsst.meas.deblender import SourceDeblendTask | ||
|
@@ -41,7 +43,7 @@ | |
__all__ = ["CalibrateConfig", "CalibrateTask"] | ||
|
||
|
||
class CalibrateConfig(pexConfig.Config): | ||
class CalibrateConfig(PipelineTaskConfig): | ||
"""Config for CalibrateTask""" | ||
doWrite = pexConfig.Field( | ||
dtype=bool, | ||
|
@@ -166,9 +168,90 @@ class CalibrateConfig(pexConfig.Config): | |
doc="Injection of fake sources for testing purposes (must be " | ||
"retargeted)" | ||
) | ||
icSourceSchema = InitInputDatasetField( | ||
doc="Schema produced by characterize image task, used to initialize this task", | ||
name="icSrc_schema", | ||
storageClass="SourceCatalog", | ||
) | ||
outputSchema = InitOutputDatasetField( | ||
doc="Schema after CalibrateTask has been initialized", | ||
name="src_schema", | ||
storageClass="SourceCatalog", | ||
) | ||
exposure = InputDatasetField( | ||
doc="Input image to calibrate", | ||
name="icExp", | ||
storageClass="ExposureF", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
background = InputDatasetField( | ||
doc="Backgrounds determined by characterize task", | ||
name="icExpBackground", | ||
storageClass="Background", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
icSourceCat = InputDatasetField( | ||
doc="Source catalog created by characterize task", | ||
name="icSrc", | ||
storageClass="SourceCatalog", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
astromRefCat = InputDatasetField( | ||
doc="Reference catalog to use for astrometry", | ||
name="ref_cat", | ||
storageClass="SimpleCatalog", | ||
dimensions=("SkyPix",), | ||
manualLoad=True | ||
) | ||
photoRefCat = InputDatasetField( | ||
doc="Reference catalog to use for photometric calibration", | ||
name="ref_cat", | ||
storageClass="SimpleCatalog", | ||
dimensions=("SkyPix",), | ||
manualLoad=True | ||
) | ||
outputExposure = OutputDatasetField( | ||
doc="Exposure after running calibration task", | ||
name="calexp", | ||
storageClass="ExposureF", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
outputCat = OutputDatasetField( | ||
doc="Source catalog produced in calibrate task", | ||
name="src", | ||
storageClass="SourceCatalog", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
outputBackground = OutputDatasetField( | ||
doc="Background models estimated in calibration task", | ||
name="calexpBackground", | ||
storageClass="Background", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
matches = OutputDatasetField( | ||
doc="Source/refObj matches from the astrometry solver", | ||
name="srcMatch", | ||
storageClass="Catalog", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
matchesDenormalized = OutputDatasetField( | ||
doc="Denormalized matches from astrometry solver", | ||
name="srcMatchFull", | ||
storageClass="Catalog", | ||
dimensions=("Instrument", "Visit", "Detector"), | ||
scalar=True | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be good to have more consistency between the match dataset field names and the config options that control whether to produce them, too; maybe just "matches" and "matchesDenormalized"? |
||
|
||
def setDefaults(self): | ||
pexConfig.Config.setDefaults(self) | ||
super().setDefaults() | ||
self.quantum.dimensions = ("Instrument", "Visit", "Detector") | ||
# aperture correction should already be measured | ||
|
||
|
||
|
@@ -179,7 +262,7 @@ def setDefaults(self): | |
## \copybrief CalibrateTask | ||
## \} | ||
|
||
class CalibrateTask(pipeBase.CmdLineTask): | ||
class CalibrateTask(PipelineTask, pipeBase.CmdLineTask): | ||
r"""!Calibrate an exposure: measure sources and perform astrometric and | ||
photometric calibration | ||
|
||
|
@@ -277,7 +360,8 @@ def DebugInfo(name): | |
RunnerClass = pipeBase.ButlerInitializedTaskRunner | ||
|
||
def __init__(self, butler=None, astromRefObjLoader=None, | ||
photoRefObjLoader=None, icSourceSchema=None, **kwargs): | ||
photoRefObjLoader=None, icSourceSchema=None, | ||
initInputs=None, **kwargs): | ||
"""!Construct a CalibrateTask | ||
|
||
@param[in] butler The butler is passed to the refObjLoader constructor | ||
|
@@ -300,12 +384,15 @@ def __init__(self, butler=None, astromRefObjLoader=None, | |
@param[in,out] kwargs other keyword arguments for | ||
lsst.pipe.base.CmdLineTask | ||
""" | ||
pipeBase.CmdLineTask.__init__(self, **kwargs) | ||
super().__init__(**kwargs) | ||
|
||
if icSourceSchema is None and butler is not None: | ||
# Use butler to read icSourceSchema from disk. | ||
icSourceSchema = butler.get("icSrc_schema", immediate=True).schema | ||
|
||
if icSourceSchema is None and butler is None and initInputs is not None: | ||
icSourceSchema = initInputs['icSourceSchema'].schema | ||
|
||
if icSourceSchema is not None: | ||
# use a schema mapper to avoid copying each field separately | ||
self.schemaMapper = afwTable.SchemaMapper(icSourceSchema) | ||
|
@@ -360,17 +447,25 @@ def __init__(self, butler=None, astromRefObjLoader=None, | |
self.makeSubtask('catalogCalculation', schema=self.schema) | ||
|
||
if self.config.doAstrometry: | ||
if astromRefObjLoader is None: | ||
if astromRefObjLoader is None and butler is not None: | ||
self.makeSubtask('astromRefObjLoader', butler=butler) | ||
astromRefObjLoader = self.astromRefObjLoader | ||
self.pixelMargin = astromRefObjLoader.config.pixelMargin | ||
self.pixelMargin = astromRefObjLoader.config.pixelMargin | ||
self.makeSubtask("astrometry", refObjLoader=astromRefObjLoader, | ||
schema=self.schema) | ||
if self.config.doPhotoCal: | ||
if photoRefObjLoader is None: | ||
if photoRefObjLoader is None and butler is not None: | ||
self.makeSubtask('photoRefObjLoader', butler=butler) | ||
photoRefObjLoader = self.photoRefObjLoader | ||
self.pixelMargin = photoRefObjLoader.config.pixelMargin | ||
self.pixelMargin = photoRefObjLoader.config.pixelMargin | ||
# If this is gen 3 don't allow LoadAstometryNet loader. If this is a gen3 run | ||
# then initInputs will be populated with data used to initialize the task from | ||
# a pipeline activator. This verifies that LoadAstrometryNetObjects task is not | ||
# used for photo calibration, and that the reference object loader is actually | ||
# None in gen3 world. It will be added in adaptArgsAndRun as a refObjLoader class | ||
# object. | ||
if initInputs is not None and photoRefObjLoader is LoadAstrometryNetObjectsTask: | ||
raise RuntimeError("Astrometry Net tasks are not compatible with gen 3 middleware") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would probably be good to add a few more comments to the logic here to note which code paths are expected for PipelineTask execution vs. CmdLineTask execution. We'll certainly appreciate that later when we come back to remove the latter. |
||
self.makeSubtask("photoCal", refObjLoader=photoRefObjLoader, | ||
schema=self.schema) | ||
|
||
|
@@ -379,6 +474,20 @@ def __init__(self, butler=None, astromRefObjLoader=None, | |
self.schema = self.schemaMapper.getOutputSchema() | ||
self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict) | ||
|
||
def getInitOutputDatasets(self): | ||
sourceCatSchema = afwTable.SourceCatalog(self.schema) | ||
sourceCatSchema.getTable().setMetadata(self.algMetadata) | ||
return {'outputSchema': sourceCatSchema} | ||
|
||
@classmethod | ||
def getOutputDatasetTypes(cls, config): | ||
outputTypesDict = super().getOutputDatasetTypes(config) | ||
if config.doWriteMatches is False: | ||
outputTypesDict.pop("matches") | ||
if config.doWriteMatchesDenormalized is False: | ||
outputTypesDict.pop("matchesDenormalized") | ||
return outputTypesDict | ||
|
||
@pipeBase.timeMethod | ||
def runDataRef(self, dataRef, exposure=None, background=None, icSourceCat=None, | ||
doUnpersist=True): | ||
|
@@ -407,7 +516,6 @@ def runDataRef(self, dataRef, exposure=None, background=None, icSourceCat=None, | |
- if False the exposure must be provided; background and | ||
icSourceCat are optional. True is intended for running as a | ||
command-line task, False for running as a subtask | ||
|
||
@return same data as the calibrate method | ||
""" | ||
self.log.info("Processing %s" % (dataRef.dataId)) | ||
|
@@ -444,6 +552,39 @@ def runDataRef(self, dataRef, exposure=None, background=None, icSourceCat=None, | |
|
||
return calRes | ||
|
||
def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler): | ||
expId, expBits = butler.registry.packDataId("VisitDetector", | ||
inputDataIds['exposure'], | ||
returnMaxBits=True) | ||
inputData['exposureIdInfo'] = ExposureIdInfo(expId, expBits) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably a topic to discuss after the demo, but I wonder if it would be more readable to instead just pass extra keyword arguments (especially those that don't represent datasets) to |
||
|
||
if self.config.doAstrometry: | ||
refObjLoader = ReferenceObjectLoader(dataIds=inputDataIds['astromRefCat'], | ||
butler=butler, | ||
config=self.config.astromRefObjLoader, | ||
log=self.log) | ||
self.pixelMargin = refObjLoader.config.pixelMargin | ||
self.astrometry.setRefObjLoader(refObjLoader) | ||
|
||
if self.config.doPhotoCal: | ||
photoRefObjLoader = ReferenceObjectLoader(inputDataIds['photoRefCat'], | ||
butler, | ||
self.config.photoRefObjLoader, | ||
self.log) | ||
self.pixelMargin = photoRefObjLoader.config.pixelMargin | ||
self.photoCal.match.setRefObjLoader(photoRefObjLoader) | ||
|
||
results = self.run(**inputData) | ||
|
||
if self.config.doWriteMatches: | ||
normalizedMatches = afwTable.packMatches(results.astromMatches) | ||
normalizedMatches.table.setMetadata(results.matchMeta) | ||
if self.config.doWriteMatchesDenormalized: | ||
denormMatches = denormalizeMatches(results.astromMatches, results.matchMeta) | ||
results.matchesDenormalized = denormMatches | ||
results.matches = normalizedMatches | ||
return results | ||
|
||
def run(self, exposure, exposureIdInfo=None, background=None, | ||
icSourceCat=None): | ||
"""!Calibrate an exposure (science image or coadd) | ||
|
@@ -598,6 +739,11 @@ def run(self, exposure, exposureIdInfo=None, background=None, | |
sourceCat=sourceCat, | ||
astromMatches=astromMatches, | ||
matchMeta=matchMeta, | ||
# These are duplicate entries with different names for use with | ||
# gen3 middleware | ||
outputExposure=exposure, | ||
outputCat=sourceCat, | ||
outputBackground=background, | ||
) | ||
|
||
def writeOutputs(self, dataRef, exposure, background, sourceCat, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about calling this
inputSourceSchema
(and similarlyinputSourceCatalog
), below, for more consistency with the output dataset field names?