Skip to content

Commit

Permalink
Add Tasks to write, transform, and consolidate ForcedSources
Browse files Browse the repository at this point in the history
This series of tasks produces the ForcedSource table
as specified in the DPDD.
  • Loading branch information
yalsayyad committed Sep 15, 2021
1 parent 917719b commit cf146b2
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pipelines/DRP.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ tasks:
connections.measCat: forced_diff
connections.outputSchema: forced_diff_schema
connections.exposure: goodSeeingDiff_differenceExp
writeForcedSourceTable: lsst.pipe.tasks.postprocess.WriteForcedSourceTableTask
transformForcedSourceTable: lsst.pipe.tasks.postprocess.TransformForcedSourceTableTask
consolidateForcedSourceTable: lsst.pipe.tasks.postprocess.ConsolidateForcedSourceTableTask
forcedPhotCcdOnDiaObjects:
class: lsst.meas.base.ForcedPhotCcdFromDataFrameTask
forcedPhotDiffOnDiaObjects:
Expand Down Expand Up @@ -208,3 +211,4 @@ contracts:
- transformDiaSourceCat.connections.diaSourceTable == drpAssociation.connections.diaSourceTables
- drpAssociation.connections.assocDiaSourceTable == drpDiaCalculation.connections.assocDiaSourceTable
- drpAssociation.connections.diaObjectTable == drpDiaCalculation.connections.diaObjectTable
- forcedPhotDiffim.connections.refCat == forcedPhotCcd.connections.refCat
4 changes: 4 additions & 0 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ def from_yaml(cls, translationDefinition, **kwargs):
else:
renameRules = None

if 'calexpFlags' in translationDefinition:
for flag in translationDefinition['calexpFlags']:
funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='calexp')

if 'refFlags' in translationDefinition:
for flag in translationDefinition['refFlags']:
funcs[cls.renameCol(flag, renameRules)] = Column(flag, dataset='ref')
Expand Down
180 changes: 180 additions & 0 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,3 +1485,183 @@ def run(self, visitSummaries):

outputCatalog = pd.DataFrame(data=visitEntries)
return pipeBase.Struct(outputCatalog=outputCatalog)


class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector", "skymap", "tract")):

inputCatalog = connectionTypes.Input(
doc="Primary per-detector, single-epoch forced-photometry catalog. "
"By default, it is the output of ForcedPhotCcdTask on calexps",
name="forced_src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector", "skymap", "tract")
)
inputCatalogDiff = connectionTypes.Input(
doc="Secondary multi-epoch, per-detector, forced photometry catalog. "
"By default, it is the output of ForcedPhotCcdTask run on image differences.",
name="forced_diff",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector", "skymap", "tract")
)
outputCatalog = connectionTypes.Output(
doc="InputCatalogs horizonatally joined on `objectId` in Parquet format",
name="forcedSource",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)


class WriteForcedSourceTableConfig(WriteSourceTableConfig,
pipelineConnections=WriteForcedSourceTableConnections):
pass


class WriteForcedSourceTableTask(pipeBase.PipelineTask):
"""Merge and convert per-detector forced source catalogs to parquet
"""
_DefaultName = "writeForcedSourceTable"
ConfigClass = WriteForcedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
# Add ccdVisitId to allow joining with CcdVisitTable
inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
inputs['band'] = butlerQC.quantum.dataId.full['band']

outputs = self.run(**inputs)
butlerQC.put(outputs, outputRefs)

def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
dfs = []
for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
df = table.asAstropy().to_pandas().set_index('objectId', drop=False)
df = df.reindex(sorted(df.columns), axis=1)
df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
df['band'] = band if band else pd.NA
df.columns = pd.MultiIndex.from_tuples([(dataset, c) for c in df.columns],
names=('dataset', 'column'))

dfs.append(df)

outputCatalog = functools.reduce(lambda d1, d2: d1.join(d2), dfs)
return pipeBase.Struct(outputCatalog=outputCatalog)


class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "skymap", "patch", "tract")):

