Skip to content

Commit

Permalink
Added xphyle.utils.uncompressed_size()
Browse files Browse the repository at this point in the history
  • Loading branch information
jdidion committed Nov 19, 2017
1 parent c54a921 commit 465f5e2
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ xphyle.egg-info/
docs/_build/
.vscode*
.mypy_cache/
.idea/
6 changes: 4 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
v3.1.2 (dev)
-------------
v3.1.2 (2017.11.18)
-------------------

* Added `xphyle.utils.uncompressed_size()`.

v3.1.1 (2017.10.13)
-------------------
Expand Down
13 changes: 12 additions & 1 deletion tests/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_gzip(self):
self.assertEqual(
gz.get_command('d', 'foo.gz'),
[gz_path, '-d', '-c', 'foo.gz'])

@skipIf(no_pigz, "'pigz' not available")
def test_pigz(self):
THREADS.update(2)
Expand Down Expand Up @@ -454,3 +454,14 @@ def test_compress_iterable(self):
decompressed = fmt.decompress_string(compressed)
self.assertListEqual(strings, decompressed.split('|'))

class UncompressedSizeTests(TestCase):
@skipIf(gz_path is None, "'gzip' not available")
def test_get_uncompressed_size(self):
for ext in ('.gz', '.xz'):
with self.subTest(ext=ext):
with TempDir() as temp:
raw = temp.make_file(contents=random_text(1000))
compressed = temp.make_file(suffix=ext)
fmt = get_format(ext)
fmt.compress_file(raw, compressed)
self.assertEquals(1000, fmt.uncompressed_size(compressed))
10 changes: 9 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,15 @@ def test_transcode(self):
transcode_file(gzfile, bzfile)
with bz2.open(bzfile, 'rt') as i:
self.assertEqual('foo', i.read())


def test_uncompressed_size(self):
for ext in ('.gz', '.xz'):
with self.subTest(ext):
raw = self.root.make_file(contents=random_text(1000))
compressed = self.root.make_file(suffix=ext)
compress_file(raw, compressed)
self.assertEquals(1000, uncompressed_size(compressed))

def test_exec_process(self):
inp = self.root.make_file(suffix='.gz')
with gzip.open(inp, 'wt') as o:
Expand Down
81 changes: 74 additions & 7 deletions xphyle/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from importlib import import_module
import io
import os
from subprocess import Popen, PIPE, CalledProcessError
import re
from subprocess import Popen, PIPE, CalledProcessError, check_output

