Skip to content

Commit

Permalink
Respond to comments on ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 27, 2016
1 parent 34a4e12 commit 4d3625c
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 30 deletions.
4 changes: 2 additions & 2 deletions dask/bytes/__init__.py
@@ -1,7 +1,7 @@
from ..utils import ignoring
from .core import read_bytes, open_files, open_text_files

from . import local

with ignoring(ImportError, SyntaxError):
from . import s3
with ignoring(ImportError):
from . import local
2 changes: 0 additions & 2 deletions dask/bytes/compression.py
Expand Up @@ -52,6 +52,4 @@ def noop_file(file, **kwargs):

if sys.version_info[0] >= 3:
import bz2
compress['bz2'] = bz2.compress
decompress['bz2'] = bz2.decompress
files['bz2'] = bz2.BZ2File
24 changes: 16 additions & 8 deletions dask/bytes/core.py
Expand Up @@ -3,7 +3,7 @@

from toolz import merge

from .compression import seekable_files, files as cfiles
from .compression import seekable_files, files as compress_files
from .utils import SeekableFile
from ..delayed import delayed
from ..utils import system_encoding
Expand All @@ -22,9 +22,8 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
The path may be a filename like ``'2015-01-01.csv'`` or a globstring
like ``'2015-*-*.csv'``.
The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` and,
if those libraries are installed, the futures will point to those locations
instead.
The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` if
those libraries are installed.
This cleanly breaks data by a delimiter if given, so that block boundaries
start directly after a delimiter and end on the delimiter.
Expand Down Expand Up @@ -55,6 +54,9 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``fn`` is a globstring.
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if '://' in path:
protocol, path = path.split('://', 1)
try:
Expand All @@ -63,7 +65,7 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
else:
read_bytes = _read_bytes['local']
read_bytes = _read_bytes['file']

return read_bytes(path, delimiter=delimiter, not_zero=not_zero,
blocksize=blocksize, sample=sample, compression=compression,
Expand Down Expand Up @@ -92,18 +94,21 @@ def open_files(path, compression=None, **kwargs):
-------
List of ``dask.delayed`` objects that compute to file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'local'
protocol = 'file'

try:
files = _open_files[protocol](path, **kwargs)
except KeyError:
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
if compression:
decompress = merge(seekable_files, cfiles)[compression]
decompress = merge(seekable_files, compress_files)[compression]
if sys.version_info[0] < 3:
files = [delayed(SeekableFile)(file) for file in files]
files = [delayed(decompress)(file) for file in files]
Expand Down Expand Up @@ -136,11 +141,14 @@ def open_text_files(path, encoding=system_encoding, errors='strict',
-------
List of ``dask.delayed`` objects that compute to text file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

original_path = path
if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'local'
protocol = 'file'

if protocol in _open_text_files and compression is None:
return _open_text_files[protocol](path, encoding=encoding,
Expand Down
15 changes: 7 additions & 8 deletions dask/bytes/local.py
Expand Up @@ -18,8 +18,6 @@
def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27,
sample=True, compression=None):
""" See dask.bytes.core.read_bytes for docstring """
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)
if '*' in fn:
filenames = list(map(os.path.abspath, sorted(glob(fn))))
sample, first = read_bytes(filenames[0], delimiter, not_zero,
Expand Down Expand Up @@ -51,7 +49,7 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27,
for offset in offsets]

if sample:
if isinstance(sample, int) and not isinstance(sample, bool):
if sample is not True:
nbytes = sample
else:
nbytes = 10000
Expand All @@ -65,8 +63,9 @@ def read_block_from_file(fn, offset, length, delimiter, compression):
if compression:
f = SeekableFile(f)
f = compress_files[compression](f)
result = read_block(f, offset, length, delimiter)
if compression:
try:
result = read_block(f, offset, length, delimiter)
finally:
f.close()
return result

Expand All @@ -85,8 +84,8 @@ def open_files(path):


from . import core
core._read_bytes['local'] = read_bytes
core._open_files['local'] = open_files
core._read_bytes['file'] = read_bytes
core._open_files['file'] = open_files


if sys.version_info[0] >= 3:
Expand All @@ -104,7 +103,7 @@ def open_text_files(path, encoding=system_encoding, errors='strict'):
tokenize(fn, encoding, errors, os.path.getmtime(fn)))
for fn in filenames]

core._open_text_files['local'] = open_text_files
core._open_text_files['file'] = open_text_files


