Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-19906: Convert diaForcedSourceTask to take pandas DataFrame as input #47

Merged
merged 3 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions data/ppdb-ap-pipe-schema-extra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,34 @@ columns:
description: Mean of the y band flux errors.
unit: nJy

---
# DiaObject needs a special column for time of last seen DiaSource,
# validityEnd should be allowed to have NULL (for +Infinity)
table: DiaForcedSource
columns:
- name: diaForcedSourceId
type: BIGINT
nullable: false
description: Unique id for forced soure.
- name: totFlux
type: FLOAT
nullable: true
description: Point Source model flux measured on the direct image.
ucd: phot.count
unit: nJy
- name: totFluxErr
type: FLOAT
nullable: true
description: Uncertainty of totFlux.
ucd: stat.error;phot.count
unit: nJy
- name: midPointTai
type: DOUBLE
nullable: false
description: Effective mid-exposure time for this diaForcedSource
ucd: time.epoch
unit: d

---
# DiaObjectLast uses the same columns as DiaObject but has different index
table: DiaObjectLast
Expand Down
154 changes: 125 additions & 29 deletions python/lsst/ap/association/diaForcedSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

__all__ = ["DiaForcedSourceTask", "DiaForcedSourcedConfig"]

import lsst.afw.table as afwTable
from lsst.daf.base import DateTime
import lsst.geom as geom
from lsst.meas.base.pluginRegistry import register
from lsst.meas.base import (
ForcedMeasurementTask,
ForcedTransformedCentroidConfig,
ForcedTransformedCentroidPlugin)
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.afw.table as afwTable
from lsst.meas.base import ForcedMeasurementTask

from .afwUtils import make_dia_object_schema, make_dia_forced_source_schema


class ForcedTransformedCentroidFromCoordConfig(ForcedTransformedCentroidConfig):
Expand Down Expand Up @@ -88,18 +88,33 @@ class DiaForcedSourcedConfig(pexConfig.Config):
doc="Subtask to force photometer DiaObjects in the direct and "
"difference images.",
)
dropColumns = pexConfig.ListField(
dtype=str,
doc="Columns produced in forced measurement that can be dropped upon "
"creation and storage of the final pandas data.",
)

def setDefaults(self):
self.forcedMeasurement.plugins = ["ap_assoc_TransformedCentroid",
"base_PsfFlux"]
self.forcedMeasurement.doReplaceWithNoise = False
self.forcedMeasurement.copyColumns = {
"id": "objectId",
"id": "diaObjectId",
"coord_ra": "coord_ra",
"coord_dec": "coord_dec"}
self.forcedMeasurement.slots.centroid = "ap_assoc_TransformedCentroid"
self.forcedMeasurement.slots.psfFlux = "base_PsfFlux"
self.forcedMeasurement.slots.shape = None
self.dropColumns = ['coord_ra', 'coord_dec', 'parent',
'ap_assoc_TransformedCentroid_x',
'ap_assoc_TransformedCentroid_y',
'base_PsfFlux_instFlux',
'base_PsfFlux_instFluxErr', 'base_PsfFlux_area',
'slot_PsfFlux_area', 'base_PsfFlux_flag',
'slot_PsfFlux_flag',
'base_PsfFlux_flag_noGoodPixels',
'slot_PsfFlux_flag_noGoodPixels',
'base_PsfFlux_flag_edge', 'slot_PsfFlux_flag_edge']


class DiaForcedSourceTask(pipeBase.Task):
Expand All @@ -111,17 +126,16 @@ class DiaForcedSourceTask(pipeBase.Task):

def __init__(self, **kwargs):
pipeBase.Task.__init__(self, **kwargs)
self.dia_forced_source_schema = make_dia_forced_source_schema()
self.makeSubtask("forcedMeasurement",
refSchema=make_dia_object_schema())
refSchema=afwTable.SourceTable.makeMinimalSchema())

def run(self, dia_objects, expIdBits, exposure, diffim):
def run(self, dia_objects, expIdBits, exposure, diffim, ppdb=None):
"""Measure forced sources on the direct and different images,
calibrate, and store them in the Ppdb.

Parameters
----------
dia_objects : `lsst.afw.table.SourceCatalog`
dia_objects : `pandas.DataFrame`
Catalog of previously observed and newly created DiaObjects
contained within the difference and direct images.
expIdBits : `int`
Expand All @@ -133,39 +147,121 @@ def run(self, dia_objects, expIdBits, exposure, diffim):

Returns
-------
result : `lsst.pipe.base.Struct`
Results struct with components.

- ``diffForcedSources`` : Psf forced photometry on the difference
image at the DiaObject location (`lsst.afw.table.SourceCatalog`)
- ``directForcedSources`` : Psf forced photometry on the direct
image at the DiaObject location (`lsst.afw.table.SourceCatalog`)
output_forced_sources : `pandas.DataFrame`
Catalog of calibrated forced photometered fluxes on both the
difference and direct images at DiaObject locations.
"""

