Skip to content

Commit

Permalink
Merge branch 'multifile-io'
Browse files Browse the repository at this point in the history
  • Loading branch information
jdidion committed Nov 11, 2016
2 parents 6d81943 + d9c50b5 commit b214ba4
Show file tree
Hide file tree
Showing 5 changed files with 622 additions and 47 deletions.
1 change: 1 addition & 0 deletions tests/testprogress.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __call__(self, itr, desc, size):
class ProgressTests(TestCase):
def setUp(self):
self.root = TempDir()
xphyle.configure(False, False)

def tearDown(self):
self.root.close()
Expand Down
220 changes: 204 additions & 16 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
from . import *
from collections import OrderedDict
import gzip
import bz2
import os
import xphyle
from xphyle.paths import TempDir
from xphyle.utils import *

class UtilsTests(TestCase):
def setUp(self):
self.root = TempDir()
self.system_args = sys.argv
xphyle.configure(False, False)

def tearDown(self):
self.root.close()
Expand Down Expand Up @@ -235,6 +239,16 @@ def test_uncompress_file_compression(self):
with open(path, 'rt') as i:
self.assertEqual(i.read(), 'foo')

def test_transcode(self):
path = self.root.make_file()
gzfile = path + '.gz'
with gzip.open(gzfile, 'wt') as o:
o.write('foo')
bzfile = path + '.bz2'
transcode_file(gzfile, bzfile)
with bz2.open(bzfile, 'rt') as i:
self.assertEqual('foo', i.read())

def test_linecount(self):
self.assertEqual(-1, linecount('foobar', errors=False))
path = self.root.make_file()
Expand All @@ -254,22 +268,34 @@ def test_linecount_empty(self):
self.assertEqual(0, linecount(path))

def test_file_manager(self):
f = FileManager()
paths = self.root.make_empty_files(3)
for p in paths:
f.add(p, mode='wt')
self.assertTrue(p in f)
self.assertFalse(f[p].closed)
path2 = open(self.root.make_file(), 'wt')
f.add(path2)
path3 = self.root.make_file()
f['path3'] = path3
self.assertEqual(len(f), 5)
for key, fh in f.items():
self.assertFalse(fh.closed)
f.close()
self.assertEqual(len(f), 5)
for key, fh in f.items():
paths12 = dict(
path1=self.root.make_empty_files(1)[0],
path2=self.root.make_empty_files(1)[0])
with FileManager(paths12, mode='wt') as f:
paths34 = self.root.make_empty_files(2)
for p in paths34:
f.add(p, mode='wt')
self.assertTrue(p in f)
self.assertFalse(f[p].closed)
path5 = self.root.make_file()
path5_fh = open(path5, 'wt')
f.add(path5_fh)
path6 = self.root.make_file()
f['path6'] = path6
self.assertEqual(path6, f.get_path('path6'))
all_paths = list(paths12.values()) + paths34 + [path5, path6]
self.assertListEqual(all_paths, f.paths)
self.assertEqual(len(f), 6)
for key, fh in f.iter_files():
self.assertFalse(fh.closed)
self.assertIsNotNone(f['path2'])
self.assertIsNotNone(f.get('path2'))
self.assertEqual(f['path6'], f.get(5))
with self.assertRaises(KeyError):
f['foo']
self.assertIsNone(f.get('foo'))
self.assertEqual(len(f), 6)
for key, fh in f.iter_files():
self.assertTrue(fh.closed)

def test_file_manager_dup_files(self):
Expand Down Expand Up @@ -314,3 +340,165 @@ def test_remove_on_close(self):
wrapper.register_listener('close', RemoveOnClose())
wrapper.write('foo')
self.assertFalse(os.path.exists(path))

def test_file_input(self):
file1 = self.root.make_file(suffix='.gz')
with gzip.open(file1, 'wt') as o:
o.write('foo\nbar\n')
with self.assertRaises(ValueError):
fileinput(file1, mode='z')
with fileinput(file1) as i:
lines = list(i)
self.assertListEqual(['foo\n','bar\n'], lines)
file2 = self.root.make_file(suffix='.gz')
with gzip.open(file2, 'wt') as o:
o.write('baz\n')
with fileinput((file1, file2)) as i:
lines = list(i)
self.assertListEqual(['foo\n','bar\n', 'baz\n'], lines)
with fileinput([('key1',file1), ('key2', file2)]) as i:
self.assertEqual(i.filekey, None)
self.assertEqual(i.filename, None)
self.assertEqual(i.lineno, 0)
self.assertEqual(i.filelineno, 0)

