Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-35155: Create unit tests for diffim detect and measure Task #241

Merged
merged 2 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
198 changes: 197 additions & 1 deletion python/lsst/ip/diffim/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
__all__ = ["DipoleTestImage", "getPsfFwhm"]

import numpy as np

import lsst.geom as geom
import lsst.afw.detection as afwDet
import lsst.afw.display as afwDisplay
Expand All @@ -34,6 +33,7 @@
import lsst.afw.math as afwMath
import lsst.afw.table as afwTable
import lsst.meas.algorithms as measAlg
from lsst.meas.algorithms.testUtils import plantSources
import lsst.meas.base as measBase
from lsst.utils.logging import getLogger
from .dipoleFitTask import DipoleFitAlgorithm
Expand Down Expand Up @@ -1110,3 +1110,199 @@ def sliceWidth(image, threshold, peaks, axis):
return high - low
width = (sliceWidth(image, peak/2., peakLocs, axis=0), sliceWidth(image, peak/2., peakLocs, axis=1))
return np.mean(width) if average else width


def detectTestSources(exposure):
"""Minimal source detection wrapper suitable for unit tests.

Parameters
----------
exposure : `lsst.afw.image.Exposure`
Exposure on which to run detection/measurement
The exposure is modified in place to set the 'DETECTED' mask plane.

Returns
-------
selectSources :
Source catalog containing candidates
"""

schema = afwTable.SourceTable.makeMinimalSchema()
selectDetection = measAlg.SourceDetectionTask(schema=schema)
selectMeasurement = measBase.SingleFrameMeasurementTask(schema=schema)
table = afwTable.SourceTable.make(schema)

detRet = selectDetection.run(
table=table,
exposure=exposure,
sigma=None, # The appropriate sigma is calculated from the PSF
doSmooth=True
)
selectSources = detRet.sources
selectMeasurement.run(measCat=selectSources, exposure=exposure)

return selectSources


def makeFakeWcs():
"""Make a fake, affine Wcs.
"""
crpix = geom.Point2D(123.45, 678.9)
crval = geom.SpherePoint(0.1, 0.1, geom.degrees)
cdMatrix = np.array([[5.19513851e-05, -2.81124812e-07],
[-3.25186974e-07, -5.19112119e-05]])
return afwGeom.makeSkyWcs(crpix, crval, cdMatrix)


def makeTestImage(seed=5, nSrc=20, psfSize=2., noiseLevel=5.,
noiseSeed=6, fluxLevel=500., fluxRange=2.,
kernelSize=32, templateBorderSize=0,
background=None,
xSize=256,
ySize=256,
x0=12345,
y0=67890,
calibration=1.,
doApplyCalibration=False,
):
"""Make a reproduceable PSF-convolved exposure for testing.

Parameters
----------
seed : `int`, optional
Seed value to initialize the random number generator for sources.
nSrc : `int`, optional
Number of sources to simulate.
psfSize : `float`, optional
Width of the PSF of the simulated sources, in pixels.
noiseLevel : `float`, optional
Standard deviation of the noise to add to each pixel.
noiseSeed : `int`, optional
Seed value to initialize the random number generator for noise.
fluxLevel : `float`, optional
Reference flux of the simulated sources.
fluxRange : `float`, optional
Range in flux amplitude of the simulated sources.
kernelSize : `int`, optional
Size in pixels of the kernel for simulating sources.
templateBorderSize : `int`, optional
Size in pixels of the image border used to pad the image.
background : `lsst.afw.math.Chebyshev1Function2D`, optional
Optional background to add to the output image.
xSize, ySize : `int`, optional
Size in pixels of the simulated image.
x0, y0 : `int`, optional
Origin of the image.
calibration : `float`, optional
Conversion factor between instFlux and nJy.
doApplyCalibration : `bool`, optional
Apply the photometric calibration and return the image in nJy?

Returns
-------
modelExposure : `lsst.afw.image.Exposure`
The model image, with the mask and variance planes.
sourceCat : `lsst.afw.table.SourceCatalog`
Catalog of sources detected on the model image.
"""
# Distance from the inner edge of the bounding box to avoid placing test
# sources in the model images.
bufferSize = kernelSize/2 + templateBorderSize + 1

