Skip to content

Commit

Permalink
Ensure empty assoc diaSource and diaObject tables have same schema
Browse files Browse the repository at this point in the history
as when they have data.

- Add config parameter doWriteEmptyTables to control whether
  to write empty tables if there are no overlapping diaSources to
  associate or raise NoWorkFound.
- raise NoWorkFound by default.
  • Loading branch information
yalsayyad committed Oct 28, 2021
1 parent 387a55d commit 247f05a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
23 changes: 20 additions & 3 deletions python/lsst/pipe/tasks/drpAssociationPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ class DrpAssociationPipeConfig(
doc="Do pull diaObject's average coordinate as coord_ra and coord_dec"
"Duplicates information, but needed for bulk ingest into qserv."
)
doWriteEmptyTables = pexConfig.Field(
dtype=bool,
default=False,
doc="If True, construct and write out empty diaSource and diaObject "
"tables. If False, raise NoWorkFound"
)


class DrpAssociationPipeTask(pipeBase.PipelineTask):
Expand Down Expand Up @@ -183,7 +189,17 @@ def run(self,
cutCat = cat[isInTractPatch]
diaSourceHistory.append(cutCat)

diaSourceHistoryCat = pd.concat(diaSourceHistory)
if diaSourceHistory:
diaSourceHistoryCat = pd.concat(diaSourceHistory)
else:
# No rows to associate
if self.config.doWriteEmptyTables:
self.log.info("Constructing empty table")
# Construct empty table using last table and dropping all the rows
diaSourceHistoryCat = cat.drop(cat.index)
else:
raise pipeBase.NoWorkFound("Found no overlapping DIASources to associate.")

self.log.info("Found %i DiaSources overlapping patch %i, tract %i",
len(diaSourceHistoryCat), patchId, tractId)

Expand All @@ -194,7 +210,7 @@ def run(self,
self.log.info("Associated DiaSources into %i DiaObjects",
len(assocResult.diaObjects))

if self.config.doAddDiaObjectCoords and not assocResult.diaObjects.empty:
if self.config.doAddDiaObjectCoords:
assocResult.assocDiaSources = self._addDiaObjectCoords(assocResult.diaObjects,
assocResult.assocDiaSources)

Expand All @@ -204,7 +220,8 @@ def run(self,

def _addDiaObjectCoords(self, objects, sources):
obj = objects[['ra', 'decl']].rename(columns={"ra": "coord_ra", "decl": "coord_dec"})
df = pd.merge(sources, obj, left_on='diaObjectId', right_index=True, how='inner')
df = pd.merge(sources.reset_index(), obj, left_on='diaObjectId', right_index=True,
how='inner').set_index('diaSourceId')
return df

def _trimToPatch(self, cat, innerPatchBox, wcs):
Expand Down
15 changes: 12 additions & 3 deletions python/lsst/pipe/tasks/simpleAssociation.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ def run(self, diaSources, tractPatchId, skymapBits):
(`pandas.DataFrame`).
"""

# Expected indexes include diaSourceId or meaningless range index
# If meaningless range index, drop it, else keep it.
doDropIndex = diaSources.index.names[0] is None
diaSources.reset_index(inplace=True, drop=doDropIndex)

# Sort by ccdVisit and diaSourceId to get a reproducible ordering for
# the association.
diaSources.reset_index(inplace=True)
diaSources.set_index(["ccdVisitId", "diaSourceId"], inplace=True)

# Empty lists to store matching and location data.
Expand Down Expand Up @@ -190,8 +195,12 @@ def run(self, diaSources, tractPatchId, skymapBits):
diaSources.reset_index(inplace=True)
diaSources.set_index("diaSourceId", inplace=True, verify_integrity=True)

diaObjects = pd.DataFrame(data=diaObjectCat)
diaSources.reset_index(inplace=True)
objs = diaObjectCat if diaObjectCat else np.array([], dtype=[('diaObjectId', 'int64'),
('ra', 'float64'),
('decl', 'float64'),
('nDiaSources', 'int64')])
diaObjects = pd.DataFrame(data=objs)

if "diaObjectId" in diaObjects.columns:
diaObjects.set_index("diaObjectId", inplace=True, verify_integrity=True)

Expand Down

0 comments on commit 247f05a

Please sign in to comment.