Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-30024: Implement matching in DRP DiaAssociation task #526

Merged
merged 4 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pipelines/DRP.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ subsets:
- imageDifference
- transformDiaSourceCat
- consolidateDiaSourceTable
- drpAssociation
- forcedPhotDiffim
description: Subset for running image differencing branch of the DRP pipeline

Expand Down
81 changes: 81 additions & 0 deletions python/lsst/pipe/tasks/associationUtils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import healpy as hp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set functions was copied over from the LSSTDESC/dia_pipe directory. Didn't want worry about getting this up to spec for now. I can make a ticket to document it up to standards.

import numpy as np

"""Utilities for interfacing with healpy. Originally implemented in
http://github.com/LSSTDESC/dia_pipe
"""

# Will update docs and the like in DM-30673


def toIndex(nside, ra, dec):
"""Return healpix index given ra,dec in degrees"""
return hp.pixelfunc.ang2pix(nside, np.radians(-dec+90.), np.radians(ra))


def toRaDec(nside, index):
"""Convert from healpix index to ra,dec in degrees"""
vec = hp.pix2ang(nside, index)
dec = np.rad2deg(-vec[0])+90
ra = np.rad2deg(vec[1])
return np.dstack((ra, dec))[0]


def eq2xyz(ra, dec):
"""Convert from equatorial ra,dec in degrees to x,y,z on unit sphere"""
phi = np.deg2rad(ra)
theta = np.pi/2 - np.deg2rad(dec)
sintheta = np.sin(theta)
x = sintheta * np.cos(phi)
y = sintheta * np.sin(phi)
z = np.cos(theta)
return np.array([x, y, z])


def eq2vec(ra, dec):
"""Convert equatorial ra,dec in degrees to x,y,z on the unit sphere parameters"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't have been able to guess just by the name that this is the vectorized version of eq2xyz. Maybe eq2xyzVec or eqVec2xyz

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

ra = np.array(ra, dtype='f8', ndmin=1, copy=False)
dec = np.array(dec, dtype='f8', ndmin=1, copy=False)
if ra.size != dec.size:
raise ValueError("ra,dec not same size: %s,%s" % (ra.size, dec.size))

vec = eq2xyz(ra, dec)

return vec


def convert_spherical(ra, dec):
"""Convert from ra,dec to spherical"""

return np.dstack([np.cos(dec*np.pi/180.)*np.cos(ra*np.pi/180.),
np.cos(dec*np.pi/180.)*np.sin(ra*np.pi/180.),
np.sin(dec*np.pi/180.)])[0]


def convert_spherical_array(array):
"""Convert from ra,dec to spherical from array"""
ra = array[:, 0]
dec = array[:, 1]
return convert_spherical(ra, dec)


def query_disc(nside, ra, dec, max_rad, min_rad=0):
"""
Get the list of healpix indices within max_rad,min_rad given in radians
around ra,dec given in degrees
"""
if np.isscalar(ra):
ra = np.array([ra])
dec = np.array([dec])

pixels = np.unique([hp.query_disc(nside, eq2vec(a, b), max_rad) for (a, b) in zip(ra, dec)])

if min_rad > 0 and len(pixels) > 0:
vec0 = convert_spherical(ra, dec)
min_rad2 = min_rad**2
vecs = convert_spherical_array(toRaDec(nside, pixels))
dsq = np.sum((vecs-vec0)**2, axis=1)
match = dsq > min_rad2
pixels = pixels[match]

return pixels
40 changes: 32 additions & 8 deletions python/lsst/pipe/tasks/drpAssociationPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import pandas as pd

import lsst.geom as geom
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
from lsst.skymap import BaseSkyMap

from .coaddBase import makeSkyInfo
from .simpleAssociation import SimpleAssociationTask

__all__ = ["DrpAssociationPipeTask",
"DrpAssociationPipeConfig",
Expand Down Expand Up @@ -76,7 +78,10 @@ class DrpAssociationPipeConnections(pipeBase.PipelineTaskConnections,
class DrpAssociationPipeConfig(
pipeBase.PipelineTaskConfig,
pipelineConnections=DrpAssociationPipeConnections):
pass
associator = pexConfig.ConfigurableField(
target=SimpleAssociationTask,
doc="Task used to associate DiaSources with DiaObjects.",
)


class DrpAssociationPipeTask(pipeBase.PipelineTask):
Expand All @@ -86,16 +91,31 @@ class DrpAssociationPipeTask(pipeBase.PipelineTask):
ConfigClass = DrpAssociationPipeConfig
_DefaultName = "drpAssociation"

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.makeSubtask('associator')

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)

inputs["tractId"] = butlerQC.quantum.dataId["tract"]
inputs["patchId"] = butlerQC.quantum.dataId["patch"]
tractPatchId, skymapBits = butlerQC.quantum.dataId.pack(
"tract_patch",
returnMaxBits=True)
inputs["tractPatchId"] = tractPatchId
inputs["skymapBits"] = skymapBits

outputs = self.run(**inputs)
butlerQC.put(outputs, outputRefs)

def run(self, diaSourceTables, skyMap, tractId, patchId):
def run(self,
diaSourceTables,
skyMap,
tractId,
patchId,
tractPatchId,
skymapBits):
"""Trim DiaSources to the current Patch and run association.

Takes in the set of DiaSource catalogs that covers the current patch,
Expand Down Expand Up @@ -158,16 +178,20 @@ def run(self, diaSourceTables, skyMap, tractId, patchId):
cutCat = cat[isInTractPatch]
diaSourceHistory.append(cutCat)

# self.associator.addCatalog()

diaSourceHistoryCat = pd.concat(diaSourceHistory)
self.log.info("Found %i DiaSources overlaping patch %i, tract %i"
self.log.info("Found %i DiaSources overlapping patch %i, tract %i"
% (len(diaSourceHistoryCat), patchId, tractId))

assocResult = self.associator.run(diaSourceHistoryCat,
tractPatchId,
skymapBits)

self.log.info("Associated DiaSources into %i DiaObjects" %
len(assocResult.diaObjects))

return pipeBase.Struct(
diaObjectTable=pd.DataFrame(columns=["diaObjectId",
"nDiaSources"]),
assocDiaSourceTable=diaSourceHistoryCat)
diaObjectTable=assocResult.diaObjects,
assocDiaSourceTable=assocResult.assocDiaSources)

def _trimToPatch(self, cat, innerPatchBox, wcs):
"""Create generator testing if a set of DiaSources are in the
Expand Down