Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-28555: Add verbosity to ApPipe and DiaPipe DB errors #106

Merged
merged 3 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 41 additions & 8 deletions python/lsst/ap/association/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,35 @@ def run(self,

matchResult = self.associate_sources(diaObjects, diaSources)

diaObjects = diaObjects.append(matchResult.new_dia_objects,
sort=True)
# Now that we know the DiaObjects our new DiaSources are associated
# with, we index the new DiaSources the same way as the full history
# and merge the tables.
diaSources.set_index(["diaObjectId", "filterName", "diaSourceId"],
drop=False,
inplace=True)
# Test for DiaSource duplication first. If duplicates are found,
# this likely means this is duplicate data being processed and sent
# to the Apdb.
mergedDiaSourceHistory = diaSourceHistory.append(diaSources, sort=True)
if mergedDiaSourceHistory.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaSources found after association and merging "
"with history. This is likely due to re-running data with an "
"already populated Apdb. If this was not the case then there "
"was an unexpected failure in Association while matching "
"sources to objects, and should be reported. Exiting.")

diaObjects = diaObjects.append(matchResult.new_dia_objects,
sort=True)
# Double check to make sure there are no duplicates in the DiaObject
# table after association.
if diaObjects.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaObjects created after association. This is "
"likely due to re-running data with an already populated "
"Apdb. If this was not the case then there was an unexpected "
"failure in Association while matching and creating new "
"DiaObjectsand should be reported. Exiting.")

# Get the current filter being processed.
filterName = diaSources["filterName"].iat[0]
Expand All @@ -149,9 +169,22 @@ def run(self,
matchResult.associated_dia_object_ids,
filterName)

allDiaObjects = updatedResults.diaObjectCat
updatedDiaObjects = updatedResults.updatedDiaObjects
if allDiaObjects.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaObjects (loaded + updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")
if updatedDiaObjects.index.has_duplicates:
raise RuntimeError(
"Duplicate DiaObjects (updated) created after "
"DiaCalculation. This is unexpected behavior and should be "
"reported. Existing.")

return pipeBase.Struct(
diaObjects=updatedResults.diaObjectCat,
updatedDiaObjects=updatedResults.updatedDiaObjects,
diaObjects=allDiaObjects,
updatedDiaObjects=updatedDiaObjects,
diaSources=diaSources,
)

Expand Down Expand Up @@ -261,8 +294,8 @@ def score(self, dia_objects, dia_sources, max_dist):
INF, -1, and -1 respectively for unassociated sources.
"""
scores = np.full(len(dia_sources), np.inf, dtype=np.float64)
obj_idxs = np.full(len(dia_sources), -1, dtype=np.int)
obj_ids = np.full(len(dia_sources), -1, dtype=np.int)
obj_idxs = np.full(len(dia_sources), -1, dtype=np.int64)
obj_ids = np.full(len(dia_sources), 0, dtype=np.int64)

if len(dia_objects) == 0:
return pipeBase.Struct(
Expand Down Expand Up @@ -371,8 +404,8 @@ def match(self, dia_objects, dia_sources, score_struct):
"""

n_previous_dia_objects = len(dia_objects)
used_dia_object = np.zeros(n_previous_dia_objects, dtype=np.bool)
used_dia_source = np.zeros(len(dia_sources), dtype=np.bool)
used_dia_object = np.zeros(n_previous_dia_objects, dtype=bool)
used_dia_source = np.zeros(len(dia_sources), dtype=bool)
associated_dia_object_ids = np.zeros(len(dia_sources),
dtype=np.uint64)
new_dia_objects = []
Expand Down
16 changes: 16 additions & 0 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,27 @@ def run(self, diaSourceCat, diffIm, exposure, warpedExposure, ccdExposureIdBits)
assocResults.updatedDiaObjects,
exposure.getInfo().getVisitInfo().getDate().toPython())
self.apdb.storeDiaForcedSources(diaForcedSources)

