Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-23074: Make the schema of the output Object parquet files input-independent #343

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 22 additions & 6 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ def flattenFilters(df, filterDict, noDupCols=['coord_ra', 'coord_dec'], camelCas
"""
newDf = pd.DataFrame()
for filt, filtShort in filterDict.items():
try:
subdf = df[filt]
except KeyError:
continue
subdf = df[filt]
columnFormat = '{0}{1}' if camelCase else '{0}_{1}'
newColumns = {c: columnFormat.format(filtShort, c)
for c in subdf.columns if c not in noDupCols}
Expand Down Expand Up @@ -414,6 +411,7 @@ def runDataRef(self, patchRef):
parq = patchRef.get()
dataId = patchRef.dataId
funcs = self.getFunctors()
self.log.info("Transforming/standardizing the catalog of %s", dataId)
df = self.run(parq, funcs=funcs, dataId=dataId)
self.write(df, patchRef)
return df
Expand Down Expand Up @@ -479,7 +477,9 @@ class TransformObjectCatalogConfig(TransformCatalogBaseConfig):
keytype=str,
itemtype=str,
default={},
doc="Dictionary mapping full filter name to short one for column name munging."
doc=("Dictionary mapping full filter name to short one for column name munging."
"These filters determine the output columns no matter what filters the "
"input data actually contain.")
)
camelCase = pexConfig.Field(
dtype=bool,
Expand All @@ -500,7 +500,7 @@ class TransformObjectCatalogTask(TransformCatalogBaseTask):

Do the same set of postprocessing calculations on all bands

This is identical to `PostprocessTask`, except for that it does the
This is identical to `TransformCatalogBaseTask`, except for that it does the
specified functor calculations for all filters present in the
input `deepCoadd_obj` table. Any specific `"filt"` keywords specified
by the YAML file will be superceded.
Expand All @@ -522,10 +522,25 @@ def _makeArgumentParser(cls):
def run(self, parq, funcs=None, dataId=None):
dfDict = {}
analysisDict = {}
templateDf = pd.DataFrame()
# Perform transform for data of filters that exist in parq and are
# specified in config.filterMap
for filt in parq.columnLevelNames['filter']:
if filt not in self.config.filterMap:
self.log.info("Ignoring %s data in the input", filt)
continue
self.log.info("Transforming the catalog of filter %s", filt)
result = self.transform(filt, parq, funcs, dataId)
dfDict[filt] = result.df
analysisDict[filt] = result.analysis
if templateDf.empty:
templateDf = result.df

# Fill NaNs in columns of other wanted filters
for filt in self.config.filterMap:
if filt not in dfDict:
self.log.info("Adding empty columns for filter %s", filt)
dfDict[filt] = pd.DataFrame().reindex_like(templateDf)

# This makes a multilevel column index, with filter as first level
df = pd.concat(dfDict, axis=1, names=['filter', 'column'])
Expand All @@ -537,6 +552,7 @@ def run(self, parq, funcs=None, dataId=None):
df = flattenFilters(df, self.config.filterMap, noDupCols=noDupCols,
camelCase=self.config.camelCase)

self.log.info("Made a table of %d columns and %d rows", len(df.columns), len(df))
return df


Expand Down
108 changes: 108 additions & 0 deletions tests/test_transformObject.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# This file is part of pipe_tasks.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import unittest
import pandas as pd

import lsst.utils.tests

# TODO: Remove skipUnless and this try block DM-22256
try:
import pyarrow as pa
import pyarrow.parquet as pq
from lsst.pipe.tasks.parquetTable import MultilevelParquetTable
from lsst.pipe.tasks.functors import HsmFwhm
from lsst.pipe.tasks.postprocess import TransformObjectCatalogTask, TransformObjectCatalogConfig
havePyArrow = True
except ImportError:
havePyArrow = False

ROOT = os.path.abspath(os.path.dirname(__file__))


def setup_module(module):
lsst.utils.tests.init()


@unittest.skipUnless(havePyArrow, "Requires pyarrow")
class TransformObjectCatalogTestCase(unittest.TestCase):
def setUp(self):
# Note that this test input includes HSC-G, HSC-R, and HSC-I data
df = pd.read_csv(os.path.join(ROOT, 'data', 'test_multilevel_parq.csv.gz'),
header=[0, 1, 2], index_col=0)
with lsst.utils.tests.getTempFilePath('*.parq') as filename:
table = pa.Table.from_pandas(df)
pq.write_table(table, filename, compression='none')
self.parq = MultilevelParquetTable(filename)

self.dataId = {"tract": 9615, "patch": "4,4"}

def testNullFilter(self):
"""Test that columns for all filters are created despite they may not
exist in the input data.
"""
config = TransformObjectCatalogConfig()
# Want y band columns despite the input data do not have them
# Exclude g band columns despite the input data have them
filterMap = {"HSC-R": "r", "HSC-I": "i", "HSC-Y": "y"}
config.filterMap = filterMap
task = TransformObjectCatalogTask(config=config)
funcs = {'Fwhm': HsmFwhm(dataset='meas')}
df = task.run(self.parq, funcs=funcs, dataId=self.dataId)
self.assertIsInstance(df, pd.DataFrame)
for column in ('coord_ra', 'coord_dec'):
self.assertIn(column, df.columns)

for filt in filterMap.values():
self.assertIn(filt + 'Fwhm', df.columns)

self.assertNotIn('gFwhm', df.columns)
self.assertTrue(df['yFwhm'].isnull().all())
self.assertTrue(df['iFwhm'].notnull().all())

def testUnderscoreColumnFormat(self):
"""Test the per-filter column format with an underscore"""
config = TransformObjectCatalogConfig()
filterMap = {"HSC-G": "g", "HSC-R": "r", "HSC-I": "i"}
config.filterMap = filterMap
config.camelCase = False
task = TransformObjectCatalogTask(config=config)
funcs = {'Fwhm': HsmFwhm(dataset='meas')}
df = task.run(self.parq, funcs=funcs, dataId=self.dataId)
self.assertIsInstance(df, pd.DataFrame)
for filt in filterMap.values():
self.assertIn(filt + '_Fwhm', df.columns)

def testMultilevelOutput(self):
"""Test the non-flattened result dataframe with a multilevel column index"""
config = TransformObjectCatalogConfig()
filterMap = {"HSC-R": "r", "HSC-I": "i"}
config.filterMap = filterMap
config.multilevelOutput = True
task = TransformObjectCatalogTask(config=config)
funcs = {'Fwhm': HsmFwhm(dataset='meas')}
df = task.run(self.parq, funcs=funcs, dataId=self.dataId)
self.assertIsInstance(df, pd.DataFrame)
self.assertNotIn('HSC-G', df)
for filt in filterMap:
self.assertIsInstance(df[filt], pd.DataFrame)
self.assertIn('Fwhm', df[filt].columns)