Skip to content

Commit

Permalink
Add tests for MakePsfCandidates and cleanup existing psfCandidate tests
Browse files Browse the repository at this point in the history
Pull out the catalog and false source creation so I can use it in both tests.

Refactor `CandidateMaskingTestCase.createCandidate()` to call
`makePsfCandidate` instead of the raw constructor, since that is what is
most commonly used elsewhere.

Better name for self.exp in CandidateMaskingTestCase
  • Loading branch information
parejkoj committed May 4, 2018
1 parent ecc3375 commit d55ac03
Showing 1 changed file with 118 additions and 27 deletions.
145 changes: 118 additions & 27 deletions tests/test_psfCandidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#
import unittest

import numpy as np

import lsst.afw.detection as afwDet
import lsst.afw.image as afwImage
Expand All @@ -37,46 +38,96 @@
display = False


def makeEmptyCatalog(psfCandidateField=''):
"""Return an empty catalog with a useful schema for psfCandidate testing.
Parameters
----------
psfCandidateField : `str`
The name of a flag field to add to the schema.
Returns
-------
catalog : `lsst.afw.table.SourceCatalog`
The created (empty) catalog.
"""
schema = afwTable.SourceTable.makeMinimalSchema()
lsst.afw.table.Point2DKey.addFields(schema, "centroid", "centroid", "pixels")
if psfCandidateField != '':
schema.addField(psfCandidateField, type="Flag", doc="Is a psfCandidate?")
catalog = afwTable.SourceCatalog(schema)
catalog.defineCentroid('centroid')

return catalog


def createFakeSource(x, y, catalog, exposure, threshold=0.1):
"""Create a fake source at the given x/y centroid location.
Parameters
----------
x,y : `int`
The x and y centroid coordinates to place the image at.
catalog : `lsst.afw.table.SourceCatalog`
The catalog to add the new source to.
exposure : `lsst.afw.image.Exposure`
The exposure to add the source to.
threshold : `float`, optional
The footprint threshold for identifying the source.
Returns
-------
source : `lsst.afw.table.SourceRecord`
The created source record that was added to ``catalog``.
"""
source = catalog.addNew()
source['centroid_x'] = x
source['centroid_y'] = y

exposure.getMaskedImage().getImage()[x, y] = 1.0
fpSet = afwDet.FootprintSet(exposure.getMaskedImage(), afwDet.Threshold(threshold), "DETECTED")
if display:
ds9.mtv(exposure, frame=1)
for fp in fpSet.getFootprints():
for peak in fp.getPeaks():
ds9.dot("x", peak.getIx(), peak.getIy(), frame=1)

# There might be multiple footprints; only the one around x,y should go in the source
found = False
for fp in fpSet.getFootprints():
if fp.contains(afwGeom.Point2I(x, y)):
found = True
print(fp.getBBox())
break
assert found, "Unable to find central peak in footprint: faulty test"

source.setFootprint(fp)

return source


class CandidateMaskingTestCase(lsst.utils.tests.TestCase):
"""Testing masking around PSF candidates."""

def setUp(self):
self.x, self.y = 123, 45
self.exp = afwImage.ExposureF(256, 256)
self.exp.getMaskedImage().getImage()[self.x, self.y] = 1.0
self.exp.getMaskedImage().getVariance().set(0.01)
self.catalog = makeEmptyCatalog()

schema = afwTable.SourceTable.makeMinimalSchema()
self.catalog = afwTable.SourceCatalog(schema)
self.x, self.y = 123, 45
self.exposure = afwImage.ExposureF(256, 256)
self.exposure.getMaskedImage().getVariance().set(0.01)

def tearDown(self):
del self.exp
del self.exposure
del self.catalog

def createCandidate(self, threshold=0.1):
"""Create a PSF candidate from self.exp
"""Create a PSF candidate from self.exposure
@param threshold: Threshold for creating footprints on image
"""
source = createFakeSource(self.x, self.y, self.catalog, self.exposure, threshold)

source = self.catalog.addNew()
fpSet = afwDet.FootprintSet(self.exp.getMaskedImage(), afwDet.Threshold(threshold), "DETECTED")
if display:
ds9.mtv(self.exp, frame=1)
for fp in fpSet.getFootprints():
for peak in fp.getPeaks():
ds9.dot("x", peak.getIx(), peak.getIy(), frame=1)

# There might be multiple footprints; only the one around self.x,self.y should go in the source
found = False
for fp in fpSet.getFootprints():
if fp.contains(afwGeom.Point2I(self.x, self.y)):
found = True
break
self.assertTrue(found, "Unable to find central peak in footprint: faulty test")

source.setFootprint(fp)
return measAlg.PsfCandidateF(source, self.exp, self.x, self.y)
return measAlg.makePsfCandidate(source, self.exposure)

def checkCandidateMasking(self, badPixels, extraPixels=[], size=25, threshold=0.1, pixelThreshold=0.0):
"""Check that candidates are masked properly
Expand All @@ -89,7 +140,7 @@ def checkCandidateMasking(self, badPixels, extraPixels=[], size=25, threshold=0.
@param threshold: Threshold for creating footprints on image
@param pixelThreshold: Threshold for masking pixels on candidate
"""
image = self.exp.getMaskedImage().getImage()
image = self.exposure.getMaskedImage().getImage()
for x, y, f in badPixels + extraPixels:
image[x, y] = f
cand = self.createCandidate(threshold=threshold)
Expand Down Expand Up @@ -139,6 +190,46 @@ def testFaintNeighborMasking(self):
self.checkCandidateMasking([(self.x+5, self.y, 0.5)], threshold=0.9, pixelThreshold=1.0)


class MakePsfCandiatesTaskTest(lsst.utils.tests.TestCase):
def setUp(self):
self.psfCandidateField = "psfCandidate"
self.catalog = makeEmptyCatalog(self.psfCandidateField)

# id=0 is bad because it's on the edge, so fails with a WARN: LengthError.
self.badIds = [1, ]
self.goodIds = [2, 3]
# x and y coordinate: keep these in sync with the above good/bad list.
self.xx = [0, 100, 200]
self.yy = [0, 100, 20]
self.exposure = afwImage.ExposureF(256, 256)
self.exposure.getMaskedImage().getVariance().set(0.01)
for x, y in zip(self.xx, self.yy):
createFakeSource(x, y, self.catalog, self.exposure, 0.1)

self.makePsfCandidates = measAlg.MakePsfCandidatesTask()

def testMakePsfCandidates(self):
result = self.makePsfCandidates.run(self.catalog, self.exposure)
self.assertEqual(len(result.psfCandidates), len(self.goodIds))

for goodId in self.goodIds:
self.assertIn(goodId, result.goodStarCat['id'])

for badId in self.badIds:
self.assertNotIn(badId, result.goodStarCat['id'])

def testMakePsfCandidatesStarSelectedField(self):
"""Test MakePsfCandidatesTask setting a selected field."""
result = self.makePsfCandidates.run(self.catalog, self.exposure, psfCandidateField=self.psfCandidateField)
self.assertEqual(len(result.psfCandidates), len(self.goodIds))

for goodId in self.goodIds:
self.assertTrue(self.catalog.find(goodId).get(self.psfCandidateField))

for badId in self.badIds:
self.assertFalse(self.catalog.find(badId).get(self.psfCandidateField))


class TestMemory(lsst.utils.tests.MemoryTestCase):
pass

Expand Down

0 comments on commit d55ac03

Please sign in to comment.