if self.config.doPackageAlerts:
if len(loaderResult.diaForcedSources) > 1:
diaForcedSources = diaForcedSources.append(
loaderResult.diaForcedSources,
sort=True)
if diaForcedSources.index.has_duplicates:
self.log.warn(
"Duplicate DiaForcedSources created after merge with "
"history and new sources. This may cause downstream "
"problems. Dropping duplicates.")
# Drop duplicates via index and keep the first appearance.
# Reset due to the index shape being slight different than
# expected.
diaForcedSources = diaForcedSources.groupby(
diaForcedSources.index).first()
diaForcedSources.reset_index(drop=True, inplace=True)
diaForcedSources.set_index(
["diaObjectId", "diaForcedSourceId"],
drop=False,
inplace=True)
self.alertPackager.run(assocResults.diaSources,
assocResults.diaObjects,
loaderResult.diaSources,
Expand Down
45 changes: 43 additions & 2 deletions python/lsst/ap/association/loadDiaCatalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
import numpy as np
import pandas as pd
from sqlalchemy.exc import OperationalError, ProgrammingError

import lsst.geom as geom
import lsst.pex.config as pexConfig
Expand Down Expand Up @@ -97,11 +98,19 @@ def run(self, exposure, apdb):
``diaObjectId``, ``filterName``, ``diaSourceId`` columns.
(`pandas.DataFrame`)
"""
visiInfo = exposure.getInfo().getVisitInfo()
pixelRanges = self._getPixelRanges(exposure)

diaObjects = self.loadDiaObjects(pixelRanges, apdb)
# This is the first database query
try:
diaObjects = self.loadDiaObjects(pixelRanges, apdb)
except (OperationalError, ProgrammingError) as e:
raise RuntimeError(
"Database query failed to load DiaObjects; did you call "
"make_apdb.py first? If you did, some other error occurred "
"during database access of the DiaObject table.") from e

dateTime = exposure.getInfo().getVisitInfo().getDate().toPython()
dateTime = visiInfo.getDate().toPython()

diaSources = self.loadDiaSources(diaObjects,
dateTime,
Expand Down Expand Up @@ -140,7 +149,15 @@ def loadDiaObjects(self, pixelRanges, apdb):
diaObjects = pd.DataFrame(columns=["diaObjectId"])
else:
diaObjects = apdb.getDiaObjects(pixelRanges, return_pandas=True)

diaObjects.set_index("diaObjectId", drop=False, inplace=True)
if diaObjects.index.has_duplicates:
self.log.warn(
"Duplicate DiaObjects loaded from the Apdb. This may cause "
"downstream pipeline issues. Dropping duplicated rows")
# Drop duplicates via index and keep the first appearance.
diaObjects = diaObjects.groupby(diaObjects.index).first()

return diaObjects.replace(to_replace=[None], value=np.nan)

@pipeBase.timeMethod
Expand Down Expand Up @@ -195,6 +212,17 @@ def loadDiaSources(self, diaObjects, pixelRanges, dateTime, apdb):
diaSources.set_index(["diaObjectId", "filterName", "diaSourceId"],
drop=False,
inplace=True)
if diaSources.index.has_duplicates:
self.log.warn(
"Duplicate DiaSources loaded from the Apdb. This may cause "
"downstream pipeline issues. Dropping duplicated rows")
# Drop duplicates via index and keep the first appearance. Reset
# due to the index shape being slight different thatn expected.
diaSources = diaSources.groupby(diaSources.index).first().reset_index(drop=True)
diaSources.set_index(["diaObjectId", "filterName", "diaSourceId"],
drop=False,
inplace=True)

return diaSources.replace(to_replace=[None], value=np.nan)

@pipeBase.timeMethod
Expand Down Expand Up @@ -226,9 +254,22 @@ def loadDiaForcedSources(self, diaObjects, dateTime, apdb):
diaObjects.loc[:, "diaObjectId"],
dateTime,
return_pandas=True)

diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"],
drop=False,
inplace=True)
if diaForcedSources.index.has_duplicates:
self.log.warn(
"Duplicate DiaForcedSources loaded from the Apdb. This may "
"cause downstream pipeline issues. Dropping duplicated rows.")
# Drop duplicates via index and keep the first appearance. Reset
# due to the index shape being slight different thatn expected.
diaForcedSources = diaForcedSources.groupby(diaForcedSources.index).first()
diaForcedSources.reset_index(drop=True, inplace=True)
diaForcedSources.set_index(["diaObjectId", "diaForcedSourceId"],
drop=False,
inplace=True)

return diaForcedSources.replace(to_replace=[None], value=np.nan)

@pipeBase.timeMethod
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ap/association/mapApData.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def __init__(self, flag_map_file, table_name):
for column in self.bit_pack_columns:
names = []
for bit in column["bitList"]:
names.append((bit["name"], np.bool))
names.append((bit["name"], bool))
self.output_flag_columns[column["columnName"]] = names

def unpack(self, input_flag_values, flag_name):
Expand Down
35 changes: 34 additions & 1 deletion tests/test_association_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,41 @@ def test_run_no_existing_objects(self):
self.assertEqual(output_dia_object['gPSFluxNdata'], 1)
self.assertEqual(df_idx, obj_idx + 10)

def _run_association_and_retrieve_objects(self, create_objects=False):
def test_run_dup_diaSources(self):
"""Test that duplicate sources being run through association throw the
correct error.
"""
with self.assertRaises(RuntimeError):
self._run_association_and_retrieve_objects(create_objects=True,
dupDiaSources=True,
dupDiaObjects=False)

def test_run_dup_diaObjects(self):
"""Test that duplicate objects being run through association throw the
correct error.
"""
with self.assertRaises(RuntimeError):
self._run_association_and_retrieve_objects(create_objects=True,
dupDiaSources=False,
dupDiaObjects=True)

def _run_association_and_retrieve_objects(self,
create_objects=False,
dupDiaSources=False,
dupDiaObjects=False):
"""Convenience method for testing the Association run method.

Parameters
----------
create_objects : `bool`
Boolean specifying if seed DIAObjects and DIASources should be
inserted into the database before association.
dupDiaSources : `bool`
Add duplicate diaSources into processing to force an error. Must
be used with ``create_objects`` equal to True.
dupDiaObjects : `bool`
Add duplicate diaObjects into processing to force an error. Must
be used with ``create_objects`` equal to True.

Return
------
Expand Down Expand Up @@ -328,6 +355,9 @@ def _run_association_and_retrieve_objects(self, create_objects=False):
inplace=True)
diaSources["ra"] = np.degrees(diaSources["ra"])
diaSources["decl"] = np.degrees(diaSources["decl"])
if dupDiaSources:
diaSources = diaSources.append(diaSourceHistory.iloc[[0, -1]],
ignore_index=True)

if len(diaObjects) == 0:
diaSourceHistory = pd.DataFrame(columns=["diaObjectId",
Expand All @@ -337,6 +367,9 @@ def _run_association_and_retrieve_objects(self, create_objects=False):
["diaObjectId", "filterName", "diaSourceId"],
drop=False,
inplace=True)
if dupDiaObjects:
diaObjects = diaObjects.append(diaObjects.iloc[[0, -1]],
ignore_index=True)

results = assoc_task.run(diaSources,
diaObjects,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_dia_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def setUp(self):
self.diaSources = pd.DataFrame(data=diaSources)

self.updatedDiaObjectIds = np.array([0, 1, 2, self.newDiaObjectId],
dtype=np.int)
dtype=np.int64)

conf = DiaObjectCalculationConfig()
conf.plugins = ["testDiaPlugin",
Expand Down Expand Up @@ -224,7 +224,7 @@ def testRunUnindexed(self):
self.diaObjects.reset_index()
results = diaObjectCalTask.run(self.diaObjects,
unindexedDiaSources,
np.array([0], dtype=np.int),
np.array([0], dtype=np.int64),
"g")
updatedDiaObjects = results.updatedDiaObjects
self.assertEqual(updatedDiaObjects.at[0, "count"],
Expand Down