Skip to content

Commit

Permalink
Add CoaddPsf construction in default reducer subtask.
Browse files Browse the repository at this point in the history
- Add tests of newly-generated PSFs
- Fix a few docstrings
- Add some fixes that arose during testing
  • Loading branch information
djreiss committed Mar 28, 2017
1 parent ab7f4cf commit b906e44
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 49 deletions.
165 changes: 118 additions & 47 deletions python/lsst/ip/diffim/imageMapReduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import lsst.afw.image as afwImage
import lsst.afw.math as afwMath
import lsst.afw.geom as afwGeom
import lsst.afw.table as afwTable
import lsst.meas.algorithms as measAlg
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase

Expand Down Expand Up @@ -67,7 +69,7 @@
`ImageMapReduceTask.run`, and may return a new, processed sub-exposure
which is to be "stitched" back into a new resulting larger exposure
(depending on the configured `ImageMapReduceTask.mapperSubtask`);
otherwise if it does not return an afw.Exposure, then the results are
otherwise if it does not return an afwImage.Exposure, then the results are
passed back directly to the caller.
`ImageReducerSubtask` will either stitch the `mapperResults` list of
Expand Down Expand Up @@ -97,7 +99,7 @@ class ImageMapperSubtask(with_metaclass(abc.ABCMeta, pipeBase.Task)):
sub-exposure which can be be "stitched" back into a new resulting
larger exposure (depending on the configured
`ImageReducerSubtask`); otherwise if it does not return an
afw.Exposure, then the
afwImage.Exposure, then the
`ImageReducerSubtask.config.reducerSubtask.reduceOperation`
should be set to 'none' and the result will be propagated
as-is.
Expand All @@ -120,16 +122,16 @@ def run(self, subExposure, expandedSubExposure, fullBBox, **kwargs):
This method may return a new, processed sub-exposure which can
be be "stitched" back into a new resulting larger exposure
(depending on the paired, configured `ImageReducerSubtask`);
otherwise if it does not return an afw.Exposure, then the
otherwise if it does not return an afwImage.Exposure, then the
`ImageReducerSubtask.config.mapperSubtask.reduceOperation`
should be set to 'none' and the result will be propagated
as-is.
Parameters
----------
subExposure : afw.Exposure
subExposure : afwImage.Exposure
the sub-exposure upon which to operate
expandedSubExposure : afw.Exposure
expandedSubExposure : afwImage.Exposure
the expanded sub-exposure upon which to operate
fullBBox : afwGeom.BoundingBox
the bounding box of the original exposure
Expand Down Expand Up @@ -201,24 +203,25 @@ def run(self, mapperResults, exposure, **kwargs):
basis for the resulting exposure (if
self.config.mapperSubtask.reduceOperation is not 'none')
kwargs :
additional keyword arguments propagaged from
additional keyword arguments propagated from
`ImageMapReduceTask.run`.
Returns
-------
A `pipeBase.Struct` containing either an `afw.Exposure` (named 'exposure')
A `pipeBase.Struct` containing either an `afwImage.Exposure` (named 'exposure')
or a list (named 'result'), depending on `config.reduceOperation`.
Notes
-----
And currently known issues:
1. This currently should correctly handle overlapping sub-exposures.
1. This currently correctly handles overlapping sub-exposures.
For overlapping sub-exposures, use `config.reduceOperation='average'`.
2. This currently does not correctly handle varying PSFs (in fact,
it just copies over the PSF from the original exposure). To be
investigated in DM-9629.
3. To be done: correct handling of masks as well.
4. This logic currently makes *two* copies of the original exposure
2. This correctly handles varying PSFs, constructing the resulting
exposure's PSF via CoaddPsf (DM-9629).
Known issues
------
1. To be done: correct handling of masks (nearly there)
2. This logic currently makes *two* copies of the original exposure
(one here and one in `mapperSubtask.run()`). Possibly of concern
for large images on memory-constrained systems.
"""
Expand All @@ -231,13 +234,14 @@ def run(self, mapperResults, exposure, **kwargs):

reduceOp = self.config.reduceOperation
if reduceOp == 'copy':
weights = None
newMI.getImage()[:, :] = np.nan
newMI.getVariance()[:, :] = np.nan
else:
newMI.getImage()[:, :] = 0.
newMI.getVariance()[:, :] = 0.
if reduceOp == 'average': # make an array to keep track of weights
weights = afwImage.ImageF(newMI.getBBox()) # must be a float to divide later.
weights = afwImage.ImageI(newMI.getBBox())

for item in mapperResults:
item = item.subExposure # Expected named value in the pipeBase.Struct
Expand All @@ -248,38 +252,78 @@ def run(self, mapperResults, exposure, **kwargs):
subExp = afwImage.ExposureF(newExp, item.getBBox())
subMI = subExp.getMaskedImage()
patchMI = item.getMaskedImage()
isNan = (np.isnan(patchMI.getImage().getArray()) |
np.isnan(patchMI.getVariance().getArray()))
noNans = np.sum(isNan) <= 0
isNotNan = ~(np.isnan(patchMI.getImage().getArray()) |
np.isnan(patchMI.getVariance().getArray()))
if reduceOp == 'copy':
if noNans:
subMI[:, :] = patchMI
else:
isNotNan = ~isNan
subMI.getImage().getArray()[isNotNan] = patchMI.getImage().getArray()[isNotNan]
subMI.getVariance().getArray()[isNotNan] = patchMI.getVariance().getArray()[isNotNan]
subMI.getImage().getArray()[isNotNan] = patchMI.getImage().getArray()[isNotNan]
subMI.getVariance().getArray()[isNotNan] = patchMI.getVariance().getArray()[isNotNan]

if reduceOp == 'sum' or reduceOp == 'average': # much of these two options is the same
if noNans:
subMI += patchMI
if reduceOp == 'average':
# wsubim is a view into the `weights` Image, so here we simply add one to
# the region of `weights` confined by `item.getBBox()`.
wsubim = afwImage.ImageF(weights, item.getBBox())
wsubim += 1.
else:
isNotNan = ~isNan
subMI.getImage().getArray()[isNotNan] += patchMI.getImage().getArray()[isNotNan]
subMI.getVariance().getArray()[isNotNan] += patchMI.getVariance().getArray()[isNotNan]
if reduceOp == 'average':
wsubim = afwImage.ImageF(weights, item.getBBox())
wsubim.getArray()[isNotNan] += 1.
subMI.getImage().getArray()[isNotNan] += patchMI.getImage().getArray()[isNotNan]
subMI.getVariance().getArray()[isNotNan] += patchMI.getVariance().getArray()[isNotNan]
if reduceOp == 'average':
# wtsView is a view into the `weights` Image
wtsView = afwImage.ImageI(weights, item.getBBox())
wtsView.getArray()[isNotNan] += 1

if reduceOp == 'average':
newMI /= weights
wts = weights.getArray().astype(np.float)
newMI.getImage().getArray()[:, :] /= wts
newMI.getVariance().getArray()[:, :] /= wts

# Not sure how to construct a PSF when reduceOp=='copy'...
if reduceOp == 'sum' or reduceOp == 'average':
psf = self._constructPsf(mapperResults, exposure)
newExp.setPsf(psf)

return pipeBase.Struct(exposure=newExp)

def _constructPsf(self, mapperResults, exposure):
"""Construct a CoaddPsf based on PSFs from individual subExposures
Currently uses (and returns) a CoaddPsf. TBD if we want to
create a custom subclass of CoaddPsf to differentiate it.
Parameters
----------
mapperResults : list
list of `pipeBase.Struct` returned by `ImageMapperSubtask.run`.
For this to work, each element of `mapperResults` must contain
a `subExposure` element, from which the component Psfs are
extracted (thus the reducerTask cannot have
`reduceOperation = 'none'`.
exposure : afwImage.Exposure
the original exposure which is used here solely for its
bounding-box and WCS.
Returns
-------
A `measAlg.CoaddPsf` constructed from the PSFs of the individual
subExposures.
"""
schema = afwTable.ExposureTable.makeMinimalSchema()
schema.addField("weight", type="D", doc="Coadd weight")
mycatalog = afwTable.ExposureCatalog(schema)

# We're just using the exposure's WCS (assuming that the subExposures'
# WCSs are the same, which they better be!).
wcsref = exposure.getWcs()
for i, res in enumerate(mapperResults):
subExp = res.subExposure
if subExp.getWcs() != wcsref:
raise Exception('Wcs of subExposure is different from exposure')
record = mycatalog.getTable().makeRecord()
record.setPsf(subExp.getPsf())
record.setWcs(subExp.getWcs())
record.setBBox(subExp.getBBox())
record['weight'] = 1.0
record['id'] = i
mycatalog.append(record)

# create the coaddpsf
psf = measAlg.CoaddPsf(catalog=mycatalog, coaddWcs=wcsref, weightFieldName='weight')
return psf


class ImageMapReduceConfig(pexConfig.Config):
"""Configuration parameters for the ImageMapReduceTask
Expand Down Expand Up @@ -453,7 +497,6 @@ def run(self, exposure, **kwargs):
Returns
-------
output of `reducerSubtask.run()`
"""
mapperResults = self._runMapper(exposure, **kwargs)
result = self._reduceImage(mapperResults, exposure, **kwargs)
Expand All @@ -476,14 +519,15 @@ def _runMapper(self, exposure, doClone=False, **kwargs):
in that case, the sub-exps do not have to be considered as read-only
kwargs :
additional keyword arguments to be passed to
`mapperSubtask.run`
`mapperSubtask.run` and `self._generateGrid`, including
`forceEvenSized`.
Returns
-------
a list of `pipeBase.Struct`s as returned by `mapperSubtask.run`.
"""
if self.boxes0 is None:
self._generateGrid(exposure)
self._generateGrid(exposure, **kwargs) # possibly pass `forceEvenSized`
if len(self.boxes0) != len(self.boxes1):
raise ValueError('Bounding boxes list and expanded bounding boxes list are of different lengths')

Expand Down Expand Up @@ -523,7 +567,7 @@ def _reduceImage(self, mapperResults, exposure, **kwargs):
result = self.reducerSubtask.run(mapperResults, exposure, **kwargs)
return result

def _generateGrid(self, exposure, forceEvenSized=False):
def _generateGrid(self, exposure, forceEvenSized=False, **kwargs):
"""Generate two lists of bounding boxes that evenly grid `exposure`
Unless the config was provided with `centroidCoordsX` and
Expand All @@ -539,12 +583,15 @@ def _generateGrid(self, exposure, forceEvenSized=False):
Parameters
----------
exposure : `afwImage.Exposure`
exposure : afwImage.Exposure
input exposure whose full bounding box is to be evenly gridded.
forceEvenSized : boolean
force grid elements to have even-valued x- and y- dimensions?
(Potentially useful if doing Fourier transform of subExposures.)
"""
# kwargs are ignored, but necessary to enable optional passing of
# `forceEvenSized` from `_runMapper`.

# Extract the config parameters for conciseness.
gridSizeX = self.config.gridSizeX
gridSizeY = self.config.gridSizeY
Expand Down Expand Up @@ -644,8 +691,32 @@ def offsetAndClipBoxes(bbox0, bbox1, xoff, yoff, bbox):
bb1.shift(afwGeom.Extent2I(xoff, yoff))
bb1.clip(bbox)
if forceEvenSized:
bb0.grow(afwGeom.Extent2I(bb0.getWidth() % 2, bb0.getHeight() % 2))
bb1.grow(afwGeom.Extent2I(bb1.getWidth() % 2, bb1.getHeight() % 2))
if bb0.getWidth() % 2 == 1: # grow to the right
bb0.include(afwGeom.Point2I(bb0.getMaxX()+1, bb0.getMaxY())) # Expand by 1 pixel!
bb0.clip(bbox)
if bb0.getWidth() % 2 == 1: # clipped at right -- so grow to the left
bb0.include(afwGeom.Point2I(bb0.getMinX()-1, bb0.getMaxY()))
bb0.clip(bbox)
if bb0.getHeight() % 2 == 1: # grow upwards
bb0.include(afwGeom.Point2I(bb0.getMaxX(), bb0.getMaxY()+1)) # Expand by 1 pixel!
bb0.clip(bbox)
if bb0.getHeight() % 2 == 1: # clipped upwards -- so grow down
bb0.include(afwGeom.Point2I(bb0.getMaxX(), bb0.getMinY()-1))
bb0.clip(bbox)

if bb1.getWidth() % 2 == 1: # grow to the left
bb1.include(afwGeom.Point2I(bb1.getMaxX()+1, bb1.getMaxY())) # Expand by 1 pixel!
bb1.clip(bbox)
if bb1.getWidth() % 2 == 1: # clipped at right -- so grow to the left
bb1.include(afwGeom.Point2I(bb1.getMinX()-1, bb1.getMaxY()))
bb1.clip(bbox)
if bb1.getHeight() % 2 == 1: # grow downwards
bb1.include(afwGeom.Point2I(bb1.getMaxX(), bb1.getMaxY()+1)) # Expand by 1 pixel!
bb1.clip(bbox)
if bb1.getHeight() % 2 == 1: # clipped upwards -- so grow down
bb1.include(afwGeom.Point2I(bb1.getMaxX(), bb1.getMinY()-1))
bb1.clip(bbox)

return bb0, bb1

xoff = 0
Expand All @@ -659,7 +730,7 @@ def offsetAndClipBoxes(bbox0, bbox1, xoff, yoff, bbox):
self.boxes1.append(bb1)
xoff += gridStepX

def _plotBoxes(self, fullBBox, skip=3):
def plotBoxes(self, fullBBox, skip=3):
"""Plot both grids of boxes using matplotlib.
Will compute the grid via `_generateGrid` if
Expand Down

0 comments on commit b906e44

Please sign in to comment.