From 4d3625c93cdcca8289150a9f66c340b18ad30dc0 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 27 Apr 2016 09:52:33 -0700 Subject: [PATCH] Respond to comments on ingest --- dask/bytes/__init__.py | 4 ++-- dask/bytes/compression.py | 2 -- dask/bytes/core.py | 24 ++++++++++++++++-------- dask/bytes/local.py | 15 +++++++-------- dask/bytes/s3.py | 11 ++++------- dask/bytes/tests/test_local.py | 8 +++++++- dask/dataframe/csv.py | 32 ++++++++++++++++++++++++++++++++ dask/dataframe/tests/test_csv.py | 1 + docs/source/dataframe-api.rst | 4 ++-- 9 files changed, 71 insertions(+), 30 deletions(-) diff --git a/dask/bytes/__init__.py b/dask/bytes/__init__.py index 4fed85362124..f11cc865904d 100644 --- a/dask/bytes/__init__.py +++ b/dask/bytes/__init__.py @@ -1,7 +1,7 @@ from ..utils import ignoring from .core import read_bytes, open_files, open_text_files +from . import local + with ignoring(ImportError, SyntaxError): from . import s3 -with ignoring(ImportError): - from . import local diff --git a/dask/bytes/compression.py b/dask/bytes/compression.py index b7e24f63c602..6824a260c2fd 100644 --- a/dask/bytes/compression.py +++ b/dask/bytes/compression.py @@ -52,6 +52,4 @@ def noop_file(file, **kwargs): if sys.version_info[0] >= 3: import bz2 - compress['bz2'] = bz2.compress - decompress['bz2'] = bz2.decompress files['bz2'] = bz2.BZ2File diff --git a/dask/bytes/core.py b/dask/bytes/core.py index c412a655e3fb..a49e7b35ed30 100644 --- a/dask/bytes/core.py +++ b/dask/bytes/core.py @@ -3,7 +3,7 @@ from toolz import merge -from .compression import seekable_files, files as cfiles +from .compression import seekable_files, files as compress_files from .utils import SeekableFile from ..delayed import delayed from ..utils import system_encoding @@ -22,9 +22,8 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, The path may be a filename like ``'2015-01-01.csv'`` or a globstring like ``'2015-*-*.csv'``. - The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` and, - if those libraries are installed, the futures will point to those locations - instead. + The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` if + those libraries are installed. This cleanly breaks data by a delimiter if given, so that block boundaries start directly after a delimiter and end on the delimiter. @@ -55,6 +54,9 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, 10kB sample header and list of ``dask.Delayed`` objects or list of lists of delayed objects if ``fn`` is a globstring. """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + if '://' in path: protocol, path = path.split('://', 1) try: @@ -63,7 +65,7 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, raise NotImplementedError("Unknown protocol %s://%s" % (protocol, path)) else: - read_bytes = _read_bytes['local'] + read_bytes = _read_bytes['file'] return read_bytes(path, delimiter=delimiter, not_zero=not_zero, blocksize=blocksize, sample=sample, compression=compression, @@ -92,10 +94,13 @@ def open_files(path, compression=None, **kwargs): ------- List of ``dask.delayed`` objects that compute to file-like objects """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + if '://' in path: protocol, path = path.split('://', 1) else: - protocol = 'local' + protocol = 'file' try: files = _open_files[protocol](path, **kwargs) @@ -103,7 +108,7 @@ def open_files(path, compression=None, **kwargs): raise NotImplementedError("Unknown protocol %s://%s" % (protocol, path)) if compression: - decompress = merge(seekable_files, cfiles)[compression] + decompress = merge(seekable_files, compress_files)[compression] if sys.version_info[0] < 3: files = [delayed(SeekableFile)(file) for file in files] files = [delayed(decompress)(file) for file in files] @@ -136,11 +141,14 @@ def open_text_files(path, encoding=system_encoding, errors='strict', ------- List of ``dask.delayed`` objects that compute to text file-like objects """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + original_path = path if '://' in path: protocol, path = path.split('://', 1) else: - protocol = 'local' + protocol = 'file' if protocol in _open_text_files and compression is None: return _open_text_files[protocol](path, encoding=encoding, diff --git a/dask/bytes/local.py b/dask/bytes/local.py index 6c5649e052fa..5fd8e70e7585 100644 --- a/dask/bytes/local.py +++ b/dask/bytes/local.py @@ -18,8 +18,6 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27, sample=True, compression=None): """ See dask.bytes.core.read_bytes for docstring """ - if compression is not None and compression not in compress_files: - raise ValueError("Compression type %s not supported" % compression) if '*' in fn: filenames = list(map(os.path.abspath, sorted(glob(fn)))) sample, first = read_bytes(filenames[0], delimiter, not_zero, @@ -51,7 +49,7 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27, for offset in offsets] if sample: - if isinstance(sample, int) and not isinstance(sample, bool): + if sample is not True: nbytes = sample else: nbytes = 10000 @@ -65,8 +63,9 @@ def read_block_from_file(fn, offset, length, delimiter, compression): if compression: f = SeekableFile(f) f = compress_files[compression](f) - result = read_block(f, offset, length, delimiter) - if compression: + try: + result = read_block(f, offset, length, delimiter) + finally: f.close() return result @@ -85,8 +84,8 @@ def open_files(path): from . import core -core._read_bytes['local'] = read_bytes -core._open_files['local'] = open_files +core._read_bytes['file'] = read_bytes +core._open_files['file'] = open_files if sys.version_info[0] >= 3: @@ -104,7 +103,7 @@ def open_text_files(path, encoding=system_encoding, errors='strict'): tokenize(fn, encoding, errors, os.path.getmtime(fn))) for fn in filenames] - core._open_text_files['local'] = open_text_files + core._open_text_files['file'] = open_text_files def getsize(fn, compression=None): diff --git a/dask/bytes/s3.py b/dask/bytes/s3.py index 611dbb76f8ba..df6a56e95615 100644 --- a/dask/bytes/s3.py +++ b/dask/bytes/s3.py @@ -39,9 +39,6 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27, 10kB sample header and list of ``dask.Delayed`` objects or list of lists of delayed objects if ``path`` is a globstring. """ - if compression is not None and compression not in compress_files: - raise ValueError("Compression type %s not supported" % compression) - if s3 is None: s3 = S3FileSystem(**s3_params) @@ -70,8 +67,7 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27, logger.debug("Read %d blocks of binary bytes from %s", len(offsets), path) s3safe_pars = s3_params.copy() - if not s3.anon: - s3safe_pars.update(s3.get_delegated_s3pars()) + s3safe_pars.update(s3.get_delegated_s3pars()) values = [delayed(read_block_from_s3)(path, offset, blocksize, s3safe_pars, delimiter, compression, @@ -95,8 +91,9 @@ def read_block_from_s3(filename, offset, length, s3_params={}, delimiter=None, with s3.open(filename, 'rb') as f: if compression: f = compress_files[compression](f) - result = read_block(f, offset, length, delimiter) - if compression: + try: + result = read_block(f, offset, length, delimiter) + finally: f.close() return result diff --git a/dask/bytes/tests/test_local.py b/dask/bytes/tests/test_local.py index 4ed1f15cc639..38db3911bfcb 100644 --- a/dask/bytes/tests/test_local.py +++ b/dask/bytes/tests/test_local.py @@ -14,7 +14,6 @@ compute = partial(compute, get=get) -# These get mirrored on s3://distributed-test/ files = {'.test.accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n' b'{"amount": 200, "name": "Bob"}\n' b'{"amount": 300, "name": "Charlie"}\n' @@ -169,6 +168,13 @@ def test_getsize(fmt): with filetexts({'.tmp.getsize': compress(b'1234567890')}, mode = 'b'): assert getsize('.tmp.getsize', fmt) == 10 +def test_bad_compression(): + from dask.bytes.core import read_bytes, open_files, open_text_files + with filetexts(files, mode='b'): + for func in [read_bytes, open_files, open_text_files]: + with pytest.raises(ValueError): + sample, values = func('.test.accounts.*', + compression='not-found') def test_not_found(): fn = 'not-a-file' diff --git a/dask/dataframe/csv.py b/dask/dataframe/csv.py index 81a216359a18..8464839f22b5 100644 --- a/dask/dataframe/csv.py +++ b/dask/dataframe/csv.py @@ -110,6 +110,38 @@ def read_csv_from_bytes(block_lists, header, head, kwargs, collection=True, def read_csv(filename, blocksize=2**25, chunkbytes=None, collection=True, lineterminator='\n', compression=None, enforce_dtypes=True, sample=10000, **kwargs): + """ Read CSV files into a Dask.DataFrame + + This parallelizes the ``pandas.read_csv`` file in the following ways: + + 1. It supports loading many files at once using globstrings as follows: + + >>> df = dd.read_csv('myfiles.*.csv') # doctest: +SKIP + + 2. In some cases it can break up large files as follows: + + >>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP + + Internally dd.read_csv uses pandas.read_csv and so supports many of the + same keyword arguments with the same performance guarantees. + + See the docstring for ``pandas.read_csv`` for more information on available + keyword arguments. + + Parameters + ---------- + + filename: string + Filename or globstring for CSV files. May include protocols like s3:// + blocksize: int or None + Number of bytes by which to cut up larger files + collection: boolean + Return a dask.dataframe if True or list of dask.delayed objects if False + sample: int + Number of bytes to use when determining dtypes + **kwargs: dict + Options to pass down to ``pandas.read_csv`` + """ if chunkbytes is not None: warn("Deprecation warning: chunksize csv keyword renamed to blocksize") blocksize=chunkbytes diff --git a/dask/dataframe/tests/test_csv.py b/dask/dataframe/tests/test_csv.py index 1d830b21c195..7020320b0bf1 100644 --- a/dask/dataframe/tests/test_csv.py +++ b/dask/dataframe/tests/test_csv.py @@ -152,6 +152,7 @@ def test_warn_non_seekable_files(capsys): files2 = valmap(compress['gzip'], files) with filetexts(files2, mode='b'): df = read_csv('2014-01-*.csv', compression='gzip') + assert df.npartitions == 3 out, err = capsys.readouterr() assert 'gzip' in out assert 'blocksize=None' in out diff --git a/docs/source/dataframe-api.rst b/docs/source/dataframe-api.rst index c16ad8205c5e..d77635085dd3 100644 --- a/docs/source/dataframe-api.rst +++ b/docs/source/dataframe-api.rst @@ -108,7 +108,7 @@ Rolling Operations Create DataFrames ~~~~~~~~~~~~~~~~~ -.. currentmodule:: dask.dataframe.io +.. currentmodule:: dask.dataframe .. autosummary:: from_array @@ -147,7 +147,7 @@ Other functions .. autofunction:: concat .. autofunction:: merge -.. currentmodule:: dask.dataframe.io +.. currentmodule:: dask.dataframe .. autofunction:: read_csv .. autofunction:: from_array