self.assertEqual(next(i), 'foo\n')
self.assertEqual(i.filekey, 'key1')
self.assertEqual(i.filename, file1)
self.assertEqual(i.lineno, 1)
self.assertEqual(i.filelineno, 1)

self.assertEqual(next(i), 'bar\n')
self.assertEqual(i.filekey, 'key1')
self.assertEqual(i.filename, file1)
self.assertEqual(i.lineno, 2)
self.assertEqual(i.filelineno, 2)

self.assertEqual(next(i), 'baz\n')
self.assertEqual(i.filekey, 'key2')
self.assertEqual(i.filename, file2)
self.assertEqual(i.lineno, 3)
self.assertEqual(i.filelineno, 1)

def test_pending(self):
file1 = self.root.make_file(suffix='.gz')
with gzip.open(file1, 'wt') as o:
o.write('foo\nbar\n')
f = FileInput()
self.assertTrue(f._pending)
f.add(file1)
l = list(f)
self.assertTrue(f.finished)
self.assertFalse(f._pending)
file2 = self.root.make_file(suffix='.gz')
with gzip.open(file2, 'wt') as o:
o.write('baz\n')
f.add(file2)
self.assertTrue(f._pending)
self.assertFalse(f.finished)
self.assertEqual('baz\n', f.readline())
self.assertEqual('', f.readline())
with self.assertRaises(StopIteration):
next(f)
self.assertTrue(f.finished)
self.assertFalse(f._pending)

def test_file_input_stdin(self):
path = self.root.make_file()
with open(path, 'wt') as o:
o.write('foo\nbar\n')
sys.argv = [self.system_args[0], path]
self.assertEqual(
['foo\n','bar\n'],
list(fileinput()))
sys.argv = []
with intercept_stdin(b'foo\nbar\n', is_bytes=True) as i:
self.assertEqual(
[b'foo\n',b'bar\n'],
list(fileinput()))

def test_tee_file_output(self):
file1 = self.root.make_file(suffix='.gz')
file2 = self.root.make_file()
with self.assertRaises(ValueError):
fileoutput(file1, mode='z')
with fileoutput((file1,file2), file_output_type=TeeFileOutput) as o:
o.writelines(('foo','bar','baz'))
with gzip.open(file1, 'rt') as i:
self.assertEqual('foo\nbar\nbaz\n', i.read())
with open(file2, 'rt') as i:
self.assertEqual('foo\nbar\nbaz\n', i.read())

def test_tee_file_output_binary(self):
file1 = self.root.make_file(suffix='.gz')
file2 = self.root.make_file()
with fileoutput((file1,file2), mode='b',
file_output_type=TeeFileOutput) as o:
o.writelines((b'foo',b'bar',b'baz'))
with gzip.open(file1, 'rb') as i:
self.assertEqual(b'foo\nbar\nbaz\n', i.read())
with open(file2, 'rb') as i:
self.assertEqual(b'foo\nbar\nbaz\n', i.read())

with fileoutput((file1,file2), mode='t',
file_output_type=TeeFileOutput) as o:
o.writelines((b'foo',b'bar',b'baz'))
with gzip.open(file1, 'rt') as i:
self.assertEqual('foo\nbar\nbaz\n', i.read())
with open(file2, 'rt') as i:
self.assertEqual('foo\nbar\nbaz\n', i.read())

with fileoutput((file1,file2), mode='b',
file_output_type=TeeFileOutput) as o:
o.writelines(('foo',b'bar',b'baz'))
with gzip.open(file1, 'rb') as i:
self.assertEqual(b'foo\nbar\nbaz\n', i.read())
with open(file2, 'rb') as i:
self.assertEqual(b'foo\nbar\nbaz\n', i.read())

