Skip to content

Commit

Permalink
Merge pull request #530 from lsst/tickets/DM-30649
Browse files Browse the repository at this point in the history
DM-30649: adapt to adjustQuantum API change and propagate NoWorkFound
  • Loading branch information
TallJimbo committed Jun 17, 2021
2 parents 32084cb + a04cea3 commit 89a8187
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 51 deletions.
1 change: 0 additions & 1 deletion pipelines/DRP.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ tasks:
class: lsst.pipe.tasks.makeCoaddTempExp.MakeWarpTask
config:
makePsfMatched: true
doWriteEmptyWarps: true
assembleCoadd:
class: lsst.pipe.tasks.assembleCoadd.CompareWarpAssembleCoaddTask
config:
Expand Down
3 changes: 1 addition & 2 deletions python/lsst/pipe/tasks/assembleCoadd.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
self.log.info("Found %d %s", len(inputs.tempExpRefList),
self.getTempExpDatasetName(self.warpType))
if len(inputs.tempExpRefList) == 0:
self.log.warn("No coadd temporary exposures found")
return
raise pipeBase.NoWorkFound("No coadd temporary exposures found")

supplementaryData = self.makeSupplementaryDataGen3(butlerQC, inputRefs, outputRefs)
retStruct = self.run(inputData['skyInfo'], inputs.tempExpRefList, inputs.imageScalerList,
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/pipe/tasks/characterizeImage.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class CharacterizeImageConnections(pipeBase.PipelineTaskConnections,
storageClass="SourceCatalog",
)

def adjustQuantum(self, datasetRefMap: pipeBase.InputQuantizedConnection):
def adjustQuantum(self, inputs, outputs, label, dataId):
# Docstring inherited from PipelineTaskConnections
try:
return super().adjustQuantum(datasetRefMap)
return super().adjustQuantum(inputs, outputs, label, dataId)
except pipeBase.ScalarError as err:
raise pipeBase.ScalarError(
"CharacterizeImageTask can at present only be run on visits that are associated with "
Expand Down
94 changes: 53 additions & 41 deletions python/lsst/pipe/tasks/fit_multiband.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,65 +111,77 @@ class MultibandFitConnections(
storageClass="SourceCatalog"
)

def adjustQuantum(self, datasetRefMap):
def adjustQuantum(self, inputs, outputs, label, data_id):
"""Validates the `lsst.daf.butler.DatasetRef` bands against the
subtask's list of bands to fit and drops unnecessary bands.
Parameters
----------
datasetRefMap : `NamedKeyDict`
Mapping from dataset type to a `set` of
`lsst.daf.butler.DatasetRef` objects
inputs : `dict`
Dictionary whose keys are an input (regular or prerequisite)
connection name and whose values are a tuple of the connection
instance and a collection of associated `DatasetRef` objects.
The exact type of the nested collections is unspecified; it can be
assumed to be multi-pass iterable and support `len` and ``in``, but
it should not be mutated in place. In contrast, the outer
dictionaries are guaranteed to be temporary copies that are true
`dict` instances, and hence may be modified and even returned; this
is especially useful for delegating to `super` (see notes below).
outputs : `Mapping`
Mapping of output datasets, with the same structure as ``inputs``.
label : `str`
Label for this task in the pipeline (should be used in all
diagnostic messages).
data_id : `lsst.daf.butler.DataCoordinate`
Data ID for this quantum in the pipeline (should be used in all
diagnostic messages).
Returns
-------
datasetRefMap : `NamedKeyDict`
Modified mapping of input with possibly adjusted
`lsst.daf.butler.DatasetRef` objects.
adjusted_inputs : `Mapping`
Mapping of the same form as ``inputs`` with updated containers of
input `DatasetRef` objects. All inputs involving the 'band'
dimension are adjusted to put them in consistent order and remove
unneeded bands.
adjusted_outputs : `Mapping`
Mapping of updated output datasets; always empty for this task.
Raises
------
ValueError
Raised if any of the per-band datasets have an inconsistent band
set, or if the band set to fit is not a subset of the data bands.
lsst.pipe.base.NoWorkFound
Raised if there are not enough of the right bands to run the task
on this quantum.
"""
datasetRefMap = super().adjustQuantum(datasetRefMap)
# Check which bands are going to be fit
bands_fit, bands_read_only = self.config.get_band_sets()
bands_needed = bands_fit.union(bands_read_only)

bands_data = None
bands_extra = set()

for type_d, ref_d in datasetRefMap.items():
adjusted_inputs = {}
for connection_name, (connection, dataset_refs) in inputs.items():
# Datasets without bands in their dimensions should be fine
if 'band' in type_d.dimensions:
bands_set = {dref.dataId['band'] for dref in ref_d}
if bands_data is None:
bands_data = bands_set
if bands_needed != bands_data:
if not bands_needed.issubset(bands_data):
raise ValueError(
f'Datarefs={ref_d} have data with bands in the set={bands_set},'
f'which is not a subset of the required bands={bands_needed} defined by '
f'{self.config.__class__}.fit_multiband='
f'{self.config.fit_multiband._value.__class__}\'s attributes'
f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
f' Add the required bands={bands_needed.difference(bands_data)}.'
)
else:
bands_extra = bands_data.difference(bands_needed)
elif bands_set != bands_data:
raise ValueError(
f'Datarefs={ref_d} have data with bands in the set={bands_set}'
f' which differs from the previous={bands_data}); bandsets must be identical.'
if 'band' in connection.dimensions:
datasets_by_band = {dref.dataId['band']: dref for dref in dataset_refs}
if not bands_needed.issubset(datasets_by_band.keys()):
raise pipeBase.NoWorkFound(
f'DatasetRefs={dataset_refs} have data with bands in the'
f' set={set(datasets_by_band.keys())},'
f' which is not a superset of the required bands={bands_needed} defined by'
f' {self.config.__class__}.fit_multiband='
f'{self.config.fit_multiband._value.__class__}\'s attributes'
f' bands_fit={bands_fit} and bands_read_only()={bands_read_only}.'
f' Add the required bands={bands_needed.difference(datasets_by_band.keys())}.'
)
if bands_extra:
for dref in ref_d:
if dref.dataId['band'] in bands_extra:
ref_d.remove(dref)
return datasetRefMap
# Adjust all datasets with band dimensions to include just
# the needed bands, in consistent order.
adjusted_inputs[connection_name] = (
connection,
[datasets_by_band[band] for band in bands_needed]
)

# Delegate to super for more checks.
inputs.update(adjusted_inputs)
super().adjustQuantum(inputs, outputs, label, data_id)
return adjusted_inputs, {}


class MultibandFitSubConfig(pexConfig.Config):
Expand Down
4 changes: 1 addition & 3 deletions python/lsst/pipe/tasks/imageDifference.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,13 +522,11 @@ def runQuantum(self, butlerQC: pipeBase.ButlerQuantumContext,
)

if templateStruct.area/inputs['exposure'].getBBox().getArea() < self.config.requiredTemplateFraction:
# TO DO DM-30649: when available, use exit code for expected failures
# In the meantime, raise to exit and register as a failure
message = ("Insufficient Template Coverage. (%.1f%% < %.1f%%) Not attempting subtraction. "
"To force subtraction, set config requiredTemplateFraction=0." % (
100*templateStruct.area/inputs['exposure'].getBBox().getArea(),
100*self.config.requiredTemplateFraction))
raise pipeBase.TaskError("Expected Failure: %s" % (message))
raise pipeBase.NoWorkFound(message)
else:
outputs = self.run(exposure=inputs['exposure'],
templateExposure=templateStruct.exposure,
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/pipe/tasks/makeCoaddTempExp.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,9 +794,9 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
ccdIdList=[ccdIdList[i] for i in goodIndices],
dataIdList=[dataIdList[i] for i in goodIndices],
skyInfo=skyInfo)
if self.config.makeDirect:
if self.config.makeDirect and results.exposures["direct"] is not None:
butlerQC.put(results.exposures["direct"], outputRefs.direct)
if self.config.makePsfMatched:
if self.config.makePsfMatched and results.exposures["psfMatched"] is not None:
butlerQC.put(results.exposures["psfMatched"], outputRefs.psfMatched)

def filterInputs(self, indices, inputs):
Expand Down

0 comments on commit 89a8187

Please sign in to comment.