Skip to content

Commit

Permalink
Merge branch 'validate-format'
Browse files Browse the repository at this point in the history
  • Loading branch information
Didion, John (NIH/NHGRI) [F] committed Nov 7, 2016
2 parents 2bd41ec + 3bec901 commit 59fd450
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 45 deletions.
6 changes: 3 additions & 3 deletions tests/testformats.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ def read_file(fmt, path, ext, use_system, mode='rt'):

class CompressionTests(TestCase):
def test_guess_format(self):
self.assertEqual('gz', guess_compression_format('gz'))
self.assertEqual('gz', guess_compression_format('.gz'))
self.assertEqual('gz', guess_compression_format('foo.gz'))
self.assertEqual('gzip', guess_compression_format('gz'))
self.assertEqual('gzip', guess_compression_format('.gz'))
self.assertEqual('gzip', guess_compression_format('foo.gz'))

def test_invalid_format(self):
self.assertIsNone(guess_compression_format('foo'))
Expand Down
2 changes: 1 addition & 1 deletion tests/testprogress.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_iter_stream(self):
progress = MockProgress()
xphyle.configure(progress)
with intercept_stdin('foo\nbar\nbaz'):
with xopen(STDIN, 'rt', context_wrapper=True) as o:
with xopen(STDIN, 'rt', context_wrapper=True, compression=False) as o:
lines = list(o)
self.assertListEqual(['foo\n','bar\n','baz\n'], lines)
self.assertEquals(3, progress.count)
23 changes: 15 additions & 8 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_guess_format(self):
path = self.root.make_file(suffix='.gz')
with gzip.open(path, 'wt') as o:
o.write('foo')
self.assertEqual(guess_file_format(path), 'gz')
self.assertEqual(guess_file_format(path), 'gzip')
path = self.root.make_file()
with gzip.open(path, 'wt') as o:
o.write('foo')
Expand Down Expand Up @@ -84,34 +84,34 @@ def test_xopen_invalid(self):
def test_xopen_std(self):
# Try stdin
with intercept_stdin('foo\n'):
with xopen(STDIN, 'r', context_wrapper=True) as i:
with xopen(STDIN, 'r', context_wrapper=True, compression=False) as i:
content = i.read()
self.assertEqual(content, 'foo\n')
# Try stdout
i = StringIO()
with intercept_stdout(i):
with xopen(STDOUT, 'w', context_wrapper=True) as o:
with xopen(STDOUT, 'w', context_wrapper=True, compression=False) as o:
o.write('foo')
self.assertEqual(i.getvalue(), 'foo')
# Try stderr
i = StringIO()
with intercept_stderr(i):
with xopen(STDERR, 'w', context_wrapper=True) as o:
with xopen(STDERR, 'w', context_wrapper=True, compression=False) as o:
o.write('foo')
self.assertEqual(i.getvalue(), 'foo')

# Try binary
i = BytesIO()
with intercept_stdout(TextIOWrapper(i)):
with xopen(STDOUT, 'wb', context_wrapper=True) as o:
with xopen(STDOUT, 'wb', context_wrapper=True, compression=False) as o:
o.write(b'foo')
self.assertEqual(i.getvalue(), b'foo')

# Try compressed
i = BytesIO()
with intercept_stdout(TextIOWrapper(i)):
with xopen(STDOUT, 'wt', compression='gz') as o:
self.assertEqual(o.compression, 'gz')
self.assertEqual(o.compression, 'gzip')
o.write('foo')
self.assertEqual(gzip.decompress(i.getvalue()), b'foo')

Expand All @@ -127,15 +127,22 @@ def test_xopen_file(self):
xopen('foobar', 'r')
path = self.root.make_file(suffix='.gz')
with xopen(path, 'w', compression=True) as o:
self.assertEqual(o.compression, 'gz')
self.assertEqual(o.compression, 'gzip')
o.write('foo')
with gzip.open(path, 'rt') as i:
self.assertEqual(i.read(), 'foo')
with self.assertRaises(ValueError):
with xopen(path, 'rt', compression='bz2', validate=True):
pass

@skipIf(no_internet(), "No internet connection")
def test_xopen_url(self):
badurl = 'http://blorf.blurp'
with self.assertRaises(ValueError):
xopen(badurl)
url = 'https://github.com/jdidion/xphyle/blob/master/tests/foo.gz?raw=True'
with self.assertRaises(ValueError):
xopen(url, 'w')
with open_(url, 'rt') as i:
self.assertEqual(i.read(), 'foo\n')
self.assertEqual('gzip', i.compression)
self.assertEqual('foo\n', i.read())
File renamed without changes.
81 changes: 50 additions & 31 deletions xphyle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def open_(f, mode : 'str' = 'r', errors : 'bool' = True, **kwargs):