from xphyle.paths import (
STDIN, EXECUTABLE_CACHE, check_readable_file, check_writable_file,
Expand Down Expand Up @@ -284,7 +285,7 @@ def decompress_path(self) -> PathLike:
"""The path of the decompression program.
"""
raise NotImplementedError()

@property
def compress_name(self) -> str:
"""The name of the compression program.
Expand Down Expand Up @@ -351,7 +352,7 @@ def can_use_system_decompression(self) -> bool:
resolves to an existing, executable file.
"""
return self.decompress_path is not None

def compress(self, raw_bytes: bytes, **kwargs) -> bytes:
"""Compress bytes.
Expand Down Expand Up @@ -464,7 +465,7 @@ def handle_command_return(
cpe = CalledProcessError(returncode, " ".join(cmd))
cpe.stderr = stderr
raise IOError from cpe

def open_file(
self, path: str, mode: ModeArg, use_system: bool = True,
**kwargs) -> FileLike:
Expand Down Expand Up @@ -703,6 +704,48 @@ def decompress_file(

return dest_name

def get_list_command(self, path: str) -> List[str]:
"""Get the command to list contents of a compressed file.
Args:
path: Path to the compressed file.
Returns:
List of command arguments, or None if the uncompressed size
cannot be determined (without actually decompressing the file).
"""
return None

def parse_file_listing(self, listing: str) -> Tuple[int, int, float]:
"""Parse the result of the list command.
Args:
listing: The output of executing the list command.
Returns:
A tuple (<compressed size in bytes>, <uncompressed size in bytes>,
<compression ratio>).
"""
raise NotImplementedError()

def uncompressed_size(self, path: str) -> Union[int, None]:
"""Get the uncompressed size of a compressed file.
Args:
path: Path to the compressed file.
Returns:
The uncompressed size of the file in bytes, or None if the
uncompressed size cannot be determined (without actually
decompressing the file).
"""
list_command = self.get_list_command(path)
if list_command is None:
return None
listing = check_output(list_command, universal_newlines=True)
_, uncompressed, _ = self.parse_file_listing(listing)
return uncompressed

class SingleExeCompressionFormat(CompressionFormat): # pylint: disable=abstract-method
"""Base class form ``CompressionFormat``s that use the same executable for
compressing and decompressing.
Expand Down Expand Up @@ -766,7 +809,6 @@ def open_file_python(
compressed_file = io.BufferedWriter(compressed_file)
return compressed_file


class Gzip(GzipBase):
"""Implementation of CompressionFormat for gzip files.
"""
Expand Down Expand Up @@ -827,7 +869,7 @@ def get_command(
if src != STDIN:
cmd.append(src)
return cmd

def handle_command_return(
self, returncode: int, cmd: List[str], stderr: bytes = None
) -> None:
Expand All @@ -838,6 +880,19 @@ def handle_command_return(
returncode = 1
super().handle_command_return(returncode, cmd, stderr)

def get_list_command(self, path: str) -> List[str]:
return [str(self.executable_path), '-l', path]

def parse_file_listing(self, listing: str) -> Tuple[int, int, float]:
parsed = re.split(' +', listing.splitlines(keepends=False)[1])
if parsed[0] == '':
parsed = parsed[1:]
if self.executable_name != 'pigz':
parsed = parsed[5:8]
ratio = float(parsed[2][:-1]) / 100
return (int(parsed[0]), int(parsed[1]), ratio)


class BGzip(GzipBase):
"""bgzip is block gzip. bgzip files are compatible with gzip. Typically,
this format is only used when specifically requested, or when a bgzip
Expand Down Expand Up @@ -1012,7 +1067,19 @@ def get_command(
if src != STDIN:
cmd.append(src)
return cmd


def get_list_command(self, path: str) -> List[str]:
return [str(self.executable_path), '-lv', path]

def parse_file_listing(self, listing: str) -> Tuple[int, int, float]:
parsed = listing.splitlines(keepends=False)
print(parsed)
compressed, uncompressed = (
int(re.match('.+?(\d+) B\)?', size).group(1))
for size in parsed[3:5])
ratio = float(parsed[5][22:])
return (compressed, uncompressed, ratio)

def compress(self, raw_bytes, **kwargs) -> bytes:
kwargs = dict(
(k, kwargs[k])
Expand Down
23 changes: 22 additions & 1 deletion xphyle/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def decompress_file(
compressed_file: PathOrFile, dest_file: PathOrFile = None,
compression: CompressionArg = None, keep: bool = True,
use_system: bool = True, **kwargs) -> PathLike:
"""decompress an existing file, either in-place or to a separate file.
"""Decompress an existing file, either in-place or to a separate file.
Args:
compressed_file: Path or file-like object to decompress.
Expand Down Expand Up @@ -436,6 +436,27 @@ def transcode_file(
for chunk in iter_file_chunked(src):
dst.write(chunk)

def uncompressed_size(
path: PathLike, compression: CompressionArg = None) -> Tuple[int, None]:
"""Get the uncompressed size of the compressed file.
Args:
path: The path to the compressed file.
compression: None or True, to guess compression format from the file
name, or the name of any supported compression format.
Returns:
The uncompressed size of the file in bytes, or None if the uncompressed
size could not be determined (without actually decompressing the file).
Raises:
ValueError if the compression format is not supported.
"""
if not isinstance(compression, str):
compression = FORMATS.guess_compression_format(path)
fmt = FORMATS.get_compression_format(compression)
return fmt.uncompressed_size(path)

# EventListeners

class CompressOnClose(EventListener[FileWrapper]):
Expand Down

0 comments on commit 465f5e2

Please sign in to comment.