bbox = geom.Box2I(geom.Point2I(x0, y0), geom.Extent2I(xSize, ySize))
if templateBorderSize > 0:
bbox.grow(templateBorderSize)

rng = np.random.RandomState(seed)
rngNoise = np.random.RandomState(noiseSeed)
x0, y0 = bbox.getBegin()
xSize, ySize = bbox.getDimensions()
xLoc = rng.rand(nSrc)*(xSize - 2*bufferSize) + bufferSize + x0
yLoc = rng.rand(nSrc)*(ySize - 2*bufferSize) + bufferSize + y0

flux = (rng.rand(nSrc)*(fluxRange - 1.) + 1.)*fluxLevel
sigmas = [psfSize for src in range(nSrc)]
coordList = list(zip(xLoc, yLoc, flux, sigmas))
skyLevel = 0
# Don't use the built in poisson noise: it modifies the global state of numpy random
modelExposure = plantSources(bbox, kernelSize, skyLevel, coordList, addPoissonNoise=False)
modelExposure.setWcs(makeFakeWcs())
noise = rngNoise.randn(ySize, xSize)*noiseLevel
noise -= np.mean(noise)
modelExposure.variance.array = np.sqrt(np.abs(modelExposure.image.array)) + noiseLevel**2
modelExposure.image.array += noise

# Run source detection to set up the mask plane
sourceCat = detectTestSources(modelExposure)
modelExposure.setPhotoCalib(afwImage.PhotoCalib(calibration, 0., bbox))
if background is not None:
modelExposure.image += background
modelExposure.maskedImage /= calibration
modelExposure.info.setId(seed)
if doApplyCalibration:
modelExposure.maskedImage = modelExposure.photoCalib.calibrateImage(modelExposure.maskedImage)

return modelExposure, sourceCat


def makeStats(badMaskPlanes=None):
"""Create a statistics control for configuring calculations on images.

Parameters
----------
badMaskPlanes : `list` of `str`, optional
List of mask planes to exclude from calculations.

Returns
-------
statsControl : ` lsst.afw.math.StatisticsControl`
Statistics control object for configuring calculations on images.
"""
if badMaskPlanes is None:
badMaskPlanes = ("INTRP", "EDGE", "DETECTED", "SAT", "CR",
"BAD", "NO_DATA", "DETECTED_NEGATIVE")
statsControl = afwMath.StatisticsControl()
statsControl.setNumSigmaClip(3.)
statsControl.setNumIter(3)
statsControl.setAndMask(afwImage.Mask.getPlaneBitMask(badMaskPlanes))
return statsControl


def computeRobustStatistics(image, mask, statsCtrl, statistic=afwMath.MEANCLIP):
"""Calculate a robust mean of the variance plane of an exposure.

Parameters
----------
image : `lsst.afw.image.Image`
Image or variance plane of an exposure to evaluate.
mask : `lsst.afw.image.Mask`
Mask plane to use for excluding pixels.
statsCtrl : `lsst.afw.math.StatisticsControl`
Statistics control object for configuring the calculation.
statistic : `lsst.afw.math.Property`, optional
The type of statistic to compute. Typical values are
``afwMath.MEANCLIP`` or ``afwMath.STDEVCLIP``.

Returns
-------
value : `float`
The result of the statistic calculated from the unflagged pixels.
"""
statObj = afwMath.makeStatistics(image, mask, statistic, statsCtrl)
return statObj.getValue(statistic)


def computePSFNoiseEquivalentArea(psf):
"""Compute the noise equivalent area for an image psf

Parameters
----------
psf : `lsst.afw.detection.Psf`

Returns
-------
nea : `float`
"""
psfImg = psf.computeImage(psf.getAveragePosition())
nea = 1./np.sum(psfImg.array**2)
return nea