Skip to content

Commit

Permalink
Modernize docs in ingest task...
Browse files Browse the repository at this point in the history
IngestIndexedReferenceTask.
I also took the liberty of changing the data returned from its `run`
method from a `Struct` containing 'result=None' to an empty `Struct`,
since `run` did not, and still does not, return anything useful.
  • Loading branch information
r-owen committed Sep 5, 2018
1 parent c02e7f0 commit dac5aba
Showing 1 changed file with 123 additions and 64 deletions.
187 changes: 123 additions & 64 deletions python/lsst/meas/algorithms/ingestIndexReferenceTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,34 @@


class IngestReferenceRunner(pipeBase.TaskRunner):
"""!Task runner for the reference catalog ingester
"""Task runner for the reference catalog ingester
Data IDs are ignored so the runner should just run the task on the parsed command.
"""

def run(self, parsedCmd):
"""!Run the task.
"""Run the task.
Several arguments need to be collected to send on to the task methods.
@param[in] parsedCmd Parsed command including command line arguments.
@returns Struct containing the result of the indexing.
Parameters
----------
parsedCmd : `argparse.Namespace`
Parsed command.
Returns
-------
results : `lsst.pipe.base.Struct` or `None`
A empty struct if self.doReturnResults, else None
"""
files = parsedCmd.files
butler = parsedCmd.butler
task = self.TaskClass(config=self.config, log=self.log, butler=butler)
task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)

result = task.createIndexedCatalog(files)
task.createIndexedCatalog(files)
if self.doReturnResults:
return pipeBase.Struct(
result=result,
)
return pipeBase.Struct()


class DatasetConfig(pexConfig.Config):
Expand Down Expand Up @@ -230,12 +236,12 @@ def assertAllOrNone(*names):


class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
"""!Class for both producing indexed reference catalogs and for loading them.
"""Class for producing and loading indexed reference catalogs.
This implements an indexing scheme based on hierarchical triangular mesh (HTM).
The term index really means breaking the catalog into localized chunks called
shards. In this case each shard contains the entries from the catalog in a single
HTM trixel
This implements an indexing scheme based on hierarchical triangular
mesh (HTM). The term index really means breaking the catalog into
localized chunks called shards. In this case each shard contains
the entries from the catalog in a single HTM trixel
For producing catalogs this task makes the following assumptions
about the input catalogs:
Expand All @@ -245,6 +251,11 @@ class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
between RA and Dec, or between PM RA and PM Dec. Gaia is a well
known example of a catalog that has such terms, and thus should not
be ingested with this task.
Parameters
----------
butler : `lsst.daf.persistence.Butler`
Data butler for reading and writing catalogs
"""
canMultiprocess = False
ConfigClass = IngestIndexedReferenceConfig
Expand All @@ -255,30 +266,30 @@ class IngestIndexedReferenceTask(pipeBase.CmdLineTask):

@classmethod
def _makeArgumentParser(cls):
"""Create an argument parser
"""Create an argument parser.
This overrides the original because we need the file arguments
This returns a standard parser with an extra "files" argument.
"""
parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
parser.add_argument("files", nargs="+", help="Names of files to index")
return parser

def __init__(self, *args, **kwargs):
"""!Constructor for the HTM indexing engine
@param[in] butler dafPersistence.Butler object for reading and writing catalogs
"""
self.butler = kwargs.pop('butler')
pipeBase.Task.__init__(self, *args, **kwargs)
self.indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
self.config.dataset_config.indexer.active)
self.makeSubtask('file_reader')

def createIndexedCatalog(self, files):
"""!Index a set of files comprising a reference catalog. Outputs are persisted in the
data repository.
"""Index a set of files comprising a reference catalog.
Outputs are persisted in the data repository.
@param[in] files A list of file names to read.
Parameters
----------
files : `list`
A list of file paths to read.
"""
rec_num = 0
first = True
Expand Down Expand Up @@ -307,26 +318,36 @@ def createIndexedCatalog(self, files):

@staticmethod
def computeCoord(row, ra_name, dec_name):
"""!Create an ICRS SpherePoint from a np.array row
@param[in] row dict like object with ra/dec info in degrees
@param[in] ra_name name of RA key
@param[in] dec_name name of Dec key
@returns ICRS SpherePoint constructed from the RA/Dec values
"""Create an ICRS coord. from a row of a catalog being ingested.
Parameters
----------
row : structured `numpy.array`
Row from catalog being ingested.
ra_name : `str`
Name of RA key in catalog being ingested.
dec_name : `str`
Name of Dec key in catalog being ingested.
Returns
-------
coord : `lsst.geom.SpherePoint`
ICRS coordinate.
"""
return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)

def _setCoordErr(self, record, row, key_map):
"""Set coordinate error from the input
"""Set coordinate error in a record of an indexed catalog.
The errors are read from the specified columns, and installed
in the appropriate columns of the output.
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Record to modify.
row : `dict`-like
Row from numpy table.
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
Expand All @@ -335,10 +356,16 @@ def _setCoordErr(self, record, row, key_map):
record.set(key_map["coord_decErr"], row[self.config.dec_err_name]*_RAD_PER_DEG)