afw_dia_objects = self._convert_from_pandas(dia_objects)

idFactoryDiff = afwTable.IdFactory.makeSource(
diffim.getInfo().getVisitInfo().getExposureId(),
afwTable.IdFactory.computeReservedFromMaxBits(int(expIdBits)))
idFactoryDirect = afwTable.IdFactory.makeSource(
diffim.getInfo().getVisitInfo().getExposureId(),
afwTable.IdFactory.computeReservedFromMaxBits(int(expIdBits)))

diffForcedSources = self.forcedMeasurement.generateMeasCat(
diffim,
dia_objects,
afw_dia_objects,
diffim.getWcs(),
idFactory=idFactoryDiff)
self.forcedMeasurement.run(
diffForcedSources, diffim, dia_objects, diffim.getWcs())
diffForcedSources, diffim, afw_dia_objects, diffim.getWcs())

directForcedSources = self.forcedMeasurement.generateMeasCat(
exposure,
dia_objects,
exposure.getWcs(),
idFactory=idFactoryDirect)
afw_dia_objects,
exposure.getWcs())
self.forcedMeasurement.run(
directForcedSources, exposure, dia_objects, exposure.getWcs())
directForcedSources, exposure, afw_dia_objects, exposure.getWcs())

output_forced_sources = self._calibrate_and_merge(diffForcedSources,
directForcedSources,
diffim,
exposure)

return output_forced_sources

def _convert_from_pandas(self, input_objects):
"""Create minimal schema SourceCatalog from a pandas DataFrame.

We need a catalog of this type to run within the forced measurement
subtask.

Parameters
----------
input_objects : `pandas.DataFrame`
DiaObjects with locations and ids. ``

Returns
-------
outputCatalog : `lsst.afw.table.SourceTable`
Output catalog with minimal schema.
"""
schema = afwTable.SourceTable.makeMinimalSchema()

outputCatalog = afwTable.SourceCatalog(schema)
outputCatalog.reserve(len(input_objects))

for obj_id, df_row in input_objects.iterrows():
outputRecord = outputCatalog.addNew()
outputRecord.setId(obj_id)
outputRecord.setCoord(
geom.SpherePoint(df_row["ra"],
df_row["decl"],
geom.degrees))
return outputCatalog

def _calibrate_and_merge(self,
diff_sources,
direct_sources,
diff_exp,
direct_exp):
"""Take the two output catalogs from the ForcedMeasurementTasks and
calibrate, combine, and convert them to Pandas.

Parameters
----------
diff_sources : `lsst.afw.table.SourceTable`
Catalog with PsFluxes measured on the difference image.
direct_sources : `lsst.afw.table.SourceTable`
Catalog with PsfFluxes measured on the direct (calexp) image.
diff_exp : `lsst.afw.image.Exposure`
Difference exposure ``diff_sources`` were measured on.
direct_exp : `lsst.afw.image.Exposure`
Direct (calexp) exposure ``direct_sources`` were measured on.

Returns
-------
output_catalog : `pandas.DataFrame`
Catalog calibrated diaForcedSources.
"""
diff_calib = diff_exp.getPhotoCalib()
direct_calib = direct_exp.getPhotoCalib()

diff_fluxes = diff_calib.instFluxToNanojansky(diff_sources,
"slot_PsfFlux")
direct_fluxes = direct_calib.instFluxToNanojansky(direct_sources,
"slot_PsfFlux")

output_catalog = diff_sources.asAstropy().to_pandas()
output_catalog.rename(columns={"id": "diaForcedSourceId",
"slot_PsfFlux_instFlux": "psFlux",
"slot_PsfFlux_instFluxErr": "psFluxErr",
"slot_Centroid_x": "x",
"slot_Centroid_y": "y"},
inplace=True)
output_catalog.loc[:, "psFlux"] = diff_fluxes[:, 0]
output_catalog.loc[:, "psFluxErr"] = diff_fluxes[:, 1]

output_catalog["totFlux"] = direct_fluxes[:, 0]
output_catalog["totFluxErr"] = direct_fluxes[:, 1]

