Skip to content

Commit

Permalink
Merge pull request #771 from lsst/tickets/DM-38689
Browse files Browse the repository at this point in the history
DM-38689: Remove gen2 compatibility code from functors and deprecate pipe.tasks.parquetTable
  • Loading branch information
erykoff committed Apr 19, 2023
2 parents 06169a7 + 2e6f6d8 commit 8d2eb0a
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 321 deletions.
2 changes: 1 addition & 1 deletion python/lsst/pipe/tasks/dataFrameActions/_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pandas as pd
from astropy import units

from ..configurableActions import ConfigurableActionStructField, ConfigurableActionField
from lsst.pex.config.configurableActions import ConfigurableActionStructField, ConfigurableActionField
from ._baseDataFrameActions import DataFrameAction
from ._evalColumnExpression import makeColumnExpressionAction

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from lsst.pex.config import Field, ListField
from typing import Iterable, Any, Mapping

from ..configurableActions import ConfigurableAction
from lsst.pex.config.configurableActions import ConfigurableAction


class DataFrameAction(ConfigurableAction):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from numpy import (cos, sin, cosh, sinh)
import pandas as pd

from ..configurableActions import ConfigurableActionField
from lsst.pex.config.configurableActions import ConfigurableActionField
from ._baseDataFrameActions import DataFrameAction


Expand Down
166 changes: 60 additions & 106 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import lsst.geom as geom
import lsst.sphgeom as sphgeom

from .parquetTable import ParquetTable, MultilevelParquetTable


def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors',
typeKey='functor', name=None):
Expand Down Expand Up @@ -93,13 +91,13 @@ def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors',


class Functor(object):
"""Define and execute a calculation on a ParquetTable
"""Define and execute a calculation on a DataFrame or Handle holding a DataFrame.
The `__call__` method accepts either a `ParquetTable` object or a
The `__call__` method accepts either a `DataFrame` object or a
`DeferredDatasetHandle` or `InMemoryDatasetHandle`, and returns the
result of the calculation as a single column. Each functor defines what
columns are needed for the calculation, and only these columns are read
from the `ParquetTable`.
from the dataset handle.
The action of `__call__` consists of two steps: first, loading the
necessary columns from disk into memory as a `pandas.DataFrame` object;
Expand All @@ -118,7 +116,7 @@ class Functor(object):
On initialization, a `Functor` should declare what band (`filt` kwarg)
and dataset (e.g. `'ref'`, `'meas'`, `'forced_src'`) it is intended to be
applied to. This enables the `_get_data` method to extract the proper
columns from the parquet file. If not specified, the dataset will fall back
columns from the underlying data. If not specified, the dataset will fall back
on the `_defaultDataset`attribute. If band is not specified and `dataset`
is anything other than `'ref'`, then an error will be raised when trying to
perform the calculation.
Expand All @@ -130,7 +128,7 @@ class Functor(object):
It has since been generalized to apply to dataframes without mutli-level
indices and multi-level indices with just `dataset` and `column` levels.
In addition, the `_get_data` method that reads
the dataframe from the `ParquetTable` will return a dataframe with column
the columns from the underlying data will return a dataframe with column
index levels defined by the `_dfLevels` attribute; by default, this is
`column`.
Expand All @@ -149,7 +147,6 @@ class Functor(object):
dataset : str
Dataset upon which to do the calculation
(e.g., 'ref', 'meas', 'forced_src').
"""

_defaultDataset = 'ref'
Expand Down Expand Up @@ -181,56 +178,35 @@ def _get_data_columnLevels(self, data, columnIndex=None):
"""Gets the names of the column index levels
This should only be called in the context of a multilevel table.
The logic here is to enable this to work both with the gen2 `MultilevelParquetTable`
and with the gen3 `DeferredDatasetHandle`.
Parameters
----------
data : various
The data to be read, can be a `MultilevelParquetTable`,
`DeferredDatasetHandle`, or `InMemoryDatasetHandle`.
The data to be read, can be a `DeferredDatasetHandle` or
`InMemoryDatasetHandle`.
columnnIndex (optional): pandas `Index` object
If not passed, then it is read from the `DeferredDatasetHandle`
for `InMemoryDatasetHandle`.
"""
if isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
if columnIndex is None:
columnIndex = data.get(component="columns")
if columnIndex is not None:
return columnIndex.names
if isinstance(data, MultilevelParquetTable):
return data.columnLevels
else:
raise TypeError(f"Unknown type for data: {type(data)}!")
if columnIndex is None:
columnIndex = data.get(component="columns")
return columnIndex.names

def _get_data_columnLevelNames(self, data, columnIndex=None):
"""Gets the content of each of the column levels for a multilevel table
Similar to `_get_data_columnLevels`, this enables backward
compatibility with gen2.
Mirrors original gen2 implementation within
`pipe.tasks.parquetTable.MultilevelParquetTable`
"""Gets the content of each of the column levels for a multilevel table.
"""
if isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
if columnIndex is None:
columnIndex = data.get(component="columns")
if columnIndex is not None:
columnLevels = columnIndex.names
columnLevelNames = {
level: list(np.unique(np.array([c for c in columnIndex])[:, i]))
for i, level in enumerate(columnLevels)
}
return columnLevelNames
if isinstance(data, MultilevelParquetTable):
return data.columnLevelNames
else:
raise TypeError(f"Unknown type for data: {type(data)}!")
if columnIndex is None:
columnIndex = data.get(component="columns")

