Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-25304: Add task to extract and process bright stars #220

Merged
merged 2 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
272 changes: 215 additions & 57 deletions python/lsst/meas/algorithms/brightStarStamps.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@
__all__ = ["BrightStarStamp", "BrightStarStamps"]

from dataclasses import dataclass
from enum import Enum, auto
from operator import ior
from functools import reduce
from typing import Optional

from lsst.afw.image import MaskedImage
from lsst.afw.image import MaskedImageF
from lsst.afw import geom as afwGeom
from lsst.afw import math as afwMath
from .stamps import StampsBase, AbstractStamp, readFitsWithOptions


class RadiiEnum(Enum):
INNER_RADIUS = auto()
OUTER_RADIUS = auto()

def __str__(self):
return self.name


@dataclass
class BrightStarStamp(AbstractStamp):
"""Single stamp centered on a bright star, normalized by its
Expand All @@ -52,13 +48,13 @@ class BrightStarStamp(AbstractStamp):
Gaia G magnitude for the object in this stamp
gaiaId : `int`
Gaia object identifier
annularFlux : `float`
annularFlux : `Optional[float]`
Flux in an annulus around the object
"""
stamp_im: MaskedImage
stamp_im: MaskedImageF
gaiaGMag: float
gaiaId: int
annularFlux: float
annularFlux: Optional[float] = None

@classmethod
def factory(cls, stamp_im, metadata, idx):
Expand Down Expand Up @@ -90,24 +86,65 @@ def factory(cls, stamp_im, metadata, idx):
gaiaId=metadata.getArray('GAIA_IDS')[idx],
annularFlux=metadata.getArray('ANNULAR_FLUXES')[idx])

def measureAndNormalize(self, annulus, statsControl=afwMath.StatisticsControl(),
statsFlag=afwMath.stringToStatisticsProperty("MEAN"),
badMaskPlanes=('BAD', 'SAT', 'NO_DATA')):
"""Compute "annularFlux", the integrated flux within an annulus
around an object's center, and normalize it.

Since the center of bright stars are saturated and/or heavily affected
by ghosts, we measure their flux in an annulus with a large enough
inner radius to avoid the most severe ghosts and contain enough
non-saturated pixels.

Parameters
----------
annulus : `lsst.afw.geom.spanSet.SpanSet`
SpanSet containing the annulus to use for normalization.
statsControl : `lsst.afw.math.statistics.StatisticsControl`, optional
StatisticsControl to be used when computing flux over all pixels
within the annulus.
statsFlag : `lsst.afw.math.statistics.Property`, optional
statsFlag to be passed on to ``afwMath.makeStatistics`` to compute
annularFlux. Defaults to a simple MEAN.
badMaskPlanes : `collections.abc.Collection` [`str`]
Collection of mask planes to ignore when computing annularFlux.
"""
stampSize = self.stamp_im.getDimensions()
# create image with the same pixel values within annulus, NO_DATA
# elsewhere
maskPlaneDict = self.stamp_im.mask.getMaskPlaneDict()
annulusImage = MaskedImageF(stampSize, planeDict=maskPlaneDict)
annulusMask = annulusImage.mask
annulusMask.array[:] = 2**maskPlaneDict['NO_DATA']
annulus.copyMaskedImage(self.stamp_im, annulusImage)
# set mask planes to be ignored
andMask = reduce(ior, (annulusMask.getPlaneBitMask(bm) for bm in badMaskPlanes))
statsControl.setAndMask(andMask)
# compute annularFlux
annulusStat = afwMath.makeStatistics(annulusImage, statsFlag, statsControl)
self.annularFlux = annulusStat.getValue()
# normalize stamps
self.stamp_im.image.array /= self.annularFlux
return None


