Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add generic HTTP and HTTPS streaming support. #107

Merged
merged 20 commits into from Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 150 additions & 12 deletions smart_open/smart_open_lib.py
Expand Up @@ -27,7 +27,11 @@
import sys
import requests
import io
if sys.version_info[0] == 2:


IS_PY2 = (sys.version_info[0] == 2)

if IS_PY2:
import httplib
elif sys.version_info[0] == 3:
import http.client as httplib
Expand Down Expand Up @@ -77,6 +81,11 @@ def smart_open(uri, mode="rb", **kw):

Examples::

>>> # stream lines from http; you can use context managers too:
>>> with smart_open.smart_open('http://www.google.com') as fin:
... for line in fin:
... print line

>>> # stream lines from S3; you can use context managers too:
>>> with smart_open.smart_open('s3://mybucket/mykey.txt') as fin:
... for line in fin:
Expand Down Expand Up @@ -177,6 +186,11 @@ def smart_open(uri, mode="rb", **kw):
return WebHdfsOpenWrite(parsed_uri, **kw)
else:
raise NotImplementedError("file mode %s not supported for %r scheme", mode, parsed_uri.scheme)
elif parsed_uri.scheme.startswith('http'):
if mode in ('r', 'rb'):
return HttpOpenRead(parsed_uri, **kw)
else:
raise NotImplementedError("file mode %s not supported for %r scheme", mode, parsed_uri.scheme)
else:
raise NotImplementedError("scheme %r is not supported", parsed_uri.scheme)
elif isinstance(uri, boto.s3.key.Key):
Expand Down Expand Up @@ -287,6 +301,8 @@ def __init__(self, uri, default_scheme="file"):

if not self.uri_path:
raise RuntimeError("invalid file URI: %s" % uri)
elif self.scheme.startswith('http'):
self.uri_path = uri
else:
raise NotImplementedError("unknown URI scheme %r in %r" % (self.scheme, uri))

Expand Down Expand Up @@ -590,27 +606,149 @@ def make_closing(base, **attrs):
return type('Closing' + base.__name__, (base, object), attrs)


def file_smart_open(fname, mode='rb'):
def compression_wrapper(file_obj, filename, mode):
"""
Stream from/to local filesystem, transparently (de)compressing gzip and bz2
files if necessary.
This function will wrap the file_obj with an appropriate
[de]compression mechanism based on the extension of the filename.

"""
_, ext = os.path.splitext(fname)
file_obj must either be a filehandle object, or a class which behaves
like one.

If the filename extension isn't recognized, will simply return the original
file_obj.
"""
_, ext = os.path.splitext(filename)
if ext == '.bz2':
PY2 = sys.version_info[0] == 2
if PY2:
if IS_PY2:
from bz2file import BZ2File
else:
from bz2 import BZ2File
return make_closing(BZ2File)(fname, mode)
return make_closing(BZ2File)(file_obj, mode)

if ext == '.gz':
elif ext == '.gz':
from gzip import GzipFile
return make_closing(GzipFile)(fname, mode)
return make_closing(GzipFile)(file_obj, mode)

else:
return file_obj


def file_smart_open(fname, mode='rb'):
"""
Stream from/to local filesystem, transparently (de)compressing gzip and bz2
files if necessary.

"""
return compression_wrapper(open(fname, mode), fname, mode)


class HttpReadStream(object):
"""
Implement streamed reader from a web site, as an iterable & context manager.
Supports Kerberos and Basic HTTP authentication.

As long as you don't mix different access patterns (readline vs readlines vs
read(n) vs read() vs iteration) this will load efficiently in memory.

"""
def __init__(self, url, mode='r', kerberos=False, user=None, password=None):
"""
If Kerberos is True, will attempt to use the local Kerberos credentials.
Otherwise, will try to use "basic" HTTP authentication via username/password.