def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
use_system : 'bool' = True, context_wrapper : 'bool' = True,
**kwargs) -> 'file':
validate : 'bool' = True, **kwargs) -> 'file':
"""
Replacement for the `open` function that automatically handles
compressed files. If `use_system==True` and the file is compressed,
Expand All @@ -117,22 +117,24 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
Args:
path: A relative or absolute path. Must be a string. If
you have a situation you want to automatically handle either
a path or a file object, use the ``open_`` wrapper instead.
you have a situation you want to automatically handle either
a path or a file object, use the ``open_`` wrapper instead.
mode: Some combination of the open mode ('r', 'w', 'a', or 'x')
and the format ('b' or 't'). If the later is not given, 't'
is used by default.
and the format ('b' or 't'). If the later is not given, 't'
is used by default.
compression: If None or True, compression type (if any) will be
determined automatically. If False, no attempt will be made to
determine compression type. Otherwise this must specify the
compression type (e.g. 'gz'). See `xphyle.compression` for
details. Note that compression will *not* be guessed for
'-' (stdin).
determined automatically. If False, no attempt will be made to
determine compression type. Otherwise this must specify the
compression type (e.g. 'gz'). See `xphyle.compression` for
details. Note that compression will *not* be guessed for
'-' (stdin).
use_system: Whether to attempt to use system-level compression
programs.
programs.
context_wrapper: If True and ``path`` == '-' or '_', returns
a ContextManager (i.e. usable with ``with``) that wraps the
system stream and is no-op on close.
a ContextManager (i.e. usable with ``with``) that wraps the
system stream and is no-op on close.
validate: Whether to validate that a file is acutally of the format
specified by ``compression``.
kwargs: Additional keyword arguments to pass to ``open``.
Returns:
Expand All @@ -143,6 +145,8 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
* ``compression==True`` and compression format cannot be
determined
* the specified compression format is invalid
* ``validate==True`` and the specified compression format is not the
acutal format of the file
* the path or mode are invalid
"""
if not isinstance(path, str):
Expand All @@ -161,6 +165,10 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,

# The file handle we will open
fh = None
# Whether to try and guess file format
guess_format = compression in (None, True)
# Whether to validate that the actually compression format matches expected
validate = validate and compression and not guess_format
# Guessed compression type, if compression in (None, True)
guess = None
# Whether the file object is a stream (e.g. stdout or URL)
Expand All @@ -176,12 +184,12 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
fh = sys.stderr
else:
fh = sys.stdin if 'r' in mode else sys.stdout
if compression:
if compression is not False:
fh = fh.buffer
if compression in (None, True) and 'r' in mode:
if not hasattr(fh, 'peek'):
fh = io.BufferedReader(fh)
guess = guess_format_from_buffer(fh)
if 'r' in mode and (validate or guess_format):
if not hasattr(fh, 'peek'):
fh = io.BufferedReader(fh)
guess = guess_format_from_buffer(fh)
if not (compression or guess):
is_stream = True
if 'b' in mode:
Expand All @@ -195,41 +203,52 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
raise ValueError("URLs can only be opened in read mode")

fh = open_url(path)
if not fh: # pragma: no cover
if not fh:
raise ValueError("Could not open URL {}".format(path))

is_stream = True
name = get_url_file_name(fh, url_parts)
use_system = False

# Get compression format if not specified
if compression in (None, True):
# Check if the MIME type indicates that the file is compressed
mime = get_url_mime_type(fh)
if mime:
guess = get_format_for_mime_type(mime)
# Try to guess from the file name
if not guess and name:
guess = guess_file_format(name)
if validate or guess_format:
guess = guess_format_from_buffer(fh)
# The following code is never used, unless there is some
# scenario in which the file type cannot be guessed from
# the header bytes.
# if guess is None and guess_format:
# # Check if the MIME type indicates that the file is
# # compressed
# mime = get_url_mime_type(fh)
# if mime:
# guess = get_format_for_mime_type(mime)
# # Try to guess from the file name
# if not guess and name:
# guess = guess_file_format(name)

# Local file handling
else:
if 'r' in mode:
path = check_readable_file(path)
if validate or guess_format:
guess = guess_format_from_file_header(path)
else:
path = check_writeable_file(path)

if compression in (None, True):
guess = guess_file_format(path)
if guess_format:
guess = guess_compression_format(path)

if guess:
if validate and guess != compression:
raise ValueError("Acutal compression format {} does not match expected "
"format {}".format(guess, compression))
elif guess:
compression = guess
elif compression is True:
raise ValueError(
"Could not guess compression format from {}".format(path))

if compression:
fmt = get_compression_format(compression)
compression = fmt.name
fh = fmt.open_file(fh or path, mode, use_system=use_system, **kwargs)
elif not fh:
fh = open(path, mode, **kwargs)
Expand Down
4 changes: 2 additions & 2 deletions xphyle/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ def guess_compression_format(name : 'str') -> 'str':
"""Guess the compression format by name or file extension.
"""
if name in compression_formats:
return name
return compression_formats[name].name
i = name.rfind(os.extsep)
if i >= 0:
ext = name[(i+1):]
if ext in compression_formats:
return ext
return compression_formats[ext].name
return None

def guess_format_from_file_header(path : 'str') -> 'str':
Expand Down

0 comments on commit 59fd450

Please sign in to comment.