Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-31033: Move DiaCalculation from a subtask of AssociationTask to a subtask of DiaPipe. #120

Merged
merged 3 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 6 additions & 76 deletions python/lsst/ap/association/association.py
Expand Up @@ -32,8 +32,6 @@
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase

from lsst.meas.base import DiaObjectCalculationTask

# Enforce an error for unsafe column/array value setting in pandas.
pd.options.mode.chained_assignment = 'raise'

Expand All @@ -47,34 +45,6 @@ class AssociationConfig(pexConfig.Config):
'match to a DIAObject.',
default=1.0,
)
diaCalculation = pexConfig.ConfigurableField(
target=DiaObjectCalculationTask,
doc="Task to compute summary statistics for DiaObjects.",
)

def setDefaults(self):
self.diaCalculation.plugins = ["ap_meanPosition",
"ap_HTMIndex",
"ap_nDiaSources",
"ap_diaObjectFlag",
"ap_meanFlux",
"ap_percentileFlux",
"ap_sigmaFlux",
"ap_chi2Flux",
"ap_madFlux",
"ap_skewFlux",
"ap_minMaxFlux",
"ap_maxSlopeFlux",
"ap_meanErrFlux",
"ap_linearFit",
"ap_stetsonJ",
"ap_meanTotFlux",
"ap_sigmaTotFlux"]

def validate(self):
if "ap_HTMIndex" not in self.diaCalculation.plugins:
raise ValueError("AssociationTask requires the ap_HTMIndex plugin "
"be enabled for proper insertion into the Apdb.")


class AssociationTask(pipeBase.Task):
Expand All @@ -89,10 +59,6 @@ class AssociationTask(pipeBase.Task):
ConfigClass = AssociationConfig
_DefaultName = "association"