columnLevels = columnIndex.names
columnLevelNames = {
level: list(np.unique(np.array([c for c in columnIndex])[:, i]))
for i, level in enumerate(columnLevels)
}
return columnLevelNames

def _colsFromDict(self, colDict, columnIndex=None):
"""Converts dictionary column specficiation to a list of columns
This mirrors the original gen2 implementation within `pipe.tasks.parquetTable.MultilevelParquetTable`
"""
new_colDict = {}
columnLevels = self._get_data_columnLevels(None, columnIndex=columnIndex)
Expand All @@ -252,15 +228,14 @@ def _colsFromDict(self, colDict, columnIndex=None):
def multilevelColumns(self, data, columnIndex=None, returnTuple=False):
"""Returns columns needed by functor from multilevel dataset
To access tables with multilevel column structure, the `MultilevelParquetTable`
or `DeferredDatasetHandle` need to be passed either a list of tuples or a
To access tables with multilevel column structure, the `DeferredDatasetHandle`
or `InMemoryDatasetHandle` need to be passed either a list of tuples or a
dictionary.
Parameters
----------
data : various
The data as either `MultilevelParquetTable`,
`DeferredDatasetHandle`, or `InMemoryDatasetHandle`.
The data as either `DeferredDatasetHandle`, or `InMemoryDatasetHandle`.
columnIndex (optional): pandas `Index` object
either passed or read in from `DeferredDatasetHandle`.
`returnTuple` : `bool`
Expand All @@ -269,7 +244,10 @@ def multilevelColumns(self, data, columnIndex=None, returnTuple=False):
combine columns from the various component functors.
"""
if isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)) and columnIndex is None:
if not isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
raise RuntimeError(f"Unexpected data type. Got {get_full_type_name(data)}.")

if columnIndex is None:
columnIndex = data.get(component="columns")

# Confirm that the dataset has the column levels the functor is expecting it to have.
Expand All @@ -285,20 +263,16 @@ def multilevelColumns(self, data, columnIndex=None, returnTuple=False):
else:
raise ValueError(f"'filt' not set for functor {self.name}"
f"(dataset {self.dataset}) "
"and ParquetTable "
"and DataFrame "
"contains multiple filters in column index. "
"Set 'filt' or set 'dataset' to 'ref'.")
else:
columnDict['band'] = self.filt

if isinstance(data, MultilevelParquetTable):
return data._colsFromDict(columnDict)
elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
if returnTuple:
return self._colsFromDict(columnDict, columnIndex=columnIndex)
else:
return columnDict
raise RuntimeError(f"Unexpected data type. Got {get_full_type_name}.")
if returnTuple:
return self._colsFromDict(columnDict, columnIndex=columnIndex)
else:
return columnDict

def _func(self, df, dropna=True):
raise NotImplementedError('Must define calculation on dataframe')
Expand All @@ -315,39 +289,32 @@ def _get_columnIndex(self, data):
def _get_data(self, data):
"""Retrieve dataframe necessary for calculation.
The data argument can be a DataFrame, a ParquetTable instance, or a gen3 DeferredDatasetHandle
The data argument can be a `DataFrame`, a `DeferredDatasetHandle`, or an
`InMemoryDatasetHandle`.
Returns dataframe upon which `self._func` can act.
N.B. while passing a raw pandas `DataFrame` *should* work here, it has not been tested.
"""
# We wrap a dataframe in a handle here to take advantage of the dataframe
# delegate dataframe column wrangling abilities.
if isinstance(data, pd.DataFrame):
return data
_data = InMemoryDatasetHandle(data, storageClass="DataFrame")
elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
_data = data
else:
raise RuntimeError(f"Unexpected type provided for data. Got {get_full_type_name(data)}.")

# First thing to do: check to see if the data source has a multilevel column index or not.
columnIndex = self._get_columnIndex(data)
is_multiLevel = isinstance(data, MultilevelParquetTable) or isinstance(columnIndex, pd.MultiIndex)

# Simple single-level parquet table, gen2
if isinstance(data, ParquetTable) and not is_multiLevel:
columns = self.columns
df = data.toDataFrame(columns=columns)
return df
columnIndex = self._get_columnIndex(_data)
is_multiLevel = isinstance(columnIndex, pd.MultiIndex)

# Get proper columns specification for this functor
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)
columns = self.multilevelColumns(_data, columnIndex=columnIndex)
else:
columns = self.columns

if isinstance(data, MultilevelParquetTable):
# Load in-memory dataframe with appropriate columns the gen2 way
df = data.toDataFrame(columns=columns, droplevels=False)
elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
# Load in-memory dataframe with appropriate columns the gen3 way
df = data.get(parameters={"columns": columns})
else:
raise RuntimeError(f"Unexpected type provided for data. Got {get_full_type_name(data)}.")
# Load in-memory dataframe with appropriate columns the gen3 way
df = _data.get(parameters={"columns": columns})