class BrightStarStamps(StampsBase):
"""Collection of bright star stamps and associated metadata.

Parameters
----------
starStamps : `collections.abc.Sequence` [`BrightStarStamp`]
Sequence of star stamps.
Sequence of star stamps. Cannot contain both normalized and
unnormalized stamps.
innerRadius : `int`, optional
Inner radius value, in pixels. This and ``outerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``. Must be provided if ``"INNER_RADIUS"`` and
``"OUTER_RADIUS"`` are not present in ``metadata``.
``starStamp``. Must be provided if ``normalize`` is True.
outerRadius : `int`, optional
Outer radius value, in pixels. This and ``innerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``. Must be provided if ``"INNER_RADIUS"`` and
``"OUTER_RADIUS"`` are not present in ``metadata``.
``starStamp``. Must be provided if ``normalize`` is True.
metadata : `lsst.daf.base.PropertyList`, optional
Metadata associated with the bright stars.
use_mask : `bool`
Expand All @@ -121,36 +158,120 @@ class BrightStarStamps(StampsBase):
Raised if one of the star stamps provided does not contain the
required keys.
AttributeError
Raised if the definition of the annulus used to compute each star's
normalization factor are not provided, that is, if ``"INNER_RADIUS"``
and ``"OUTER_RADIUS"`` are not present in ``metadata`` _and_
``innerRadius`` and ``outerRadius`` are not provided.
Raised if there is a mix-and-match of normalized and unnormalized
stamps, stamps normalized with different annulus definitions, or if
stamps are to be normalized but annular radii were not provided.


Notes
-----
A (gen2) butler can be used to read only a part of the stamps,
specified by a bbox:
A butler can be used to read only a part of the stamps, specified by a
bbox:

>>> starSubregions = butler.get("brightStarStamps_sub", dataId, bbox=bbox)
"""

def __init__(self, starStamps, innerRadius=None, outerRadius=None,
metadata=None, use_mask=True, use_variance=False):
super().__init__(starStamps, metadata, use_mask, use_variance)
# Add inner and outer radii to metadata
self._checkRadius(innerRadius, RadiiEnum.INNER_RADIUS)
self._innerRadius = innerRadius
self._checkRadius(outerRadius, RadiiEnum.OUTER_RADIUS)
self._outerRadius = outerRadius
# Ensure stamps contain a flux measurement if and only if they are
# already expected to be normalized
self._checkNormalization(False, innerRadius, outerRadius)
self._innerRadius, self._outerRadius = innerRadius, outerRadius
if innerRadius is not None and outerRadius is not None:
self.normalized = True
else:
self.normalized = False

@classmethod
def initAndNormalize(cls, starStamps, innerRadius, outerRadius,
metadata=None, use_mask=True, use_variance=False,
imCenter=None,
statsControl=afwMath.StatisticsControl(),
statsFlag=afwMath.stringToStatisticsProperty("MEAN"),
badMaskPlanes=('BAD', 'SAT', 'NO_DATA')):
"""Normalize a set of bright star stamps and initialize a
BrightStarStamps instance.

Since the center of bright stars are saturated and/or heavily affected
by ghosts, we measure their flux in an annulus with a large enough
inner radius to avoid the most severe ghosts and contain enough
non-saturated pixels.

Parameters
----------
starStamps : `collections.abc.Sequence` [`BrightStarStamp`]
Sequence of star stamps. Cannot contain both normalized and
unnormalized stamps.
innerRadius : `int`
Inner radius value, in pixels. This and ``outerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``.
outerRadius : `int`
Outer radius value, in pixels. This and ``innerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``.
metadata : `lsst.daf.base.PropertyList`, optional
Metadata associated with the bright stars.
use_mask : `bool`
If `True` read and write mask data. Default `True`.
use_variance : `bool`
If ``True`` read and write variance data. Default ``False``.
imCenter : `collections.abc.Sequence`, optional
Center of the object, in pixels. If not provided, the center of the
first stamp's pixel grid will be used.
statsControl : `lsst.afw.math.statistics.StatisticsControl`, optional
StatisticsControl to be used when computing flux over all pixels
within the annulus.
statsFlag : `lsst.afw.math.statistics.Property`, optional
statsFlag to be passed on to ``afwMath.makeStatistics`` to compute
annularFlux. Defaults to a simple MEAN.
badMaskPlanes : `collections.abc.Collection` [`str`]
Collection of mask planes to ignore when computing annularFlux.

Raises
------
ValueError
Raised if one of the star stamps provided does not contain the
required keys.
AttributeError
Raised if there is a mix-and-match of normalized and unnormalized
stamps, stamps normalized with different annulus definitions, or if
stamps are to be normalized but annular radii were not provided.
"""
if imCenter is None:
stampSize = starStamps[0].stamp_im.getDimensions()
imCenter = stampSize[0]//2, stampSize[1]//2
# Create SpanSet of annulus
outerCircle = afwGeom.SpanSet.fromShape(outerRadius, afwGeom.Stencil.CIRCLE, offset=imCenter)
innerCircle = afwGeom.SpanSet.fromShape(innerRadius, afwGeom.Stencil.CIRCLE, offset=imCenter)
annulus = outerCircle.intersectNot(innerCircle)
# Initialize (unnormalized) brightStarStamps instance
bss = cls(starStamps, innerRadius=None, outerRadius=None,
metadata=metadata, use_mask=use_mask,
use_variance=use_variance)
# Ensure no stamps had already been normalized
bss._checkNormalization(True, innerRadius, outerRadius)
bss._innerRadius, bss._outerRadius = innerRadius, outerRadius
# Apply normalization
for stamp in bss._stamps:
stamp.measureAndNormalize(annulus, statsControl=statsControl, statsFlag=statsFlag,
badMaskPlanes=badMaskPlanes)
bss.normalized = True
return bss

