Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-5877: Use Afterburners to clean up aperture correction logic #42

Merged
merged 5 commits into from
Jul 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions python/lsst/meas/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .pluginsBase import *
from .sfm import *
from .plugins import *
from .classification import *
from .baseLib import *
from .noiseReplacer import *
from .baseMeasurement import *
Expand Down
36 changes: 21 additions & 15 deletions python/lsst/meas/base/afterburner.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
from collections import namedtuple
import sys
import traceback

import lsst.pipe.base
import lsst.pex.config
import lsst.daf.base

from .pluginsBase import BasePlugin, BasePluginConfig
from .pluginRegistry import PluginRegistry
from .pluginRegistry import PluginRegistry, PluginMap
from .baseLib import FatalAlgorithmError, MeasurementError

# Exceptions that the measurement tasks should always propagate up to their callers
FATAL_EXCEPTIONS = (MemoryError, FatalAlgorithmError)

__all__ = ("AfterburnerPluginConfig", "AfterburnerPlugin", "AfterburnerConfig", "AfterburnerTask")


class AfterburnerPluginConfig(BasePluginConfig):
'''
Default configuration class for afterburner plugins
'''
pass


class AfterburnerPlugin(BasePlugin):
'''
Base class for after burner plugin
Expand All @@ -34,7 +34,7 @@ class AfterburnerPlugin(BasePlugin):

plugType = 'single'

def __init__(self,config, name, schema, metadata):
def __init__(self, config, name, schema, metadata):
"""!
Initialize the afterburner plugin

Expand Down Expand Up @@ -67,12 +67,13 @@ def burn(self, cat, **kwargs):
"""
raise NotImplementedError()


class AbContext(object):
'''
Context manager to handle catching errors that may have been thrown in an afterburner plugin
@param[in] plugin The plugin that is to be run
@param[in] cat Either a catalog or a source record entry of a catalog, depending of the plugin type,
i.e. either working on a whole catalog, or a single record.
i.e. either working on a whole catalog, or a single record.
@param[in] log The log which to write to, most likely will always be the log (self.log) of the object
in which the context manager is used.
'''
Expand All @@ -95,21 +96,24 @@ def __exit__(self, exc_type, exc_value, traceback):
self.log.warn("Error in {}.burn: {}".format(self.plugin.name, exc_value))
return True


class AfterburnerConfig(lsst.pex.config.Config):
'''
Default AfterburnerConfig. Currently this is an empty list, meaning that there are no default plugins run.
The config object for each plugin must use this variable to specify the names of all plugins to be run.
'''
plugins = AfterburnerPlugin.registry.makeField(multi=True, default=[], doc="Plugins to be run and their "
"configuration")
pass
plugins = AfterburnerPlugin.registry.makeField(
multi=True,
default=["base_ClassificationExtendedness"],
doc="Plugins to be run and their configuration")


class AfterburnerTask(lsst.pipe.base.Task):
'''
This task facilitates running plugins which will operate on a source catalog. These plugins may do things
such as classifying an object based on source record entries inserted during a measurement task.

Plugins may either take an entire catalog to work on at a time, or
Plugins may either take an entire catalog to work on at a time, or
'''
ConfigClass = AfterburnerConfig
_DefaultName = "afterburner"
Expand All @@ -123,11 +127,12 @@ def __init__(self, schema, plugMetadata=None, **kwargs):
will be created.
@param[in] **kwargs Additional arguments passed to lsst.pipe.base.Task.__init__.
"""
lsst.pipe.base.Task.__init__(self,**kwargs)
lsst.pipe.base.Task.__init__(self, **kwargs)
self.schema = schema
if plugMetadata is None:
plugMetadata = lsst.daf.base.PropertyList()
self.plugMetadata = plugMetadata
self.plugins = PluginMap()

self.initializePlugins()

Expand All @@ -136,7 +141,7 @@ def initializePlugins(self):
Initialize the plugins according to the configuration.
'''

