Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop uses a different framing format #35

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 97 additions & 2 deletions snappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,87 @@ def copy(self):
return copy


class HadoopStreamDecompressor(object):
"""This class implements the decompressor-side of the hadoop framing
format.

Hadoop fraiming format consists of one or more blocks, each of which is
composed of one or more compressed subblocks. The block size is the
uncompressed size, while the subblock size is the size of the compressed
data.
"""

__slots__ = ["_buf", "_block_size", "_block_read", "_subblock_size"]

def __init__(self):
self._buf = b""
self._block_size = None
self._block_read = 0
self._subblock_size = None

def decompress(self, data):
self._buf += data
output = b""
while True:
# decompress block will attempt to decompress any subblocks if it
# has already read the block size and subblock size.
buf = self._decompress_block()
if len(buf) > 0:
output += buf
else:
break
return output

def _decompress_block(self):
if self._block_size is None:
if len(self._buf) <= 4:
return b""
self._block_size = struct.unpack(">i", self._buf[:4])[0]
self._buf = self._buf[4:]
output = b""
while self._block_read < self._block_size:
buf = self._decompress_subblock()
if len(buf) > 0:
output += buf
else:
# Buffer doesn't contain full subblock
break
if self._block_read == self._block_size:
# We finished reading this block, so reinitialize.
self._block_read = 0
self._block_size = None
return output

def _decompress_subblock(self):
if self._subblock_size is None:
if len(self._buf) <= 4:
return b""
self._subblock_size = struct.unpack(">i", self._buf[:4])[0]
self._buf = self._buf[4:]
# Only attempt to decompress complete subblocks.
if len(self._buf) < self._subblock_size:
return b""
compressed = self._buf[:self._subblock_size]
self._buf = self._buf[self._subblock_size:]
uncompressed = uncompress(compressed)
self._block_read += len(uncompressed)
self._subblock_size = None
return uncompressed

def flush(self):
if self._buf != b"":
raise UncompressError("chunk truncated")
return b""

def copy(self):
copy = HadoopStreamDecompressor()
copy._buf = self._buf
copy._block_size = self._block_size
copy._block_read = self._block_read
copy._subblock_size = self._subblock_size
return copy


def stream_compress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
"""Takes an incoming file-like object and an outgoing file-like object,
reads data from src, compresses it, and writes it to dst. 'src' should
Expand Down Expand Up @@ -289,16 +370,28 @@ def stream_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
decompressor.flush() # makes sure the stream ended well


def hadoop_decompress(src, dst, blocksize=_STREAM_TO_STREAM_BLOCK_SIZE):
decompressor = HadoopStreamDecompressor()
while True:
buf = src.read(blocksize)
if not buf: break
buf = decompressor.decompress(buf)
if buf: dst.write(buf)
decompressor.flush() # makes sure the stream ended well



def cmdline_main():
"""This method is what is run when invoking snappy via the commandline.
Try python -m snappy --help
"""
import sys
if (len(sys.argv) < 2 or len(sys.argv) > 4 or "--help" in sys.argv or
"-h" in sys.argv or sys.argv[1] not in ("-c", "-d")):
"-h" in sys.argv or sys.argv[1] not in ("-c", "-d", "-q")):
print("Usage: python -m snappy <-c/-d> [src [dst]]")
print(" -c compress")
print(" -d decompress")
print(" -q hadoop decompress")
print("output is stdout if dst is omitted or '-'")
print("input is stdin if src and dst are omitted or src is '-'.")
sys.exit(1)
Expand All @@ -319,8 +412,10 @@ def cmdline_main():

if sys.argv[1] == "-c":
method = stream_compress
else:
elif sys.argv[1] == '-d':
method = stream_decompress
else:
method = hadoop_decompress

method(src, dst)

Expand Down