Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-13876: Implement and register ParquetStorage as a butler storage type #90

Merged
merged 3 commits into from
May 1, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 67 additions & 2 deletions python/lsst/daf/persistence/posixStorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,12 @@ def read(self, butlerLocation):
raise(RuntimeError("No formatter for location:{}".format(butlerLocation)))

def butlerLocationExists(self, location):
"""Implementaion of PosixStorage.exists for ButlerLocation objects."""
"""Implementation of PosixStorage.exists for ButlerLocation objects.
"""
storageName = location.getStorageName()
if storageName not in ('BoostStorage', 'FitsStorage', 'PafStorage',
'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage'):
'PickleStorage', 'ConfigStorage', 'FitsCatalogStorage',
'ParquetStorage'):
self.log.warn("butlerLocationExists for non-supported storage %s" % location)
return False
for locationString in location.getLocations():
Expand Down Expand Up @@ -610,6 +612,68 @@ def writeFitsStorage(butlerLocation, obj):
persistence.persist(obj, storageList, butlerLocation.getAdditionalData())


def readParquetStorage(butlerLocation):
"""Read from a butlerLocation.

The object returned by this is expected to be a subtype
of `ParquetTable`, which is a thin wrapper to `pyarrow.ParquetFile`
that allows for lazy loading of the data.

Parameters
----------
butlerLocation : ButlerLocation
The location & formatting for the object(s) to be read.

Returns
-------
A list of objects as described by the butler location. One item for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original purpose of the list of locations was to allow a single dataset to be assembled from multiple physical serializations. If you're not actually expecting/using the list, I would just use the initial entry (and perhaps even warn if there is more than one).

each location in butlerLocation.getLocations()
"""
results = []
additionalData = butlerLocation.getAdditionalData()

for locationString in butlerLocation.getLocations():
locStringWithRoot = os.path.join(butlerLocation.getStorage().root, locationString)
logLoc = LogicalLocation(locStringWithRoot, additionalData)
if not os.path.exists(logLoc.locString()):
raise RuntimeError("No such parquet file: " + logLoc.locString())

pythonType = butlerLocation.getPythonType()
if pythonType is not None:
if isinstance(pythonType, basestring):
pythonType = doImport(pythonType)

filename = logLoc.locString()

# pythonType will be ParquetTable (or perhaps MultilevelParquetTable)
# filename should be the first kwarg, but being explicit here.
results.append(pythonType(filename=filename))

if len(results) > 1:
Log.getLogger("daf.persistence.butler").warning('Not using multiple locations!')

return results[0]


def writeParquetStorage(butlerLocation, obj):
"""Writes pandas dataframe to parquet file

Parameters
----------
butlerLocation : ButlerLocation
The location & formatting for the object(s) to be read.
obj : `lsst.qa.explorer.parquetTable.ParquetTable`
Wrapped DataFrame to write.

"""
additionalData = butlerLocation.getAdditionalData()
locations = butlerLocation.getLocations()
with SafeFilename(os.path.join(butlerLocation.getStorage().root, locations[0])) as locationString:
logLoc = LogicalLocation(locationString, additionalData)
filename = logLoc.locString()
obj.write(filename)


def readPickleStorage(butlerLocation):
"""Read from a butlerLocation.

Expand Down Expand Up @@ -796,6 +860,7 @@ def writeBoostStorage(butlerLocation, obj):


PosixStorage.registerFormatters("FitsStorage", readFitsStorage, writeFitsStorage)
PosixStorage.registerFormatters("ParquetStorage", readParquetStorage, writeParquetStorage)
PosixStorage.registerFormatters("ConfigStorage", readConfigStorage, writeConfigStorage)
PosixStorage.registerFormatters("PickleStorage", readPickleStorage, writePickleStorage)
PosixStorage.registerFormatters("FitsCatalogStorage", readFitsCatalogStorage, writeFitsCatalogStorage)
Expand Down