Skip to content

Commit

Permalink
Merge branch 'tickets/DM-28922'
Browse files Browse the repository at this point in the history
  • Loading branch information
timothydmorton committed Feb 25, 2021
2 parents 0a6ae79 + 85c0230 commit 5afaedd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
8 changes: 6 additions & 2 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,11 @@ def _get_data(self, data):
df = data.toDataFrame(columns=columns)
return df

# Get proper multi-level columns specification for this functor
# Get proper columns specification for this functor
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)
else:
columns = self.columns

if isinstance(data, MultilevelParquetTable):
# Load in-memory dataframe with appropriate columns the gen2 way
Expand All @@ -304,7 +306,9 @@ def _get_data(self, data):
df = data.get(parameters={"columns": columns})

# Drop unnecessary column levels
df = self._setLevels(df)
if is_multiLevel:
df = self._setLevels(df)

return df

def _setLevels(self, df):
Expand Down
45 changes: 44 additions & 1 deletion tests/test_functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from lsst.sphgeom import HtmPixelization
import lsst.meas.base as measBase
import lsst.utils.tests
from lsst.pipe.tasks.parquetTable import MultilevelParquetTable
from lsst.pipe.tasks.parquetTable import MultilevelParquetTable, ParquetTable
from lsst.daf.butler import Butler, DatasetType
from lsst.pipe.tasks.functors import (CompositeFunctor, CustomFunctor, Column, RAColumn,
DecColumn, Mag, MagDiff, Color, StarGalaxyLabeller,
Expand Down Expand Up @@ -70,6 +70,10 @@ def simulateMultiParquet(self, dataDict):

return MultilevelParquetTable(dataFrame=df)

def simulateParquet(self, dataDict):
df = pd.DataFrame(dataDict)
return ParquetTable(dataFrame=df)

def getDatasetHandle(self, parq):
df = parq._df
lo, hi = HtmPixelization(7).universe().ranges()[0]
Expand Down Expand Up @@ -149,6 +153,10 @@ def testColumn(self):
func = Column('base_FootprintArea_value', filt='g')
self._funcVal(func, parq)

parq = self.simulateParquet(self.dataDict)
func = Column('base_FootprintArea_value')
self._funcVal(func, parq)

def testCustom(self):
self.columns.append("base_FootprintArea_value")
self.dataDict["base_FootprintArea_value"] = \
Expand All @@ -161,18 +169,36 @@ def testCustom(self):

np.allclose(val.values, 2*func2(parq).values, atol=1e-13, rtol=0)

parq = self.simulateParquet(self.dataDict)
func = CustomFunctor('2 * base_FootprintArea_value')
val = self._funcVal(func, parq)
func2 = Column('base_FootprintArea_value')

np.allclose(val.values, 2*func2(parq).values, atol=1e-13, rtol=0)

def testCoords(self):
parq = self.simulateMultiParquet(self.dataDict)
ra = self._funcVal(RAColumn(), parq)
dec = self._funcVal(DecColumn(), parq)

columnDict = {'dataset': 'ref', 'band': 'g',
'column': ['coord_ra', 'coord_dec']}

coords = parq.toDataFrame(columns=columnDict, droplevels=True) / np.pi * 180.

self.assertTrue(np.allclose(ra, coords[('ref', 'g', 'coord_ra')], atol=1e-13, rtol=0))
self.assertTrue(np.allclose(dec, coords[('ref', 'g', 'coord_dec')], atol=1e-13, rtol=0))

# single-level column index table
parq = self.simulateParquet(self.dataDict)
ra = self._funcVal(RAColumn(), parq)
dec = self._funcVal(DecColumn(), parq)

coords = parq.toDataFrame(columns=['coord_ra', 'coord_dec']) / np.pi * 180.

self.assertTrue(np.allclose(ra, coords['coord_ra'], atol=1e-13, rtol=0))
self.assertTrue(np.allclose(dec, coords['coord_dec'], atol=1e-13, rtol=0))

def testMag(self):
self.columns.extend(["base_PsfFlux_instFlux", "base_PsfFlux_instFluxErr"])
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1000)
Expand Down Expand Up @@ -330,6 +356,7 @@ def testComposite(self):
self.columns.extend(["modelfit_CModel_instFlux", "base_PsfFlux_instFlux"])
self.dataDict["modelfit_CModel_instFlux"] = np.full(self.nRecords, 1)
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1)

parq = self.simulateMultiParquet(self.dataDict)
# Modify r band value slightly.
parq._df[("meas", "r", "base_PsfFlux_instFlux")] -= 0.1
Expand Down Expand Up @@ -370,6 +397,22 @@ def testComposite(self):

df = self._compositeFuncVal(CompositeFunctor(funcs), parq)

def testCompositeSimple(self):
"""Test single-level composite functor for functionality
"""
self.columns.extend(["modelfit_CModel_instFlux", "base_PsfFlux_instFlux"])
self.dataDict["modelfit_CModel_instFlux"] = np.full(self.nRecords, 1)
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1)

parq = self.simulateParquet(self.dataDict)
funcDict = {'ra': RAColumn(),
'dec': DecColumn(),
'psfMag': Mag('base_PsfFlux'),
'cmodel_magDiff': MagDiff('base_PsfFlux',
'modelfit_CModel')}
func = CompositeFunctor(funcDict)
df = self._compositeFuncVal(func, parq) # noqa

def testCompositeColor(self):
self.dataDict["base_PsfFlux_instFlux"] = np.full(self.nRecords, 1000)
self.dataDict["base_PsfFlux_instFluxErr"] = np.full(self.nRecords, 10)
Expand Down

0 comments on commit 5afaedd

Please sign in to comment.