Skip to content

Commit

Permalink
Improve S3 read performance. Fix #152 (#157)
Browse files Browse the repository at this point in the history
* Resolve #152: perform a .get only when necessary

The previous implementation performed a .get every time when reading to
the buffer.  This was wasteful and unnecessary.  A single .get is
sufficient to read from the current position until the end of the file.
Another .get is only necessary when seeking.

* decrease default buffer size to 128kB

This matches the behavior of 1.5.3 more closely, as observed during live
tests against S3.
  • Loading branch information
mpenkov authored and menshikh-iv committed Dec 6, 2017
1 parent 27557bc commit fbc82cc
Showing 1 changed file with 29 additions and 15 deletions.
44 changes: 29 additions & 15 deletions smart_open/s3.py
Expand Up @@ -30,7 +30,7 @@
"""Allowed I/O modes for working with S3."""

BINARY_NEWLINE = b'\n'
DEFAULT_BUFFER_SIZE = 256 * 1024
DEFAULT_BUFFER_SIZE = 128 * 1024


def _range_string(start, stop=None):
Expand Down Expand Up @@ -87,26 +87,39 @@ def read(self, size=-1):


class SeekableRawReader(object):
"""Read an S3 object.
"""Read an S3 object."""

Support seeking around, but is slower than RawReader."""
def __init__(self, s3_object):
self.position = 0
self._object = s3_object
self._content_length = self._object.content_length
self.seek(0)

def seek(self, position):
"""Seek to the specified position (byte offset) in the S3 key.
:param int position: The byte offset from the beginning of the key.
"""
self._position = position
range_string = _range_string(self._position)
logger.debug('content_length: %r range_string: %r', self._content_length, range_string)
if position == self._content_length == 0 or position == self._content_length:
#
# When reading, we can't seek to the first byte of an empty file.
# Similarly, we can't seek past the last byte. Do nothing here.
#
self._body = io.BytesIO()
else:
self._body = self._object.get(Range=range_string)['Body']

def read(self, size=-1):
if self.position == self._content_length:
if self._position >= self._content_length:
return b''
if size <= 0:
end = None
if size == -1:
binary = self._body.read()
else:
end = min(self._content_length, self.position + size)
range_string = _range_string(self.position, stop=end)
logger.debug('range_string: %r', range_string)
body = self._object.get(Range=range_string)['Body'].read()
self.position += len(body)
return body
binary = self._body.read(size)
self._position += len(binary)
return binary


class BufferedInputBase(io.BufferedIOBase):
Expand Down Expand Up @@ -290,9 +303,10 @@ def seek(self, offset, whence=START):
else:
new_position = self._content_length + offset
new_position = _clamp(new_position, 0, self._content_length)
self._current_pos = new_position
self._raw_reader.seek(new_position)
logger.debug('new_position: %r', self._current_pos)

logger.debug('new_position: %r', new_position)
self._current_pos = self._raw_reader.position = new_position
self._buffer = b""
self._eof = self._current_pos == self._content_length
return self._current_pos
Expand Down

0 comments on commit fbc82cc

Please sign in to comment.