New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-14720: Implement forced photometry on PVIs in AP pipe. #220
Conversation
@@ -189,6 +196,12 @@ def setDefaults(self): | |||
# after the user has set doPreConvolve. | |||
self.measurement.algorithms.names.add('base_PeakLikelihoodFlux') | |||
|
|||
self.forcedMeasurement.plugins = ["base_TransformedCentroid", "base_PsfFlux"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to verify these are the only plugins you want right? I assume it is doing what you want since you must have tested it, but sometimes people get bitten by =, |=, +=, etc for this datatype so I wanted to double check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, I only want these pluggins to be run for simplicity.
@@ -249,6 +262,12 @@ def __init__(self, butler=None, **kwargs): | |||
if self.config.doMeasurement: | |||
self.makeSubtask("measurement", schema=self.schema, | |||
algMetadata=self.algMetadata) | |||
if self.config.doForcedMeasurement: | |||
self.totFluxKey = self.schema.addField( | |||
"totFlux", "D", "Forced flux measured on the PVI") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name of these keys is not very descriptive, and does not seem to match the documentation exactly. I assume tot means total? How does this "total" relate to the forced flux measured? Does the forced flux give a total in some way that is clearer if you know about PVI (it may just be my ignorance). Consider changing the name to something more descriptive (or just like pviFlux etc), or changing the documentation string to clarify. I would say add a comment here, which may not be a bad idea, but that will not be documentation to someone who later just sees the schema and not what produced it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Copy the measured flux and error into the diaSource. | ||
forcedSources = self.forcedMeasurement.generateMeasCat( | ||
exposure, diaSources, subtractedExposure.getWcs()) | ||
self.forcedMeasurement.run(forcedSources, exposure, diaSources, subtractedExposure.getWcs()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code is run in a performance critical section of code, consider doing the copying of records with a schema mapper instead of a loop. For this review it does not really matter to me either way, but wanted to bring it up in case it made a difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried working with schema mappers in the past and found them obtuse and unintuitive. If you can provide an example of this type of operation using a schema mapper that would be great. This may be asking too much but if you could provide a code snippet to use a schema mapper in this case I would be all for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @morriscb, does this help?
import random
import lsst.afw.table as afwTable
import lsst.afw.geom as afwGeom
N_SOURCES = 1
def make_catalogs(num_sources):
# Create some placeholder catalogs with random data.
diaSourceSchema = afwTable.SourceTable.makeMinimalSchema()
totFluxKey = diaSourceSchema.addField("totFlux", doc="xx", type="F")
totFluxErrKey = diaSourceSchema.addField("totFluxErr", doc="xx", type="F")
diaSources = afwTable.SourceCatalog(diaSourceSchema)
diaSources.table.preallocate(num_sources) # required for columnar operations
for i in range(num_sources):
rec = diaSources.addNew()
rec.set("coord_ra", random.random() * afwGeom.degrees)
rec.set("coord_dec", random.random() * afwGeom.degrees)
forcedSourceSchema = afwTable.SourceTable.makeMinimalSchema()
base_PsfFlux_fluxKey = forcedSourceSchema.addField("base_PsfFlux_flux", doc="xx", type="F")
base_PsfFlux_fluxErrKey = forcedSourceSchema.addField("base_PsfFlux_fluxErr", doc="xx", type="F")
forcedSources = afwTable.SourceCatalog(forcedSourceSchema)
forcedSources.table.preallocate(num_sources) # required for columnar operations
for i in range(num_sources):
rec = forcedSources.addNew()
rec.set(base_PsfFlux_fluxKey, random.random())
rec.set(base_PsfFlux_fluxErrKey, random.random())
return diaSources, forcedSources
def copy_mapper(diaSources, forcedSources):
# Copy PsfFlux from forceSources to diaSources with a mapper
mapper = afwTable.SchemaMapper(forcedSources.schema, diaSources.schema)
mapper.addMapping(forcedSources.schema.find("base_PsfFlux_flux")[0], "totFlux", True)
mapper.addMapping(forcedSources.schema.find("base_PsfFlux_fluxErr")[0], "totFluxErr", True)
for forced, dia in zip(forcedSources, diaSources):
dia.assign(forced, mapper)
return diaSources
def copy_brute(diaSources, forcedSources):
# Copy PsfFlux from forceSources to diaSources with "brute force" assignment
for forced, dia in zip(forcedSources, diaSources):
dia.set("totFlux", forced["base_PsfFlux_flux"])
dia.set("totFluxErr", forced["base_PsfFlux_fluxErr"])
return diaSources
def copy_column(diaSources, forcedSources):
# Copy PsfFlux from forceSources to diaSources as a column
# NB requires that the tables be contiguous in memory (ie, the preallocate step, above).
diaSources["totFlux"] = forcedSources["base_PsfFlux_flux"]
diaSources["totFluxErr"] = forcedSources["base_PsfFlux_fluxErr"]
return diaSources
if __name__ == "__main__":
# Just to check the copies worked: all entries should have both positions & fluxes.
print(copy_mapper(*make_catalogs(N_SOURCES)))
print(copy_brute(*make_catalogs(N_SOURCES)))
print(copy_column(*make_catalogs(N_SOURCES)))
For what it's worth, using the mapper seems to be a bit faster than doing things the “brute force” way, although columnar operations are faster still... but require your tables to be contiguous in memory. I don't know if that's guaranteed in your situation.
In [1]: import mapper
In [2]: %timeit mapper.copy_column(*mapper.make_catalogs(100000))
2.55 s ± 74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [3]: %timeit mapper.copy_mapper(*mapper.make_catalogs(100000))
4.05 s ± 80.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [4]: %timeit mapper.copy_brute(*mapper.make_catalogs(100000))
6.85 s ± 648 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
I also give no guarantees that a loop with assign
is actually the most efficient way to use the SchemaMapper
; it's just what I stumbled upon first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks John. From my previous experience the assign
function was what I was missing. I'll write this in and test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committed and tested the new schema mapper code. @natelust if you could please take one last look and let me know if there's anything else needed. Thanks!
forcedSources = self.forcedMeasurement.generateMeasCat( | ||
exposure, diaSources, subtractedExposure.getWcs()) | ||
self.forcedMeasurement.run(forcedSources, exposure, diaSources, subtractedExposure.getWcs()) | ||
mapper = afwTable.SchemaMapper(forcedSources.getSchema(), diaSources.getSchema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you would pythonify this line a little bit by using forcedSources.schema
and the same for diaSources that would be nice. If it is a big inconvenience it is not strictly necessary, but would make things look a bit nicer and more consistent.
Add nessecary config settings and code measure forced fluxes at diaSource locations in the PVI. Still need to copy value into final diaSource outputs. Removed ForcedMeasurementConfig Minimize number of plugins run for ForcedMeasurement.
Fix naming bug in column assignment for loop. Change totFlux assigments to a schema mapper. Change getSchema to schema
db4579b
to
11efe10
Compare
No description provided.