If none of those are set, will connect unauthenticated.
"""
if IS_PY2:
from urllib2 import urlopen
else:
from urllib.request import urlopen

if kerberos:
import requests_kerberos
auth = requests_kerberos.HTTPKerberosAuth()
elif user is not None and password is not None:
auth = (user, password)
else:
auth = None

self.response = requests.get(url, auth=auth, stream=True)

if not self.response.ok:
self.response.raise_for_status()

self._read_buffer = None
self._read_iter = None
self._readline_iter = None

def __iter__(self):
return self.response.iter_lines()

def readline(self):
"""
Mimics the readline call to a filehandle object.
"""
if self._readline_iter is None:
self._readline_iter = self.response.iter_lines()

try:
return next(self._readline_iter)
except StopIteration:
raise EOFError()

def readlines(self):
"""
Mimics the readlines call to a filehandle object.
"""
return list(self.response.iter_lines())

def read(self, size=None):
"""
Mimics the read call to a filehandle object.
"""
if size is None:
return self.response.content
else:
if self._read_iter is None:
self._read_iter = self.response.iter_content(size)
self._read_buffer = next(self._read_iter)

while len(self._read_buffer) < size:
try:
self._read_buffer += next(self._read_iter)
except StopIteration:
# Oops, ran out of data early.
retval = self._read_buffer
self._read_buffer = ''
if len(retval) == 0:
raise EOFError()
else:
return retval

# If we got here, it means we have enough data in the buffer
# to return to the caller.
retval = self._read_buffer[:size]
self._read_buffer = self._read_buffer[size:]
return retval

def __enter__(self, *args, **kwargs):
return self

def __exit__(self, *args, **kwargs):
self.response.close()


def HttpOpenRead(parsed_uri, mode='r', **kwargs):
if parsed_uri.scheme not in ('http', 'https'):
raise TypeError("can only process http/https urls")
if mode not in ('r', 'rb'):
raise NotImplementedError('Streaming write to http not supported')

url = parsed_uri.uri_path

response = HttpReadStream(url, **kwargs)

return open(fname, mode)
fname = url.split('/')[-1]
return compression_wrapper(response, fname, mode)


class S3OpenWrite(object):
Expand Down
29 changes: 27 additions & 2 deletions smart_open/tests/test_smart_open.py
Expand Up @@ -34,12 +34,12 @@ class ParseUriTest(unittest.TestCase):
def test_scheme(self):
"""Do URIs schemes parse correctly?"""
# supported schemes
for scheme in ("s3", "s3n", "hdfs", "file"):
for scheme in ("s3", "s3n", "hdfs", "file", "http", "https"):
parsed_uri = smart_open.ParseUri(scheme + "://mybucket/mykey")
self.assertEqual(parsed_uri.scheme, scheme)

# unsupported scheme => NotImplementedError
self.assertRaises(NotImplementedError, smart_open.ParseUri, "http://mybucket/mykey")
self.assertRaises(NotImplementedError, smart_open.ParseUri, "foobar://mybucket/mykey")

# unknown scheme => default_scheme
parsed_uri = smart_open.ParseUri("blah blah")
Expand Down Expand Up @@ -168,6 +168,31 @@ def test_webhdfs_read(self):
smart_open_object = smart_open.WebHdfsOpenRead(smart_open.ParseUri("webhdfs://127.0.0.1:8440/path/file"))
self.assertEqual(smart_open_object.read().decode("utf-8"), "line1\nline2")

@responses.activate
def test_http_read(self):
"""Does webhdfs read method work correctly"""
Copy link
Contributor

@tmylk tmylk Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a typo in the docstring ""Does http read method work correctly"""

responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.read().decode("utf-8"), "line1\nline2")

@responses.activate
def test_https_readline(self):
"""Does webhdfs read method work correctly"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in docstring as well

responses.add(responses.GET, "https://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("https://127.0.0.1/index.html"))
self.assertEqual(smart_open_object.readline().decode("utf-8"), "line1")

@responses.activate
def test_http_pass(self):
"""Does http authentication work correctly"""
responses.add(responses.GET, "http://127.0.0.1/index.html", body='line1\nline2')
smart_open_object = smart_open.HttpOpenRead(smart_open.ParseUri("http://127.0.0.1/index.html"),
user='me', password='pass')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: we use hanging (not vertical) indent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed

self.assertEquals(len(responses.calls), 1)
actual_request = responses.calls[0].request
self.assert_('Authorization' in actual_request.headers)
self.assert_(actual_request.headers['Authorization'].startswith('Basic '))

@mock.patch('smart_open.smart_open_lib.boto')
@mock.patch('smart_open.smart_open_lib.S3OpenRead')
def test_s3_boto(self, mock_s3_open_read, mock_boto):
Expand Down