Skip to content

Commit

Permalink
Handful of flake8 and doc updates
Browse files Browse the repository at this point in the history
  • Loading branch information
laurenam committed May 25, 2022
1 parent 6934c25 commit 47ad3e3
Showing 1 changed file with 88 additions and 74 deletions.
162 changes: 88 additions & 74 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@


def flattenFilters(df, noDupCols=['coord_ra', 'coord_dec'], camelCase=False, inputBands=None):
"""Flattens a dataframe with multilevel column index
"""Flattens a dataframe with multilevel column index.
"""
newDf = pd.DataFrame()
# band is the level 0 index
Expand Down Expand Up @@ -130,15 +130,16 @@ class WriteObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
outputDataset = 'obj'

def __init__(self, butler=None, schema=None, **kwargs):
# It is a shame that this class can't use the default init for CmdLineTask
# But to do so would require its own special task runner, which is many
# more lines of specialization, so this is how it is for now
# It is a shame that this class can't use the default init for
# CmdLineTask, but to do so would require its own special task
# runner, which is many more lines of specialization, so this is
# how it is for now.
super().__init__(**kwargs)

def runDataRef(self, patchRefList):
"""!
@brief Merge coadd sources from multiple bands. Calls @ref `run` which must be defined in
subclasses that inherit from MergeSourcesTask.
@brief Merge coadd sources from multiple bands. Calls @ref `run` which
must be defined in subclasses that inherit from MergeSourcesTask.
@param[in] patchRefList list of data references for each filter
"""
catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
Expand Down Expand Up @@ -186,12 +187,12 @@ def readCatalog(self, patchRef):
Parameters
----------
patchRef : `lsst.daf.persistence.ButlerDataRef`
Data reference for patch
Data reference for patch.
Returns
-------
Tuple consisting of band name and a dict of catalogs, keyed by
dataset name
dataset name.
"""
band = patchRef.get(self.config.coaddName + "Coadd_filterLabel", immediate=True).bandLabel
catalogDict = {}
Expand All @@ -210,14 +211,14 @@ def run(self, catalogs, tract, patch):
catalogs : `dict`
Mapping from filter names to dict of catalogs.
tract : int
tractId to use for the tractId column
tractId to use for the tractId column.
patch : str
patchId to use for the patchId column
patchId to use for the patchId column.
Returns
-------
catalog : `pandas.DataFrame`
Merged dataframe
Merged dataframe.
"""

dfs = []
Expand Down Expand Up @@ -245,19 +246,21 @@ def write(self, patchRef, catalog):
Parameters
----------
catalog : `ParquetTable`
Catalog to write
Catalog to write.
patchRef : `lsst.daf.persistence.ButlerDataRef`
Data reference for patch
Data reference for patch.
"""
patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
# since the filter isn't actually part of the data ID for the dataset we're saving,
# it's confusing to see it in the log message, even if the butler simply ignores it.
# since the filter isn't actually part of the data ID for the dataset
# we're saving, it's confusing to see it in the log message, even if
# the butler simply ignores it.
mergeDataId = patchRef.dataId.copy()
del mergeDataId["filter"]
self.log.info("Wrote merged catalog: %s", mergeDataId)

def writeMetadata(self, dataRefList):
"""No metadata to write, and not sure how to write it for a list of dataRefs.
"""No metadata to write, and not sure how to write it for a list of
dataRefs.
"""
pass

Expand Down Expand Up @@ -287,7 +290,7 @@ class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,


class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
"""Write source table to parquet
"""Write source table to parquet.
"""
_DefaultName = "writeSourceTable"
ConfigClass = WriteSourceTableConfig
Expand Down Expand Up @@ -676,37 +679,38 @@ def addCalibColumns(self, catalog, exposure, exposureIdInfo, **kwargs):