def _setFlags(self, record, row, key_map):
"""!Set the flags for a record. Relies on the _flags class attribute
@param[in,out] record SimpleCatalog record to modify
@param[in] row dict like object containing flag info
@param[in] key_map Map of catalog keys to use in filling the record
"""Set flags in an output record
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
names = record.schema.getNames()
for flag in self._flags:
Expand All @@ -347,10 +374,16 @@ def _setFlags(self, record, row, key_map):
record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))

def _setMags(self, record, row, key_map):
"""!Set the flux records from the input magnitudes
@param[in,out] record SimpleCatalog record to modify
@param[in] row dict like object containing magnitude values
@param[in] key_map Map of catalog keys to use in filling the record
"""Set flux fields in a record of an indexed catalog.
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
for item in self.config.mag_column_list:
record.set(key_map[item+'_flux'], fluxFromABMag(row[item]))
Expand All @@ -361,7 +394,7 @@ def _setMags(self, record, row, key_map):
fluxErrFromABMagErr(row[error_col_name], row[err_key]))

def _setProperMotion(self, record, row, key_map):
"""Set the proper motions from the input
"""Set proper motion fields in a record of an indexed catalog.
The proper motions are read from the specified columns,
scaled appropriately, and installed in the appropriate
Expand All @@ -370,9 +403,9 @@ def _setProperMotion(self, record, row, key_map):
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Record to modify.
row : `dict`-like
Row from numpy table.
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
Expand All @@ -387,16 +420,22 @@ def _setProperMotion(self, record, row, key_map):
record.set(key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)

def _epochToMjdTai(self, nativeEpoch):
"""Convert an epoch in native format to TAI MJD (a float)
"""Convert an epoch in native format to TAI MJD (a float).
"""
return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
scale=self.config.epoch_scale).tai.mjd

def _setExtra(self, record, row, key_map):
"""!Copy the extra column information to the record
@param[in,out] record SimpleCatalog record to modify
@param[in] row dict like object containing the column values
@param[in] key_map Map of catalog keys to use in filling the record
"""Set extra data fields in a record of an indexed catalog.
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
for extra_col in self.config.extra_col_names:
value = row[extra_col]
Expand All @@ -412,12 +451,18 @@ def _setExtra(self, record, row, key_map):
record.set(key_map[extra_col], value)

def _fillRecord(self, record, row, rec_num, key_map):
"""!Fill a record to put in the persisted indexed catalogs
"""Fill a record in an indexed catalog to be persisted.
@param[in,out] record afwTable.SimpleRecord in a reference catalog to fill.
@param[in] row A row from a numpy array constructed from the input catalogs.
@param[in] rec_num Starting integer to increment for the unique id
@param[in] key_map Map of catalog keys to use in filling the record
Parameters
----------
record : `lsst.afw.table.SimpleRecord`
Row from indexed catalog to modify.
row : structured `numpy.array`
Row from catalog being ingested.
rec_num : `int`
Starting integer to increment for the unique id
key_map : `dict` mapping `str` to `lsst.afw.table.Key`
Map of catalog keys.
"""
record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
if self.config.id_name:
Expand All @@ -434,25 +479,39 @@ def _fillRecord(self, record, row, rec_num, key_map):
return rec_num

def getCatalog(self, dataId, schema):
"""!Get a catalog from the butler or create it if it doesn't exist
"""Get a catalog from the butler or create it if it doesn't exist.
@param[in] dataId Identifier for catalog to retrieve
@param[in] schema Schema to use in catalog creation if the butler can't get it
@returns table (an lsst.afw.table.SimpleCatalog) for the specified identifier
Parameters
----------
dataId : `dict`
Identifier for catalog to retrieve
schema : `lsst.afw.table.Schema`
Schema to use in catalog creation if the butler can't get it
Returns
-------
catalog : `lsst.afw.table.SimpleCatalog`
The catalog specified by `dataId`
"""
if self.butler.datasetExists('ref_cat', dataId=dataId):
return self.butler.get('ref_cat', dataId=dataId)
return afwTable.SimpleCatalog(schema)

def makeSchema(self, dtype):
"""!Make the schema to use in constructing the persisted catalogs.
@param[in] dtype A np.dtype describing the type of each entry in
config.extra_col_names.
"""Make the schema to use in constructing the persisted catalogs.
@returns a pair of items:
- The schema for the output source catalog.
- A map of catalog keys to use in filling the record
Parameters
----------
dtype : `numpy.dtype`
Data type describing each entry in ``config.extra_col_names``
for the catalogs being ingested.
Returns
-------
schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
A tuple containing two items:
- The schema for the output source catalog.
- A map of catalog keys to use in filling the record
"""
self.config.validate() # just to be sure

Expand Down

0 comments on commit dac5aba

Please sign in to comment.