Skip to content

Commit

Permalink
Merge pull request #720 from lsst/tickets/DM-35803
Browse files Browse the repository at this point in the history
DM-35803: Add DataFrame delegate to enable InMemoryDatasets of DataFrames.
  • Loading branch information
erykoff committed Aug 30, 2022
2 parents 489cae2 + 0b78437 commit 3ee7dd6
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 22 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-35803.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in the parquet reader where a single string column name would be interpreted as an iterable.
1 change: 1 addition & 0 deletions doc/changes/DM-35803.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added ``DataFrameDelegate`` to allow DataFrames to be used with ``lsst.pipe.base.InMemoryDatasetHandle``.
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/configs/storageClasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ storageClasses:
ExposureCatalog:
pytype: lsst.afw.table.ExposureCatalog
DataFrame:
pytype: pandas.DataFrame
pytype: pandas.core.frame.DataFrame
delegate: lsst.daf.butler.delegates.dataframe.DataFrameDelegate
derivedComponents:
columns: DataFrameIndex
parameters:
Expand Down
Empty file.
133 changes: 133 additions & 0 deletions python/lsst/daf/butler/delegates/dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Support for reading DataFrames."""
from __future__ import annotations

import collections.abc
from typing import Any, Mapping, Optional

import pandas
from lsst.daf.butler import StorageClassDelegate
from lsst.utils.introspection import get_full_type_name
from lsst.utils.iteration import ensure_iterable

__all__ = ["DataFrameDelegate"]


class DataFrameDelegate(StorageClassDelegate):
def getComponent(self, composite: pandas.DataFrame, componentName: str) -> Any:
"""Get a component from a DataFrame.
Parameters
----------
composite : `~pandas.DataFrame`
``DataFrame`` to access component.
componentName : `str`
Name of component to retrieve.
Returns
-------
component : `object`
The component.
Raises
------
AttributeError
The component can not be found.
"""
if componentName == "columns":
return pandas.Index(self._getAllColumns(composite))
else:
raise AttributeError(
f"Do not know how to retrieve component {componentName} from {get_full_type_name(composite)}"
)

def handleParameters(
self, inMemoryDataset: pandas.DataFrame, parameters: Optional[Mapping[str, Any]] = None
) -> Any:
"""Return possibly new in-memory dataset using the supplied parameters.
Parameters
----------
inMemoryDataset : `object`
Object to modify based on the parameters.
parameters : `dict`, optional
Parameters to apply. Values are specific to the parameter.
Supported parameters are defined in the associated
`StorageClass`. If no relevant parameters are specified the
``inMemoryDataset`` will be return unchanged.
Returns
-------
inMemoryDataset : `object`
Original in-memory dataset, or updated form after parameters
have been used.
"""
if not isinstance(inMemoryDataset, pandas.DataFrame):
raise ValueError(
"handleParameters for a DataFrame must get a DataFrame, "
f"not {get_full_type_name(inMemoryDataset)}."
)

if parameters is None:
return inMemoryDataset

if "columns" in parameters:
allColumns = self._getAllColumns(inMemoryDataset)

if not isinstance(parameters["columns"], collections.abc.Iterable):
raise NotImplementedError(
"InMemoryDataset of a DataFrame only supports list/tuple of string column names"
)

for column in ensure_iterable(parameters["columns"]):
if not isinstance(column, str):
raise NotImplementedError(
"InMemoryDataset of a DataFrame only supports string column names."
)
if column not in allColumns:
raise ValueError(f"Unrecognized column name {column!r}.")

# Exclude index columns from the subset.
readColumns = [
name
for name in ensure_iterable(parameters["columns"])
if name not in inMemoryDataset.index.names
]

return inMemoryDataset[readColumns]
else:
return inMemoryDataset

def _getAllColumns(self, inMemoryDataset: pandas.DataFrame) -> list[str]:
"""Get all columns, including index columns.
Returns
-------
columns : `list` [`str`]
List of all columns.
"""
allColumns = list(inMemoryDataset.columns)
if inMemoryDataset.index.names[0] is not None:
allColumns.extend(inMemoryDataset.index.names)

return allColumns
4 changes: 2 additions & 2 deletions python/lsst/daf/butler/formatters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ def read(
assert isinstance(columns, dict) or isinstance(columns, list)
columns = list(self._standardizeColumnParameter(columns))
else:
for column in columns:
for column in ensure_iterable(columns):
if column not in self.columns:
raise ValueError(f"Unrecognized column name {column!r}.")
return self.file.read(columns=columns, use_pandas_metadata=True).to_pandas()
return self.file.read(columns=ensure_iterable(columns), use_pandas_metadata=True).to_pandas()


def _writeParquet(path: str, inMemoryDataset: pd.DataFrame) -> None:
Expand Down
139 changes: 120 additions & 19 deletions tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,81 @@
try:
import numpy as np
import pandas as pd
except ImportError:
pd = None

try:
import pyarrow.parquet
except ImportError:
pyarrow = None

from lsst.daf.butler import Butler, DatasetType
from lsst.daf.butler import Butler, Config, DatasetType, StorageClassConfig, StorageClassFactory
from lsst.daf.butler.delegates.dataframe import DataFrameDelegate
from lsst.daf.butler.tests.utils import makeTestTempDir, removeTestTempDir

TESTDIR = os.path.abspath(os.path.dirname(__file__))


def _makeSingleIndexDataFrame():
"""Make a single index data frame for testing.
Returns
-------
dataFrame : `~pandas.DataFrame`
The test dataframe.
allColumns : `list` [`str`]
List of all the columns (including index columns).
"""
nrow = 5
data = np.zeros(nrow, dtype=[("index", "i4"), ("a", "f8"), ("b", "f8"), ("c", "f8"), ("ddd", "f8")])
data["index"][:] = np.arange(nrow)
data["a"] = np.random.randn(nrow)
data["b"] = np.random.randn(nrow)
data["c"] = np.random.randn(nrow)
data["ddd"] = np.random.randn(nrow)
df = pd.DataFrame(data)
df = df.set_index("index")
allColumns = df.columns.append(pd.Index(df.index.names))

return df, allColumns


def _makeMultiIndexDataFrame():
"""Make a multi-index data frame for testing.
Returns
-------
dataFrame : `~pandas.DataFrame`
The test dataframe.
"""
columns = pd.MultiIndex.from_tuples(
[
("g", "a"),
("g", "b"),
("g", "c"),
("r", "a"),
("r", "b"),
("r", "c"),
],
names=["filter", "column"],
)
df = pd.DataFrame(np.random.randn(5, 6), index=np.arange(5, dtype=int), columns=columns)

return df


@unittest.skipUnless(pd is not None, "Cannot test ParquetFormatter without pandas.")
@unittest.skipUnless(pyarrow is not None, "Cannot test ParquetFormatter without pyarrow.")
class ParquetFormatterTestCase(unittest.TestCase):
"""Tests for ParquetFormatter, using local file datastore."""

configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")

def setUp(self):
"""Create a new butler root for each test."""
self.root = makeTestTempDir(TESTDIR)
Butler.makeRepo(self.root)
self.butler = Butler(self.root, run="test_run")
config = Config(self.configFile)
self.butler = Butler(Butler.makeRepo(self.root, config=config), writeable=True, run="test_run")
# No dimensions in dataset type so we don't have to worry about
# inserting dimension data or defining data IDs.
self.datasetType = DatasetType(
Expand All @@ -60,37 +116,31 @@ def tearDown(self):
removeTestTempDir(self.root)

def testSingleIndexDataFrame(self):
columns1 = pd.Index(["a", "b", "c"])
df1 = pd.DataFrame(np.random.randn(5, 3), index=np.arange(5, dtype=int), columns=columns1)
df1, allColumns = _makeSingleIndexDataFrame()

self.butler.put(df1, self.datasetType, dataId={})
# Read the whole DataFrame.
df2 = self.butler.get(self.datasetType, dataId={})
self.assertTrue(df1.equals(df2))
# Read just the column descriptions.
columns2 = self.butler.get(self.datasetType.componentTypeName("columns"), dataId={})
self.assertTrue(df1.columns.equals(columns2))
self.assertTrue(allColumns.equals(columns2))
# Read just some columns a few different ways.
df3 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["a", "c"]})
self.assertTrue(df1.loc[:, ["a", "c"]].equals(df3))
df4 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "a"})
self.assertTrue(df1.loc[:, ["a"]].equals(df4))
df5 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["index", "a"]})
self.assertTrue(df1.loc[:, ["a"]].equals(df5))
df6 = self.butler.get(self.datasetType, dataId={}, parameters={"columns": "ddd"})
self.assertTrue(df1.loc[:, ["ddd"]].equals(df6))
# Passing an unrecognized column should be a ValueError.
with self.assertRaises(ValueError):
self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d"]})
self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["e"]})

def testMultiIndexDataFrame(self):
columns1 = pd.MultiIndex.from_tuples(
[
("g", "a"),
("g", "b"),
("g", "c"),
("r", "a"),
("r", "b"),
("r", "c"),
],
names=["filter", "column"],
)
df1 = pd.DataFrame(np.random.randn(5, 6), index=np.arange(5, dtype=int), columns=columns1)
df1 = _makeMultiIndexDataFrame()

self.butler.put(df1, self.datasetType, dataId={})
# Read the whole DataFrame.
df2 = self.butler.get(self.datasetType, dataId={})
Expand All @@ -113,5 +163,56 @@ def testMultiIndexDataFrame(self):
self.butler.get(self.datasetType, dataId={}, parameters={"columns": ["d"]})


@unittest.skipUnless(pd is not None, "Cannot test parquet InMemoryDatastore without pandas.")
class InMemoryParquetFormatterTestCase(ParquetFormatterTestCase):
"""Tests for InMemoryDatastore, using DataFrameDelegate"""

configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")

def testMultiIndexDataFrame(self):
df1 = _makeMultiIndexDataFrame()

delegate = DataFrameDelegate("DataFrame")

# Read the whole DataFrame.
df2 = delegate.handleParameters(inMemoryDataset=df1)
self.assertTrue(df1.equals(df2))
# Read just the column descriptions.
columns2 = delegate.getComponent(composite=df1, componentName="columns")
self.assertTrue(df1.columns.equals(columns2))

# Read just some columns a few different ways.
with self.assertRaises(NotImplementedError) as cm:
delegate.handleParameters(inMemoryDataset=df1, parameters={"columns": {"filter": "g"}})
self.assertIn("only supports string column names", str(cm.exception))
with self.assertRaises(NotImplementedError) as cm:
delegate.handleParameters(
inMemoryDataset=df1, parameters={"columns": {"filter": ["r"], "column": "a"}}
)
self.assertIn("only supports string column names", str(cm.exception))

def testBadInput(self):
delegate = DataFrameDelegate("DataFrame")

with self.assertRaises(ValueError):
delegate.handleParameters(inMemoryDataset="not_a_dataframe")

def testStorageClass(self):
df1, allColumns = _makeSingleIndexDataFrame()

factory = StorageClassFactory()
factory.addFromConfig(StorageClassConfig())

storageClass = factory.findStorageClass(type(df1), compare_types=False)
# Force the name lookup to do name matching
storageClass._pytype = None
self.assertEqual(storageClass.name, "DataFrame")

storageClass = factory.findStorageClass(type(df1), compare_types=True)
# Force the name lookup to do name matching
storageClass._pytype = None
self.assertEqual(storageClass.name, "DataFrame")


if __name__ == "__main__":
unittest.main()

0 comments on commit 3ee7dd6

Please sign in to comment.