visit_info = direct_exp.getInfo().getVisitInfo()
ccdVisitId = visit_info.getExposureId()
midPointTaiMJD = visit_info.getDate().get(system=DateTime.MJD)
output_catalog["ccdVisitId"] = ccdVisitId
output_catalog["midPointTai"] = midPointTaiMJD

# Drop superfluous columns from output DataFrame.
output_catalog.drop(columns=self.config.dropColumns, inplace=True)

return pipeBase.Struct(
diffForcedSources=diffForcedSources,
directForcedSources=directForcedSources,
)
return output_catalog
90 changes: 61 additions & 29 deletions tests/test_diaForcedSource.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,41 +156,73 @@ def setUp(self):
self.diffim.setPsf(psf)

self.testDiaObjects = create_test_dia_objects(5, self.wcs)
self.expected_n_columns = 10

def testRun(self):
"""Test that forced source catalogs are successfully created and have
sensible values.
"""

test_objects = self._convert_to_pandas(self.testDiaObjects)

dfs = DiaForcedSourceTask()
result = dfs.run(
self.testDiaObjects, self.expIdBits, self.exposure, self.diffim)

direct_values = [19.98544842, 16.00974072, 8.2299179, 2.71486044, 0.57469884]
direct_var = 7.52143925
diff_values = [39.97089683, 32.01948144, 16.45983579, 5.42972089, 1.14939768]
diff_var = 10.6369214

self.assertEqual(len(result.directForcedSources), len(self.testDiaObjects))
self.assertEqual(len(result.diffForcedSources), len(self.testDiaObjects))

for dirRec, diffRec, testObj, dirVal, diffVal in zip(result.directForcedSources,
result.diffForcedSources,
self.testDiaObjects,
direct_values,
diff_values):
self.assertEqual(dirRec["id"], diffRec["id"])
self.assertEqual(dirRec["coord_ra"], testObj["coord_ra"])
self.assertEqual(dirRec["coord_dec"], testObj["coord_dec"])
self.assertEqual(diffRec["coord_ra"], testObj["coord_ra"])
self.assertEqual(diffRec["coord_dec"], testObj["coord_dec"])

self.assertAlmostEqual(dirRec["slot_PsfFlux_instFlux"], dirVal)
self.assertAlmostEqual(dirRec["slot_PsfFlux_instFluxErr"],
direct_var)

self.assertAlmostEqual(diffRec["slot_PsfFlux_instFlux"], diffVal)
self.assertAlmostEqual(diffRec["slot_PsfFlux_instFluxErr"],
diff_var)
dia_forced_sources = dfs.run(
test_objects, self.expIdBits, self.exposure, self.diffim)

direct_values = [199854.48417094944, 160097.40719241602,
82299.17897267535, 27148.604434624354,
5746.988388215507]
direct_var = [75240.939811168, 75231.42933749466,
75218.89495113207, 75214.88248249644,
75214.41447602339]
diff_values = [399708.9683418989, 320194.81438483205,
164598.3579453507, 54297.20886924871,
11493.976776431015]
diff_var = [106444.28782374493, 106417.39592887461,
106381.94840437356, 106370.59980584883,
106369.27608815048]

self.assertEqual(len(dia_forced_sources), len(self.testDiaObjects))
self.assertEqual(len(dia_forced_sources.columns),
self.expected_n_columns)

for (diaFS_id, diaFS), testObj, dirVal, diffVal, dirVar, diffVar in zip(dia_forced_sources.iterrows(),
self.testDiaObjects,
direct_values,
diff_values,
direct_var,
diff_var):
self.assertAlmostEqual(diaFS["psFlux"] / diffVal, 1.)
self.assertAlmostEqual(diaFS["psFluxErr"] / diffVar, 1.)

self.assertAlmostEqual(diaFS["totFlux"] / dirVal, 1.)
self.assertAlmostEqual(diaFS["totFluxErr"] / dirVar, 1.)

self.assertEqual(diaFS["ccdVisitId"], self.exposureId)

def _convert_to_pandas(self, dia_objects):
"""Convert input afw table to pandas.

Parameters
----------
dia_objects : `lsst.afw.table.SourceCatalog`
Catalog to convert

Returns
-------
output_catalog : `pandas.DataFrame`
Converted catalog
"""
output_catalog = dia_objects.asAstropy().to_pandas()
output_catalog.rename(columns={"id": "diaObjectId",
"coord_ra": "ra",
"coord_dec": "decl"},
inplace=True)

output_catalog.loc[:, "ra"] = np.degrees(output_catalog["ra"])
output_catalog.loc[:, "decl"] = np.degrees(output_catalog["decl"])

return output_catalog


class MemoryTester(lsst.utils.tests.MemoryTestCase):
Expand Down