Skip to content

Commit

Permalink
Fixing issue #84, adding read_until_eof, more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenkov authored and tmylk committed Aug 8, 2016
1 parent 02550cb commit eba107f
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 14 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
* 1.3.5, 6th August 2015

- Move gzipstream module to smart_open package
- Ensure reader objects never return None
- Ensure read functions never return more bytes than asked for
- Add support for reading gzipped objects until EOF, e.g. read()
- Add missing parameter to read_from_buffer call (https://github.com/RaRe-Technologies/smart_open/issues/84)
- Add unit tests for gzipstream

* 1.3.4, 25th June 2016

- Bundle gzipstream to enable streaming of gzipped content from S3
Expand Down
37 changes: 29 additions & 8 deletions smart_open/gzipstreamfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import zlib


class _GzipStreamFile(object):
class GzipStreamFileInner(object):
def __init__(self, stream):
self.stream = stream
self.decoder = None
Expand All @@ -26,20 +26,42 @@ def restart_decoder(self):
self.unused_buffer += self.decoder.decompress(unused_raw)

def read_from_buffer(self, size):
"""Read at most size bytes from buffer."""
part = self.unused_buffer[:size]
self.unused_buffer = self.unused_buffer[size:]
return part

def read(self, size):
def read_until_eof(self):
#
# This method is here because boto.s3.Key.read() reads the entire
# file, which isn't expected behavior.
#
# https://github.com/boto/boto/issues/3311
#
while not self.finished:
while self.decoder and self.decoder.unused_data:
self.restart_decoder()

raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
if len(raw) > 0:
self.unused_buffer += self.decoder.decompress(raw)
else:
self.finished = True
return self.unused_buffer

def read(self, size=None):
if not size or size < 0:
return self.read_from_buffer(
len(self.unused_buffer)) + self.read_until_eof()

# Use unused data first
if len(self.unused_buffer) > size:
return self.read_from_buffer()
if len(self.unused_buffer) >= size:
return self.read_from_buffer(size)

# If the stream is finished and no unused raw data, return what we have
if self.stream.closed or self.finished:
self.finished = True
buf, self.unused_buffer = self.unused_buffer, b''
return buf
return self.read_from_buffer(size)

# Otherwise consume new data
while len(self.unused_buffer) < size:
Expand Down Expand Up @@ -80,7 +102,7 @@ def _checkReadable(self, msg=None):

class GzipStreamFile(io.BufferedReader):
def __init__(self, stream):
self._gzipstream = _GzipStreamFile(stream)
self._gzipstream = GzipStreamFileInner(stream)
super(GzipStreamFile, self).__init__(self._gzipstream)

def read(self, *args, **kwargs):
Expand All @@ -96,7 +118,6 @@ def read(self, *args, **kwargs):
#
if result is None:
result = ""
logger.debug("GzipStreamFile.read: result: %r", result)
return result
except ValueError:
return ''
Expand Down
13 changes: 7 additions & 6 deletions smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def is_gzip(name):
return name.endswith(".gz")


class _S3ReadStream(object):
class S3ReadStreamInner(object):

def __init__(self, stream):
self.stream = stream
Expand All @@ -294,23 +294,24 @@ def read_until_eof(self):
return buf

def read_from_buffer(self, size):
"""Remove at most size bytes from our buffer and return them."""
part = self.unused_buffer[:size]
self.unused_buffer = self.unused_buffer[size:]
return part

def read(self, size=None):
if not size or size < 0:
return self.unused_buffer + self.read_until_eof()
return self.read_from_buffer(
len(self.unused_buffer)) + self.read_until_eof()

# Use unused data first
if len(self.unused_buffer) > size:
if len(self.unused_buffer) >= size:
return self.read_from_buffer(size)

# If the stream is finished and no unused raw data, return what we have
if self.stream.closed or self.finished:
self.finished = True
buf, self.unused_buffer = self.unused_buffer, b''
return buf
return self.read_from_buffer(size)

# Consume new data in chunks and return it.
while len(self.unused_buffer) < size:
Expand Down Expand Up @@ -346,7 +347,7 @@ def _checkReadable(self, msg=None):
class S3ReadStream(io.BufferedReader):

def __init__(self, key):
self.stream = _S3ReadStream(key)
self.stream = S3ReadStreamInner(key)
super(S3ReadStream, self).__init__(self.stream)

def read(self, *args, **kwargs):
Expand Down
84 changes: 84 additions & 0 deletions smart_open/tests/test_gzipstreamfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
import unittest
import os.path as P
import hashlib
import logging
import io
import mock
import zlib

logger = logging.getLogger(__name__)

CURR_DIR = P.abspath(P.dirname(__file__))

Expand All @@ -26,5 +32,83 @@ def test_warc_md5sum(self):
'Failed with read size of {0}'.format(read_size)
f.close()


def mock_decompressobj(*args, **kwargs):
# Crafting the payload to reproduce a bug is tricky since both the
# compressed and decompressed streams must trigger a certain condition wrt
# to io.DEFAULT_BUFFER_SIZE. We use mock to reproduce the bug easier.
decoder = mock.Mock()
decoder.decompress = lambda x: x
decoder.unused_data = None
return decoder


class S3ReadStreamInnerTest(unittest.TestCase):

@mock.patch("zlib.decompressobj", mock_decompressobj)
def test_read_from_internal_buffer(self):
"""The reader should correctly return bytes in its unused buffer."""
stream = io.BytesIO(b"0" * io.DEFAULT_BUFFER_SIZE * 2)
reader = smart_open.gzipstreamfile.GzipStreamFileInner(stream)

ret = reader.read(io.DEFAULT_BUFFER_SIZE // 2)
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE // 2)

ret = reader.read(io.DEFAULT_BUFFER_SIZE // 4)
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE // 4)

ret = reader.read()
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE * 5 / 4)

@mock.patch("zlib.decompressobj", mock_decompressobj)
def test_read_from_closed_stream(self):
"""The reader should correctly handle reaching the end of the
stream."""
stream = io.BytesIO(b"0" * io.DEFAULT_BUFFER_SIZE)
reader = smart_open.gzipstreamfile.GzipStreamFileInner(stream)

ret = reader.read(io.DEFAULT_BUFFER_SIZE * 2)
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE)

ret = reader.read()
self.assertEquals(len(ret), 0)

@mock.patch("zlib.decompressobj", mock_decompressobj)
def test_buffer_flushed_after_eof(self):
"""The buffer should be empty after we've requested to read until
EOF."""
stream = io.BytesIO(b"0" * io.DEFAULT_BUFFER_SIZE * 2)
reader = smart_open.gzipstreamfile.GzipStreamFileInner(stream)
self.assertEquals(len(reader.read(io.DEFAULT_BUFFER_SIZE)),
io.DEFAULT_BUFFER_SIZE)
self.assertEquals(len(reader.read(io.DEFAULT_BUFFER_SIZE)),
io.DEFAULT_BUFFER_SIZE)
self.assertEquals(len(reader.unused_buffer), 0)
self.assertEquals(len(reader.read()), 0)

def test_read_until_eof(self):
"""The reader should correctly read until EOF."""
fpath = P.join(CURR_DIR, 'test_data/crlf_at_1k_boundary.warc.gz')
with open(fpath, "rb") as fin:
expected = zlib.decompress(fin.read(), 16 + zlib.MAX_WBITS)

#
# Test reading all at once.
#
with open(fpath, "rb") as fin:
reader = smart_open.gzipstreamfile.GzipStreamFileInner(fin)
actual = reader.read()
self.assertEquals(expected, actual)

#
# Test reading in smaller chunks.
#
with open(fpath, "rb") as fin:
reader = smart_open.gzipstreamfile.GzipStreamFileInner(fin)
actual = reader.read(io.DEFAULT_BUFFER_SIZE // 2)
actual += reader.read(io.DEFAULT_BUFFER_SIZE // 4)
actual += reader.read()
self.assertEquals(expected, actual)

if __name__ == '__main__':
unittest.main()
44 changes: 44 additions & 0 deletions smart_open/tests/test_smart_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import mock
from moto import mock_s3
import responses
import io

import smart_open
from smart_open import smart_open_lib

logger = logging.getLogger(__name__)


class ParseUriTest(unittest.TestCase):
"""
Test ParseUri class.
Expand Down Expand Up @@ -255,6 +259,46 @@ def test_s3_seek_moto(self):
self.assertEqual(content, smart_open_object.read(-1)) # same thing


class S3ReadStreamInnerTest(unittest.TestCase):

def test_never_return_more_than_asked_for(self):
"""read should never return more bytes than it is asked for."""
stream = io.BytesIO(b"0" * io.DEFAULT_BUFFER_SIZE * 2)
reader = smart_open.S3ReadStreamInner(stream)

logger.debug("reader: %r", reader)

logger.debug("performing partial read")
ret = reader.read(io.DEFAULT_BUFFER_SIZE // 2)
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE // 2)

#
# Currently, the reader's buffer is half-full wrt DEFAULT_BUFFER_SIZE.
# Force it to read more bytes from the stream, causing it to contain
# 1.5 * DEFAULT_BUFFER_SIZE bytes, and reaching EOF. The bug is that
# once it reaches EOF, it always returns the buffer contents, ignoring
# how many bytes it was originally asked for.
#
ret = reader.read(io.DEFAULT_BUFFER_SIZE)
self.assertEquals(len(ret), io.DEFAULT_BUFFER_SIZE)

#
# Consume the rest of the file.
#
self.assertEquals(len(reader.read()), io.DEFAULT_BUFFER_SIZE // 2)

def test_buffer_flushed_after_eof(self):
"""The buffer should be empty after we've requested to read until
EOF."""
stream = io.BytesIO(b"0" * io.DEFAULT_BUFFER_SIZE * 2)
reader = smart_open.S3ReadStreamInner(stream)
self.assertEquals(len(reader.read(io.DEFAULT_BUFFER_SIZE)),
io.DEFAULT_BUFFER_SIZE)
self.assertEquals(len(reader.read()), io.DEFAULT_BUFFER_SIZE)
self.assertEquals(len(reader.unused_buffer), 0)
self.assertEquals(len(reader.read()), 0)


class S3OpenReadTest(unittest.TestCase):

@mock_s3
Expand Down

0 comments on commit eba107f

Please sign in to comment.