Skip to content

Commit

Permalink
Merge branch 'tickets/DM-29147'
Browse files Browse the repository at this point in the history
  • Loading branch information
morriscb committed May 1, 2021
2 parents 4828449 + f14a2c5 commit 83b8c68
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 64 deletions.
106 changes: 91 additions & 15 deletions data/DiaSource.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
# Column names defined in http://ls.st/dpdd with some modification as needed
# by database schema defined in lsst.dax.apdb
funcs:
diaSourceId: # the index of deepCoadd_disSource IS the diaSourceId
functor: Column
args: id
ccdVisitId:
functor: Column
args: ccdVisitId
filterName:
functor: Column
args: filterName
diaObjectId:
functor: Column
args: diaObjectId
Expand Down Expand Up @@ -58,7 +63,11 @@ funcs:
- slot_ApFlux_instFluxErr
- base_LocalPhotoCalib
- base_LocalPhotoCalibErr
# SNR need to make functor. DM-
snr:
functor: Ratio
args:
- slot_ApFlux_instFlux
- slot_ApFlux_instFluxErr
psFlux:
functor: LocalNanojansky
args:
Expand Down Expand Up @@ -86,8 +95,42 @@ funcs:
# trailLnL not implemented
# trailChi2 not implemented
# trailNdata not implemented
# dipMeanFlux needs functor DM-
# dipFluxDiff needs functor DM-
dipMeanFlux:
functor: LocalDipoleMeanFlux
args:
- ip_diffim_NaiveDipoleFlux_pos_instFlux
- ip_diffim_NaiveDipoleFlux_neg_instFlux
- ip_diffim_NaiveDipoleFlux_pos_instFluxErr
- ip_diffim_NaiveDipoleFlux_neg_instFluxErr
- base_LocalPhotoCalib
- base_LocalPhotoCalibErr
dipMeanFluxErr:
functor: LocalDipoleMeanFluxErr
args:
- ip_diffim_NaiveDipoleFlux_pos_instFlux
- ip_diffim_NaiveDipoleFlux_neg_instFlux
- ip_diffim_NaiveDipoleFlux_pos_instFluxErr
- ip_diffim_NaiveDipoleFlux_neg_instFluxErr
- base_LocalPhotoCalib
- base_LocalPhotoCalibErr
dipFluxDiff:
functor: LocalDipoleDiffFlux
args:
- ip_diffim_NaiveDipoleFlux_pos_instFlux
- ip_diffim_NaiveDipoleFlux_neg_instFlux
- ip_diffim_NaiveDipoleFlux_pos_instFluxErr
- ip_diffim_NaiveDipoleFlux_neg_instFluxErr
- base_LocalPhotoCalib
- base_LocalPhotoCalibErr
dipFluxDiffErr:
functor: LocalDipoleDiffFluxErr
args:
- ip_diffim_NaiveDipoleFlux_pos_instFlux
- ip_diffim_NaiveDipoleFlux_neg_instFlux
- ip_diffim_NaiveDipoleFlux_pos_instFluxErr
- ip_diffim_NaiveDipoleFlux_neg_instFluxErr
- base_LocalPhotoCalib
- base_LocalPhotoCalibErr
# dipRa not implemented
# dipDec not implemented
# (this may be redundant with RA/DEC as the default centroid is the
Expand All @@ -108,6 +151,9 @@ funcs:
dipChi2:
functor: Column
args: ip_diffim_DipoleFit_chi2dof
isDipole:
functor: Column
args: ip_diffim_DipoleFit_flag_classification
# dipNdata not implemented
totFlux:
functor: LocalNanojansky
Expand All @@ -131,23 +177,53 @@ funcs:
# These values below work but need a new functor for converting pixel^2
# units to arcsec^2 DM-
Ixx:
functor: Column
args: slot_Shape_xx
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_Shape_xx
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
Iyy:
functor: Column
args: slot_Shape_yy
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_Shape_yy
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
Ixy:
functor: Column
args: slot_Shape_xy
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_Shape_xy
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
# Icov not implemented
IxxPsf:
functor: Column
args: slot_PsfShape_xx
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_PsfShape_xx
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
IyyPsf:
functor: Column
args: slot_PsfShape_yy
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_PsfShape_xy
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
IxyPsf:
functor: Column
args: slot_PsfShape_xy
functor: ConvertPixelSqToArcsecondsSq
args:
- slot_PsfShape_xy
- base_LocalWcs_CDMatrix_1_1
- base_LocalWcs_CDMatrix_1_2
- base_LocalWcs_CDMatrix_2_1
- base_LocalWcs_CDMatrix_2_2
# extendedness not implemented
# spuriousness not implemented
1 change: 1 addition & 0 deletions python/lsst/ap/association/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@
from .loadDiaCatalogs import *
from .packageAlerts import *
from .diaPipe import *
from .transformDiaSourceCatalog import *
50 changes: 19 additions & 31 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
AssociationTask,
DiaForcedSourceTask,
LoadDiaCatalogsTask,
MapDiaSourceTask,
make_dia_object_schema,
make_dia_source_schema,
PackageAlertsTask)
Expand All @@ -50,22 +49,16 @@
"DiaPipelineConnections")


class DiaPipelineConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector"),
defaultTemplates={"coaddName": "deep", "fakesType": ""}):
class DiaPipelineConnections(
pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector"),
defaultTemplates={"coaddName": "deep", "fakesType": ""}):
"""Butler connections for DiaPipelineTask.
"""
diaSourceSchema = connTypes.InitInput(
doc="Schema of the DiaSource catalog produced during image "
"differencing",
name="{fakesType}{coaddName}Diff_diaSrc_schema",
storageClass="SourceCatalog",
multiple=True
)
diaSourceCat = connTypes.Input(
doc="Catalog of DiaSources produced during image differencing.",
name="{fakesType}{coaddName}Diff_diaSrc",
storageClass="SourceCatalog",
diaSourceTable = connTypes.Input(
doc="Catalog of calibrated DiaSources.",
name="{fakesType}{coaddName}Diff_diaSrcTable",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
)
diffIm = connTypes.Input(
Expand Down Expand Up @@ -140,9 +133,9 @@ def adjustQuantum(self, datasetRefMap: pipeBase.InputQuantizedConnection):
if ref.dataId["band"] not in self.config.validBands:
raise ValueError(
f"Requested '{ref.dataId['band']}' not in "
"DiaPipelineConfig.validBands. To process bands not in the "
"standard Rubin set (ugrizy) you must add the band to the "
"validBands list in DiaPipelineConfig and add the "
"DiaPipelineConfig.validBands. To process bands not in "
"the standard Rubin set (ugrizy) you must add the band to "
"the validBands list in DiaPipelineConfig and add the "
"appropriate columns to the Apdb schema.")
return super().adjustQuantum(datasetRefMap)

Expand All @@ -169,11 +162,6 @@ class DiaPipelineConfig(pipeBase.PipelineTaskConfig,
"band not on this list, the appropriate band specific columns "
"must be added to the Apdb schema in dax_apdb.",
)
diaSourceDpddifier = pexConfig.ConfigurableField(
target=MapDiaSourceTask,
doc="Task for assigning columns from the raw output of ip_diffim into "
"a schema that more closely resembles the DPDD.",
)
diaCatalogLoader = pexConfig.ConfigurableField(
target=LoadDiaCatalogsTask,
doc="Task to load DiaObjects and DiaSources from the Apdb.",
Expand Down Expand Up @@ -233,8 +221,6 @@ def __init__(self, initInputs=None, **kwargs):
self.apdb = self.config.apdb.apply(
afw_schemas=dict(DiaObject=make_dia_object_schema(),
DiaSource=make_dia_source_schema()))
self.makeSubtask("diaSourceDpddifier",
inputSchema=initInputs["diaSourceSchema"].schema)
self.makeSubtask("diaCatalogLoader")
self.makeSubtask("associator")
self.makeSubtask("diaForcedSource")
Expand All @@ -252,7 +238,12 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
butlerQC.put(outputs, outputRefs)

@pipeBase.timeMethod
def run(self, diaSourceCat, diffIm, exposure, warpedExposure, ccdExposureIdBits):
def run(self,
diaSourceTable,
diffIm,
exposure,
warpedExposure,
ccdExposureIdBits):
"""Process DiaSources and DiaObjects.
Load previous DiaObjects and their DiaSource history. Calibrate the
Expand All @@ -262,7 +253,7 @@ def run(self, diaSourceCat, diffIm, exposure, warpedExposure, ccdExposureIdBits)
Parameters
----------
diaSourceCat : `lsst.afw.table.SourceCatalog`
diaSourceTable : `pandas.DataFrame`
Newly detected DiaSources.
diffIm : `lsst.afw.image.ExposureF`
Difference image exposure in which the sources in ``diaSourceCat``
Expand All @@ -287,16 +278,13 @@ def run(self, diaSourceCat, diffIm, exposure, warpedExposure, ccdExposureIdBits)
self.log.info("Running DiaPipeline...")
# Put the SciencePipelines through a SDMification step and return
# calibrated columns with the expect output database names.
diaSources = self.diaSourceDpddifier.run(diaSourceCat,
diffIm,
return_pandas=True)

# Load the DiaObjects and DiaSource history.
loaderResult = self.diaCatalogLoader.run(diffIm, self.apdb)

# Associate new DiaSources with existing DiaObjects and update
# DiaObject summary statistics using the full DiaSource history.
assocResults = self.associator.run(diaSources,
assocResults = self.associator.run(diaSourceTable,
loaderResult.diaObjects,
loaderResult.diaSources)

Expand Down
8 changes: 6 additions & 2 deletions python/lsst/ap/association/transformDiaSourceCatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

__all__ = ("TransformDiaSourceCatalogConnections",
"TransformDiaSourceCatalogConfig",
"TransformDiaSourceCatalogTask")
"TransformDiaSourceCatalogTask",
"UnpackApdbFlags")

import numpy as np
import os
Expand Down Expand Up @@ -182,8 +183,11 @@ def run(self,
ParquetTable(dataFrame=diaSourceDf),
self.funcs,
dataId=None).df
# The Ra/DecColumn functors preserve the coord_ra/dec original columns.
# Since we don't need these and keeping them causes a DB insert crash
# we drop them from the DataFrame before returning output catalog.
return pipeBase.Struct(
diaSourceTable=df
diaSourceTable=df.drop(columns=["coord_ra", "coord_dec"]),
)

def computeBBoxSizes(self, inputCatalog):
Expand Down
19 changes: 3 additions & 16 deletions tests/test_diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os
import unittest
import pandas as pd

import lsst.afw.image as afwImage
import lsst.afw.table as afwTable
import lsst.pipe.base as pipeBase
from lsst.utils import getPackageDir
import lsst.utils.tests
from unittest.mock import patch, Mock, DEFAULT

Expand All @@ -39,15 +38,6 @@ def _makeDefaultConfig(cls, doPackageAlerts=False):
config = DiaPipelineTask.ConfigClass()
config.apdb.db_url = "sqlite://"
config.apdb.isolation_level = "READ_UNCOMMITTED"
config.diaSourceDpddifier.copyColumns = {"id": "id",
"parent": "parent",
"coord_ra": "coord_ra",
"coord_dec": "coord_dec"}
config.diaSourceDpddifier.flagMap = os.path.join(
getPackageDir("ap_association"),
"tests",
"data",
"test-flag-map.yaml")
config.doPackageAlerts = doPackageAlerts
return config

Expand Down Expand Up @@ -78,21 +68,18 @@ def _testRun(self, doPackageAlerts=False):
"""Test the normal workflow of each ap_pipe step.
"""
config = self._makeDefaultConfig(doPackageAlerts=doPackageAlerts)
task = DiaPipelineTask(
config=config,
initInputs={"diaSourceSchema": self.srcSchema})
task = DiaPipelineTask(config=config)
diffIm = Mock(spec=afwImage.ExposureF)
exposure = Mock(spec=afwImage.ExposureF)
template = Mock(spec=afwImage.ExposureF)
diaSrc = Mock(sepc=afwTable.SourceCatalog)
diaSrc = Mock(sepc=pd.DataFrame)
ccdExposureIdBits = 32

# Each of these subtasks should be called once during diaPipe
# execution. We use mocks here to check they are being executed
# appropriately.
subtasksToMock = [
"diaCatalogLoader",
"diaSourceDpddifier",
"associator",
"diaForcedSource",
]
Expand Down

0 comments on commit 83b8c68

Please sign in to comment.