Skip to content

Commit

Permalink
Correct functor logic to work with single-level datarefs, and add tests
Browse files Browse the repository at this point in the history
Previous updates to Functor to allow for passing a DeferredDatasetHandle
did not have appropriate logic to allow for the handle to point to a dataframe
with only a single-level column index.  This corrects that, and adds corresponding
tests, which were also missing before (which is why this bug was not caught).
  • Loading branch information
timothydmorton committed Feb 22, 2021
1 parent d3a6870 commit b8e4b60
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
8 changes: 6 additions & 2 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,11 @@ def _get_data(self, data):
df = data.toDataFrame(columns=columns)
return df

# Get proper multi-level columns specification for this functor
# Get proper columns specification for this functor
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)
else:
columns = self.columns

if isinstance(data, MultilevelParquetTable):
# Load in-memory dataframe with appropriate columns the gen2 way
Expand All @@ -304,7 +306,9 @@ def _get_data(self, data):
df = data.get(parameters={"columns": columns})

# Drop unnecessary column levels
df = self._setLevels(df)
if is_multiLevel:
df = self._setLevels(df)

return df

def _setLevels(self, df):
Expand Down
45 changes: 44 additions & 1 deletion tests/test_functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from lsst.sphgeom import HtmPixelization
import lsst.meas.base as measBase
import lsst.utils.tests
from lsst.pipe.tasks.parquetTable import MultilevelParquetTable
from lsst.pipe.tasks.parquetTable import MultilevelParquetTable, ParquetTable
from lsst.daf.butler import Butler, DatasetType
from lsst.pipe.tasks.functors import (CompositeFunctor, CustomFunctor, Column, RAColumn,
DecColumn, Mag, MagDiff, Color, StarGalaxyLabeller,
Expand Down Expand Up @@ -70,6 +70,10 @@ def simulateMultiParquet(self, dataDict):

return MultilevelParquetTable(dataFrame=df)

def simulateParquet(self, dataDict):
df = pd.DataFrame(dataDict)
return ParquetTable(dataFrame=df)

def getDatasetHandle(self, parq):
df = parq._df
lo, hi = HtmPixelization(7).universe().ranges()[0]
Expand Down Expand Up @@ -149,6 +153,10 @@ def testColumn(self):
func = Column('base_FootprintArea_value', filt='g')
self._funcVal(func, parq)

parq = self.simulateParquet(self.dataDict)
func = Column('base_FootprintArea_value')
self._funcVal(func, parq)

def testCustom(self):
self.columns.append("base_FootprintArea_value")
self.dataDict["base_FootprintArea_value"] = \
Expand All @@ -161,18 +169,36 @@ def testCustom(self):

np.allclose(val.values, 2*func2(parq).values, atol=1e-13, rtol=0)

parq = self.simulateParquet(self.dataDict)
func = CustomFunctor('2 * base_FootprintArea_value')
val = self._funcVal(func, parq)
func2 = Column('base_FootprintArea_value')

np.allclose(val.values, 2*func2(parq).values, atol=1e-13, rtol=0)

def testCoords(self):
parq = self.simulateMultiParquet(self.dataDict)
ra = self._funcVal(RAColumn(), parq)
dec = self._funcVal(DecColumn(), parq)

columnDict = {'dataset': 'ref', 'band': 'g',
'column': ['coord_ra', 'coord_dec']}

coords = parq.toDataFrame(columns=columnDict, droplevels=True) / np.pi * 180.

self.assertTrue(np.allclose(ra, coords[('ref', 'g', 'coord_ra')], atol=1e-13, rtol=0))
self.assertTrue(np.allclose(dec, coords[('ref', 'g', 'coord_dec')], atol=1e-13, rtol=0))

# single-level column index table
parq = self.simulateParquet(self.dataDict)
ra = self._funcVal(RAColumn(), parq)
dec = self._funcVal(DecColumn(), parq)

coords = parq.toDataFrame(columns=['coord_ra', 'coord_dec']) / np.pi * 180.

self.assertTrue(np.allclose(ra, coords['coord_ra'], atol=1e-13, rtol=0))
self.assertTrue(np.allclose(dec, coords['coord_dec'], atol=1e-13, rtol=0))

def testMag(self):
self.columns.extend(["base_PsfFlux_instFlux", "base_PsfFlux_instFluxErr"])
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1000)
Expand Down Expand Up @@ -330,6 +356,7 @@ def testComposite(self):
self.columns.extend(["modelfit_CModel_instFlux", "base_PsfFlux_instFlux"])
self.dataDict["modelfit_CModel_instFlux"] = np.full(self.nRecords, 1)
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1)

parq = self.simulateMultiParquet(self.dataDict)
# Modify r band value slightly.
parq._df[("meas", "r", "base_PsfFlux_instFlux")] -= 0.1
Expand Down Expand Up @@ -370,6 +397,22 @@ def testComposite(self):

df = self._compositeFuncVal(CompositeFunctor(funcs), parq)

def testCompositeSimple(self):
"""Test single-level composite functor for functionality
"""
self.columns.extend(["modelfit_CModel_instFlux", "base_PsfFlux_instFlux"])
self.dataDict["modelfit_CModel_instFlux"] = np.full(self.nRecords, 1)
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1)

parq = self.simulateParquet(self.dataDict)
funcDict = {'ra': RAColumn(),
'dec': DecColumn(),
'psfMag': Mag('base_PsfFlux'),
'cmodel_magDiff': MagDiff('base_PsfFlux',
'modelfit_CModel')}
func = CompositeFunctor(funcDict)
df = self._compositeFuncVal(func, parq) # noqa

def testCompositeColor(self):
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1000)
self.dataDict["base_PsfFlux_instFluxErr"] = np.full(self.nRecords, 10)
Expand Down

0 comments on commit b8e4b60

Please sign in to comment.