Skip to content

Commit

Permalink
Enable compressed formats over http. Avoid filehandle leak. Fix #109
Browse files Browse the repository at this point in the history
…and #110. (#112)

* Better support for custom S3 servers.

This patch adds support for custom S3 servers in the connection string.
It also adds explicit support for setting the server port, and whether
or not to use SSL, both as paramaters to the smart_open function as
well as within the connection string.

These changes are neccessary to be able to connect to s3proxy and
other custom s3 servers which don't run on the default port,
or neccessarily use SSL.

* Fix unit tests

* updated README.rst with new s3 mode.

* Added a new unit test for the unsecured calling form

* Updated style and unit test.

* Check that the port argument isnt normally passed.

* Add generic HTTP and HTTPS streaming support.

Adds support for opening vanilla HTTP and HTTPS addresses.
Supports efficient streaming, gzip and bz2 compression,
as well as Kerberos and username/password (basic) http
authentication.

* removed previous merge artifact;

* Raise exception instead of returning it :/

* Raise http exceptions properly

* neccessary import

* python 3 compatibility

* Reverted make_closing -> closing

We still want to maintain Python 2.6 compatibility,
so don't rely on contextlib.closing.

* Refactor the code to get the Python version

* Refactored the GZfile and BZ2File compression wrappers.

* Refactored HttpOpenRead unit tests.

Now they don't require internet access, and will test for
Basic authentication in the HTTP header.

* Clean up http unit tests.

http => https, and remove old versions of the tests.

* Cosmetic changes and doc updates.

* Re-use the open filehandle rather than open a new one.

This allows one to use any filehandle-like object instead of
just local posix. It also avoids unneccessary filesystem syscalls.

* merge artifact

* Add unit tests for compressed httpd reads.

This breaks out the http tests into their own test class.

Also fixed a few behaviors in the HttpReader uncovered by
the new tests (yay).

* fixed import for python3

* removed stray import

* Handle some python3 byte vs unicode incompatibilityes.

Works now on Python 2 as well as Python 3.
  • Loading branch information
robottwo authored and tmylk committed Mar 27, 2017
1 parent 4d6c9b0 commit 7b2c879
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 37 deletions.
33 changes: 23 additions & 10 deletions smart_open/smart_open_lib.py
Expand Up @@ -34,6 +34,7 @@
IS_PY2 = (sys.version_info[0] == 2)

if IS_PY2:
import cStringIO as StringIO
import httplib
if sys.version_info[1] == 6:
import copy_reg
Expand All @@ -46,6 +47,7 @@ def _reduce_partial(p):

copy_reg.pickle(partial, _reduce_partial)
elif sys.version_info[0] == 3:
import io as StringIO
import http.client as httplib

from boto.compat import BytesIO, urlsplit, six
Expand Down Expand Up @@ -635,11 +637,11 @@ def compression_wrapper(file_obj, filename, mode):
from bz2file import BZ2File
else:
from bz2 import BZ2File
return make_closing(BZ2File)(filename, mode)
return make_closing(BZ2File)(file_obj, mode)

elif ext == '.gz':
from gzip import GzipFile
return make_closing(GzipFile)(filename, mode)
return make_closing(GzipFile)(fileobj=file_obj, mode=mode)

else:
return file_obj
Expand Down Expand Up @@ -670,11 +672,6 @@ def __init__(self, url, mode='r', kerberos=False, user=None, password=None):
If none of those are set, will connect unauthenticated.
"""
if IS_PY2:
from urllib2 import urlopen
else:
from urllib.request import urlopen

if kerberos:
import requests_kerberos
auth = requests_kerberos.HTTPKerberosAuth()
Expand All @@ -688,13 +685,18 @@ def __init__(self, url, mode='r', kerberos=False, user=None, password=None):
if not self.response.ok:
self.response.raise_for_status()

self.mode = mode
self._read_buffer = None
self._read_iter = None
self._readline_iter = None

def __iter__(self):
return self.response.iter_lines()

def binary_content(self):
"""Return the content of the request as bytes."""
return self.response.content

def readline(self):
"""
Mimics the readline call to a filehandle object.
Expand All @@ -705,14 +707,18 @@ def readline(self):
try:
return next(self._readline_iter)
except StopIteration:
raise EOFError()
# When readline runs out of data, it just returns an empty string
return ''

def readlines(self):
"""
Mimics the readlines call to a filehandle object.
"""
return list(self.response.iter_lines())

def seek(self):
raise NotImplementedError('seek() is not implemented')

