Skip to content

Commit

Permalink
Re-write FileWrapper to implement an event model, wrap files opened v…
Browse files Browse the repository at this point in the history
…ia xopen
  • Loading branch information
Didion, John (NIH/NHGRI) [F] committed Nov 2, 2016
1 parent 1802b94 commit dcacd92
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 121 deletions.
78 changes: 40 additions & 38 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,40 +227,6 @@ def test_uncompress_file_compression(self):
with open(path, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_compress_on_close(self):
path = self.root.make_file()
fh = compress_on_close(path, compression='gz')
fh.write('foo')
gzfile = fh.close()
self.assertEqual(gzfile, path + '.gz')
self.assertTrue(os.path.exists(gzfile))
with gzip.open(gzfile, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_move_on_close(self):
path = self.root.make_file()
dest = self.root.make_file()
fh = move_on_close(path, dest)
fh.write('foo')
fh.close()
self.assertFalse(os.path.exists(path))
self.assertTrue(os.path.exists(dest))
with open(dest, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_remove_on_close(self):
path = self.root.make_file()
fh = remove_on_close(path)
fh.write('foo')
fh.close()
self.assertFalse(os.path.exists(path))

path = self.root.make_file()
fh = remove_on_close(open(path, 'wt'))
fh.write('foo')
fh.close()
self.assertFalse(os.path.exists(path))

def test_linecount(self):
path = self.root.make_file()
with open(path, 'wt') as o:
Expand All @@ -274,8 +240,8 @@ def test_linecount_empty(self):
path = self.root.make_empty_files(1)[0]
self.assertEqual(0, linecount(path))

def test_file_closer(self):
f = FileCloser()
def test_file_manager(self):
f = FileManager()
paths = self.root.make_empty_files(3)
for p in paths:
f.add(p, mode='wt')
Expand All @@ -293,9 +259,45 @@ def test_file_closer(self):
for key, fh in f.items():
self.assertTrue(fh.closed)

def test_file_closer_dup_files(self):
f = FileCloser()
def test_file_manager_dup_files(self):
f = FileManager()
path = self.root.make_file()
f.add(path)
with self.assertRaises(ValueError):
f.add(path)

def test_compress_on_close(self):
path = self.root.make_file()
compressor = CompressOnClose(compression='gz')
with FileWrapper(path, 'wt') as wrapper:
wrapper.register_listener('close', compressor)
wrapper.write('foo')
gzfile = path + '.gz'
self.assertEqual(gzfile, compressor.compressed_path)
self.assertTrue(os.path.exists(gzfile))
with gzip.open(gzfile, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_move_on_close(self):
path = self.root.make_file()
dest = self.root.make_file()
with FileWrapper(path, 'wt') as wrapper:
wrapper.register_listener('close', MoveOnClose(dest))
wrapper.write('foo')
self.assertFalse(os.path.exists(path))
self.assertTrue(os.path.exists(dest))
with open(dest, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_remove_on_close(self):
path = self.root.make_file()
with FileWrapper(path, 'wt') as wrapper:
wrapper.register_listener('close', RemoveOnClose())
wrapper.write('foo')
self.assertFalse(os.path.exists(path))

path = self.root.make_file()
with FileWrapper(open(path, 'wt')) as wrapper:
wrapper.register_listener('close', RemoveOnClose())
wrapper.write('foo')
self.assertFalse(os.path.exists(path))
122 changes: 105 additions & 17 deletions xphyle/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""The main xphyle methods -- open_ and xopen.
"""
from collections import defaultdict
from contextlib import contextmanager
import os
import sys
Expand Down Expand Up @@ -60,12 +61,12 @@ def open_(f, mode : 'str' = 'r', **kwargs):
A file-like object
Examples:
with open_('myfile') as infile:
print(next(infile))
with open_('myfile') as infile:
print(next(infile))
fh = open('myfile')
with open_(fh) as infile:
print(next(infile))
fh = open('myfile')
with open_(fh) as infile:
print(next(infile))
"""
if isinstance(f, str):
kwargs['context_wrapper'] = True
Expand All @@ -76,7 +77,7 @@ def open_(f, mode : 'str' = 'r', **kwargs):

def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
use_system : 'bool' = True, context_wrapper : 'bool' = False,
**kwargs):
**kwargs) -> 'file':
"""
Replacement for the `open` function that automatically handles
compressed files. If `use_system==True` and the file is compressed,
Expand Down Expand Up @@ -149,15 +150,8 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
fh = fh.buffer
fmt = get_compression_format(compression)
fh = fmt.open_file_python(fh, mode, **kwargs)
elif context_wrapper:
class StdWrapper(object):
def __init__(self, fh):
self.fh = fh
def __enter__(self):
return self.fh
def __exit__(self, exception_type, exception_value, traceback):
pass
fh = StdWrapper(fh)
if context_wrapper:
fh = StreamWrapper(fh)
return fh

if 'r' in mode:
Expand All @@ -177,6 +171,100 @@ def __exit__(self, exception_type, exception_value, traceback):

if compression:
fmt = get_compression_format(compression)
return fmt.open_file(path, mode, use_system=use_system, **kwargs)
fh = fmt.open_file(path, mode, use_system=use_system, **kwargs)
else:
fh = open(path, mode, **kwargs)

if context_wrapper:
fh = FileWrapper(fh)

return fh

# File wrapper

class FileWrapper(object):
"""Wrapper around a file object that adds two features:
1. An event system by which registered listeners can respond to file events.
Currently, 'close' is the only supported event.
2. Wraps a file iterator in a progress bar (if configured)
Args:
source: Path or file object
mode: File open mode
kwargs: Additional arguments to pass to xopen
"""
__slots__ = ['_file', '_path', '_listeners']

def __init__(self, source, mode='w', **kwargs):
if isinstance(source, str):
path = source
source = xopen(source, mode=mode, **kwargs)
else:
path = source.name
object.__setattr__(self, '_file', source)
object.__setattr__(self, '_path', path)
object.__setattr__(self, '_listeners', defaultdict(lambda: []))

def __getattr__(self, name):
return getattr(self._file, name)

def __iter__(self):
return iter(xphyle.progress.wrap(self._file))

def __enter__(self):
if self.closed:
raise ValueError("I/O operation on closed file.")
return self

def __exit__(self, exception_type, exception_value, traceback):
self.close()

def register_listener(self, event : 'str', listener):
"""Register an event listener.
Args:
event: Event name (currently, only 'close' is recognized)
listener: A listener object, which must be callable with a
single argument -- this file wrapper.
"""
self._listeners[event].append(listener)

def close(self):
self._file.close()
if 'close' in self._listeners:
for listener in self._listeners['close']:
listener(self)

class FileEventListener(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs

def __call__(self, file_wrapper):
self.execute(file_wrapper._path, *self.args, **self.kwargs)

class StreamWrapper(object):
"""Wrapper around a stream (such as stdout) that implements the
ContextManager operations and wraps iterators in a progress bar
(if configured).
Args:
stream: The stream to wrap
"""
__slots__ = ['_stream']

def __init__(self, stream):
object.__setattr__(self, '_stream', stream)

def __getattr__(self, name):
return getattr(self._stream, name)

def __iter__(self):
return iter(xphyle.progress.wrap(self._stream))

def __enter__(self):
return self

return open(path, mode, **kwargs)
def __exit__(self, exception_type, exception_value, traceback):
pass
6 changes: 3 additions & 3 deletions xphyle/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,18 +404,18 @@ def open_file(self, filename : 'str', mode : 'str',

return self.open_file_python(filename, mode, **kwargs)

def open_file_python(self, filename : 'str', mode : 'str', **kwargs):
def open_file_python(self, f, mode : 'str', **kwargs):
"""Open a file using the python library.
Args:
filename: The file to open
f: The file to open -- a path or open file object
mode: The file open mode
kwargs: Additional arguments to pass to the open method
Returns:
A file-like object
"""
return self.lib.open(filename, mode, **kwargs)
return self.lib.open(f, mode, **kwargs)

def compress_file(self, source, dest=None, keep : 'bool' = True,
compresslevel : 'int' = None,
Expand Down
85 changes: 22 additions & 63 deletions xphyle/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,72 +330,26 @@ def uncompress_file(compressed_file, dest_file=None,
# else:
# c.writestr(content, name)

# File wrappers that perform an action when the file is closed.
# Some useful FileEventListeners

class FileWrapper(object):
"""Base class for file wrappers.
"""
__slots__ = ['_file']

def __init__(self, source, mode='w', **kwargs):
if isinstance(source, str):
path = source
source = xopen(source, mode=mode, **kwargs)
else:
path = source.name
object.__setattr__(self, '_file', source)
object.__setattr__(self, '_path', path)

def __getattr__(self, name):
return getattr(self._file, name)
class CompressOnClose(FileEventListener):
"""Compress a file."""
def execute(self, path, *args, **kwargs):
self.compressed_path = compress_file(path, *args, **kwargs)

def compress_on_close(source, *args, **kwargs) -> 'FileWrapper':
"""Compress the file when it is closed.
Args:
args, kwargs: arguments passed through to ``compress_file``
Returns:
File-like object
"""
class FileCompressor(FileWrapper):
def close(self):
self._file.close()
return compress_file(self._path, *args, **kwargs)
return FileCompressor(source)
class MoveOnClose(FileEventListener):
"""Move a file."""
def execute(self, path, dest):
shutil.move(path, dest)

def move_on_close(source, dest : 'str') -> 'FileWrapper':
"""Move the file to a new location when it is closed.
Args:
source: Path or file object
dest: Destination path
Returns:
File-like object
"""
class FileMover(FileWrapper):
def close(self):
self._file.close()
shutil.move(self._path, dest)
return FileMover(source)
class RemoveOnClose(FileEventListener):
"""Remove a file."""
def execute(self, path):
os.remove(path)

def remove_on_close(source) -> 'FileWrapper':
"""Delete the file when it is closed.
Args:
source: Path or file object
Returns:
File-like object
"""
class FileDeleter(FileWrapper):
def close(self):
self._file.close()
os.remove(self._path)
return FileDeleter(source)
# Misc

class FileCloser(object):
class FileManager(object):
"""Dict-like container for files. Has a ``close`` method that closes
all open files.
"""
Expand Down Expand Up @@ -446,6 +400,8 @@ def add(self, f, key : 'str' = None, **kwargs):
return f

def items(self):
"""Returns a list of all (key, file) pairs.
"""
return self.files.items()

def close(self):
Expand All @@ -454,8 +410,6 @@ def close(self):
for fh in self.files.values():
fh.close()

# Misc

def linecount(f, linesep : 'str' = None, buf_size : 'int' = 1024 * 1024) -> 'int':
"""Fastest pythonic way to count the lines in a file.
Expand All @@ -482,4 +436,9 @@ def linecount(f, linesep : 'str' = None, buf_size : 'int' = 1024 * 1024) -> 'int
return lines

def is_iterable(x):
"""Returns True if ``x`` is a non-string Iterable.
Args:
x: The object to test
"""
return isinstance(x, Iterable) and not isinstance(x, str)

0 comments on commit dcacd92

Please sign in to comment.