Skip to content

Commit

Permalink
Merge pull request #444 from lsst/revert-427-tickets/DM-27008
Browse files Browse the repository at this point in the history
Revert DM-27008 due to ci_hsc breakage.
  • Loading branch information
ktlim committed Dec 18, 2020
2 parents 57095b0 + d5e2eea commit b83d212
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 257 deletions.
247 changes: 39 additions & 208 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import yaml
import re
from itertools import product

import pandas as pd
import numpy as np
import astropy.units as u

from lsst.daf.persistence import doImport
from lsst.daf.butler import DeferredDatasetHandle
from .parquetTable import ParquetTable, MultilevelParquetTable
from .parquetTable import MultilevelParquetTable


def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors',
Expand Down Expand Up @@ -53,8 +51,7 @@ def init_fromDict(initDict, basePath='lsst.pipe.tasks.functors',
class Functor(object):
"""Define and execute a calculation on a ParquetTable
The `__call__` method accepts either a `ParquetTable` object or a
`DeferredDatasetHandle`, and returns the
The `__call__` method accepts a `ParquetTable` object, and returns the
result of the calculation as a single column. Each functor defines what
columns are needed for the calculation, and only these columns are read
from the `ParquetTable`.
Expand All @@ -75,19 +72,18 @@ class Functor(object):
On initialization, a `Functor` should declare what filter (`filt` kwarg)
and dataset (e.g. `'ref'`, `'meas'`, `'forced_src'`) it is intended to be
applied to. This enables the `_get_data` method to extract the proper
applied to. This enables the `_get_cols` method to extract the proper
columns from the parquet file. If not specified, the dataset will fall back
on the `_defaultDataset`attribute. If filter is not specified and `dataset`
is anything other than `'ref'`, then an error will be raised when trying to
perform the calculation.
As currently implemented, `Functor` is only set up to expect a
dataset of the format of the `deepCoadd_obj` dataset; that is, a
dataframe with a multi-level column index,
with the levels of the column index being `filter`,
`ParquetTable` of the format of the `deepCoadd_obj` dataset; that is, a
`MultilevelParquetTable` with the levels of the column index being `filter`,
`dataset`, and `column`. This is defined in the `_columnLevels` attribute,
as well as being implicit in the role of the `filt` and `dataset` attributes
defined at initialization. In addition, the `_get_data` method that reads
defined at initialization. In addition, the `_get_cols` method that reads
the dataframe from the `ParquetTable` will return a dataframe with column
index levels defined by the `_dfLevels` attribute; by default, this is
`column`.
Expand Down Expand Up @@ -135,112 +131,17 @@ def columns(self):
raise NotImplementedError('Must define columns property or _columns attribute')
return self._columns

def _get_data_columnLevels(self, data, columnIndex=None):
"""Gets the names of the column index levels
This should only be called in the context of a multilevel table.
The logic here is to enable this to work both with the gen2 `MultilevelParquetTable`
and with the gen3 `DeferredDatasetHandle`.
Parameters
----------
data : `MultilevelParquetTable` or `DeferredDatasetHandle`
columnnIndex (optional): pandas `Index` object
if not passed, then it is read from the `DeferredDatasetHandle`
"""
if isinstance(data, DeferredDatasetHandle):
if columnIndex is None:
columnIndex = data.get(component="columns")
if columnIndex is not None:
return columnIndex.names
if isinstance(data, MultilevelParquetTable):
return data.columnLevels
else:
raise TypeError(f"Unknown type for data: {type(data)}!")

def _get_data_columnLevelNames(self, data, columnIndex=None):
"""Gets the content of each of the column levels for a multilevel table
Similar to `_get_data_columnLevels`, this enables backward compatibility with gen2.
Mirrors original gen2 implementation within `pipe.tasks.parquetTable.MultilevelParquetTable`
"""
if isinstance(data, DeferredDatasetHandle):
if columnIndex is None:
columnIndex = data.get(component="columns")
if columnIndex is not None:
columnLevels = columnIndex.names
columnLevelNames = {
level: list(np.unique(np.array([c for c in columnIndex])[:, i]))
for i, level in enumerate(columnLevels)
}
return columnLevelNames
if isinstance(data, MultilevelParquetTable):
return data.columnLevelNames
else:
raise TypeError(f"Unknown type for data: {type(data)}!")

def _colsFromDict(self, colDict, columnIndex=None):
"""Converts dictionary column specficiation to a list of columns
This mirrors the original gen2 implementation within `pipe.tasks.parquetTable.MultilevelParquetTable`
"""
new_colDict = {}
columnLevels = self._get_data_columnLevels(None, columnIndex=columnIndex)

for i, l in enumerate(columnLevels):
if l in colDict:
if isinstance(colDict[l], str):
new_colDict[l] = [colDict[l]]
else:
new_colDict[l] = colDict[l]
else:
new_colDict[l] = columnIndex.levels[i]

levelCols = [new_colDict[l] for l in columnLevels]
cols = product(*levelCols)
return list(cols)

def multilevelColumns(self, data, columnIndex=None, returnTuple=False):
"""Returns columns needed by functor from multilevel dataset
To access tables with multilevel column structure, the `MultilevelParquetTable`
or `DeferredDatasetHandle` need to be passed either a list of tuples or a
dictionary.
Parameters
----------
data : `MultilevelParquetTable` or `DeferredDatasetHandle`
columnIndex (optional): pandas `Index` object
either passed or read in from `DeferredDatasetHandle`.
`returnTuple` : bool
If true, then return a list of tuples rather than the column dictionary
specification. This is set to `True` by `CompositeFunctor` in order to be able to
combine columns from the various component functors.
"""
if isinstance(data, DeferredDatasetHandle) and columnIndex is None:
columnIndex = data.get(component="columns")

# Confirm that the dataset has the column levels the functor is expecting it to have.
columnLevels = self._get_data_columnLevels(data, columnIndex)

if not set(columnLevels) == set(self._columnLevels):
raise ValueError(
"ParquetTable does not have the expected column levels. "
f"Got {columnLevels}; expected {self._columnLevels}."
)
def multilevelColumns(self, parq):
if not set(parq.columnLevels) == set(self._columnLevels):
raise ValueError('ParquetTable does not have the expected column levels. '
f'Got {parq.columnLevels}; expected {self._columnLevels}.')

columnDict = {'column': self.columns,
'dataset': self.dataset}
if self.filt is None:
columnLevelNames = self._get_data_columnLevelNames(data, columnIndex)
if "filter" in columnLevels:
if self.dataset == "ref":
columnDict["filter"] = columnLevelNames["filter"][0]
if 'filter' in parq.columnLevels:
if self.dataset == 'ref':
columnDict['filter'] = parq.columnLevelNames['filter'][0]
else:
raise ValueError(f"'filt' not set for functor {self.name}"
f"(dataset {self.dataset}) "
Expand All @@ -250,61 +151,24 @@ def multilevelColumns(self, data, columnIndex=None, returnTuple=False):
else:
columnDict['filter'] = self.filt

if isinstance(data, MultilevelParquetTable):
return data._colsFromDict(columnDict)
elif isinstance(data, DeferredDatasetHandle):
if returnTuple:
return self._colsFromDict(columnDict, columnIndex=columnIndex)
else:
return columnDict
return parq._colsFromDict(columnDict)

def _func(self, df, dropna=True):
raise NotImplementedError('Must define calculation on dataframe')

def _get_columnIndex(self, data):
"""Return columnIndex
"""

if isinstance(data, DeferredDatasetHandle):
return data.get(component="columns")
else:
return None

def _get_data(self, data):
def _get_cols(self, parq):
"""Retrieve dataframe necessary for calculation.
The data argument can be a DataFrame, a ParquetTable instance, or a gen3 DeferredDatasetHandle
Returns dataframe upon which `self._func` can act.
N.B. while passing a raw pandas `DataFrame` *should* work here, it has not been tested.
"""
if isinstance(data, pd.DataFrame):
return data

# First thing to do: check to see if the data source has a multilevel column index or not.
columnIndex = self._get_columnIndex(data)
is_multiLevel = isinstance(data, MultilevelParquetTable) or isinstance(columnIndex, pd.MultiIndex)

# Simple single-level parquet table, gen2
if isinstance(data, ParquetTable) and not is_multiLevel:
if isinstance(parq, MultilevelParquetTable):
columns = self.multilevelColumns(parq)
df = parq.toDataFrame(columns=columns, droplevels=False)
df = self._setLevels(df)
else:
columns = self.columns
df = data.toDataFrame(columns=columns)
return df

# Get proper multi-level columns specification for this functor
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)

if isinstance(data, MultilevelParquetTable):
# Load in-memory dataframe with appropriate columns the gen2 way
df = data.toDataFrame(columns=columns, droplevels=False)
elif isinstance(data, DeferredDatasetHandle):
# Load in-memory dataframe with appropriate columns the gen3 way
df = data.get(parameters={"columns": columns})

# Drop unnecessary column levels
df = self._setLevels(df)
df = parq.toDataFrame(columns=columns)

return df

def _setLevels(self, df):
Expand All @@ -315,9 +179,9 @@ def _setLevels(self, df):
def _dropna(self, vals):
return vals.dropna()

def __call__(self, data, dropna=False):
def __call__(self, parq, dropna=False):
try:
df = self._get_data(data)
df = self._get_cols(parq)
vals = self._func(df)
except Exception:
vals = self.fail(df)
Expand All @@ -326,10 +190,10 @@ def __call__(self, data, dropna=False):

return vals

def difference(self, data1, data2, **kwargs):
def difference(self, parq1, parq2, **kwargs):
"""Computes difference between functor called on two different ParquetTable objects
"""
return self(data1, **kwargs) - self(data2, **kwargs)
return self(parq1, **kwargs) - self(parq2, **kwargs)

def fail(self, df):
return pd.Series(np.full(len(df), np.nan), index=df.index)
Expand Down Expand Up @@ -416,58 +280,24 @@ def update(self, new):
def columns(self):
return list(set([x for y in [f.columns for f in self.funcDict.values()] for x in y]))

def multilevelColumns(self, data, **kwargs):
# Get the union of columns for all component functors. Note the need to have `returnTuple=True` here.
return list(
set(
[
x
for y in [
f.multilevelColumns(data, returnTuple=True, **kwargs) for f in self.funcDict.values()
]
for x in y
]
)
)

def __call__(self, data, **kwargs):
columnIndex = self._get_columnIndex(data)

# First, determine whether data has a multilevel index (either gen2 or gen3)
is_multiLevel = isinstance(data, MultilevelParquetTable) or isinstance(columnIndex, pd.MultiIndex)

# Simple single-level column index, gen2
if isinstance(data, ParquetTable) and not is_multiLevel:
columns = self.columns
df = data.toDataFrame(columns=columns)
valDict = {k: f._func(df) for k, f in self.funcDict.items()}

# Multilevel index, gen2 or gen3
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)

if isinstance(data, MultilevelParquetTable):
# Read data into memory the gen2 way
df = data.toDataFrame(columns=columns, droplevels=False)
elif isinstance(data, DeferredDatasetHandle):
# Read data into memory the gen3 way
df = data.get(parameters={"columns": columns})
def multilevelColumns(self, parq):
return list(set([x for y in [f.multilevelColumns(parq)
for f in self.funcDict.values()] for x in y]))

def __call__(self, parq, **kwargs):
if isinstance(parq, MultilevelParquetTable):
columns = self.multilevelColumns(parq)
df = parq.toDataFrame(columns=columns, droplevels=False)
valDict = {}
for k, f in self.funcDict.items():
try:
subdf = f._setLevels(
df[f.multilevelColumns(data, returnTuple=True, columnIndex=columnIndex)]
)
subdf = f._setLevels(df[f.multilevelColumns(parq)])
valDict[k] = f._func(subdf)
except Exception:
raise
valDict[k] = f.fail(subdf)

# non-multilevel, gen3 (TODO: this should work, but this case is not tested in test_functors.py)
elif isinstance(data, DeferredDatasetHandle):
else:
columns = self.columns
df = data.get(parameters={"columns": columns})
df = parq.toDataFrame(columns=columns)
valDict = {k: f._func(df) for k, f in self.funcDict.items()}

try:
Expand Down Expand Up @@ -863,8 +693,9 @@ def _func(self, df):
def columns(self):
return [self.mag1.col, self.mag2.col]

def multilevelColumns(self, parq, **kwargs):
return [(self.dataset, self.filt1, self.col), (self.dataset, self.filt2, self.col)]
def multilevelColumns(self, parq):
return [(self.dataset, self.filt1, self.col),
(self.dataset, self.filt2, self.col)]

@property
def name(self):
Expand Down

0 comments on commit b83d212

Please sign in to comment.