def read(self, size=None):
"""
Mimics the read call to a filehandle object.
Expand All @@ -732,7 +738,8 @@ def read(self, size=None):
retval = self._read_buffer
self._read_buffer = ''
if len(retval) == 0:
raise EOFError()
# When read runs out of data, it just returns empty
return ''
else:
return retval

Expand Down Expand Up @@ -760,7 +767,13 @@ def HttpOpenRead(parsed_uri, mode='r', **kwargs):
response = HttpReadStream(url, **kwargs)

fname = url.split('/')[-1]
return compression_wrapper(response, fname, mode)

if fname.endswith('.gz'):
# Gzip needs a seek-able filehandle, so we need to buffer it.
buffer = make_closing(io.BytesIO)(response.binary_content())
return compression_wrapper(buffer, fname, mode)
else:
return compression_wrapper(response, fname, mode)


class S3OpenWrite(object):
Expand Down
100 changes: 73 additions & 27 deletions smart_open/tests/test_smart_open.py
Expand Up @@ -96,6 +96,76 @@ def test_webhdfs_uri(self):
self.assertEqual(parsed_uri.scheme, "webhdfs")
self.assertEqual(parsed_uri.uri_path, "host:port/webhdfs/v1/path/file?query_part_1&query_part2")


class SmartOpenHttpTest(unittest.TestCase):
"""
Test reading from HTTP connections in various ways.
"""
@responses.activate
def test_http_read(self):
"""Does http read method work correctly"""
responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.read().decode("utf-8"), "line1\nline2")

@responses.activate
def test_https_readline(self):
"""Does https readline method work correctly"""
responses.add(responses.GET, "https://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("https://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.readline().decode("utf-8"), "line1")

@responses.activate
def test_http_pass(self):
"""Does http authentication work correctly"""
responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
_ = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"), user='me', password='pass')
self.assertEquals(len(responses.calls), 1)
actual_request = responses.calls[0].request
self.assert_('Authorization' in actual_request.headers)
self.assert_(actual_request.headers['Authorization'].startswith('Basic '))

@responses.activate
def test_http_gz(self):
"""Can open gzip via http?"""
fpath = os.path.join(CURR_DIR, 'test_data/crlf_at_1k_boundary.warc.gz')
data = open(fpath, 'rb').read()

responses.add(responses.GET, "http://127.0.0.1/data.gz",
body=data)
smart_open_object = smart_open.HttpOpenRead(
smart_open.ParseUri("http://127.0.0.1/data.gz"))

m = hashlib.md5(smart_open_object.read())
# decompress the gzip and get the same md5 hash
self.assertEqual(m.hexdigest(),'18473e60f8c7c98d29d65bf805736a0d')

@responses.activate
def test_http_bz2(self):
"""Can open bz2 via http?"""
test_string = b'Hello World Compressed.'
test_file = tempfile.NamedTemporaryFile('wb', suffix='.bz2',
delete=False).name

with smart_open.smart_open(test_file, 'wb') as outfile:
outfile.write(test_string)

with open(test_file, 'rb') as infile:
compressed_data = infile.read()

if os.path.isfile(test_file):
os.unlink(test_file)

responses.add(responses.GET, "http://127.0.0.1/data.bz2",
body=compressed_data)
smart_open_object = smart_open.HttpOpenRead(
smart_open.ParseUri("http://127.0.0.1/data.bz2"))

# decompress the gzip and get the same md5 hash
self.assertEqual(smart_open_object.read(), test_string)


class SmartOpenReadTest(unittest.TestCase):
"""
Test reading from files under various schemes.
Expand Down Expand Up @@ -169,30 +239,6 @@ def test_webhdfs_read(self):
smart_open_object = smart_open.WebHdfsOpenRead(smart_open.ParseUri("webhdfs://127.0.0.1:8440/path/file"))
self.assertEqual(smart_open_object.read().decode("utf-8"), "line1\nline2")

@responses.activate
def test_http_read(self):
"""Does http read method work correctly"""
responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.read().decode("utf-8"), "line1\nline2")

@responses.activate
def test_https_readline(self):
"""Does https readline method work correctly"""
responses.add(responses.GET, "https://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("https://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.readline().decode("utf-8"), "line1")

@responses.activate
def test_http_pass(self):
"""Does http authentication work correctly"""
responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
_ = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"), user='me', password='pass')
self.assertEquals(len(responses.calls), 1)
actual_request = responses.calls[0].request
self.assert_('Authorization' in actual_request.headers)
self.assert_(actual_request.headers['Authorization'].startswith('Basic '))

@mock.patch('smart_open.smart_open_lib.boto')
@mock.patch('smart_open.smart_open_lib.S3OpenRead')
def test_s3_boto(self, mock_s3_open_read, mock_boto):
Expand Down Expand Up @@ -900,13 +946,13 @@ def test_s3_iter_bucket_with_SSLError_moto(self):


PY2 = sys.version_info[0] == 2

CURR_DIR = os.path.abspath(os.path.dirname(__file__))

class CompressionFormatTest(unittest.TestCase):
"""
Test that compression
"""
CURR_DIR = os.path.abspath(os.path.dirname(__file__))

TEXT = 'Hello'

def write_read_assertion(self, test_file):
Expand All @@ -921,7 +967,7 @@ def write_read_assertion(self, test_file):

def test_open_gz(self):
"""Can open gzip?"""
fpath = os.path.join(self.CURR_DIR, 'test_data/crlf_at_1k_boundary.warc.gz')
fpath = os.path.join(CURR_DIR, 'test_data/crlf_at_1k_boundary.warc.gz')
data = smart_open.smart_open(fpath).read()
m = hashlib.md5(data)
assert m.hexdigest() == '18473e60f8c7c98d29d65bf805736a0d', \
Expand Down

0 comments on commit 7b2c879

Please sign in to comment.