class PostprocessAnalysis(object):
"""Calculate columns from ParquetTable
"""Calculate columns from ParquetTable.
This object manages and organizes an arbitrary set of computations
on a catalog. The catalog is defined by a
`lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such as a
`deepCoadd_obj` dataset, and the computations are defined by a collection
of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
`lsst.pipe.tasks.parquetTable.ParquetTable` object (or list thereof), such
as a `deepCoadd_obj` dataset, and the computations are defined by a
collection of `lsst.pipe.tasks.functor.Functor` objects (or, equivalently,
a `CompositeFunctor`).
After the object is initialized, accessing the `.df` attribute (which
holds the `pandas.DataFrame` containing the results of the calculations) triggers
computation of said dataframe.
holds the `pandas.DataFrame` containing the results of the calculations)
triggers computation of said dataframe.
One of the conveniences of using this object is the ability to define a desired common
filter for all functors. This enables the same functor collection to be passed to
several different `PostprocessAnalysis` objects without having to change the original
functor collection, since the `filt` keyword argument of this object triggers an
overwrite of the `filt` property for all functors in the collection.
One of the conveniences of using this object is the ability to define a
desired common filter for all functors. This enables the same functor
collection to be passed to several different `PostprocessAnalysis` objects
without having to change the original functor collection, since the `filt`
keyword argument of this object triggers an overwrite of the `filt`
property for all functors in the collection.
This object also allows a list of refFlags to be passed, and defines a set of default
refFlags that are always included even if not requested.
This object also allows a list of refFlags to be passed, and defines a set
of default refFlags that are always included even if not requested.
If a list of `ParquetTable` object is passed, rather than a single one, then the
calculations will be mapped over all the input catalogs. In principle, it should
be straightforward to parallelize this activity, but initial tests have failed
(see TODO in code comments).
If a list of `ParquetTable` object is passed, rather than a single one,
then the calculations will be mapped over all the input catalogs. In
principle, it should be straightforward to parallelize this activity, but
initial tests have failed (see TODO in code comments).
Parameters
----------
parq : `lsst.pipe.tasks.ParquetTable` (or list of such)
Source catalog(s) for computation
Source catalog(s) for computation.
functors : `list`, `dict`, or `lsst.pipe.tasks.functors.CompositeFunctor`
Computations to do (functors that act on `parq`).
Expand All @@ -715,19 +719,19 @@ class PostprocessAnalysis(object):
If a list, the column keys will come from the
`.shortname` attribute of each functor.
filt : `str` (optional)
filt : `str`, optional
Filter in which to calculate. If provided,
this will overwrite any existing `.filt` attribute
of the provided functors.
flags : `list` (optional)
flags : `list`, optional
List of flags (per-band) to include in output table.
Taken from the `meas` dataset if applied to a multilevel Object Table.
refFlags : `list` (optional)
refFlags : `list`, optional
List of refFlags (only reference band) to include in output table.
forcedFlags : `list` (optional)
forcedFlags : `list`, optional
List of flags (per-band) to include in output table.
Taken from the ``forced_src`` dataset if applied to a
multilevel Object Table. Intended for flags from measurement plugins
Expand Down Expand Up @@ -787,7 +791,8 @@ def compute(self, dropna=False, pool=None):
if pool is None:
dflist = [self.func(parq, dropna=dropna) for parq in self.parq]
else:
# TODO: Figure out why this doesn't work (pyarrow pickling issues?)
# TODO: Figure out why this doesn't work (pyarrow pickling
# issues?)
dflist = pool.map(functools.partial(self.func, dropna=dropna), self.parq)
self._df = pd.concat(dflist)
else:
Expand Down Expand Up @@ -888,11 +893,12 @@ class TransformCatalogBaseTask(pipeBase.PipelineTask):
- base_PixelFlags_flag_inexact_psfCenter
- detect_isPrimary
The names for each entry under "func" will become the names of columns in the
output dataset. All the functors referenced are defined in `lsst.pipe.tasks.functors`.
Positional arguments to be passed to each functor are in the `args` list,
and any additional entries for each column other than "functor" or "args" (e.g., `'filt'`,
`'dataset'`) are treated as keyword arguments to be passed to the functor initialization.
The names for each entry under "func" will become the names of columns in
the output dataset. All the functors referenced are defined in
`lsst.pipe.tasks.functors`. Positional arguments to be passed to each
functor are in the `args` list, and any additional entries for each column
other than "functor" or "args" (e.g., `'filt'`, `'dataset'`) are treated as
keyword arguments to be passed to the functor initialization.
The "flags" entry is the default shortcut for `Column` functors.
All columns listed under "flags" will be copied to the output table
Expand All @@ -912,7 +918,6 @@ class TransformCatalogBaseTask(pipeBase.PipelineTask):
This task uses the `lsst.pipe.tasks.postprocess.PostprocessAnalysis` object
to organize and excecute the calculations.
"""
@property
def _DefaultName(self):
Expand Down Expand Up @@ -969,8 +974,7 @@ def run(self, parq, funcs=None, dataId=None, band=None):
Returns
------
`pandas.DataFrame`
df : `pandas.DataFrame`
"""
self.log.info("Transforming/standardizing the source table dataId: %s", dataId)

Expand Down Expand Up @@ -1108,10 +1112,10 @@ class TransformObjectCatalogTask(TransformCatalogBaseTask):
"""Produce a flattened Object Table to match the format specified in
sdm_schemas.
Do the same set of postprocessing calculations on all bands
Do the same set of postprocessing calculations on all bands.
This is identical to `TransformCatalogBaseTask`, except for that it does the
specified functor calculations for all filters present in the
This is identical to `TransformCatalogBaseTask`, except for that it does
the specified functor calculations for all filters present in the
input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
by the YAML file will be superceded.
"""
Expand Down Expand Up @@ -1252,9 +1256,9 @@ class ConsolidateObjectTableConfig(pipeBase.PipelineTaskConfig,


class ConsolidateObjectTableTask(CmdLineTask, pipeBase.PipelineTask):
"""Write patch-merged source tables to a tract-level parquet file
"""Write patch-merged source tables to a tract-level parquet file.
Concatenates `objectTable` list into a per-visit `objectTable_tract`
Concatenates `objectTable` list into a per-visit `objectTable_tract`.
"""
_DefaultName = "consolidateObjectTable"
ConfigClass = ConsolidateObjectTableConfig
Expand Down Expand Up @@ -1567,7 +1571,7 @@ def _makeVisitSummarySchema(self):


class VisitDataIdContainer(DataIdContainer):
"""DataIdContainer that groups sensor-level id's by visit
"""DataIdContainer that groups sensor-level ids by visit.
"""

def makeDataRefList(self, namespace):
Expand All @@ -1578,7 +1582,8 @@ def makeDataRefList(self, namespace):
Parameters
----------
namespace : `argparse.Namespace`
Namespace used by `lsst.pipe.base.CmdLineTask` to parse command line arguments
Namespace used by `lsst.pipe.base.CmdLineTask` to parse command
line arguments.
"""
# Group by visits
visitRefs = defaultdict(list)
Expand Down Expand Up @@ -1698,18 +1703,21 @@ class MakeCcdVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
ConfigClass = MakeCcdVisitTableConfig

def run(self, visitSummaryRefs):
""" Make a table of ccd information from the `visitSummary` catalogs.
"""Make a table of ccd information from the `visitSummary` catalogs.
Parameters
----------
visitSummaryRefs : `list` of `lsst.daf.butler.DeferredDatasetHandle`
List of DeferredDatasetHandles pointing to exposure catalogs with
per-detector summary information.
Returns
-------
result : `lsst.pipe.Base.Struct`
Results struct with attribute:
- `outputCatalog`
Catalog of ccd and visit information.
Results struct with attribute:
``outputCatalog``
Catalog of ccd and visit information.
"""
ccdEntries = []
for visitSummaryRef in visitSummaryRefs:
Expand All @@ -1726,9 +1734,10 @@ def run(self, visitSummaryRefs):
'psfStarDeltaSizeMedian', 'psfStarDeltaSizeScatter',
'psfStarScaledDeltaSizeScatter']
ccdEntry = summaryTable[selectColumns].to_pandas().set_index('id')
# 'visit' is the human readible visit number
# 'visitId' is the key to the visitId table. They are the same
# Technically you should join to get the visit from the visit table
# 'visit' is the human readable visit number.
# 'visitId' is the key to the visitId table. They are the same.
# Technically you should join to get the visit from the visit
# table.
ccdEntry = ccdEntry.rename(columns={"visit": "visitId"})
dataIds = [DataCoordinate.standardize(visitSummaryRef.dataId, detector=id) for id in
summaryTable['id']]
Expand Down Expand Up @@ -1758,8 +1767,9 @@ def run(self, visitSummaryRefs):
ccdEntry['urcdec'] = summaryTable['decCorners'][:, 2]
ccdEntry['lrcra'] = summaryTable['raCorners'][:, 3]
ccdEntry['lrcdec'] = summaryTable['decCorners'][:, 3]
# TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY, and flags,
# and decide if WCS, and llcx, llcy, ulcx, ulcy, etc. values are actually wanted.
# TODO: DM-30618, Add raftName, nExposures, ccdTemp, binX, binY,
# and flags, and decide if WCS, and llcx, llcy, ulcx, ulcy, etc.
# values are actually wanted.
ccdEntries.append(ccdEntry)

outputCatalog = pd.concat(ccdEntries)
Expand Down Expand Up @@ -1798,18 +1808,19 @@ class MakeVisitTableTask(CmdLineTask, pipeBase.PipelineTask):
ConfigClass = MakeVisitTableConfig

def run(self, visitSummaries):
""" Make a table of visit information from the `visitSummary` catalogs
"""Make a table of visit information from the `visitSummary` catalogs.
Parameters
----------
visitSummaries : list of `lsst.afw.table.ExposureCatalog`
visitSummaries : `list` of `lsst.afw.table.ExposureCatalog`
List of exposure catalogs with per-detector summary information.
Returns
-------
result : `lsst.pipe.Base.Struct`
Results struct with attribute:
``outputCatalog``
Catalog of visit information.
``outputCatalog``
Catalog of visit information.
"""
visitEntries = []
for visitSummary in visitSummaries:
Expand Down Expand Up @@ -1840,8 +1851,9 @@ def run(self, visitSummaries):
visitEntry["obsStartMJD"] = visitEntry["expMidptMJD"] - 0.5 * expTime_days
visitEntries.append(visitEntry)

# TODO: DM-30623, Add programId, exposureType, cameraTemp, mirror1Temp, mirror2Temp,
# mirror3Temp, domeTemp, externalTemp, dimmSeeing, pwvGPS, pwvMW, flags, nExposures
# TODO: DM-30623, Add programId, exposureType, cameraTemp,
# mirror1Temp, mirror2Temp, mirror3Temp, domeTemp, externalTemp,
# dimmSeeing, pwvGPS, pwvMW, flags, nExposures.

outputCatalog = pd.DataFrame(data=visitEntries)
outputCatalog.set_index('visitId', inplace=True, verify_integrity=True)
Expand Down Expand Up @@ -1883,7 +1895,7 @@ class WriteForcedSourceTableConfig(pipeBase.PipelineTaskConfig,


class WriteForcedSourceTableTask(pipeBase.PipelineTask):
"""Merge and convert per-detector forced source catalogs to parquet
"""Merge and convert per-detector forced source catalogs to parquet.
Because the predecessor ForcedPhotCcdTask operates per-detector,
per-tract, (i.e., it has tract in its dimensions), detectors
Expand Down Expand Up @@ -2020,7 +2032,8 @@ def run(self, inputCatalogs, referenceCatalog, funcs=None, dataId=None, band=Non
outputCatalog.index.rename(self.config.keyRef, inplace=True)
# Add config.keyRef to the column list
outputCatalog.reset_index(inplace=True)
# set the forcedSourceId to the index. This is specified in the ForcedSource.yaml
# Set the forcedSourceId to the index. This is specified in the
# ForcedSource.yaml
outputCatalog.set_index("forcedSourceId", inplace=True, verify_integrity=True)
# Rename it to the config.key
outputCatalog.index.rename(self.config.key, inplace=True)
Expand Down Expand Up @@ -2056,14 +2069,15 @@ class ConsolidateTractConfig(pipeBase.PipelineTaskConfig,

class ConsolidateTractTask(CmdLineTask, pipeBase.PipelineTask):
"""Concatenate any per-patch, dataframe list into a single
per-tract DataFrame
per-tract DataFrame.
"""
_DefaultName = 'ConsolidateTract'
ConfigClass = ConsolidateTractConfig

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
# Not checking at least one inputCatalog exists because that'd be an empty QG
# Not checking at least one inputCatalog exists because that'd be an
# empty QG.
self.log.info("Concatenating %s per-patch %s Tables",
len(inputs['inputCatalogs']),
inputRefs.inputCatalogs[0].datasetType.name)
Expand Down

0 comments on commit 47ad3e3

Please sign in to comment.