def _refresh_metadata(self):
"""Refresh the metadata. Should be called before writing this object out.
"""Refresh the metadata. Should be called before writing this object
out.
"""
# add full list of Gaia magnitudes, IDs and annularFlxes to shared
# metadata
self._metadata["G_MAGS"] = self.getMagnitudes()
self._metadata["GAIA_IDS"] = self.getGaiaIds()
self._metadata["ANNULAR_FLUXES"] = self.getAnnularFluxes()
self._metadata["NORMALIZED"] = self.normalized
self._metadata["INNER_RADIUS"] = self._innerRadius
self._metadata["OUTER_RADIUS"] = self._outerRadius
return None

@classmethod
Expand All @@ -176,29 +297,38 @@ def readFitsWithOptions(cls, filename, options):
Collection of metadata parameters
"""
stamps, metadata = readFitsWithOptions(filename, BrightStarStamp.factory, options)
return cls(stamps, metadata=metadata, use_mask=metadata['HAS_MASK'],
use_variance=metadata['HAS_VARIANCE'])
if metadata["NORMALIZED"]:
return cls(stamps,
innerRadius=metadata["INNER_RADIUS"], outerRadius=metadata["OUTER_RADIUS"],
metadata=metadata, use_mask=metadata['HAS_MASK'],
use_variance=metadata['HAS_VARIANCE'])
else:
return cls(stamps, metadata=metadata, use_mask=metadata['HAS_MASK'],
use_variance=metadata['HAS_VARIANCE'])

def append(self, item, innerRadius, outerRadius):
def append(self, item, innerRadius=None, outerRadius=None):
"""Add an additional bright star stamp.

Parameters
----------
item : `BrightStarStamp`
Bright star stamp to append.
innerRadius : `int`
innerRadius : `int`, optional
Inner radius value, in pixels. This and ``outerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``.
``BrightStarStamp``.
outerRadius : `int`, optional
Outer radius value, in pixels. This and ``innerRadius`` define the
annulus used to compute the ``"annularFlux"`` values within each
``starStamp``.
``BrightStarStamp``.
"""
if not isinstance(item, BrightStarStamp):
raise ValueError(f"Can only add instances of BrightStarStamp, got {type(item)}.")
self._checkRadius(innerRadius, RadiiEnum.INNER_RADIUS)
self._checkRadius(outerRadius, RadiiEnum.OUTER_RADIUS)
if (item.annularFlux is None) == self.normalized:
raise AttributeError("Trying to append an unnormalized stamp to a normalized BrightStarStamps "
"instance, or vice-versa.")
else:
self._checkRadius(innerRadius, outerRadius)
self._stamps.append(item)
return None