inputCatalogs = connectionTypes.Input(
doc="Parquet table of merged ForcedSources produced by WriteForcedSourceTableTask",
name="forcedSource",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
multiple=True,
deferLoad=True
)
referenceCatalog = connectionTypes.Input(
doc="Reference catalog which was used to seed the forcedPhot. Columns "
"objectId, detect_isPrimary, detect_isTractInner, detect_isPatchInner "
"are expected.",
name="objectTable",
storageClass="DataFrame",
dimensions=("tract", "patch", "skymap"),
deferLoad=True
)
outputCatalog = connectionTypes.Output(
doc="Narrower, temporally-aggregated, per-patch ForcedSource Table transformed and converted per a "
"specified set of functors",
name="ForcedSourceTable",
storageClass="DataFrame",
dimensions=("tract", "patch", "skymap")
)


class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
pipelineConnections=TransformForcedSourceTableConnections):
pass


class TransformForcedSourceTableTask(TransformCatalogBaseTask):
"""Transform/standardize a ForcedSource catalog
Transforms each wide, per-detector forcedSource parquet table per the
specification file (per-camera defaults found in ForcedSource.yaml).
All epochs that overlap the patch are aggregated into one per-patch
narrow-parquet file.
No de-duplication of rows is performed. Duplicate resolutions flags are
pulled in from the referenceCatalog: `detect_isPrimary`,
`detect_isTractInner`,`detect_isPatchInner`, so that user may de-duplicate
for analysis or compare duplicates for QA.
The resulting table includes multiple bands. Epochs (MJDs) and other useful
per-visit rows can be retreived by joining with the CcdVisitTable on
ccdVisitId.
"""
_DefaultName = "transformForcedSourceTable"
ConfigClass = TransformForcedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
if self.funcs is None:
raise ValueError("config.functorFile is None. "
"Must be a valid path to yaml in order to run Task as a PipelineTask.")
outputs = self.run(inputs['inputCatalogs'], inputs['referenceCatalog'], funcs=self.funcs,
dataId=outputRefs.outputCatalog.dataId.full)

butlerQC.put(outputs, outputRefs)

def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
dfs = []
ref = referenceCatalog.get(parameters={"columns": ['detect_isPrimary', 'detect_isTractInner',
'detect_isPatchInner']})
self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
for handle in inputCatalogs:
result = self.transform(None, handle, funcs, dataId)
# Filter for only rows that were detected on (overlap) the patch
dfs.append(ref.join(result.df, how='inner'))

outputCatalog = pd.concat(dfs)
self.log.info("Made a table of %d columns and %d rows",
len(outputCatalog.columns), len(outputCatalog))
return pipeBase.Struct(outputCatalog=outputCatalog)


class ConsolidateForcedSourceTableConnections(pipeBase.PipelineTaskConnections,
defaultTemplates={"catalogType": ""},
dimensions=("instrument", "tract")):
inputCatalogs = connectionTypes.Input(
doc="Input per-patch ForcedSource Tables",
name="{catalogType}ForcedSourceTable",
storageClass="DataFrame",
dimensions=("tract", "patch", "skymap"),
multiple=True,
)

outputCatalog = connectionTypes.Output(
doc="Output per-tract concatenation of ForcedSource Tables",
name="{catalogType}ForcedSourceTable_tract",
storageClass="DataFrame",
dimensions=("tract", "skymap"),
)


class ConsolidateForcedSourceTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateForcedSourceTableConnections):
pass


class ConsolidateForcedSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
"""Concatenate a per-patch `ForcedSourceTable` list into a single
per-tract `forcedSourceTable_tract`
"""
_DefaultName = 'consolidateForcedSourceTable'
ConfigClass = ConsolidateForcedSourceTableConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
self.log.info("Concatenating %s per-patch ForcedSource Tables",
len(inputs['inputCatalogs']))
df = pd.concat(inputs['inputCatalogs'])
butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)

0 comments on commit cf146b2

Please sign in to comment.