Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40582: Add support for serialization/deserialization of Arrow schemas to Parquet. #887

Merged
merged 4 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-40582.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for serialization and deserialization of Arrow schemas via Parquet, and add support for translation of ``doc`` and ``units`` to/from arrow/astropy schemas.
1 change: 1 addition & 0 deletions python/lsst/daf/butler/configs/datastores/formatters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ ArrowAstropy: lsst.daf.butler.formatters.parquet.ParquetFormatter
ArrowNumpy: lsst.daf.butler.formatters.parquet.ParquetFormatter
ArrowNumpyDict: lsst.daf.butler.formatters.parquet.ParquetFormatter
ArrowTable: lsst.daf.butler.formatters.parquet.ParquetFormatter
ArrowSchema: lsst.daf.butler.formatters.parquet.ParquetFormatter
ExtendedPsf: lsst.obs.base.formatters.fitsGeneric.FitsGenericFormatter
HealSparseMap: lsst.pipe.tasks.healSparseMapping.HealSparseMapFormatter
ButlerLogRecords: lsst.daf.butler.formatters.logs.ButlerLogRecordsFormatter
Expand Down
57 changes: 44 additions & 13 deletions python/lsst/daf/butler/formatters/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def read(self, component: str | None = None) -> Any:
# Docstring inherited from Formatter.read.
schema = pq.read_schema(self.fileDescriptor.location.path)

if component in ("columns", "schema"):
schema_names = ["ArrowSchema", "DataFrameSchema", "ArrowAstropySchema", "ArrowNumpySchema"]

if component in ("columns", "schema") or self.fileDescriptor.readStorageClass.name in schema_names:
# The schema will be translated to column format
# depending on the input type.
return schema
Expand Down Expand Up @@ -141,6 +143,8 @@ def write(self, inMemoryDataset: Any) -> None:
import numpy as np
from astropy.table import Table as astropyTable

location = self.makeUpdatedLocation(self.fileDescriptor.location)

arrow_table = None
if isinstance(inMemoryDataset, pa.Table):
# This will be the most likely match.
Expand All @@ -156,6 +160,9 @@ def write(self, inMemoryDataset: Any) -> None:
raise ValueError(
"Input dict for inMemoryDataset does not appear to be a dict of numpy arrays."
) from e
elif isinstance(inMemoryDataset, pa.Schema):
pq.write_metadata(inMemoryDataset, location.path)
return
else:
if hasattr(inMemoryDataset, "to_parquet"):
# This may be a pandas DataFrame
Expand All @@ -175,8 +182,6 @@ def write(self, inMemoryDataset: Any) -> None:

row_group_size = compute_row_group_size(arrow_table.schema)

location = self.makeUpdatedLocation(self.fileDescriptor.location)

pq.write_table(arrow_table, location.path, row_group_size=row_group_size)


Expand Down Expand Up @@ -217,9 +222,7 @@ def arrow_to_astropy(arrow_table: pa.Table) -> atable.Table:

astropy_table = Table(arrow_to_numpy_dict(arrow_table))

metadata = arrow_table.schema.metadata if arrow_table.schema.metadata is not None else {}

_apply_astropy_metadata(astropy_table, metadata)
_apply_astropy_metadata(astropy_table, arrow_table.schema)

return astropy_table

Expand Down Expand Up @@ -450,7 +453,23 @@ def astropy_to_arrow(astropy_table: atable.Table) -> pa.Table:
meta_yaml_str = "\n".join(meta_yaml)
md[b"table_meta_yaml"] = meta_yaml_str

schema = pa.schema(type_list, metadata=md)
# Convert type list to fields with metadata.
fields = []
for name, pa_type in type_list:
field_metadata = {}
if description := astropy_table[name].description:
field_metadata["doc"] = description
if units := astropy_table[name].unit:
field_metadata["units"] = str(units)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the point where we decide whether we want this to be "units" or "unit". I may have put "units" in the drp_tasks PR that spawned this, but if astropy uses "unit" maybe we should, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting question. Right now, our afw Fields use doc/units, which is what I thought you were going for here. Astropy tables use description/unit, which I'm using as exact convertibles. And right here we're defining the precedent for what should go into an arrow schema. So I think that either we should go with the afw convention or the astropy convention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for astropy conventions, then.

fields.append(
pa.field(
name,
pa_type,
metadata=field_metadata,
)
)

schema = pa.schema(fields, metadata=md)

