Skip to content

Commit

Permalink
More fixes to writing chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
kboone committed May 17, 2019
1 parent d886820 commit bc5bc63
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
43 changes: 19 additions & 24 deletions avocado/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,23 +201,23 @@ def read_dataframes(path, keys, chunk=None, num_chunks=None,
return dataframes


def write_dataframe(path, dataframe, key, overwrite=False, append=False,
def write_dataframe(path, dataframe, key, overwrite=False, append=None,
timeout=5, chunk=None, num_chunks=None,
chunk_column='object_id', index_chunk_column=True):
"""Write a dataframe out to an HDF5 file
The append functionality is designed so that multiple independent processes
running simultaneously can append to the same file. Each process will lock
the output file while it is writing, and other processes will repeatedly
try to get the lock until they succeed. With this implementation, if the
file is locked by other means, the processes will hang endlessly until the
lock is released.
The append functionality is designed so that multiple independent
processes, potentially running simultaneously, can append to the same file.
Each process will lock the output file while it is writing, and other
processes will repeatedly try to get the lock until they succeed. With this
implementation, if the file is locked by other means, the processes will
hang endlessly until the lock is released.
Typically, the append functionality is used when the writing process loaded
an input file with multiple chunks. If "chunk" and "num_chunks" are passed,
then this writer keeps track of which chunks have been processed and which
haven't. `read_dataframes` will then be able to tell if it is reading a
complete file or not.
Typically, the append functionality is used when the writing process is
operating on an input file with multiple chunks. If "chunk" and
"num_chunks" are passed, then this writer keeps track of which chunks have
been processed and which haven't. `read_dataframes` will then be able to
tell if it is reading a complete file or not.
Parameters
----------
Expand All @@ -233,7 +233,7 @@ def write_dataframe(path, dataframe, key, overwrite=False, append=False,
not supported if using the append functionality.
append : bool
If True, the dataframe will be appended to the file if a file exists at
the given path.
the given path. defualt: True if chunk is set, False otherwise.
timeout : int
After failing to write to a file in append mode, wait this amount of
time in seconds before retrying the write (to allow other processes to
Expand All @@ -256,6 +256,12 @@ def write_dataframe(path, dataframe, key, overwrite=False, append=False,
from tables.exceptions import HDF5ExtError
import time

if append is None:
if chunk:
append = True
else:
append = False

# Make the containing directory if it doesn't exist yet.
directory = os.path.dirname(path)
os.makedirs(directory, exist_ok=True)
Expand All @@ -275,7 +281,6 @@ def write_dataframe(path, dataframe, key, overwrite=False, append=False,
)

# Get a lock on the HDF5 file.
num_fails = 0
while True:
try:
store = pd.HDFStore(path, 'a')
Expand All @@ -292,16 +297,6 @@ def write_dataframe(path, dataframe, key, overwrite=False, append=False,
% (path, timeout)
)

num_fails += 1

if num_fails % 40 == 0:
logger.warning(
"Failed %d times to get lock for HDF5 file %s... will "
"keep trying (there are probably a lot of processes "
"trying to write to it)."
% (num_fails, path)
)

time.sleep(timeout)
else:
# Got the lock successfully.
Expand Down
9 changes: 2 additions & 7 deletions scripts/avocado_augment
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ if __name__ == "__main__":

# Load the reference dataset
print("Loading reference dataset...")
dataset = avocado.Dataset.load(
dataset = avocado.load(
args.reference_dataset,
chunk=args.chunk,
num_chunks=args.num_chunks,
Expand All @@ -65,11 +65,6 @@ if __name__ == "__main__":

# Save the augmented dataset
print("Saving the augmented dataset...")
if args.chunk is None:
augmented_dataset.write()
else:
# If we are parsing chunks, append the results of each chunk to the
# same files.
augmented_dataset.write(append=True)
augmented_dataset.write()

print("Done!")

0 comments on commit bc5bc63

Please sign in to comment.