From 153a4abc5fc96788bb1965db7d41761b6b6be1cd Mon Sep 17 00:00:00 2001 From: Clare Saunders Date: Wed, 3 Dec 2025 14:02:51 -0800 Subject: [PATCH] Turn on correcting for proper motion/parallax --- .../tasks/associatedSourcesTractAnalysis.py | 134 ++++++------------ .../tools/tasks/sourceObjectTableAnalysis.py | 97 ++++++++----- 2 files changed, 108 insertions(+), 123 deletions(-) diff --git a/python/lsst/analysis/tools/tasks/associatedSourcesTractAnalysis.py b/python/lsst/analysis/tools/tasks/associatedSourcesTractAnalysis.py index 8452c4085..7abce9583 100644 --- a/python/lsst/analysis/tools/tasks/associatedSourcesTractAnalysis.py +++ b/python/lsst/analysis/tools/tasks/associatedSourcesTractAnalysis.py @@ -26,15 +26,13 @@ import astropy.units as u import lsst.pex.config as pexConfig import numpy as np -import pandas as pd -from astropy.table import Table, join, vstack +from astropy.table import join, vstack from lsst.daf.butler import DatasetProvenance from lsst.drp.tasks.gbdesAstrometricFit import calculate_apparent_motion from lsst.geom import Box2D from lsst.pipe.base import NoWorkFound from lsst.pipe.base import connectionTypes as ct from lsst.skymap import BaseSkyMap -from smatch import Matcher from ..interfaces import AnalysisBaseConfig, AnalysisBaseConnections, AnalysisPipelineTask @@ -87,14 +85,12 @@ class AssociatedSourcesTractAnalysisConnections( dimensions=("instrument",), isCalibration=True, ) - - astrometricCorrectionCatalogs = ct.Input( - doc="Catalog containing proper motion and parallax.", - name="gbdesAstrometricFit_starCatalog", - storageClass="ArrowNumpyDict", - dimensions=("instrument", "skymap", "tract", "physical_filter"), - multiple=True, + astrometricCorrectionCatalog = ct.Input( + doc="Catalog with proper motion and parallax information.", + name="isolated_star_stellar_motions", + storageClass="ArrowAstropy", deferLoad=True, + dimensions=("instrument", "skymap", "tract"), ) visitTable = ct.Input( @@ -108,7 +104,7 @@ def __init__(self, *, config=None): super().__init__(config=config) if not config.applyAstrometricCorrections: - self.inputs.remove("astrometricCorrectionCatalogs") + self.inputs.remove("astrometricCorrectionCatalog") self.inputs.remove("visitTable") @@ -120,23 +116,16 @@ class AssociatedSourcesTractAnalysisConfig( default=True, doc="Apply proper motion and parallax corrections to source positions.", ) - matchingRadius = pexConfig.Field( - dtype=float, - default=0.2, - doc=( - "Radius in mas with which to match the mean positions of the sources with the positions in the" - " astrometricCorrectionCatalogs." - ), - ) astrometricCorrectionParameters = pexConfig.DictField( keytype=str, itemtype=str, default={ "ra": "ra", "dec": "dec", - "pmRA": "pm_ra", - "pmDec": "pm_dec", + "pmRA": "raPM", + "pmDec": "decPM", "parallax": "parallax", + "isolated_star_id": "isolated_star_id", }, doc="Column names for position and motion parameters in the astrometric correction catalogs.", ) @@ -162,7 +151,7 @@ def callback(self, inputs, dataId): inputs["sourceCatalogs"], inputs["associatedSources"], inputs["associatedSourceIds"], - inputs["astrometricCorrectionCatalogs"], + inputs["astrometricCorrectionCatalog"], inputs["visitTable"], ) @@ -173,7 +162,7 @@ def prepareAssociatedSources( sourceCatalogs, associatedSources, associatedSourceIds, - astrometricCorrectionCatalogs=None, + astrometricCorrectionCatalog=None, visitTable=None, ): """Concatenate source catalogs and join on associated source IDs.""" @@ -194,8 +183,8 @@ def prepareAssociatedSources( sourceCatalogStack = vstack(sourceCatalogs, join_type="exact") dataJoined = join(sourceCatalogStack, associatedSources, keys="sourceId", join_type="inner") - if astrometricCorrectionCatalogs is not None: - self.applyAstrometricCorrections(dataJoined, astrometricCorrectionCatalogs, visitTable) + if astrometricCorrectionCatalog is not None: + self.applyAstrometricCorrections(dataJoined, astrometricCorrectionCatalog, visitTable) # Determine which sources are contained in tract ra = np.radians(dataJoined["coord_ra"]) @@ -211,7 +200,7 @@ def prepareAssociatedSources( return dataFiltered - def applyAstrometricCorrections(self, dataJoined, astrometricCorrectionCatalogs, visitTable): + def applyAstrometricCorrections(self, dataJoined, astrometricCorrectionCatalog, visitTable): """Use proper motion/parallax catalogs to shift positions to median epoch of the visits. @@ -219,8 +208,8 @@ def applyAstrometricCorrections(self, dataJoined, astrometricCorrectionCatalogs, ---------- dataJoined : `astropy.table.Table` Table containing source positions, which will be modified in place. - astrometricCorrectionCatalogs: `dict` [`pd.DataFrame`] - Dictionary keyed by band with proper motion and parallax catalogs. + astrometricCorrectionCatalog : `astropy.table.Table` + Proper motion and parallax catalog. visitTable : `pd.DataFrame` Table containing the MJDs of the visits. """ @@ -229,63 +218,33 @@ def applyAstrometricCorrections(self, dataJoined, astrometricCorrectionCatalogs, # the table was written originally as a DataFrame or something else # Parquet-friendly. visitTable.set_index("visitId", inplace=True) - for band in np.unique(dataJoined["band"]): - bandInd = dataJoined["band"] == band - bandSources = dataJoined[bandInd] - # Add key for sorting below. - bandSources["__index__"] = np.arange(len(bandSources)) - bandSourcesDf = bandSources.to_pandas() - meanRAs = bandSourcesDf.groupby("obj_index")["coord_ra"].aggregate("mean") - meanDecs = bandSourcesDf.groupby("obj_index")["coord_dec"].aggregate("mean") - - bandPMs = astrometricCorrectionCatalogs[band] - with Matcher(meanRAs, meanDecs) as m: - idx, i1, i2, d = m.query_radius( - bandPMs[self.config.astrometricCorrectionParameters["ra"]], - bandPMs[self.config.astrometricCorrectionParameters["dec"]], - (self.config.matchingRadius * u.mas).to(u.degree), - return_indices=True, - ) - - catRAs = np.zeros_like(meanRAs) - catDecs = np.zeros_like(meanRAs) - pmRAs = np.zeros_like(meanRAs) - pmDecs = np.zeros_like(meanRAs) - parallaxes = np.zeros(len(meanRAs)) - catRAs[i1] = bandPMs[self.config.astrometricCorrectionParameters["ra"]][i2] - catDecs[i1] = bandPMs[self.config.astrometricCorrectionParameters["dec"]][i2] - pmRAs[i1] = bandPMs[self.config.astrometricCorrectionParameters["pmRA"]][i2] - pmDecs[i1] = bandPMs[self.config.astrometricCorrectionParameters["pmDec"]][i2] - parallaxes[i1] = bandPMs[self.config.astrometricCorrectionParameters["parallax"]][i2] - - pmDf = Table( - { - "ra": catRAs * u.degree, - "dec": catDecs * u.degree, - "pmRA": pmRAs * u.mas / u.yr, - "pmDec": pmDecs * u.mas / u.yr, - "parallax": parallaxes * u.mas, - "obj_index": meanRAs.index, - } - ) - dataWithPM = join(bandSources, pmDf, keys="obj_index", join_type="left") + # Get the stellar motion catalog into the right format: + for key, value in self.config.astrometricCorrectionParameters.items(): + astrometricCorrectionCatalog.rename_column(value, key) + astrometricCorrectionCatalog["ra"] *= u.degree + astrometricCorrectionCatalog["dec"] *= u.degree + astrometricCorrectionCatalog["pmRA"] *= u.mas / u.yr + astrometricCorrectionCatalog["pmDec"] *= u.mas / u.yr + astrometricCorrectionCatalog["parallax"] *= u.mas + + dataWithPM = join( + dataJoined, + astrometricCorrectionCatalog, + keys="isolated_star_id", + join_type="left", + keep_order=True, + ) - visits = bandSourcesDf["visit"].unique() - mjds = [visitTable.loc[visit]["expMidptMJD"] for visit in visits] - mjdTable = Table( - [astropy.time.Time(mjds, format="mjd", scale="tai"), visits], names=["MJD", "visit"] - ) - dataWithMJD = join(dataWithPM, mjdTable, keys="visit", join_type="left") - # After astropy 7.0, it should be possible to use "keep_order=True" - # in the join and avoid sorting. - dataWithMJD.sort("__index__") - medianMJD = astropy.time.Time(np.median(mjds), format="mjd", scale="tai") + mjds = visitTable.loc[dataWithPM["visit"]]["expMidptMJD"] + times = astropy.time.Time(mjds, format="mjd", scale="tai") + dataWithPM["MJD"] = times + medianMJD = astropy.time.Time(np.median(mjds), format="mjd", scale="tai") - raCorrection, decCorrection = calculate_apparent_motion(dataWithMJD, medianMJD) + raCorrection, decCorrection = calculate_apparent_motion(dataWithPM, medianMJD) - dataJoined["coord_ra"][bandInd] = dataWithMJD["coord_ra"] - raCorrection.value - dataJoined["coord_dec"][bandInd] = dataWithMJD["coord_dec"] - decCorrection.value + dataJoined["coord_ra"] = dataWithPM["coord_ra"] - raCorrection.value + dataJoined["coord_dec"] = dataWithPM["coord_dec"] - decCorrection.value def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs = butlerQC.get(inputRefs) @@ -303,15 +262,12 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): inputs["sourceCatalogs"] = sourceCatalogs if self.config.applyAstrometricCorrections: - astrometricCorrections = {} - for pmCatRef in inputs["astrometricCorrectionCatalogs"]: - pmCat = pmCatRef.get( - parameters={"columns": self.config.astrometricCorrectionParameters.values()} - ) - astrometricCorrections[pmCatRef.dataId["band"]] = pd.DataFrame(pmCat) - inputs["astrometricCorrectionCatalogs"] = astrometricCorrections + astrometricCorrections = inputs["astrometricCorrectionCatalog"].get( + parameters={"columns": self.config.astrometricCorrectionParameters.values()} + ) + inputs["astrometricCorrectionCatalog"] = astrometricCorrections else: - inputs["astrometricCorrectionCatalogs"] = None + inputs["astrometricCorrectionCatalog"] = None inputs["visitTable"] = None dataId = butlerQC.quantum.dataId diff --git a/python/lsst/analysis/tools/tasks/sourceObjectTableAnalysis.py b/python/lsst/analysis/tools/tasks/sourceObjectTableAnalysis.py index 2fad4edf1..c6c622b40 100644 --- a/python/lsst/analysis/tools/tasks/sourceObjectTableAnalysis.py +++ b/python/lsst/analysis/tools/tasks/sourceObjectTableAnalysis.py @@ -33,7 +33,7 @@ import lsst.pipe.base as pipeBase import numpy as np import pandas as pd -from astropy.table import Table, vstack +from astropy.table import Table, join, vstack from lsst.drp.tasks.gbdesAstrometricFit import calculate_apparent_motion from lsst.pipe.base import AlgorithmError from lsst.pipe.base import connectionTypes as ct @@ -165,6 +165,7 @@ class SourceObjectTableAnalysisConnections( "inputName": "sourceTable_visit", "inputCoaddName": "deep", "associatedSourcesInputName": "isolated_star_presources", + "associatedSourceIdsInputName": "isolated_star_presource_associations", "outputName": "sourceObjectTable", }, ): @@ -186,6 +187,16 @@ class SourceObjectTableAnalysisConnections( deferGraphConstraint=True, ) + associatedSourceIds = ct.Input( + doc="Table containing unique ids for the associated sources", + name="{associatedSourceIdsInputName}", + storageClass="ArrowAstropy", + deferLoad=True, + multiple=True, + dimensions=("instrument", "skymap", "tract"), + deferGraphConstraint=True, + ) + refCat = ct.Input( doc="Catalog of positions to use as reference.", name="objectTable", @@ -197,9 +208,9 @@ class SourceObjectTableAnalysisConnections( ) astrometricCorrectionCatalog = ct.Input( doc="Catalog containing proper motions and parallaxes.", - name="gbdesAstrometricFit_starCatalog", - storageClass="ArrowNumpyDict", - dimensions=("instrument", "skymap", "tract", "physical_filter"), + name="isolated_star_stellar_motions", + storageClass="ArrowAstropy", + dimensions=("instrument", "skymap", "tract"), multiple=True, deferLoad=True, ) @@ -289,9 +300,10 @@ class SourceObjectTableAnalysisConfig( default={ "ra": "ra", "dec": "dec", - "pmRA": "pm_ra", - "pmDec": "pm_dec", + "pmRA": "raPM", + "pmDec": "decPM", "parallax": "parallax", + "isolated_star_id": "isolated_star_id", }, doc="Column names for position and motion parameters in the astrometric correction catalogs.", ) @@ -313,6 +325,7 @@ def callback(self, inputs, dataId): dataId["visit"], inputs["data"], inputs["associatedSources"], + inputs["associatedSourceIds"], inputs["refCat"], inputs["visitTable"], inputs["astrometricCorrectionCatalog"], @@ -329,7 +342,7 @@ def applyAstrometricCorrections( isolatedSources : `astropy.table.Table` Catalog of sources which will be modified in place with the astrometric corrections. - astrometricCorrectionCatalog : `pd.DataFrame` + astrometricCorrectionCatalog : `astropy.table.Table` Catalog with proper motion and parallax information. visitTable : `pd.DataFrame` Catalog containing the epoch for the visit corresponding to the @@ -351,35 +364,41 @@ def applyAstrometricCorrections( # edge. targetEpochs[~np.isfinite(targetEpochs)] = sourceMjd - with Matcher(isolatedSources["coord_ra"], isolatedSources["coord_dec"]) as m: - idx, i1, i2, d = m.query_radius( - astrometricCorrectionCatalog[self.config.astrometricCorrectionParameters["ra"]].to_numpy(), - astrometricCorrectionCatalog[self.config.astrometricCorrectionParameters["dec"]].to_numpy(), - self.config.correctionsMatchingRadius / 3600, - return_indices=True, - ) - - params = self.config.astrometricCorrectionParameters - pmDf = Table( - { - "ra": astrometricCorrectionCatalog[params["ra"]].to_numpy() * u.degree, - "dec": astrometricCorrectionCatalog[params["dec"]].to_numpy() * u.degree, - "pmRA": astrometricCorrectionCatalog[params["pmRA"]].to_numpy() * u.mas / u.yr, - "pmDec": astrometricCorrectionCatalog[params["pmDec"]].to_numpy() * u.mas / u.yr, - "parallax": astrometricCorrectionCatalog[params["parallax"]].to_numpy() * u.mas, - } + # Get the stellar motion catalog into the right format: + for key, value in self.config.astrometricCorrectionParameters.items(): + astrometricCorrectionCatalog.rename_column(value, key) + astrometricCorrectionCatalog["ra"] *= u.degree + astrometricCorrectionCatalog["dec"] *= u.degree + astrometricCorrectionCatalog["pmRA"] *= u.mas / u.yr + astrometricCorrectionCatalog["pmDec"] *= u.mas / u.yr + astrometricCorrectionCatalog["parallax"] *= u.mas + + joinedData = join( + isolatedSources[["isolated_star_id"]], + astrometricCorrectionCatalog, + keys="isolated_star_id", + join_type="left", + keep_order=True, + metadata_conflicts="silent", ) + joinedData["MJD"] = astropy.time.Time(sourceMjd, format="mjd", scale="tai") - pmDf["MJD"] = astropy.time.Time(sourceMjd, format="mjd", scale="tai") raCorrection, decCorrection = calculate_apparent_motion( - pmDf[i2], astropy.time.Time(targetEpochs[i1], format="mjd", scale="tai") + joinedData, astropy.time.Time(targetEpochs, format="mjd", scale="tai") ) - isolatedSources["coord_ra"][i1] -= raCorrection.value - isolatedSources["coord_dec"][i1] -= decCorrection.value + isolatedSources["coord_ra"] -= raCorrection.value + isolatedSources["coord_dec"] -= decCorrection.value def prepareAssociatedSources( - self, visit, data, associatedSourceRefs, refCats, visitTable, astrometricCorrectionCatalog + self, + visit, + data, + associatedSourceRefs, + associatedSourceIdRefs, + refCats, + visitTable, + astrometricCorrectionCatalog, ): """Match isolated sources with reference objects and shift the sources to the object epochs if `self.config.applyAstrometricCorrections` is @@ -399,17 +418,27 @@ def prepareAssociatedSources( visitTable : `pd.DataFrame` Catalog containing the epoch for the visit corresponding to the isolatedSources. - astrometricCorrectionCatalog : `pd.DataFrame` + astrometricCorrectionCatalog : `astropy.table.Table` Catalog with proper motion and parallax information. """ isolatedSources = [] + associatedSourceIds = { + ref.dataId["tract"]: ref.get(parameters={"columns": ["isolated_star_id"]}) + for ref in associatedSourceIdRefs + } for associatedSourceRef in associatedSourceRefs: + tract = associatedSourceRef.dataId["tract"] associatedSources = associatedSourceRef.get( - parameters={"columns": ["visit", "source_row", "sourceId"]} + parameters={"columns": ["visit", "sourceId", "obj_index"]} ) + index = associatedSources["obj_index"] + associatedSources["isolated_star_id"] = associatedSourceIds[tract]["isolated_star_id"][index] + visit_sources = associatedSources[associatedSources["visit"] == visit] try: - isolatedSources.append(data.loc[visit_sources["sourceId"]]) + visitData = data.loc[visit_sources["sourceId"]] + visitData["isolated_star_id"] = visit_sources["isolated_star_id"] + isolatedSources.append(visitData) except KeyError: raise IndexMismatchError() isolatedSources = vstack(isolatedSources) @@ -496,8 +525,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): pmCat = astrometricCorrectionCatalogRef.get( parameters={"columns": self.config.astrometricCorrectionParameters.values()} ) - pmCats.append(pd.DataFrame(pmCat)) - inputs["astrometricCorrectionCatalog"] = pd.concat(pmCats) + pmCats.append(pmCat) + inputs["astrometricCorrectionCatalog"] = vstack(pmCats, metadata_conflicts="silent") else: inputs["astrometricCorrectionCatalog"] = None inputs["visitTable"] = None