Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-17029: Update LoadReferenceObjectsTask to output fluxes in nanojansky #154

Merged
merged 7 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.sconsign.dblite
_build.*
config.log
bin/*.py
doc/*.tag
doc/doxygen.conf
doc/html
Expand Down
3 changes: 3 additions & 0 deletions bin.src/SConscript
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- python -*-
from lsst.sconsUtils import scripts
scripts.BasicSConscript.shebang()
138 changes: 138 additions & 0 deletions bin.src/convert_refcat_to_nJy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python
# This file is part of meas_algorithms.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Convert old HTM reference catalogs to use nJy for fluxes, instead of Jy.

This will process all .fits files in the given directory, converting the units
of any `_flux`, `_fluxSigma`, and `_fluxErr` fields from Jy->nJy and making
their units field in the schema `nJy`, then overwriting the files with the
updated values. Also replaces `_fluxSigma` with `_fluxErr` in the schema,
per RFC-333. If all flux fields in the refcat schema have units of `'nJy'`,
the files are not modified.

Many of our old reference catalogs have no units for their fluxes: we assume
(as the algorithmic code did) that these are all in Jy units.

By default, only print the files and fields that will be modified, but do not
write the output.

If you are processing a large number of files (e.g. ps1_pv3), we recommend
capturing stdout to a log file, and using the -n8 option to parallelize it.
"""
import os.path
import glob

import concurrent.futures
import itertools

import lsst.afw.table
from lsst.meas.algorithms import DatasetConfig
from lsst.meas.algorithms.loadReferenceObjects import convertToNanojansky, hasNanojanskyFluxUnits
from lsst.meas.algorithms.ingestIndexReferenceTask import addRefCatMetadata
import lsst.log


def is_old_schema(config, filename):
"""Check whether this file's schema has "old-style" fluxes."""
catalog = lsst.afw.table.SimpleCatalog.readFits(filename)
return (config.format_version == 0) and (not hasNanojanskyFluxUnits(catalog.schema))


def process_one(filename, write=False, quiet=False):
"""Convert one file in-place from Jy (or no units) to nJy fluxes.

Parameters
----------
filename : `str`
The file to convert.
write : `bool`, optional
Write the converted catalog out, overwriting the read in catalog?
quiet : `bool`, optional
Do not print messages about files read/written or fields found?
"""
log = lsst.log.Log()
if quiet:
log.setLevel(lsst.log.WARN)

log.info(f"Reading: {filename}")
catalog = lsst.afw.table.SimpleCatalog.readFits(filename)

output = convertToNanojansky(catalog, log, doConvert=write)

if write:
addRefCatMetadata(output)
output.writeFits(filename)
log.info(f"Wrote: {filename}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love f-strings. So compact.



def main():
import argparse
import sys

class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter):
pass

parser = argparse.ArgumentParser(description=__doc__,
formatter_class=CustomFormatter)
parser.add_argument("path",
help="Directory (written by IngestIndexedReferenceTask) containing the"
" reference catalogs to overwrite."
" All files with a `.fits` extension in the directory will be processed,"
" including `master_schema.fits`, which must exist.")
parser.add_argument('-n', '--nprocesses', default=1, type=int,
help="Number of processes to use when reading and writing files.")
parser.add_argument('--write', action="store_true",
help="Write the corrected files (default just prints what would have changed).")
parser.add_argument('--quiet', action="store_true",
help="Be less verbose about what files and fields are being converted.")
args = parser.parse_args()

schema_file = os.path.join(args.path, "master_schema.fits")
if not os.path.isfile(schema_file):
print("Error: Cannot find master_schema.fits in supplied path:", args.path)
sys.exit(-1)
configPath = os.path.join(args.path, 'config.py')
config = DatasetConfig()
config.load(configPath)
if not is_old_schema(config, schema_file):
print("Catalog does not contain old-style fluxes; nothing to convert.")
sys.exit(0)

files = glob.glob(os.path.join(args.path, "*.fits"))
with concurrent.futures.ProcessPoolExecutor(max_workers=args.nprocesses) as executor:
futures = executor.map(process_one, files, itertools.repeat(args.write), itertools.repeat(args.quiet))
# we have to at least loop over the futures, otherwise exceptions will be lost
for future in futures:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how this helps exceptions not getting lost.

Copy link
Contributor Author

@parejkoj parejkoj Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's part of how exception handling in futures behaves. I reworded it as follows: # we have to at least loop over the futures, otherwise exceptions will be lost

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

pass

if args.write:
config.format_version = 1
# update the docstring to annotate the file.
msg = "\nUpdated refcat from version 0->1 to have nJy flux units via convert_refcat_to_nJy.py"
config._fields['format_version'].doc += msg
config.save(configPath)
if not args.quiet:
print("Added `format_version=1` to config.py")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.info no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a Task, and all of these print statements are outside the futures loop, so I think just print is fine here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point it's not a Task.



if __name__ == "__main__":
main()
50 changes: 45 additions & 5 deletions python/lsst/meas/algorithms/ingestIndexReferenceTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,41 @@
import math

import astropy.time
import astropy.units as u
import numpy as np

import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.geom
import lsst.afw.table as afwTable
from lsst.afw.image import fluxFromABMag, fluxErrFromABMagErr
from lsst.daf.base import PropertyList
from lsst.afw.image import fluxErrFromABMagErr
from .indexerRegistry import IndexerRegistry
from .readTextCatalogTask import ReadTextCatalogTask
from .loadReferenceObjects import LoadReferenceObjectsTask

_RAD_PER_DEG = math.pi / 180
_RAD_PER_MILLIARCSEC = _RAD_PER_DEG/(3600*1000)

# The most recent Indexed Reference Catalog on-disk format version.
LATEST_FORMAT_VERSION = 1


def addRefCatMetadata(catalog):
"""Add metadata to a new (not yet populated) reference catalog.

Parameters
----------
catalog : `lsst.afw.table.SimpleCatalog`
Catalog to which metadata should be attached. Will be modified
in-place.
"""
md = catalog.getMetadata()
if md is None:
md = PropertyList()
md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
catalog.setMetadata(md)
Copy link
Contributor Author

@parejkoj parejkoj Apr 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TallJimbo : would this overwrite the rest of the metadata in the catalog? Should I change it to do this first?

    md = catalog.getMetadata()
    if md is None:
        md = PropertyList()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would indeed be safer. I recall thinking that the initial PropertySet was going to be None in the cases that mattered here, but I could easily believe that was only the case in the first place I used this function, and then I forgot about it in the next place.



class IngestReferenceRunner(pipeBase.TaskRunner):
"""Task runner for the reference catalog ingester
Expand Down Expand Up @@ -73,6 +94,16 @@ def run(self, parsedCmd):


class DatasetConfig(pexConfig.Config):
"""The description of the on-disk storage format for the persisted
reference catalog.
"""
format_version = pexConfig.Field(
dtype=int,
doc="Version number of the persisted on-disk storage format."
"\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
"\nVersion 1 had nJy as flux units.",
default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
)
ref_dataset_name = pexConfig.Field(
dtype=str,
default='cal_ref_cat',
Expand Down Expand Up @@ -203,6 +234,10 @@ class IngestIndexedReferenceConfig(pexConfig.Config):
doc='Extra columns to add to the reference catalog.'
)

def setDefaults(self):
# Newly ingested reference catalogs always have the latest format_version.
self.dataset_config.format_version = LATEST_FORMAT_VERSION

def validate(self):
pexConfig.Config.validate(self)

Expand Down Expand Up @@ -386,12 +421,15 @@ def _setFlux(self, record, row, key_map):
Map of catalog keys.
"""
for item in self.config.mag_column_list:
record.set(key_map[item+'_flux'], fluxFromABMag(row[item]))
record.set(key_map[item+'_flux'], (row[item]*u.ABmag).to_value(u.nJy))
if len(self.config.mag_err_column_map) > 0:
for err_key in self.config.mag_err_column_map.keys():
error_col_name = self.config.mag_err_column_map[err_key]
record.set(key_map[err_key+'_fluxErr'],
fluxErrFromABMagErr(row[error_col_name], row[err_key]))
# TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
fluxErr = fluxErrFromABMagErr(row[error_col_name], row[err_key])
if fluxErr is not None:
fluxErr *= 1e9
record.set(key_map[err_key+'_fluxErr'], fluxErr)

def _setProperMotion(self, record, row, key_map):
"""Set proper motion fields in a record of an indexed catalog.
Expand Down Expand Up @@ -495,7 +533,9 @@ def getCatalog(self, dataId, schema):
"""
if self.butler.datasetExists('ref_cat', dataId=dataId):
return self.butler.get('ref_cat', dataId=dataId)
return afwTable.SimpleCatalog(schema)
catalog = afwTable.SimpleCatalog(schema)
addRefCatMetadata(catalog)
return catalog

def makeSchema(self, dtype):
"""Make the schema to use in constructing the persisted catalogs.
Expand Down
27 changes: 23 additions & 4 deletions python/lsst/meas/algorithms/loadIndexedReferenceObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

__all__ = ["LoadIndexedReferenceObjectsConfig", "LoadIndexedReferenceObjectsTask"]

from .loadReferenceObjects import hasNanojanskyFluxUnits, convertToNanojansky, getFormatVersionFromRefCat
from lsst.meas.algorithms import getRefFluxField, LoadReferenceObjectsTask, LoadReferenceObjectsConfig
import lsst.afw.table as afwTable
import lsst.geom
Expand Down Expand Up @@ -53,8 +54,8 @@ class LoadIndexedReferenceObjectsTask(LoadReferenceObjectsTask):

def __init__(self, butler, *args, **kwargs):
LoadReferenceObjectsTask.__init__(self, *args, **kwargs)
dataset_config = butler.get("ref_cat_config", name=self.config.ref_dataset_name, immediate=True)
self.indexer = IndexerRegistry[dataset_config.indexer.name](dataset_config.indexer.active)
self.dataset_config = butler.get("ref_cat_config", name=self.config.ref_dataset_name, immediate=True)
self.indexer = IndexerRegistry[self.dataset_config.indexer.name](self.dataset_config.indexer.active)
# This needs to come from the loader config, not the dataset_config since directory aliases can
# change the path where the shards are found.
self.ref_dataset_name = self.config.ref_dataset_name
Expand All @@ -67,8 +68,8 @@ def loadSkyCircle(self, ctrCoord, radius, filterName=None, epoch=None):
refCat = self.butler.get('ref_cat',
dataId=self.indexer.makeDataId('master_schema', self.ref_dataset_name),
immediate=True)
self._addFluxAliases(refCat.schema)
fluxField = getRefFluxField(schema=refCat.schema, filterName=filterName)

# load the catalog, one shard at a time
for shard, isOnBoundary in zip(shards, isOnBoundaryList):
if shard is None:
continue
Expand All @@ -77,13 +78,31 @@ def loadSkyCircle(self, ctrCoord, radius, filterName=None, epoch=None):
else:
refCat.extend(shard)

# apply proper motion corrections
if epoch is not None and "pm_ra" in refCat.schema:
# check for a catalog in a non-standard format
if isinstance(refCat.schema["pm_ra"].asKey(), lsst.afw.table.KeyAngle):
self.applyProperMotions(refCat, epoch)
else:
self.log.warn("Catalog pm_ra field is not an Angle; not applying proper motion")

# update version=0 style refcats to have nJy fluxes
if self.dataset_config.format_version == 0 or not hasNanojanskyFluxUnits(refCat.schema):
self.log.warn("Found version 0 reference catalog with old style units in schema.")
self.log.warn("run `meas_algorithms/bin/convert_refcat_to_nJy.py` to convert fluxes to nJy.")
self.log.warn("See RFC-575 for more details.")
refCat = convertToNanojansky(refCat, self.log)
else:
# For version >= 1, the version should be in the catalog header,
# too, and should be consistent with the version in the config.
catVersion = getFormatVersionFromRefCat(refCat)
if catVersion != self.dataset_config.format_version:
raise RuntimeError(f"Format version in reference catalog ({catVersion}) does not match"
f" format_version field in config ({self.dataset_config.format_version})")

self._addFluxAliases(refCat.schema)
fluxField = getRefFluxField(schema=refCat.schema, filterName=filterName)

# add and initialize centroid and hasCentroid fields (these are
# added after loading to avoid wasting space in the saved catalogs)
# the new fields are automatically initialized to (nan, nan) and
Expand Down