def getsize(fn, compression=None):
Expand Down
11 changes: 4 additions & 7 deletions dask/bytes/s3.py
Expand Up @@ -39,9 +39,6 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27,
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``path`` is a globstring.
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if s3 is None:
s3 = S3FileSystem(**s3_params)

Expand Down Expand Up @@ -70,8 +67,7 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27,
logger.debug("Read %d blocks of binary bytes from %s", len(offsets), path)

s3safe_pars = s3_params.copy()
if not s3.anon:
s3safe_pars.update(s3.get_delegated_s3pars())
s3safe_pars.update(s3.get_delegated_s3pars())

values = [delayed(read_block_from_s3)(path, offset, blocksize,
s3safe_pars, delimiter, compression,
Expand All @@ -95,8 +91,9 @@ def read_block_from_s3(filename, offset, length, s3_params={}, delimiter=None,
with s3.open(filename, 'rb') as f:
if compression:
f = compress_files[compression](f)
result = read_block(f, offset, length, delimiter)
if compression:
try:
result = read_block(f, offset, length, delimiter)
finally:
f.close()
return result

Expand Down
8 changes: 7 additions & 1 deletion dask/bytes/tests/test_local.py
Expand Up @@ -14,7 +14,6 @@

compute = partial(compute, get=get)

# These get mirrored on s3://distributed-test/
files = {'.test.accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n'
b'{"amount": 200, "name": "Bob"}\n'
b'{"amount": 300, "name": "Charlie"}\n'
Expand Down Expand Up @@ -169,6 +168,13 @@ def test_getsize(fmt):
with filetexts({'.tmp.getsize': compress(b'1234567890')}, mode = 'b'):
assert getsize('.tmp.getsize', fmt) == 10

def test_bad_compression():
from dask.bytes.core import read_bytes, open_files, open_text_files
with filetexts(files, mode='b'):
for func in [read_bytes, open_files, open_text_files]:
with pytest.raises(ValueError):
sample, values = func('.test.accounts.*',
compression='not-found')

def test_not_found():
fn = 'not-a-file'
Expand Down
32 changes: 32 additions & 0 deletions dask/dataframe/csv.py
Expand Up @@ -110,6 +110,38 @@ def read_csv_from_bytes(block_lists, header, head, kwargs, collection=True,
def read_csv(filename, blocksize=2**25, chunkbytes=None,
collection=True, lineterminator='\n', compression=None,
enforce_dtypes=True, sample=10000, **kwargs):
""" Read CSV files into a Dask.DataFrame
This parallelizes the ``pandas.read_csv`` file in the following ways:
1. It supports loading many files at once using globstrings as follows:
>>> df = dd.read_csv('myfiles.*.csv') # doctest: +SKIP
2. In some cases it can break up large files as follows:
>>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP
Internally dd.read_csv uses pandas.read_csv and so supports many of the
same keyword arguments with the same performance guarantees.
See the docstring for ``pandas.read_csv`` for more information on available
keyword arguments.
Parameters
----------
filename: string
Filename or globstring for CSV files. May include protocols like s3://
blocksize: int or None
Number of bytes by which to cut up larger files
collection: boolean
Return a dask.dataframe if True or list of dask.delayed objects if False
sample: int
Number of bytes to use when determining dtypes
**kwargs: dict
Options to pass down to ``pandas.read_csv``
"""
if chunkbytes is not None:
warn("Deprecation warning: chunksize csv keyword renamed to blocksize")
blocksize=chunkbytes
Expand Down
1 change: 1 addition & 0 deletions dask/dataframe/tests/test_csv.py
Expand Up @@ -152,6 +152,7 @@ def test_warn_non_seekable_files(capsys):
files2 = valmap(compress['gzip'], files)
with filetexts(files2, mode='b'):
df = read_csv('2014-01-*.csv', compression='gzip')
assert df.npartitions == 3
out, err = capsys.readouterr()
assert 'gzip' in out
assert 'blocksize=None' in out
Expand Down
4 changes: 2 additions & 2 deletions docs/source/dataframe-api.rst
Expand Up @@ -108,7 +108,7 @@ Rolling Operations
Create DataFrames
~~~~~~~~~~~~~~~~~

.. currentmodule:: dask.dataframe.io
.. currentmodule:: dask.dataframe

.. autosummary::
from_array
Expand Down Expand Up @@ -147,7 +147,7 @@ Other functions
.. autofunction:: concat
.. autofunction:: merge

.. currentmodule:: dask.dataframe.io
.. currentmodule:: dask.dataframe

.. autofunction:: read_csv
.. autofunction:: from_array
Expand Down

0 comments on commit 4d3625c

Please sign in to comment.