# Drop unnecessary column levels
if is_multiLevel:
Expand Down Expand Up @@ -376,7 +343,7 @@ def __call__(self, data, dropna=False):
return vals

def difference(self, data1, data2, **kwargs):
"""Computes difference between functor called on two different ParquetTable objects
"""Computes difference between functor called on two different DataFrame/Handle objects
"""
return self(data1, **kwargs) - self(data2, **kwargs)

Expand All @@ -397,7 +364,7 @@ def shortname(self):


class CompositeFunctor(Functor):
"""Perform multiple calculations at once on a catalog
"""Perform multiple calculations at once on a catalog.
The role of a `CompositeFunctor` is to group together computations from
multiple functors. Instead of returning `pandas.Series` a
Expand Down Expand Up @@ -486,34 +453,29 @@ def __call__(self, data, **kwargs):
----------
data : various
The data represented as `lsst.daf.butler.DeferredDatasetHandle`,
`lsst.pipe.tasks.parquetTable.MultilevelParquetTable`,
`lsst.pipe.tasks.parquetTable.ParquetTable`,
`lsst.pipe.base.InMemoryDatasetHandle`,
or `pandas.DataFrame`.
The table or a pointer to a table on disk from which columns can
be accessed
"""
columnIndex = self._get_columnIndex(data)

# First, determine whether data has a multilevel index (either gen2 or gen3)
is_multiLevel = isinstance(data, MultilevelParquetTable) or isinstance(columnIndex, pd.MultiIndex)
if isinstance(data, pd.DataFrame):
_data = InMemoryDatasetHandle(data, storageClass="DataFrame")
elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
_data = data
else:
raise RuntimeError(f"Unexpected type provided for data. Got {get_full_type_name(data)}.")

# Multilevel index, gen2 or gen3
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)
columnIndex = self._get_columnIndex(_data)

if isinstance(data, MultilevelParquetTable):
# Read data into memory the gen2 way
df = data.toDataFrame(columns=columns, droplevels=False)
elif isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
# Read data into memory the gen3 way
df = data.get(parameters={"columns": columns})
if isinstance(columnIndex, pd.MultiIndex):
columns = self.multilevelColumns(_data, columnIndex=columnIndex)
df = _data.get(parameters={"columns": columns})

valDict = {}
for k, f in self.funcDict.items():
try:
subdf = f._setLevels(
df[f.multilevelColumns(data, returnTuple=True, columnIndex=columnIndex)]
df[f.multilevelColumns(_data, returnTuple=True, columnIndex=columnIndex)]
)
valDict[k] = f._func(subdf)
except Exception as e:
Expand All @@ -524,15 +486,7 @@ def __call__(self, data, **kwargs):
raise e

else:
if isinstance(data, (DeferredDatasetHandle, InMemoryDatasetHandle)):
# input if Gen3 deferLoad=True
df = data.get(parameters={"columns": self.columns})
elif isinstance(data, pd.DataFrame):
# input if Gen3 deferLoad=False
df = data
else:
# Original Gen2 input is type ParquetTable and the fallback
df = data.toDataFrame(columns=self.columns)
df = _data.get(parameters={"columns": self.columns})

valDict = {k: f._func(df) for k, f in self.funcDict.items()}

Expand Down
6 changes: 4 additions & 2 deletions python/lsst/pipe/tasks/healSparseMapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ def build_ccd_input_map(self, bbox, wcs, ccds):
bbox_afw_poly.convexHull().getVertices())
bbox_poly = hsp.Polygon(ra=bbox_poly_radec[: -1, 0], dec=bbox_poly_radec[: -1, 1],
value=np.arange(self.ccd_input_map.wide_mask_maxbits))
bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
with warnings.catch_warnings():
warnings.simplefilter("ignore")
bbox_poly_map = bbox_poly.get_map_like(self.ccd_input_map)
self.ccd_input_map = hsp.and_intersection([self.ccd_input_map, bbox_poly_map])
self.ccd_input_map.metadata = metadata

# Create a temporary map to hold the count of bad pixels in each healpix pixel
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/pipe/tasks/parquetTable.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
import numpy as np
import pandas as pd

from deprecated.sphinx import deprecated


@deprecated(reason="The ParquetTable interface is from Gen2 i/o and will be removed after v26.",
version="v25", category=FutureWarning)
class ParquetTable(object):
"""Thin wrapper to pyarrow's ParquetFile object
Expand Down Expand Up @@ -148,6 +152,8 @@ def toDataFrame(self, columns=None):
return df


@deprecated(reason="The MultilevelParquetTable interface is from Gen2 i/o and will be removed after v26.",
version="v25", category=FutureWarning)
class MultilevelParquetTable(ParquetTable):
"""Wrapper to access dataframe with multi-level column index from Parquet
Expand Down

0 comments on commit 8d2eb0a

Please sign in to comment.