Skip to content

Commit

Permalink
Resolve issues #12 (gzipped S3) and #13 (readline) (#76)
Browse files Browse the repository at this point in the history
- Bundle gzipstream to enable streaming of gzipped content from S3
- Update gzipstream to avoid deep recursion
- Implement readline for S3
- Add pip requirements.txt
  • Loading branch information
mpenkov authored and tmylk committed Jun 27, 2016
1 parent ecf6eb6 commit 78c461e
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 59 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ docs/_build/

# PyBuilder
target/

# vim
*.swp
*.swo
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
* 1.3.4, 25th June 2016

- Bundle gzipstream to enable streaming of gzipped content from S3
- Update gzipstream to avoid deep recursion
- Implemented readline for S3
- Added pip requirements.txt

* 1.3.3, 16th May 2016

- Accept an instance of boto.s3.key.Key to smart_open (PR #38, @asieira)
Expand Down
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ There are a few optional keyword arguments that are useful only for S3 access.
>>> smart_open.smart_open('s3://', host='s3.amazonaws.com')
>>> smart_open.smart_open('s3://', profile_name='my-profile')
These are both passed to `boto.s3_connect()` as keyword arguments.
The S3 reader supports gzipped content, as long as the key is obviously a gzipped file (e.g. ends with ".gz").

Why?
----
Expand All @@ -102,7 +102,8 @@ There are nasty hidden gotchas when using ``boto``'s multipart upload functional
Installation
------------

The module has no dependencies beyond Python >= 2.6 (or Python >= 3.3) and ``boto``::
The module has no dependencies beyond Python >= 2.6 (or Python >= 3.3),
``boto`` and ``requests``::

pip install smart_open

Expand Down
1 change: 1 addition & 0 deletions gzipstream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .gzipstreamfile import GzipStreamFile
100 changes: 100 additions & 0 deletions gzipstream/gzipstreamfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Adapted from Stephen Merity's gzipstream
# (https://github.com/Smerity/gzipstream)
#
# Used under the MIT license
# (https://github.com/Smerity/gzipstream/blob/master/LICENSE)
#
import io
import zlib


class _GzipStreamFile(object):
def __init__(self, stream):
self.stream = stream
self.decoder = None
self.restart_decoder()
###
self.unused_buffer = b''
self.closed = False
self.finished = False

def restart_decoder(self):
unused_raw = self.decoder.unused_data if self.decoder else None
self.decoder = zlib.decompressobj(16 + zlib.MAX_WBITS)
if unused_raw:
self.unused_buffer += self.decoder.decompress(unused_raw)

def read_from_buffer(self, size):
part = self.unused_buffer[:size]
self.unused_buffer = self.unused_buffer[size:]
return part

def read(self, size):
# Use unused data first
if len(self.unused_buffer) > size:
return self.read_from_buffer()

# If the stream is finished and no unused raw data, return what we have
if self.stream.closed or self.finished:
self.finished = True
buf, self.unused_buffer = self.unused_buffer, b''
return buf

# Otherwise consume new data
while len(self.unused_buffer) < size:
# TODO: Update this to use unconsumed_tail and a StringIO buffer
# http://docs.python.org/2/library/zlib.html#zlib.Decompress.unconsumed_tail
# Check if we need to start a new decoder
while self.decoder and self.decoder.unused_data:
self.restart_decoder()

raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
if len(raw):
self.unused_buffer += self.decoder.decompress(raw)
else:
self.finished = True
break

return self.read_from_buffer(size)

def readinto(self, b):
# Read up to len(b) bytes into bytearray b
# Sadly not as efficient as lower level
data = self.read(len(b))
if not data:
return None
b[:len(data)] = data
return len(data)

def readable(self):
# io.BufferedReader needs us to appear readable
return True

def _checkReadable(self, msg=None):
# This is required to satisfy io.BufferedReader on Python 2.6.
# Another way to achieve this is to inherit from io.IOBase, but that
# leads to other problems.
return True


class GzipStreamFile(io.BufferedReader):
def __init__(self, stream):
self._gzipstream = _GzipStreamFile(stream)
super(GzipStreamFile, self).__init__(self._gzipstream)

def read(self, *args, **kwargs):
# Patch read to return '' instead of raise Value Error
try:
result = super(GzipStreamFile, self).read(*args, **kwargs)
return result
except ValueError:
return ''

def readline(self, *args, **kwargs):
# Patch readline to return '' instead of raise Value Error
try:
result = super(GzipStreamFile, self).readline(*args, **kwargs)
return result
except ValueError:
return ''
3 changes: 3 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mock
moto
responses
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto
requests
177 changes: 129 additions & 48 deletions smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import subprocess
import sys
import requests
import io
if sys.version_info[0] == 2:
import httplib
elif sys.version_info[0] == 3:
Expand All @@ -47,6 +48,9 @@
logger.warning("multiprocessing could not be imported and won't be used")
from itertools import imap

import gzipstream


S3_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for S3 multipart uploads
WEBHDFS_MIN_PART_SIZE = 50 * 1024**2 # minimum part size for HDFS multipart uploads

Expand Down Expand Up @@ -249,39 +253,145 @@ def __init__(self, uri, default_scheme="file"):
raise NotImplementedError("unknown URI scheme %r in %r" % (self.scheme, uri))


def is_gzip(name):
"""Return True if the name indicates that the file is compressed with
gzip."""
return name.endswith(".gz")


class _S3ReadStream(object):

def __init__(self, stream):
self.stream = stream
self.unused_buffer = b''
self.closed = False
self.finished = False

def read_until_eof(self):
#
# This method is here because boto.s3.Key.read() reads the entire
# file, which isn't expected behavior.
#
# https://github.com/boto/boto/issues/3311
#
buf = b""
while not self.finished:
raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
if len(raw) > 0:
buf += raw
else:
self.finished = True
return buf

def read_from_buffer(self, size):
part = self.unused_buffer[:size]
self.unused_buffer = self.unused_buffer[size:]
return part

def read(self, size=None):
if not size or size < 0:
return self.unused_buffer + self.read_until_eof()

# Use unused data first
if len(self.unused_buffer) > size:
return self.read_from_buffer(size)

# If the stream is finished and no unused raw data, return what we have
if self.stream.closed or self.finished:
self.finished = True
buf, self.unused_buffer = self.unused_buffer, b''
return buf

# Consume new data in chunks and return it.
while len(self.unused_buffer) < size:
raw = self.stream.read(io.DEFAULT_BUFFER_SIZE)
if len(raw):
self.unused_buffer += raw
else:
self.finished = True
break

return self.read_from_buffer(size)

def readinto(self, b):
# Read up to len(b) bytes into bytearray b
# Sadly not as efficient as lower level
data = self.read(len(b))
if not data:
return None
b[:len(data)] = data
return len(data)

def readable(self):
# io.BufferedReader needs us to appear readable
return True

def _checkReadable(self, msg=None):
# This is required to satisfy io.BufferedReader on Python 2.6.
# Another way to achieve this is to inherit from io.IOBase, but that
# leads to other problems.
return True


class S3ReadStream(io.BufferedReader):

def __init__(self, key):
self.stream = _S3ReadStream(key)
super(S3ReadStream, self).__init__(self.stream)

def read(self, *args, **kwargs):
# Patch read to return '' instead of raise Value Error
try:
return super(S3ReadStream, self).read(*args, **kwargs)
except ValueError:
return ''

def readline(self, *args, **kwargs):
# Patch readline to return '' instead of raise Value Error
try:
result = super(S3ReadStream, self).readline(*args, **kwargs)
return result
except ValueError:
return ''


class S3OpenRead(object):
"""
Implement streamed reader from S3, as an iterable & context manager.
Supports reading from gzip-compressed files. Identifies such files by
their extension.
"""
def __init__(self, read_key):
if not hasattr(read_key, "bucket") and not hasattr(read_key, "name") and not hasattr(read_key, "read") \
and not hasattr(read_key, "close"):
raise TypeError("can only process S3 keys")
self.read_key = read_key
self.line_generator = s3_iter_lines(self.read_key)
self._open_reader()

def _open_reader(self):
if is_gzip(self.read_key.name):
self.reader = gzipstream.GzipStreamFile(self.read_key)
else:
self.reader = S3ReadStream(self.read_key)

def __iter__(self):
key = self.read_key.bucket.get_key(self.read_key.name)
if key is None:
raise KeyError(self.read_key.name)
for line in self.reader:
yield line

return s3_iter_lines(key)
def readline(self):
try:
return self.reader.next()
except StopIteration:
return None

def read(self, size=None):
"""
Read a specified number of bytes from the key.
Note read() and line iteration (`for line in self: ...`) each have their
own file position, so they are independent. Doing a `read` will not affect
the line iteration, and vice versa.
"""
if not size or size < 0:
# For compatibility with standard Python, `read(negative)` = read the rest of the file.
# Otherwise, boto would read *from the start* if given size=-1.
size = 0
return self.read_key.read(size)
return self.reader.read(size)

def seek(self, offset, whence=0):
"""
Expand All @@ -293,12 +403,13 @@ def seek(self, offset, whence=0):
if whence != 0 or offset != 0:
raise NotImplementedError("seek other than offset=0 not implemented yet")
self.read_key.close(fast=True)
self._open_reader()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
self.read_key.close()
self.read_key.close(fast=True)

def __str__(self):
return "%s<key: %s>" % (
Expand Down Expand Up @@ -443,6 +554,9 @@ def __init__(self, outkey, min_part_size=S3_MIN_PART_SIZE, **kw):
if not hasattr(outkey, "bucket") and not hasattr(outkey, "name"):
raise TypeError("can only process S3 keys")

if is_gzip(outkey.name):
raise NotImplementedError("streaming write to S3 gzip not supported")

self.outkey = outkey
self.min_part_size = min_part_size

Expand Down Expand Up @@ -677,39 +791,6 @@ def s3_iter_bucket(bucket, prefix='', accept_key=lambda key: True, key_limit=Non
logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))


def s3_iter_lines(key):
"""
Stream an object from S3 line by line (generator).
`key` must be a `boto.key.Key` object.
"""
# check valid object on input
if not isinstance(key, boto.s3.key.Key):
raise TypeError("expected boto.key.Key object on input")

buf = b''
# keep reading chunks of bytes into the buffer
for chunk in key:
buf += chunk

start = 0
# process all lines within the current buffer
while True:
end = buf.find(b'\n', start) + 1
if end:
yield buf[start : end]
start = end
else:
# no more newlines => break out to read more data from s3 into the buffer
buf = buf[start : ]
break

# process the last line, too
if buf:
yield buf


class WebHdfsException(Exception):
def __init__(self, msg=str()):
self.msg = msg
Expand Down
Binary file not shown.
Loading

0 comments on commit 78c461e

Please sign in to comment.