pluginType = namedtuple('pluginType','single multi')
pluginType = namedtuple('pluginType', 'single multi')
self.executionDict = {}
# Read the properties for each plugin. Allocate a dictionary entry for each run level. Verify that
# the plugins are above the minimum run level for an afterburner plugin. For each run level, the
Expand All @@ -146,14 +151,15 @@ def initializePlugins(self):
self.executionDict[executionOrder] = pluginType(single=[], multi=[])
if PluginClass.getExecutionOrder() >= BasePlugin.DEFAULT_AFTERBURNER:
plug = PluginClass(config, name, self.schema, metadata=self.plugMetadata)
self.plugins[name] = plug
if plug.plugType == 'single':
self.executionDict[executionOrder].single.append(plug)
elif plug.plugType == 'multi':
self.executionDict[executionOrder].multi.append(plug)
else:
raise ValueError("{} has an execution order less than the minimum for an afterburner plugin."
"Value {} : Minimum {}".format(PluginClass, PluginClass.getExecutionOrder(),
BasePlugin.DEFAULT_AFTERBURNER))
"Value {} : Minimum {}".format(PluginClass, PluginClass.getExecutionOrder(),
BasePlugin.DEFAULT_AFTERBURNER))

def run(self, measCat):
'''
Expand All @@ -170,10 +176,10 @@ def callCompute(self, catalog):
for runlevel in sorted(self.executionDict):
# Run all of the plugins which take a whole catalog first
for plug in self.executionDict[runlevel].multi:
with AbContext(plug, catalog, self.log) as cm:
with AbContext(plug, catalog, self.log):
plug.burn(catalog)
# Run all the plugins which take single catalog entries
for measRecord in catalog:
for plug in self.executionDict[runlevel].single:
with AbContext(plug, measRecord, self.log) as cm:
with AbContext(plug, measRecord, self.log):
plug.burn(measRecord)
64 changes: 0 additions & 64 deletions python/lsst/meas/base/baseMeasurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import lsst.pipe.base
import lsst.pex.config

from .applyApCorr import ApplyApCorrTask
from .pluginRegistry import PluginMap
from .baseLib import FatalAlgorithmError, MeasurementError
from .pluginsBase import BasePluginConfig, BasePlugin
Expand Down Expand Up @@ -122,25 +121,6 @@ class BaseMeasurementConfig(lsst.pex.config.Config):
doc="configuration that sets how to replace neighboring sources with noise"
)

doApplyApCorr = lsst.pex.config.ChoiceField(
dtype = str,
doc = "Apply aperture corrections? Silently ignored if endOrder <= lsst.meas.base.APCORR_ORDER"
" when calling run",
default = "noButWarn",
allowed = {
"yes": "apply aperture corrections; fail if data not available",
"yesOrWarn": "apply aperture corrections if data available, else warn",
"noButWarn": "do not apply aperture corrections, but warn if data available"
" (since aperture corrections could have been applied)",
"no": "do not apply aperture corrections",
},
)

applyApCorr = lsst.pex.config.ConfigurableField(
target = ApplyApCorrTask,
doc = "subtask to apply aperture corrections",
)

def validate(self):
lsst.pex.config.Config.validate(self)
if self.slots.centroid is not None and self.slots.centroid not in self.plugins.names:
Expand Down Expand Up @@ -168,11 +148,6 @@ class BaseMeasurementTask(lsst.pipe.base.Task):

This base class for SingleFrameMeasurementTask and ForcedMeasurementTask mostly exists to share
code between the two, and generally should not be used directly.

@note Tasks that use this task should usually set the default value of config parameter doApplyApCorr
to "yes" or "no", depending if aperture corrections are wanted. The default value of "noButWarn"
is intended to alert users who forget, and is appropriate for unit tests and temporary scripts
that do not need aperture corrections.
"""

ConfigClass = BaseMeasurementConfig
Expand Down Expand Up @@ -307,42 +282,3 @@ def callMeasureN(self, measCat, *args, **kwds):
plugin.fail(measRecord)
self.log.warn("Error in %s.measureN on records %s-%s: %s"
% (plugin.name, measCat[0].getId(), measCat[-1].getId(), error))

def _applyApCorrIfWanted(self, sources, apCorrMap, endOrder):
"""!Apply aperture corrections to a catalog, if wanted

This method is intended to be called at the end of every subclass's run method or other
measurement sequence. This is a thin wrapper around self.applyApCorr.run.

@param[in,out] sources catalog of sources to which to apply aperture corrections
@param[in] apCorrMap aperture correction map (lsst.afw.image.ApCorrMap) or None;
typically found in an lsst.afw.image.ExposureInfo
if provided then it must contain two entries for each flux field:
- flux field (e.g. base_PsfFlux_flux): 2d model
- flux sigma field (e.g. base_PsfFlux_fluxSigma): 2d model of error
@param[in] endOrder ending execution order, or None; if provided then aperture corrections
are only wanted if endOrder > lsst.meas.base.BasePlugin.APCORR_ORDER
@return the results from applyApCorr if run, else None

@throw lsst.pipe.base.TaskError if aperture corrections are wanted and the exposure does not contain
an aperture correction map.
"""
if endOrder is not None and endOrder <= BasePlugin.APCORR_ORDER:
# it is not appropriate to apply aperture corrections
return

if self.config.doApplyApCorr.startswith("yes"):
if apCorrMap is not None:
self.applyApCorr.run(catalog=sources, apCorrMap=apCorrMap)
else:
errMsg = "Cannot apply aperture corrections; apCorrMap is None"
if self.config.doApplyApCorr == "yesOrWarn":
self.log.warn(errMsg)
else:
raise lsst.pipe.base.TaskError(errMsg)
elif self.config.doApplyApCorr == "noButWarn":
if apCorrMap is not None:
self.log.warn("Aperture corrections are disabled but the data to apply them is available;"
" change doApplyApCorr to suppress this warning")


98 changes: 98 additions & 0 deletions python/lsst/meas/base/classification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python
#
# LSST Data Management System
# Copyright 2008-2016 AURA/LSST.
#
# This product includes software developed by the
# LSST Project (http://www.lsst.org/).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the LSST License Statement and
# the GNU General Public License along with this program. If not,
# see <http://www.lsstcorp.org/LegalNotices/>.
#
"""
Definition and registration of classification plugins
"""

import numpy

import lsst.pex.config
from .afterburner import AfterburnerPluginConfig, AfterburnerPlugin
from .pluginRegistry import register

__all__ = (
"AfterburnerClassificationConfig", "AfterburnerClassificationPlugin",
)


class AfterburnerClassificationConfig(AfterburnerPluginConfig):
fluxRatio = lsst.pex.config.Field(dtype=float, default=.925, optional=True,
doc="critical ratio of model to psf flux")
modelErrFactor = lsst.pex.config.Field(dtype=float, default=0.0, optional=True,
doc="correction factor for modelFlux error")
psfErrFactor = lsst.pex.config.Field(dtype=float, default=0.0, optional=True,
doc="correction factor for psfFlux error")


@register("base_ClassificationExtendedness")
class AfterburnerClassificationPlugin(AfterburnerPlugin):
"""
A binary measure of the extendedness of a source, based a simple cut on the ratio of the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check

PSF flux to the model flux.

Because the fluxes on which this algorithm is based on are slot measurements, they can be provided
by different algorithms, and the "fluxRatio" threshold used by this algorithm should generally
be set differently for different algorithms. To do this, plot the difference between the PSF
magnitude and the model magnitude vs. the PSF magnitude, and look for where the cloud of galaxies
begins.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably you also have to convert that magnitude difference to a ratio.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was copied from the existing plugin as is, and seems to be correct to my reading

"""

ConfigClass = AfterburnerClassificationConfig

@classmethod
def getExecutionOrder(cls):
return cls.DEFAULT_AFTERBURNER

def __init__(self, config, name, schema, metadata):
AfterburnerPlugin.__init__(self, config, name, schema, metadata)
self.keyProbability = schema.addField(name + "_value", type="D",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't call this a Probability, since this classifier only does a binary classification. So then is there any reason (besides history) not to make this a flag?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing the total context, but if this is the value (formerly?) known as extendedness it is intended to be a float in the range [0,1]. More a likelihood than a probability, but definitely not a flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That may be the intent, but that is not (nor has it ever been) what it is. I suggest we either need to change the algorithm to supply a likelihood or change the type to reflect what it is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, for a while in SDSS we did explore how to make this a real float, and José Garmilla has code to convert these psf-model numbers into probabilities now (but we're not using it for LSST).

For now, let's keep it a float and describe it as a probability or likelihood. @TallJimbo and I are not in favour of making it a bool/flag now as that really is an interpretation of a probability (i.e. involves priors). If someone wants to start an RFC/c.l.o discussion of this (including adding a slot) that'd be fine, but I think it needs that level of discussion before we start changing things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keeping as is for now

doc="Set to 1 for extended sources, 0 for point sources.")
self.keyFlag = schema.addField(name + "_flag", type="Flag", doc="Set to 1 for any fatal failure.")

def burn(self, measRecord):
modelFlux = measRecord.getModelFlux()
psfFlux = measRecord.getPsfFlux()
modelFluxFlag = (measRecord.getModelFluxFlag()
if measRecord.table.getModelFluxFlagKey().isValid()
else False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's in the coding standards, but I generally advise putting line breaks after an operator (in this case, if and else are the operators) rather than before because it makes it clearer that there's something missing if a line is accidentally deleted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved as is from the previous file and as this is actually a form of comprehension, I think it is reasonable as is.

psfFluxFlag = (measRecord.getPsfFluxFlag()
if measRecord.table.getPsfFluxFlagKey().isValid()
else False)
flux1 = self.config.fluxRatio*modelFlux
if self.config.modelErrFactor != 0:
flux1 += self.config.modelErrFactor*measRecord.getModelFluxErr()
flux2 = psfFlux
if not self.config.psfErrFactor == 0:
flux2 += self.config.psfErrFactor*measRecord.getPsfFluxErr()

# A generic failure occurs when either FluxFlag is set to True
# A generic failure also occurs if either calculated flux value is NAN:
# this can occur if the Flux field itself is NAN,
# or the ErrFactor != 0 and the FluxErr is NAN
if numpy.isnan(flux1) or numpy.isnan(flux2) or modelFluxFlag or psfFluxFlag:
self.fail(measRecord)
else:
measRecord.set(self.keyProbability, 0.0 if flux1 < flux2 else 1.0)

def fail(self, measRecord, error=None):
measRecord.set(self.keyFlag, True)
11 changes: 1 addition & 10 deletions python/lsst/meas/base/forcedMeasurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,9 @@ def __init__(self, refSchema, algMetadata=None, **kwds):
self.config.slots.setupSchema(self.mapper.editOutputSchema())
self.initializePlugins(schemaMapper=self.mapper)
self.schema = self.mapper.getOutputSchema()
self.makeSubtask("applyApCorr", schema=self.schema)
self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)

def run(self, measCat, exposure, refCat, refWcs, exposureId=None, beginOrder=None, endOrder=None,
allowApCorr=True):
def run(self, measCat, exposure, refCat, refWcs, exposureId=None, beginOrder=None, endOrder=None):
"""!
Perform forced measurement.

Expand All @@ -257,7 +255,6 @@ def run(self, measCat, exposure, refCat, refWcs, exposureId=None, beginOrder=Non
executionOrder < beginOrder are not executed. None for no limit.
@param[in] endOrder ending execution order (exclusive): measurements with
executionOrder >= endOrder are not executed. None for no limit.
@param[in] allowApCorr allow application of aperture correction?

Fills the initial empty SourceCatalog with forced measurement results. Two steps must occur
before run() can be called:
Expand Down Expand Up @@ -335,12 +332,6 @@ def run(self, measCat, exposure, refCat, refWcs, exposureId=None, beginOrder=Non
noiseReplacer.removeSource(refParentRecord.getId())
noiseReplacer.end()

if allowApCorr:
self._applyApCorrIfWanted(
sources = measCat,
apCorrMap = exposure.getInfo().getApCorrMap(),
endOrder = endOrder,
)

def generateMeasCat(self, exposure, refCat, refWcs, idFactory=None):
"""!Initialize an output SourceCatalog using information from the reference catalog.
Expand Down