arrays = _numpy_style_arrays_to_arrow_arrays(
astropy_table.dtype,
Expand Down Expand Up @@ -729,9 +748,7 @@ def from_arrow(cls, schema: pa.Schema) -> ArrowAstropySchema:
data = np.zeros(0, dtype=dtype)
astropy_table = Table(data=data)

metadata = schema.metadata if schema.metadata is not None else {}

_apply_astropy_metadata(astropy_table, metadata)
_apply_astropy_metadata(astropy_table, schema)

return cls(astropy_table)

Expand Down Expand Up @@ -971,18 +988,21 @@ def _standardize_multi_index_columns(
return names


def _apply_astropy_metadata(astropy_table: atable.Table, metadata: dict) -> None:
def _apply_astropy_metadata(astropy_table: atable.Table, arrow_schema: pa.Schema) -> None:
"""Apply any astropy metadata from the schema metadata.

Parameters
----------
astropy_table : `astropy.table.Table`
Table to apply metadata.
metadata : `dict` [`bytes`]
Metadata dict.
arrow_schema : `pyarrow.Schema`
Arrow schema with metadata.
"""
from astropy.table import meta

metadata = arrow_schema.metadata if arrow_schema.metadata is not None else {}

# Check if we have a special astropy metadata header yaml.
meta_yaml = metadata.get(b"table_meta_yaml", None)
if meta_yaml:
meta_yaml = meta_yaml.decode("UTF8").split("\n")
Expand All @@ -998,6 +1018,17 @@ def _apply_astropy_metadata(astropy_table: atable.Table, metadata: dict) -> None

if "meta" in meta_hdr:
astropy_table.meta.update(meta_hdr["meta"])
else:
# If we don't have astropy header data, we may have arrow field
# metadata.
for name in arrow_schema.names:
field_metadata = arrow_schema.field(name).metadata
if field_metadata is None:
continue
if b"doc" in field_metadata and (doc := field_metadata[b"doc"].decode("UTF-8")) != "":
astropy_table[name].description = doc
if b"units" in field_metadata and (units := field_metadata[b"units"].decode("UTF-8")) != "":
astropy_table[name].unit = units


def _arrow_string_to_numpy_dtype(
Expand Down
178 changes: 178 additions & 0 deletions tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@
# Add a couple of units.
table = atable.Table(data)
table["a"].unit = units.degree
table["a"].description = "Description of column a"
table["b"].unit = units.meter
table["b"].description = "Description of column b"

# Add some masked columns.
if include_masked:
Expand Down Expand Up @@ -1503,6 +1505,17 @@
)
self.assertEqual(schema2, schema)

# Check the schema conversions and units.
arrow_schema = schema.to_arrow_schema()
for name in arrow_schema.names:
field_metadata = arrow_schema.field(name).metadata
if b"doc" in field_metadata and (doc := field_metadata[b"doc"].decode("UTF-8")) != "":
self.assertEqual(schema2.schema[name].description, doc)
else:
self.assertIsNone(schema2.schema[name].description)
if b"units" in field_metadata and (units_str := field_metadata[b"units"].decode("UTF-8")) != "":
self.assertEqual(schema2.schema[name].unit, units.Unit(units_str))

@unittest.skipUnless(np is not None, "Cannot test reading as numpy without numpy.")
def testWriteArrowTableReadAsNumpyTable(self):
tab1 = _makeSimpleNumpyTable(include_multidim=True)
Expand Down Expand Up @@ -1761,6 +1774,171 @@
pass


@unittest.skipUnless(pa is not None, "Cannot test ArrowSchema without pyarrow.")
class ParquetFormatterArrowSchemaTestCase(unittest.TestCase):
"""Tests for ParquetFormatter, ArrowSchema, using local file datastore."""

configFile = os.path.join(TESTDIR, "config/basic/butler.yaml")

def setUp(self):
"""Create a new butler root for each test."""
self.root = makeTestTempDir(TESTDIR)
config = Config(self.configFile)
self.butler = Butler(Butler.makeRepo(self.root, config=config), writeable=True, run="test_run")
# No dimensions in dataset type so we don't have to worry about
# inserting dimension data or defining data IDs.
self.datasetType = DatasetType(
"data", dimensions=(), storageClass="ArrowSchema", universe=self.butler.dimensions
)
self.butler.registry.registerDatasetType(self.datasetType)

def tearDown(self):
removeTestTempDir(self.root)

def _makeTestSchema(self):
schema = pa.schema(
[
pa.field(
"int32",
pa.int32(),
nullable=False,
metadata={
"doc": "32-bit integer",
"units": "",
},
),
pa.field(
"int64",
pa.int64(),
nullable=False,
metadata={
"doc": "64-bit integer",
"units": "",
},
),
pa.field(
"uint64",
pa.uint64(),
nullable=False,
metadata={
"doc": "64-bit unsigned integer",
"units": "",
},
),
pa.field(
"float32",
pa.float32(),
nullable=False,
metadata={
"doc": "32-bit float",
"units": "count",
},
),
pa.field(
"float64",
pa.float64(),
nullable=False,
metadata={
"doc": "64-bit float",
"units": "nJy",
},
),
pa.field(
"fixed_size_list",
pa.list_(pa.float64(), list_size=10),
nullable=False,
metadata={
"doc": "Fixed size list of 64-bit floats.",
"units": "nJy",
},
),
pa.field(
"variable_size_list",
pa.list_(pa.float64()),
nullable=False,
metadata={
"doc": "Variable size list of 64-bit floats.",
"units": "nJy",
},
),
pa.field(
"string",
pa.string(),
nullable=False,
metadata={
"doc": "String",
"units": "",
},
),
pa.field(
"binary",
pa.binary(),
nullable=False,
metadata={
"doc": "Binary",
"units": "",
},
),
]
)

return schema

def testArrowSchema(self):
schema1 = self._makeTestSchema()
self.butler.put(schema1, self.datasetType, dataId={})

schema2 = self.butler.get(self.datasetType, dataId={})
self.assertEqual(schema2, schema1)

@unittest.skipUnless(pd is not None, "Cannot test reading as a dataframe schema without pandas.")
def testWriteArrowSchemaReadAsDataFrameSchema(self):
schema1 = self._makeTestSchema()
self.butler.put(schema1, self.datasetType, dataId={})

df_schema1 = DataFrameSchema.from_arrow(schema1)

df_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="DataFrameSchema")
self.assertEqual(df_schema2, df_schema1)

@unittest.skipUnless(atable is not None, "Cannot test reading as an astropy schema without astropy.")
def testWriteArrowSchemaReadAsArrowAstropySchema(self):
schema1 = self._makeTestSchema()
self.butler.put(schema1, self.datasetType, dataId={})

ap_schema1 = ArrowAstropySchema.from_arrow(schema1)

ap_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowAstropySchema")
self.assertEqual(ap_schema2, ap_schema1)

# Confirm that the ap_schema2 has the units/description we expect.
for name in schema1.names:
field_metadata = schema1.field(name).metadata
if b"doc" in field_metadata and (doc := field_metadata[b"doc"].decode("UTF-8")) != "":
self.assertEqual(ap_schema2.schema[name].description, doc)
else:
self.assertIsNone(ap_schema2.schema[name].description)

Check warning on line 1920 in tests/test_parquet.py

View check run for this annotation

Codecov / codecov/patch

tests/test_parquet.py#L1920

Added line #L1920 was not covered by tests
if b"units" in field_metadata and (units_str := field_metadata[b"units"].decode("UTF-8")) != "":
self.assertEqual(ap_schema2.schema[name].unit, units.Unit(units_str))

@unittest.skipUnless(atable is not None, "Cannot test reading as an numpy schema without numpy.")
def testWriteArrowSchemaReadAsArrowNumpySchema(self):
schema1 = self._makeTestSchema()
self.butler.put(schema1, self.datasetType, dataId={})

np_schema1 = ArrowNumpySchema.from_arrow(schema1)

np_schema2 = self.butler.get(self.datasetType, dataId={}, storageClass="ArrowNumpySchema")
self.assertEqual(np_schema2, np_schema1)


@unittest.skipUnless(pa is not None, "Cannot test InMemoryArrowSchemaDelegate without pyarrow.")
class InMemoryArrowSchemaDelegateTestCase(ParquetFormatterArrowSchemaTestCase):
"""Tests for InMemoryDatastore and ArrowSchema."""

configFile = os.path.join(TESTDIR, "config/basic/butler-inmemory.yaml")


@unittest.skipUnless(np is not None, "Cannot test compute_row_group_size without numpy.")
@unittest.skipUnless(pa is not None, "Cannot test compute_row_group_size without pyarrow.")
class ComputeRowGroupSizeTestCase(unittest.TestCase):
Expand Down