def __init__(self, **kwargs):
pipeBase.Task.__init__(self, **kwargs)
self.makeSubtask("diaCalculation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleting the entire init function won't break anything else, will it? Certainly deleting the makeSubtask call makes sense, but you're killing the entire init function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first line is calling the parent class init (written before super() was common) so if you are removing the second line the first line is not doing anything on its own.


@pipeBase.timeMethod
def run(self,
diaSources,
Expand Down Expand Up @@ -120,8 +86,8 @@ def run(self,
diaObjects. (`pandas.DataFrame`)
- ``updatedDiaObjects`` : Subset of DiaObjects that were updated
or created during processing. (`pandas.DataFrame`)
- ``diaSources`` : DiaSources detected in this ccdVisit with
associated diaObjectIds. (`pandas.DataFrame`)
- ``matchedDiaObjectIds`` : DiaSources detected in this ccdVisit with
associated diaObjectIds. (`numpy.ndarray`)
"""
diaSources = self.check_dia_source_radec(diaSources)

Expand All @@ -133,18 +99,7 @@ def run(self,
diaSources.set_index(["diaObjectId", "filterName", "diaSourceId"],
drop=False,
inplace=True)
# Test for DiaSource duplication first. If duplicates are found,
# this likely means this is duplicate data being processed and sent
# to the Apdb.
mergedDiaSourceHistory = diaSourceHistory.append(diaSources, sort=True)
if mergedDiaSourceHistory.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaSources found after association and merging "
"with history. This is likely due to re-running data with an "
"already populated Apdb. If this was not the case then there "
"was an unexpected failure in Association while matching "
"sources to objects, and should be reported. Exiting.")

# Append the newly created DiaObjectds.
diaObjects = diaObjects.append(matchResult.new_dia_objects,
sort=True)
# Double check to make sure there are no duplicates in the DiaObject
Expand All @@ -155,37 +110,12 @@ def run(self,
"likely due to re-running data with an already populated "
"Apdb. If this was not the case then there was an unexpected "
"failure in Association while matching and creating new "
"DiaObjectsand should be reported. Exiting.")

# Get the current filter being processed.
filterName = diaSources["filterName"].iat[0]

# Update previously existing DIAObjects with the information from their
# newly association DIASources and create new DIAObjects from
# unassociated sources.
updatedResults = self.diaCalculation.run(
diaObjects,
mergedDiaSourceHistory,
matchResult.associated_dia_object_ids,
[filterName])

allDiaObjects = updatedResults.diaObjectCat
updatedDiaObjects = updatedResults.updatedDiaObjects
if allDiaObjects.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaObjects (loaded + updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")
if updatedDiaObjects.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaObjects (updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")
"DiaObjects and should be reported. Exiting.")

return pipeBase.Struct(
diaObjects=allDiaObjects,
updatedDiaObjects=updatedDiaObjects,
diaObjects=diaObjects,
diaSources=diaSources,
matchedDiaObjectIds=matchResult.associated_dia_object_ids,
)

def check_dia_source_radec(self, dia_sources):
Expand Down
90 changes: 83 additions & 7 deletions python/lsst/ap/association/diaPipe.py
Expand Up @@ -31,6 +31,7 @@
import os

import lsst.dax.apdb as daxApdb
from lsst.meas.base import DiaObjectCalculationTask
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.pipe.base.connectionTypes as connTypes
Expand Down Expand Up @@ -199,6 +200,10 @@ class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
target=AssociationTask,
doc="Task used to associate DiaSources with DiaObjects.",
)
diaCalculation = pexConfig.ConfigurableField(
target=DiaObjectCalculationTask,
doc="Task to compute summary statistics for DiaObjects.",
)
diaForcedSource = pexConfig.ConfigurableField(
target=DiaForcedSourceTask,
doc="Task used for force photometer DiaObject locations in direct and "
Expand Down Expand Up @@ -227,14 +232,34 @@ def setDefaults(self):
"${AP_ASSOCIATION_DIR}",
"data",
"apdb-ap-pipe-schema-extra.yaml")
self.diaCalculation.plugins = ["ap_meanPosition",
"ap_HTMIndex",
"ap_nDiaSources",
"ap_diaObjectFlag",
"ap_meanFlux",
"ap_percentileFlux",
"ap_sigmaFlux",
"ap_chi2Flux",
"ap_madFlux",
"ap_skewFlux",
"ap_minMaxFlux",
"ap_maxSlopeFlux",
"ap_meanErrFlux",
"ap_linearFit",
"ap_stetsonJ",
"ap_meanTotFlux",
"ap_sigmaTotFlux"]

def validate(self):
pexConfig.Config.validate(self)
if self.diaCatalogLoader.htmLevel != \
self.associator.diaCalculation.plugins["ap_HTMIndex"].htmLevel:
self.diaCalculation.plugins["ap_HTMIndex"].htmLevel:
raise ValueError("HTM index level in LoadDiaCatalogsTask must be "
"equal to HTMIndexDiaCalculationPlugin index "
"level.")
if "ap_HTMIndex" not in self.diaCalculation.plugins:
raise ValueError("DiaPipe requires the ap_HTMIndex plugin "
"be enabled for proper insertion into the Apdb.")


class DiaPipelineTask(pipeBase.PipelineTask):
Expand All @@ -252,6 +277,7 @@ def __init__(self, initInputs=None, **kwargs):
DiaSource=make_dia_source_schema()))
self.makeSubtask("diaCatalogLoader")
self.makeSubtask("associator")
self.makeSubtask("diaCalculation")
self.makeSubtask("diaForcedSource")
if self.config.doPackageAlerts:
self.makeSubtask("alertPackager")
Expand All @@ -261,6 +287,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
expId, expBits = butlerQC.quantum.dataId.pack("visit_detector",
returnMaxBits=True)
inputs["ccdExposureIdBits"] = expBits
inputs["band"] = butlerQC.quantum.dataId["band"]

outputs = self.run(**inputs)

Expand All @@ -272,7 +299,8 @@ def run(self,
diffIm,
exposure,
warpedExposure,
ccdExposureIdBits):
ccdExposureIdBits,
band):
"""Process DiaSources and DiaObjects.

Load previous DiaObjects and their DiaSource history. Calibrate the
Expand Down Expand Up @@ -317,19 +345,49 @@ def run(self,
loaderResult.diaObjects,
loaderResult.diaSources)

mergedDiaSourceHistory = loaderResult.diaSources.append(
assocResults.diaSources,
sort=True)
# Test for DiaSource duplication first. If duplicates are found,
# this likely means this is duplicate data being processed and sent
# to the Apdb.
if self.testDataFrameIndex(mergedDiaSourceHistory):
raise RuntimeError(
"Duplicate DiaSources found after association and merging "
"with history. This is likely due to re-running data with an "
"already populated Apdb. If this was not the case then there "
"was an unexpected failure in Association while matching "
"sources to objects, and should be reported. Exiting.")

diaCalResult = self.diaCalculation.run(
assocResults.diaObjects,
mergedDiaSourceHistory,
assocResults.matchedDiaObjectIds,
[band])
if self.testDataFrameIndex(diaCalResult.diaObjectCat):
raise RuntimeError(
"Duplicate DiaObjects (loaded + updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")
if self.testDataFrameIndex(diaCalResult.updatedDiaObjects):
raise RuntimeError(
"Duplicate DiaObjects (updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")

# Force photometer on the Difference and Calibrated exposures using
# the new and updated DiaObject locations.
diaForcedSources = self.diaForcedSource.run(
assocResults.diaObjects,
assocResults.updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(),
diaCalResult.diaObjectCat,
diaCalResult.updatedDiaObjects.loc[:, "diaObjectId"].to_numpy(),
ccdExposureIdBits,
exposure,
diffIm)

# Store DiaSources and updated DiaObjects in the Apdb.
self.apdb.storeDiaSources(assocResults.diaSources)
self.apdb.storeDiaObjects(
assocResults.updatedDiaObjects,
diaCalResult.updatedDiaObjects,
exposure.getInfo().getVisitInfo().getDate().toPython())
self.apdb.storeDiaForcedSources(diaForcedSources)

Expand All @@ -338,7 +396,7 @@ def run(self,
diaForcedSources = diaForcedSources.append(
loaderResult.diaForcedSources,
sort=True)
if diaForcedSources.index.has_duplicates:
if self.testDataFrameIndex(diaForcedSources):
self.log.warn(
"Duplicate DiaForcedSources created after merge with "
"history and new sources. This may cause downstream "
Expand All @@ -354,7 +412,7 @@ def run(self,
drop=False,
inplace=True)
self.alertPackager.run(assocResults.diaSources,
assocResults.diaObjects,
diaCalResult.diaObjectCat,
loaderResult.diaSources,
diaForcedSources,
diffIm,
Expand All @@ -363,3 +421,21 @@ def run(self,

return pipeBase.Struct(apdbMarker=self.config.apdb.value,
associatedDiaSources=assocResults.diaSources)

def testDataFrameIndex(self, df):
"""Test the sorted DataFrame index for duplicates.

Wrapped as a separate function to allow for mocking of the this task
in unittesting. Default of a mock return for this test is True.

Parameters
----------
df : `pandas.DataFrame`
DataFrame to text.

Returns
-------
`bool`
True if DataFrame contains duplicate rows.
"""
return df.index.has_duplicates
29 changes: 3 additions & 26 deletions tests/test_association_task.py
Expand Up @@ -242,21 +242,13 @@ def test_run(self):
if df_idx == not_updated_idx:
# Test the DIAObject we expect to not be associated with any
# new DIASources.
self.assertEqual(dia_object['gPSFluxNdata'], 1)
self.assertEqual(dia_object['rPSFluxNdata'], 1)
self.assertEqual(dia_object['nDiaSources'], 2)
self.assertEqual(df_idx, obj_idx)
elif updated_idx_start <= df_idx < new_idx_start:
# Test that associating to the existing DIAObjects went
# as planned and test that the IDs of the newly associated
# DIASources is correct.
self.assertEqual(dia_object['gPSFluxNdata'], 2)
self.assertEqual(dia_object['rPSFluxNdata'], 1)
self.assertEqual(dia_object['nDiaSources'], 3)
self.assertEqual(df_idx, obj_idx)
else:
self.assertEqual(dia_object['gPSFluxNdata'], 1)
self.assertEqual(dia_object['nDiaSources'], 1)
self.assertEqual(df_idx, obj_idx + 4 + 5)

def test_run_no_existing_objects(self):
Expand All @@ -267,30 +259,18 @@ def test_run_no_existing_objects(self):
self.assertEqual(len(dia_objects),
total_expected_dia_objects)
for obj_idx, (df_idx, output_dia_object) in enumerate(dia_objects.iterrows()):
self.assertEqual(output_dia_object['gPSFluxNdata'], 1)
self.assertEqual(df_idx, obj_idx + 10)

def test_run_dup_diaSources(self):
"""Test that duplicate sources being run through association throw the
correct error.
"""
with self.assertRaises(RuntimeError):
self._run_association_and_retrieve_objects(create_objects=True,
dupDiaSources=True,
dupDiaObjects=False)

def test_run_dup_diaObjects(self):
"""Test that duplicate objects being run through association throw the
correct error.
"""
with self.assertRaises(RuntimeError):
self._run_association_and_retrieve_objects(create_objects=True,
dupDiaSources=False,
dupDiaObjects=True)

def _run_association_and_retrieve_objects(self,
create_objects=False,
dupDiaSources=False,
dupDiaObjects=False):
"""Convenience method for testing the Association run method.

Expand All @@ -299,9 +279,6 @@ def _run_association_and_retrieve_objects(self,
create_objects : `bool`
Boolean specifying if seed DIAObjects and DIASources should be
inserted into the database before association.
dupDiaSources : `bool`
Add duplicate diaSources into processing to force an error. Must
be used with ``create_objects`` equal to True.
dupDiaObjects : `bool`
Add duplicate diaObjects into processing to force an error. Must
be used with ``create_objects`` equal to True.
Expand Down Expand Up @@ -355,9 +332,6 @@ def _run_association_and_retrieve_objects(self,
inplace=True)
diaSources["ra"] = np.degrees(diaSources["ra"])
diaSources["decl"] = np.degrees(diaSources["decl"])
if dupDiaSources:
diaSources = diaSources.append(diaSourceHistory.iloc[[0, -1]],
ignore_index=True)

if len(diaObjects) == 0:
diaSourceHistory = pd.DataFrame(columns=["diaObjectId",
Expand All @@ -370,6 +344,9 @@ def _run_association_and_retrieve_objects(self,
if dupDiaObjects:
diaObjects = diaObjects.append(diaObjects.iloc[[0, -1]],
ignore_index=True)
diaObjects.set_index("diaObjectId",
inplace=True,
drop=False)

results = assoc_task.run(diaSources,
diaObjects,
Expand Down