def test_tee_file_output_no_newline(self):
file1 = self.root.make_file(suffix='.gz')
file2 = self.root.make_file()
with fileoutput((file1,file2), file_output_type=TeeFileOutput) as o:
o.writeline('foo', False)
o.writeline(newline=True)
o.writeline('bar', True)
self.assertEqual(3, o.num_lines)
with gzip.open(file1, 'rb') as i:
self.assertEqual(b'foo\nbar\n', i.read())
with open(file2, 'rb') as i:
self.assertEqual(b'foo\nbar\n', i.read())

def test_file_output_stdout(self):
path = self.root.make_file()
sys.argv = [self.system_args, path]
with fileoutput() as o:
o.writelines(('foo','bar','baz'))
with open(path, 'rt') as i:
self.assertEqual('foo\nbar\nbaz\n', i.read())
sys.argv = []
outbuf = BytesIO()
with intercept_stdout(TextIOWrapper(outbuf)):
with fileoutput(mode='b') as o:
o.writelines((b'foo',b'bar',b'baz'))
self.assertEqual(b'foo\nbar\nbaz\n', outbuf.getvalue())

def test_cycle_file_output(self):
file1 = self.root.make_file(suffix='.gz')
file2 = self.root.make_file()
with fileoutput((file1,file2), file_output_type=CycleFileOutput) as o:
o.writelines(('foo','bar','baz'))
with gzip.open(file1, 'rt') as i:
self.assertEqual('foo\nbaz\n', i.read())
with open(file2, 'rt') as i:
self.assertEqual('bar\n', i.read())

def test_ncycle_file_output(self):
file1 = self.root.make_file(suffix='.gz')
file2 = self.root.make_file()
with fileoutput((file1,file2), n=2, file_output_type=NCycleFileOutput) as o:
o.writelines(('foo','bar','baz','blorf','bing'))
with gzip.open(file1, 'rt') as i:
self.assertEqual('foo\nbar\nbing\n', i.read())
with open(file2, 'rt') as i:
self.assertEqual('baz\nblorf\n', i.read())
5 changes: 4 additions & 1 deletion xphyle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,20 @@ def xopen(path : 'str', mode : 'str' = 'r', compression : 'bool|str' = None,
fh = sys.stderr
else:
fh = sys.stdin if 'r' in mode else sys.stdout
wrapped = True
if compression is not False:
fh = fh.buffer
wrapped = False
if 'r' in mode and (validate or guess_format):
if not hasattr(fh, 'peek'):
fh = io.BufferedReader(fh)
wrapped = True
guess = guess_format_from_buffer(fh)
else:
validate = False
if not (compression or guess):
is_stream = True
if 'b' in mode:
if 'b' in mode and wrapped:
fh = fh.buffer

else:
Expand Down
20 changes: 17 additions & 3 deletions xphyle/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def seekable(self): return False
def flush(self): pass

def close(self):
self.close = True
self.closed = True
retcode = self.process.poll()
if retcode is None:
# still running
Expand Down Expand Up @@ -286,7 +286,10 @@ def guess_format_from_header_bytes(b : 'bytes') -> 'str':
return None

def get_format_for_mime_type(mime_type : 'str') -> 'str':
pass
"""Returns the file format associated with a MIME type, or None if no
format is associated with the mime type.
"""
return mime_types.get(mime_type, None)

class CompressionFormat(FileFormat):
"""Base class for classes that provide access to system-level and
Expand Down Expand Up @@ -476,6 +479,10 @@ def compress_file(self, source, dest=None, keep : 'bool' = True,
source_path = source
else:
source_path = source.name
try: # pragma: no cover
source.fileno()
except:
use_system = False

in_place = False
if dest is None:
Expand Down Expand Up @@ -556,7 +563,14 @@ def uncompress_file(self, source, dest=None, keep : 'bool' = True,
dest_is_path = isinstance(dest, str)
if dest_is_path:
check_writeable_file(dest)
dest_file = open(dest, 'wb') if dest_is_path else dest
if dest_is_path:
dest_file = open(dest, 'wb')
else:
dest_file = dest
try: # pragma: no cover
dest_file.fileno()
except:
use_system = False

try:
if use_system and self.can_use_system_uncompression:
Expand Down

0 comments on commit b214ba4

Please sign in to comment.