diff --git a/snappy.py b/snappy.py index bf22536..94a905a 100644 --- a/snappy.py +++ b/snappy.py @@ -258,6 +258,87 @@ def copy(self): return copy +class HadoopStreamDecompressor(object): + """This class implements the decompressor-side of the hadoop framing + format. + + Hadoop fraiming format consists of one or more blocks, each of which is + composed of one or more compressed subblocks. The block size is the + uncompressed size, while the subblock size is the size of the compressed + data. + """ + + __slots__ = ["_buf", "_block_size", "_block_read", "_subblock_size"] + + def __init__(self): + self._buf = b"" + self._block_size = None + self._block_read = 0 + self._subblock_size = None + + def decompress(self, data): + self._buf += data + output = b"" + while True: + # decompress block will attempt to decompress any subblocks if it + # has already read the block size and subblock size. + buf = self._decompress_block() + if len(buf) > 0: + output += buf + else: + break + return output + + def _decompress_block(self): + if self._block_size is None: + if len(self._buf) <= 4: + return b"" + self._block_size = struct.unpack(">i", self._buf[:4])[0] + self._buf = self._buf[4:] + output = b"" + while self._block_read < self._block_size: + buf = self._decompress_subblock() + if len(buf) > 0: + output += buf + else: + # Buffer doesn't contain full subblock + break + if self._block_read == self._block_size: + # We finished reading this block, so reinitialize. + self._block_read = 0 + self._block_size = None + return output + + def _decompress_subblock(self): + if self._subblock_size is None: + if len(self._buf) <= 4: + return b"" + self._subblock_size = struct.unpack(">i", self._buf[:4])[0] + self._buf = self._buf[4:] + # Only attempt to decompress complete subblocks. + if len(self._buf) < self._subblock_size: + return b"" + compressed = self._buf[:self._subblock_size] + self._buf = self._buf[self._subblock_size:] + uncompressed = uncompress(compressed) + self._block_read += len(uncompressed) + self._subblock_size = None + return uncompressed + + def flush(self): + if self._buf != b"": + raise UncompressError("chunk truncated") + return b"" + + def copy(self): + copy = HadoopStreamDecompressor() + copy._buf = self._buf + copy._block_size = self._block_size + copy._block_read = self._block_read + copy._subblock_size = self._subblock_size + return copy + + def stream_compress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE): """Takes an incoming file-like object and an outgoing file-like object, reads data from src, compresses it, and writes it to dst. 'src' should @@ -289,16 +370,28 @@ def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE): decompressor.flush() # makes sure the stream ended well +def hadoop_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE): + decompressor = HadoopStreamDecompressor() + while True: + buf = src.read(blocksize) + if not buf: break + buf = decompressor.decompress(buf) + if buf: dst.write(buf) + decompressor.flush() # makes sure the stream ended well + + + def cmdline_main(): """This method is what is run when invoking snappy via the commandline. Try python -m snappy --help """ import sys if (len(sys.argv) < 2 or len(sys.argv) > 4 or "--help" in sys.argv or - "-h" in sys.argv or sys.argv[1] not in ("-c", "-d")): + "-h" in sys.argv or sys.argv[1] not in ("-c", "-d", "-q")): print("Usage: python -m snappy <-c/-d> [src [dst]]") print(" -c compress") print(" -d decompress") + print(" -q hadoop decompress") print("output is stdout if dst is omitted or '-'") print("input is stdin if src and dst are omitted or src is '-'.") sys.exit(1) @@ -319,8 +412,10 @@ def cmdline_main(): if sys.argv[1] == "-c": method = stream_compress - else: + elif sys.argv[1] == '-d': method = stream_decompress + else: + method = hadoop_decompress method(src, dst)