Skip to content

Commit

Permalink
COMPAT/REF: Use s3fs for s3 IO
Browse files Browse the repository at this point in the history
closes #11915

Author: Tom Augspurger <tom.augspurger88@gmail.com>

Closes #13137 from TomAugspurger/s3fs and squashes the following commits:

92ac063 [Tom Augspurger] CI: Update deps, docs
81690b5 [Tom Augspurger] COMPAT/REF: Use s3fs for s3 IO
  • Loading branch information
TomAugspurger authored and jreback committed Dec 19, 2016
1 parent 8c798c0 commit dc4b070
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 120 deletions.
2 changes: 1 addition & 1 deletion asv_bench/benchmarks/io_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def setup(self, compression, engine):
# The Python 2 C parser can't read bz2 from open files.
raise NotImplementedError
try:
import boto
import s3fs
except ImportError:
# Skip these benchmarks if `boto` is not installed.
raise NotImplementedError
Expand Down
2 changes: 1 addition & 1 deletion ci/requirements-2.7-64.run
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ sqlalchemy
lxml=3.2.1
scipy
xlsxwriter
boto
s3fs
bottleneck
html5lib
beautiful-soup
Expand Down
2 changes: 1 addition & 1 deletion ci/requirements-2.7.run
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ sqlalchemy=0.9.6
lxml=3.2.1
scipy
xlsxwriter=0.4.6
boto=2.36.0
s3fs
bottleneck
psycopg2=2.5.2
patsy
Expand Down
2 changes: 1 addition & 1 deletion ci/requirements-2.7_SLOW.run
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ numexpr
pytables
sqlalchemy
lxml
boto
s3fs
bottleneck
psycopg2
pymysql
Expand Down
2 changes: 1 addition & 1 deletion ci/requirements-3.5.run
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sqlalchemy
pymysql
psycopg2
xarray
boto
s3fs

# incompat with conda ATM
# beautiful-soup
2 changes: 1 addition & 1 deletion ci/requirements-3.5_OSX.run
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ matplotlib
jinja2
bottleneck
xarray
boto
s3fs

# incompat with conda ATM
# beautiful-soup
2 changes: 1 addition & 1 deletion doc/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ Optional Dependencies
* `XlsxWriter <https://pypi.python.org/pypi/XlsxWriter>`__: Alternative Excel writer

* `Jinja2 <http://jinja.pocoo.org/>`__: Template engine for conditional HTML formatting.
* `boto <https://pypi.python.org/pypi/boto>`__: necessary for Amazon S3 access.
* `s3fs <http://s3fs.readthedocs.io/>`__: necessary for Amazon S3 access (s3fs >= 0.0.7).
* `blosc <https://pypi.python.org/pypi/blosc>`__: for msgpack compression using ``blosc``
* One of `PyQt4
<http://www.riverbankcomputing.com/software/pyqt/download>`__, `PySide
Expand Down
17 changes: 17 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,23 @@ options include:
Specifying any of the above options will produce a ``ParserWarning`` unless the
python engine is selected explicitly using ``engine='python'``.

Reading remote files
''''''''''''''''''''

You can pass in a URL to a CSV file:

.. code-block:: python
df = pd.read_csv('https://download.bls.gov/pub/time.series/cu/cu.item',
sep='\t')
S3 URLs are handled as well:

.. code-block:: python
df = pd.read_csv('s3://pandas-test/tips.csv')
Writing out Data
''''''''''''''''

Expand Down
13 changes: 10 additions & 3 deletions doc/source/whatsnew/v0.20.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ Other enhancements

- ``.select_dtypes()`` now allows `datetimetz` to generically select datetimes with tz (:issue:`14910`)


.. _whatsnew_0200.api_breaking:

Backwards incompatible API changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


.. _whatsnew.api_breaking.index_map

Map on Index types now return other Index types
Expand Down Expand Up @@ -182,8 +182,16 @@ Map on Index types now return other Index types

s.map(lambda x: x.hour)

.. _whatsnew_0200.s3:

S3 File Handling
^^^^^^^^^^^^^^^^

.. _whatsnew_0200.api:
pandas now uses `s3fs <http://s3fs.readthedocs.io/>`_ for handling S3 connections. This shouldn't break
any code. However, since s3fs is not a required dependency, you will need to install it separately (like boto
in prior versions of pandas) (:issue:`11915`).

