Skip to content

Commit

Permalink
Merge branch 'tickets/DM-31765'
Browse files Browse the repository at this point in the history
  • Loading branch information
yalsayyad committed Sep 26, 2021
2 parents cb1df97 + 58d345e commit 164d271
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
19 changes: 19 additions & 0 deletions pipelines/DRP.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,25 @@ tasks:
connections.measCat: forced_diff_diaObject
connections.outputSchema: forced_diff_diaObject_schema
connections.exposure: goodSeeingDiff_differenceExp
writeForcedSourceOnDiaObjectTable:
class: lsst.pipe.tasks.postprocess.WriteForcedSourceTableTask
config:
key: diaObjectId
connections.inputCatalogDiff: forced_diff_diaObject
connections.inputCatalog: forced_src_diaObject
connections.outputCatalog: forcedSourceOnDiaObject
transformForcedSourceOnDiaObjectTable:
class: lsst.pipe.tasks.postprocess.TransformForcedSourceTableTask
config:
referenceColumns: []
connections.inputCatalogs: forcedSourceOnDiaObject
connections.outputCatalog: forcedSourceOnDiaObjectTable
connections.referenceCatalog: goodSeeingDiff_fullDiaObjTable
consolidateForcedSourceOnDiaObjectTable:
class: lsst.pipe.tasks.postprocess.ConsolidateForcedSourceTableTask
config:
connections.inputCatalogs: forcedSourceOnDiaObjectTable
connections.outputCatalog: forcedSourceOnDiaObjectTable_tract
subsets:
processCcd:
subset:
Expand Down
7 changes: 5 additions & 2 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,11 @@ def __call__(self, data, **kwargs):
df[f.multilevelColumns(data, returnTuple=True, columnIndex=columnIndex)]
)
valDict[k] = f._func(subdf)
except Exception:
valDict[k] = f.fail(subdf)
except Exception as e:
try:
valDict[k] = f.fail(subdf)
except NameError:
raise e

else:
if isinstance(data, DeferredDatasetHandle):
Expand Down
19 changes: 13 additions & 6 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,11 @@ class WriteForcedSourceTableConnections(pipeBase.PipelineTaskConnections,

class WriteForcedSourceTableConfig(WriteSourceTableConfig,
pipelineConnections=WriteForcedSourceTableConnections):
pass
key = lsst.pex.config.Field(
doc="Column on which to join the two input tables on and make the primary key of the output",
dtype=str,
default="objectId",
)


class WriteForcedSourceTableTask(pipeBase.PipelineTask):
Expand All @@ -1528,14 +1532,13 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
# Add ccdVisitId to allow joining with CcdVisitTable
inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
inputs['band'] = butlerQC.quantum.dataId.full['band']

outputs = self.run(**inputs)
butlerQC.put(outputs, outputRefs)

def run(self, inputCatalog, inputCatalogDiff, ccdVisitId=None, band=None):
dfs = []
for table, dataset, in zip((inputCatalog, inputCatalogDiff), ('calexp', 'diff')):
df = table.asAstropy().to_pandas().set_index('objectId', drop=False)
df = table.asAstropy().to_pandas().set_index(self.config.key, drop=False)
df = df.reindex(sorted(df.columns), axis=1)
df['ccdVisitId'] = ccdVisitId if ccdVisitId else pd.NA
df['band'] = band if band else pd.NA
Expand Down Expand Up @@ -1579,7 +1582,12 @@ class TransformForcedSourceTableConnections(pipeBase.PipelineTaskConnections,

class TransformForcedSourceTableConfig(TransformCatalogBaseConfig,
pipelineConnections=TransformForcedSourceTableConnections):
pass
referenceColumns = pexConfig.ListField(
dtype=str,
default=["detect_isPrimary", "detect_isTractInner", "detect_isPatchInner"],
optional=True,
doc="Columns to pull from reference catalog",
)


class TransformForcedSourceTableTask(TransformCatalogBaseTask):
Expand Down Expand Up @@ -1614,8 +1622,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):

def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=None):
dfs = []
ref = referenceCatalog.get(parameters={"columns": ['detect_isPrimary', 'detect_isTractInner',
'detect_isPatchInner']})
ref = referenceCatalog.get(parameters={"columns": self.config.referenceColumns})
self.log.info("Aggregating %s input catalogs" % (len(inputCatalogs)))
for handle in inputCatalogs:
result = self.transform(None, handle, funcs, dataId)
Expand Down

0 comments on commit 164d271

Please sign in to comment.