Expand All @@ -214,8 +344,7 @@ def extend(self, bss):
if not isinstance(bss, BrightStarStamps):
raise ValueError('Can only extend with a BrightStarStamps object. '
f'Got {type(bss)}.')
self._checkRadius(bss._innerRadius, RadiiEnum.INNER_RADIUS)
self._checkRadius(bss._outerRadius, RadiiEnum.OUTER_RADIUS)
self._checkRadius(bss._innerRadius, bss._outerRadius)
self._stamps += bss._stamps

def getMagnitudes(self):
Expand Down Expand Up @@ -266,26 +395,55 @@ def selectByMag(self, magMin=None, magMax=None):
and (magMax is None or stamp.gaiaGMag < magMax)]
# This is an optimization to save looping over the init argument when
# it is already guaranteed to be the correct type
instance = BrightStarStamps((), metadata=self._metadata)
instance = BrightStarStamps((),
innerRadius=self._innerRadius, outerRadius=self._outerRadius,
metadata=self._metadata)
instance._stamps = subset
return instance

def _checkRadius(self, radiusValue, metadataEnum):
"""Ensure provided annulus radius is consistent with that present
in metadata. If metadata does not contain annulus radius, add it.
def _checkRadius(self, innerRadius, outerRadius):
"""Ensure provided annulus radius is consistent with that already
present in the instance, or with arguments passed on at initialization.
"""
if innerRadius != self._innerRadius or outerRadius != self._outerRadius:
raise AttributeError("Trying to mix stamps normalized with annulus radii "
f"{innerRadius, outerRadius} with those of BrightStarStamp instance\n"
f"(computed with annular radii {self._innerRadius, self._outerRadius}).")

def _checkNormalization(self, normalize, innerRadius, outerRadius):
"""Ensure there is no mixing of normalized and unnormalized stars, and
that, if requested, normalization can be performed.
"""
# if a radius value is already present in metadata, ensure it matches
# the one given
metadataName = str(metadataEnum)
if self._metadata.exists(metadataName):
if radiusValue is not None:
if self._metadata[metadataName] != radiusValue:
raise AttributeError("BrightStarStamps instance already contains different annulus radii "
+ f"values ({metadataName}).")
# if not already in metadata, a value must be provided
elif radiusValue is None:
raise AttributeError("No radius value provided for the AnnularFlux measurement "
+ f"({metadataName}), and none present in metadata.")
noneFluxCount = self.getAnnularFluxes().count(None)
nStamps = len(self)
nFluxVals = nStamps - noneFluxCount
if noneFluxCount and noneFluxCount < nStamps:
# at least one stamp contains an annularFlux value (i.e. has been
# normalized), but not all of them do
raise AttributeError(f"Only {nFluxVals} stamps contain an annularFlux value.\nAll stamps in a "
"BrightStarStamps instance must either be normalized with the same annulus "
"definition, or none of them can contain an annularFlux value.")
elif normalize:
# stamps are to be normalized; ensure annular radii are specified
# and they have no annularFlux
if innerRadius is None or outerRadius is None:
raise AttributeError("For stamps to be normalized (normalize=True), please provide a valid "
"value (in pixels) for both innerRadius and outerRadius.")
elif noneFluxCount < nStamps:
raise AttributeError(f"{nFluxVals} stamps already contain an annularFlux value. For stamps to"
" be normalized, all their annularFlux must be None.")
elif innerRadius is not None and outerRadius is not None:
# Radii provided, but normalize=False; check that stamps
# already contain annularFluxes
if noneFluxCount:
raise AttributeError(f"{noneFluxCount} stamps contain no annularFlux, but annular radius "
"values were provided and normalize=False.\nTo normalize stamps, set "
"normalize to True.")
else:
self._metadata[metadataName] = radiusValue
# At least one radius value is missing; ensure no stamps have
# already been normalized
if nFluxVals:
raise AttributeError(f"{nFluxVals} stamps contain an annularFlux value. If stamps have "
"been normalized, the innerRadius and outerRadius values used must "
"be provided.")
return None