.. _whatsnew_0200.api:

- ``CParserError`` has been renamed to ``ParserError`` in ``pd.read_csv`` and will be removed in the future (:issue:`12665`)
- ``SparseArray.cumsum()`` and ``SparseSeries.cumsum()`` will now always return ``SparseArray`` and ``SparseSeries`` respectively (:issue:`12855`)
Expand All @@ -194,7 +202,6 @@ Map on Index types now return other Index types
Other API Changes
^^^^^^^^^^^^^^^^^


.. _whatsnew_0200.deprecations:

Deprecations
Expand Down
16 changes: 11 additions & 5 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
from pandas.core.common import AbstractMethodError
from pandas.types.common import is_number

try:
from s3fs import S3File
need_text_wrapping = (BytesIO, S3File)
except ImportError:
need_text_wrapping = (BytesIO,)

# common NA values
# no longer excluding inf representations
# '1.#INF','-1.#INF', '1.#INF000000',
Expand Down Expand Up @@ -212,10 +218,10 @@ def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
return reader, encoding, compression

if _is_s3_url(filepath_or_buffer):
from pandas.io.s3 import get_filepath_or_buffer
return get_filepath_or_buffer(filepath_or_buffer,
encoding=encoding,
compression=compression)
from pandas.io import s3
return s3.get_filepath_or_buffer(filepath_or_buffer,
encoding=encoding,
compression=compression)

