Skip to content

Commit

Permalink
Add support for writing out dataframes
Browse files Browse the repository at this point in the history
  • Loading branch information
kboone committed May 14, 2019
1 parent 21fe659 commit 5056601
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 8 deletions.
56 changes: 48 additions & 8 deletions avocado/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sklearn.model_selection import StratifiedKFold

from .astronomical_object import AstronomicalObject
from .utils import logger, AvocadoException
from .utils import logger, AvocadoException, write_dataframes
from .settings import settings

class Dataset():
Expand Down Expand Up @@ -46,13 +46,31 @@ def __init__(self, name, metadata, observations=None, objects=None):
for object_id, object_observations in \
observations.groupby('object_id'):
meta_index = self.metadata.index.get_loc(object_id)

# Make sure that every object_id only appears once in the
# metadata. OTherwise we have a corrupted dataset that we can't
# handle.
if type(meta_index) != int:
raise AvocadoException(
"Error: found multiple metadata entries for "
"object_id=%s! Can't handle." % object_id
)

object_metadata = meta_dicts[meta_index]
object_metadata['object_id'] = object_id
new_object = AstronomicalObject(object_metadata,
object_observations)

self.objects[meta_index] = new_object

@property
def path(self):
"""Return the path to where this dataset should lie on disk"""
data_directory = settings['data_directory']
data_path = os.path.join(data_directory, self.name + '.h5')

return data_path

@classmethod
def load(cls, name, metadata_only=False, chunk=None, num_chunks=None):
"""Load a dataset that has been saved in HDF5 format in the data
Expand Down Expand Up @@ -83,7 +101,6 @@ def load(cls, name, metadata_only=False, chunk=None, num_chunks=None):
The loaded dataset.
"""
data_directory = settings['data_directory']

data_path = os.path.join(data_directory, name + '.h5')

if not os.path.exists(data_path):
Expand Down Expand Up @@ -146,10 +163,6 @@ def load(cls, name, metadata_only=False, chunk=None, num_chunks=None):
# Create a Dataset object
dataset = cls(name, metadata, observations)

# Label folds if we have a full dataset with fold information
if chunk is None and 'category' in dataset.metadata:
dataset.label_folds()

return dataset

@classmethod
Expand Down Expand Up @@ -185,9 +198,10 @@ def label_folds(self):
The number of folds is set by the `num_folds` settings parameter.
This needs to happen before augmentation to avoid leakage, so augmented
datasets and similar datasets should already have the folds set.
If the dataset is an augmented dataset, we ensure that the
augmentations of the same object stay in the same fold.
"""
print("TODO: KEEP AUGMENTS IN SAME FOLD!")
if 'category' not in self.metadata:
logger.warn("Dataset %s does not have labeled categories! Can't "
"separate into folds." % self.name)
Expand Down Expand Up @@ -322,3 +336,29 @@ def update_idx_range(*args):
interact(self.plot_light_curve, index=idx_widget,
category=category_widget, show_gp=True, uncertainties=True,
verbose=False, subtract_background=True)

def write(self, **kwargs):
"""Write the dataset out to disk.
The dataset will be stored in the data directory using the dataset's
name.
Parameters
----------
kwargs : kwargs
Additional arguments to be passed to `utils.write_dataframes`
"""
# Pull out the observations from every object
observations = []
for obj in self.objects:
object_observations = obj.observations
object_observations['object_id'] = obj.metadata['object_id']
observations.append(object_observations)
observations = pd.concat(observations, ignore_index=True, sort=False)

write_dataframes(
self.path,
[self.metadata, observations],
['metadata', 'observations'],
**kwargs
)
74 changes: 74 additions & 0 deletions avocado/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,84 @@
import logging
import os

from . import settings


# Logging
logger = logging.getLogger('avocado')


# Exceptions
class AvocadoException(Exception):
pass


def write_dataframes(path, dataframes, keys, overwrite=False, append=False,
timeout=5):
"""Write a set of dataframes out to an HDF5 file
The append functionality is designed so that multiple independent processes
running simultaneously can append to the same file. Each process will lock
the output file while it is writing, and other processes will repeatedly
try to get the lock until they succeed. With this implementation, if the
file is locked by other means, the processes will hang endlessly until the
lock is released.
Parameters
----------
path : str
The output file path
dataframes : list
A list of pandas DataFrame objects to write out.
keys : list
A list of keys to use in the HDF5 file for each DataFrame.
overwrite : bool
If there is an existing file at the given path, it will be deleted if
overwrite is True. Otherwise an exception will be raised.
append : bool
If True, the dataframes will be appended to the file if a file exists
at the given path.
timeout : int
After failing to write to a file in append mode, wait this amount of
time in seconds before retrying the write (to allow other processes to
finish).
"""
from tables.exceptions import HDF5ExtError
import time

if os.path.exists(path):
if overwrite:
logger.warning("Overwriting %s..." % path)
os.remove(path)
elif append:
# We are appending to the file, so it is fine to have a file there
# already.
pass
else:
raise AvocadoException(
"Dataset %s already exists! Can't write." % path
)

for dataframe, key in zip(dataframes, keys):
while True:
# When appending, we repeatedly try to write to the file so that
# many processes can write to the same file at the same time.
try:
dataframe.to_hdf(path, key, mode='a', append=append,
format='table', data_columns=['object_id'])
except HDF5ExtError:
# Failed to write the file, try again if we are in append mode
# (otherwise this shouldn't happen).
if not append:
raise

timeout = 5

logger.warning(
"Error writing to HDF5 file %s... another process is "
"probably using it. Retrying in %d seconds."
% (path, timeout)
)
time.sleep(timeout)
else:
break

0 comments on commit 5056601

Please sign in to comment.