# It is a pathlib.Path/py.path.local or string
filepath_or_buffer = _stringify_path(filepath_or_buffer)
Expand Down Expand Up @@ -391,7 +397,7 @@ def _get_handle(path_or_buf, mode, encoding=None, compression=None,
handles.append(f)

# in Python 3, convert BytesIO or fileobjects passed with an encoding
if compat.PY3 and (compression or isinstance(f, compat.BytesIO)):
if compat.PY3 and (compression or isinstance(f, need_text_wrapping)):
from io import TextIOWrapper
f = TextIOWrapper(f, encoding=encoding)
handles.append(f)
Expand Down
111 changes: 18 additions & 93 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,35 @@
""" s3 support for remote file interactivity """

import os
from pandas import compat
from pandas.compat import BytesIO

try:
import boto
from boto.s3 import key
import s3fs
from botocore.exceptions import NoCredentialsError
except:
raise ImportError("boto is required to handle s3 files")
raise ImportError("The s3fs library is required to handle s3 files")

if compat.PY3:
from urllib.parse import urlparse as parse_url
else:
from urlparse import urlparse as parse_url


class BotoFileLikeReader(key.Key):
"""boto Key modified to be more file-like
This modification of the boto Key will read through a supplied
S3 key once, then stop. The unmodified boto Key object will repeatedly
cycle through a file in S3: after reaching the end of the file,
boto will close the file. Then the next call to `read` or `next` will
re-open the file and start reading from the beginning.
Also adds a `readline` function which will split the returned
values by the `\n` character.
"""

def __init__(self, *args, **kwargs):
encoding = kwargs.pop("encoding", None) # Python 2 compat
super(BotoFileLikeReader, self).__init__(*args, **kwargs)
# Add a flag to mark the end of the read.
self.finished_read = False
self.buffer = ""
self.lines = []
if encoding is None and compat.PY3:
encoding = "utf-8"
self.encoding = encoding
self.lines = []

def next(self):
return self.readline()

__next__ = next

def read(self, *args, **kwargs):
if self.finished_read:
return b'' if compat.PY3 else ''
return super(BotoFileLikeReader, self).read(*args, **kwargs)

def close(self, *args, **kwargs):
self.finished_read = True
return super(BotoFileLikeReader, self).close(*args, **kwargs)

def seekable(self):
"""Needed for reading by bz2"""
return False

def readline(self):
"""Split the contents of the Key by '\n' characters."""
if self.lines:
retval = self.lines[0]
self.lines = self.lines[1:]
return retval
if self.finished_read:
if self.buffer:
retval, self.buffer = self.buffer, ""
return retval
else:
raise StopIteration

if self.encoding:
self.buffer = "{}{}".format(
self.buffer, self.read(8192).decode(self.encoding))
else:
self.buffer = "{}{}".format(self.buffer, self.read(8192))

split_buffer = self.buffer.split("\n")
self.lines.extend(split_buffer[:-1])
self.buffer = split_buffer[-1]

return self.readline()
def _strip_schema(url):
"""Returns the url without the s3:// part"""
result = parse_url(url)
return result.netloc + result.path


def get_filepath_or_buffer(filepath_or_buffer, encoding=None,
compression=None):

# Assuming AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_S3_HOST
# are environment variables
parsed_url = parse_url(filepath_or_buffer)
s3_host = os.environ.get('AWS_S3_HOST', 's3.amazonaws.com')

fs = s3fs.S3FileSystem(anon=False)
try:
conn = boto.connect_s3(host=s3_host)
except boto.exception.NoAuthHandlerFound:
conn = boto.connect_s3(host=s3_host, anon=True)

b = conn.get_bucket(parsed_url.netloc, validate=False)
if compat.PY2 and compression:
k = boto.s3.key.Key(b, parsed_url.path)
filepath_or_buffer = BytesIO(k.get_contents_as_string(
encoding=encoding))
else:
k = BotoFileLikeReader(b, parsed_url.path, encoding=encoding)
k.open('r') # Expose read errors immediately
filepath_or_buffer = k
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
except (OSError, NoCredentialsError):
# boto3 has troubles when trying to access a public file
# when credentialed...
# An OSError is raised if you have credentials, but they
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
filepath_or_buffer = fs.open(_strip_schema(filepath_or_buffer))
return filepath_or_buffer, None, compression
11 changes: 4 additions & 7 deletions pandas/io/tests/parser/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ class TestS3(tm.TestCase):

def setUp(self):
try:
import boto # noqa
import s3fs # noqa
except ImportError:
raise nose.SkipTest("boto not installed")
raise nose.SkipTest("s3fs not installed")

@tm.network
def test_parse_public_s3_bucket(self):
Expand Down Expand Up @@ -174,15 +174,12 @@ def test_parse_public_s3_bucket_nrows_python(self):

@tm.network
def test_s3_fails(self):
import boto
with tm.assertRaisesRegexp(boto.exception.S3ResponseError,
'S3ResponseError: 404 Not Found'):
with tm.assertRaises(IOError):
read_csv('s3://nyqpug/asdf.csv')

# Receive a permission error when trying to read a private bucket.
# It's irrelevant here that this isn't actually a table.
with tm.assertRaisesRegexp(boto.exception.S3ResponseError,
'S3ResponseError: 403 Forbidden'):
with tm.assertRaises(IOError):
read_csv('s3://cant_get_it/')

if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions pandas/io/tests/test_excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ def _skip_if_no_excelsuite():
_skip_if_no_openpyxl()


def _skip_if_no_boto():
def _skip_if_no_s3fs():
try:
import boto # NOQA
import s3fs # noqa
except ImportError:
raise nose.SkipTest('boto not installed, skipping')
raise nose.SkipTest('s3fs not installed, skipping')


_seriesd = tm.getSeriesData()
Expand Down Expand Up @@ -582,7 +582,7 @@ def test_read_from_http_url(self):

@tm.network(check_before_test=True)
def test_read_from_s3_url(self):
_skip_if_no_boto()
_skip_if_no_s3fs()

url = ('s3://pandas-test/test1' + self.ext)
url_table = read_excel(url)
Expand Down
2 changes: 1 addition & 1 deletion pandas/util/print_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def show_versions(as_json=False):
("pymysql", lambda mod: mod.__version__),
("psycopg2", lambda mod: mod.__version__),
("jinja2", lambda mod: mod.__version__),
("boto", lambda mod: mod.__version__),
("s3fs", lambda mod: mod.__version__),
("pandas_datareader", lambda mod: mod.__version__)
]

Expand Down

0 comments on commit dc4b070

Please sign in to comment.