diff --git a/dask/bag/text.py b/dask/bag/text.py index c07c07a7f4c..9eb7fa2b0f4 100644 --- a/dask/bag/text.py +++ b/dask/bag/text.py @@ -19,8 +19,9 @@ def read_text(path, blocksize=None, compression='infer', Parameters ---------- - path: string + path: string or list Path to data. Can include ``'*'`` or protocol like ``'s3://'`` + Can also be a list of filenames blocksize: None or int Size to cut up larger files. Streams by default. compression: string @@ -77,7 +78,6 @@ def read_text(path, blocksize=None, compression='infer', compression=compression, **kwargs) blocks = [delayed(list)(file) for file in files] - else: _, blocks = read_bytes(path, delimiter=linedelimiter.encode(), blocksize=blocksize, sample=False, compression=compression, diff --git a/dask/bytes/__init__.py b/dask/bytes/__init__.py index 4fed8536212..f11cc865904 100644 --- a/dask/bytes/__init__.py +++ b/dask/bytes/__init__.py @@ -1,7 +1,7 @@ from ..utils import ignoring from .core import read_bytes, open_files, open_text_files +from . import local + with ignoring(ImportError, SyntaxError): from . import s3 -with ignoring(ImportError): - from . import local diff --git a/dask/bytes/compression.py b/dask/bytes/compression.py index b7e24f63c60..6824a260c2f 100644 --- a/dask/bytes/compression.py +++ b/dask/bytes/compression.py @@ -52,6 +52,4 @@ def noop_file(file, **kwargs): if sys.version_info[0] >= 3: import bz2 - compress['bz2'] = bz2.compress - decompress['bz2'] = bz2.decompress files['bz2'] = bz2.BZ2File diff --git a/dask/bytes/core.py b/dask/bytes/core.py index c412a655e3f..66997a5d9a9 100644 --- a/dask/bytes/core.py +++ b/dask/bytes/core.py @@ -1,15 +1,17 @@ import io -import sys from toolz import merge -from .compression import seekable_files, files as cfiles +from .compression import seekable_files, files as compress_files from .utils import SeekableFile +from ..compatibility import PY2 from ..delayed import delayed from ..utils import system_encoding delayed = delayed(pure=True) +# Global registration dictionaries for backend storage functions +# See docstrings to functions below for more information _read_bytes = dict() _open_files = dict() _open_text_files = dict() @@ -22,9 +24,8 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, The path may be a filename like ``'2015-01-01.csv'`` or a globstring like ``'2015-*-*.csv'``. - The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` and, - if those libraries are installed, the futures will point to those locations - instead. + The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` if + those libraries are installed. This cleanly breaks data by a delimiter if given, so that block boundaries start directly after a delimiter and end on the delimiter. @@ -55,6 +56,9 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, 10kB sample header and list of ``dask.Delayed`` objects or list of lists of delayed objects if ``fn`` is a globstring. """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + if '://' in path: protocol, path = path.split('://', 1) try: @@ -63,7 +67,7 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27, raise NotImplementedError("Unknown protocol %s://%s" % (protocol, path)) else: - read_bytes = _read_bytes['local'] + read_bytes = _read_bytes['file'] return read_bytes(path, delimiter=delimiter, not_zero=not_zero, blocksize=blocksize, sample=sample, compression=compression, @@ -92,10 +96,13 @@ def open_files(path, compression=None, **kwargs): ------- List of ``dask.delayed`` objects that compute to file-like objects """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + if '://' in path: protocol, path = path.split('://', 1) else: - protocol = 'local' + protocol = 'file' try: files = _open_files[protocol](path, **kwargs) @@ -103,8 +110,8 @@ def open_files(path, compression=None, **kwargs): raise NotImplementedError("Unknown protocol %s://%s" % (protocol, path)) if compression: - decompress = merge(seekable_files, cfiles)[compression] - if sys.version_info[0] < 3: + decompress = merge(seekable_files, compress_files)[compression] + if PY2: files = [delayed(SeekableFile)(file) for file in files] files = [delayed(decompress)(file) for file in files] @@ -136,18 +143,21 @@ def open_text_files(path, encoding=system_encoding, errors='strict', ------- List of ``dask.delayed`` objects that compute to text file-like objects """ + if compression is not None and compression not in compress_files: + raise ValueError("Compression type %s not supported" % compression) + original_path = path if '://' in path: protocol, path = path.split('://', 1) else: - protocol = 'local' + protocol = 'file' if protocol in _open_text_files and compression is None: return _open_text_files[protocol](path, encoding=encoding, errors=errors, **kwargs) elif protocol in _open_files: files = open_files(original_path, compression=compression, **kwargs) - if sys.version_info[0] < 3: + if PY2: files = [delayed(SeekableFile)(file) for file in files] return [delayed(io.TextIOWrapper)(file, encoding=encoding, errors=errors) for file in files] diff --git a/dask/bytes/local.py b/dask/bytes/local.py index 6c5649e052f..7b65b632935 100644 --- a/dask/bytes/local.py +++ b/dask/bytes/local.py @@ -6,11 +6,11 @@ import sys from .compression import files as compress_files, seekable_files -from .utils import SeekableFile +from .utils import SeekableFile, read_block from ..base import tokenize from ..compatibility import FileNotFoundError from ..delayed import delayed -from ..utils import read_block, system_encoding +from ..utils import system_encoding logger = logging.getLogger(__name__) @@ -18,8 +18,6 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27, sample=True, compression=None): """ See dask.bytes.core.read_bytes for docstring """ - if compression is not None and compression not in compress_files: - raise ValueError("Compression type %s not supported" % compression) if '*' in fn: filenames = list(map(os.path.abspath, sorted(glob(fn)))) sample, first = read_bytes(filenames[0], delimiter, not_zero, @@ -51,7 +49,7 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27, for offset in offsets] if sample: - if isinstance(sample, int) and not isinstance(sample, bool): + if sample is not True: nbytes = sample else: nbytes = 10000 @@ -65,8 +63,9 @@ def read_block_from_file(fn, offset, length, delimiter, compression): if compression: f = SeekableFile(f) f = compress_files[compression](f) - result = read_block(f, offset, length, delimiter) - if compression: + try: + result = read_block(f, offset, length, delimiter) + finally: f.close() return result @@ -85,8 +84,8 @@ def open_files(path): from . import core -core._read_bytes['local'] = read_bytes -core._open_files['local'] = open_files +core._read_bytes['file'] = read_bytes +core._open_files['file'] = open_files if sys.version_info[0] >= 3: @@ -104,7 +103,7 @@ def open_text_files(path, encoding=system_encoding, errors='strict'): tokenize(fn, encoding, errors, os.path.getmtime(fn))) for fn in filenames] - core._open_text_files['local'] = open_text_files + core._open_text_files['file'] = open_text_files def getsize(fn, compression=None): diff --git a/dask/bytes/s3.py b/dask/bytes/s3.py index 611dbb76f8b..398e3f1cc18 100644 --- a/dask/bytes/s3.py +++ b/dask/bytes/s3.py @@ -5,10 +5,10 @@ from s3fs import S3FileSystem from .compression import files as compress_files, seekable_files +from .utils import read_block from ..base import tokenize from ..delayed import delayed -from ..utils import read_block logger = logging.getLogger(__name__) @@ -39,9 +39,6 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27, 10kB sample header and list of ``dask.Delayed`` objects or list of lists of delayed objects if ``path`` is a globstring. """ - if compression is not None and compression not in compress_files: - raise ValueError("Compression type %s not supported" % compression) - if s3 is None: s3 = S3FileSystem(**s3_params) @@ -70,8 +67,7 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27, logger.debug("Read %d blocks of binary bytes from %s", len(offsets), path) s3safe_pars = s3_params.copy() - if not s3.anon: - s3safe_pars.update(s3.get_delegated_s3pars()) + s3safe_pars.update(s3.get_delegated_s3pars()) values = [delayed(read_block_from_s3)(path, offset, blocksize, s3safe_pars, delimiter, compression, @@ -95,8 +91,9 @@ def read_block_from_s3(filename, offset, length, s3_params={}, delimiter=None, with s3.open(filename, 'rb') as f: if compression: f = compress_files[compression](f) - result = read_block(f, offset, length, delimiter) - if compression: + try: + result = read_block(f, offset, length, delimiter) + finally: f.close() return result diff --git a/dask/bytes/tests/test_bytes_utils.py b/dask/bytes/tests/test_bytes_utils.py new file mode 100644 index 00000000000..dbd63ae8c4a --- /dev/null +++ b/dask/bytes/tests/test_bytes_utils.py @@ -0,0 +1,52 @@ +import io + +from dask.bytes.utils import read_block, seek_delimiter + + +def test_read_block(): + delimiter = b'\n' + data = delimiter.join([b'123', b'456', b'789']) + f = io.BytesIO(data) + + assert read_block(f, 1, 2) == b'23' + assert read_block(f, 0, 1, delimiter=b'\n') == b'123\n' + assert read_block(f, 0, 2, delimiter=b'\n') == b'123\n' + assert read_block(f, 0, 3, delimiter=b'\n') == b'123\n' + assert read_block(f, 0, 5, delimiter=b'\n') == b'123\n456\n' + assert read_block(f, 0, 8, delimiter=b'\n') == b'123\n456\n789' + assert read_block(f, 0, 100, delimiter=b'\n') == b'123\n456\n789' + assert read_block(f, 1, 1, delimiter=b'\n') == b'' + assert read_block(f, 1, 5, delimiter=b'\n') == b'456\n' + assert read_block(f, 1, 8, delimiter=b'\n') == b'456\n789' + + for ols in [[(0, 3), (3, 3), (6, 3), (9, 2)], + [(0, 4), (4, 4), (8, 4)]]: + out = [read_block(f, o, l, b'\n') for o, l in ols] + assert b"".join(filter(None, out)) == data + + +def test_seek_delimiter_endline(): + f = io.BytesIO(b'123\n456\n789') + + # if at zero, stay at zero + seek_delimiter(f, b'\n', 5) + assert f.tell() == 0 + + # choose the first block + for bs in [1, 5, 100]: + f.seek(1) + seek_delimiter(f, b'\n', blocksize=bs) + assert f.tell() == 4 + + # handle long delimiters well, even with short blocksizes + f = io.BytesIO(b'123abc456abc789') + for bs in [1, 2, 3, 4, 5, 6, 10]: + f.seek(1) + seek_delimiter(f, b'abc', blocksize=bs) + assert f.tell() == 6 + + # End at the end + f = io.BytesIO(b'123\n456') + f.seek(5) + seek_delimiter(f, b'\n', 5) + assert f.tell() == 7 diff --git a/dask/bytes/tests/test_local.py b/dask/bytes/tests/test_local.py index 4ed1f15cc63..83dab025d21 100644 --- a/dask/bytes/tests/test_local.py +++ b/dask/bytes/tests/test_local.py @@ -14,7 +14,6 @@ compute = partial(compute, get=get) -# These get mirrored on s3://distributed-test/ files = {'.test.accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n' b'{"amount": 200, "name": "Bob"}\n' b'{"amount": 300, "name": "Charlie"}\n' @@ -136,6 +135,7 @@ def test_registered_open_text_files(encoding): assert list(data) == [files[k].decode(encoding) for k in sorted(files)] + def test_open_files(): with filetexts(files, mode='b'): myfiles = open_files('.test.accounts.*') @@ -170,6 +170,14 @@ def test_getsize(fmt): assert getsize('.tmp.getsize', fmt) == 10 +def test_bad_compression(): + from dask.bytes.core import read_bytes, open_files, open_text_files + with filetexts(files, mode='b'): + for func in [read_bytes, open_files, open_text_files]: + with pytest.raises(ValueError): + sample, values = func('.test.accounts.*', + compression='not-found') + def test_not_found(): fn = 'not-a-file' with pytest.raises(FileNotFoundError) as e: diff --git a/dask/bytes/utils.py b/dask/bytes/utils.py index b8971f9bc3d..3ea7f85858e 100644 --- a/dask/bytes/utils.py +++ b/dask/bytes/utils.py @@ -33,3 +33,91 @@ def __getattr__(self, key): SeekableFile = identity +def seek_delimiter(file, delimiter, blocksize): + """ Seek current file to next byte after a delimiter bytestring + + This seeks the file to the next byte following the delimiter. It does + not return anything. Use ``file.tell()`` to see location afterwards. + + Parameters + ---------- + file: a file + delimiter: bytes + a delimiter like ``b'\n'`` or message sentinel + blocksize: int + Number of bytes to read from the file at once. + """ + + if file.tell() == 0: + return + + last = b'' + while True: + current = file.read(blocksize) + if not current: + return + full = last + current + try: + i = full.index(delimiter) + file.seek(file.tell() - (len(full) - i) + len(delimiter)) + return + except ValueError: + pass + last = full[-len(delimiter):] + + +def read_block(f, offset, length, delimiter=None): + """ Read a block of bytes from a file + + Parameters + ---------- + f: File + offset: int + Byte offset to start read + length: int + Number of bytes to read + delimiter: bytes (optional) + Ensure reading starts and stops at delimiter bytestring + + If using the ``delimiter=`` keyword argument we ensure that the read + starts and stops at delimiter boundaries that follow the locations + ``offset`` and ``offset + length``. If ``offset`` is zero then we + start at zero. The bytestring returned WILL include the + terminating delimiter string. + + Examples + -------- + + >>> from io import BytesIO # doctest: +SKIP + >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP + >>> read_block(f, 0, 13) # doctest: +SKIP + b'Alice, 100\\nBo' + + >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP + b'Alice, 100\\nBob, 200\\n' + + >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP + b'Bob, 200\\nCharlie, 300' + """ + if offset != f.tell(): # commonly both zero + f.seek(offset) + + if not offset and length is None and f.tell() == 0: + return f.read() + + if delimiter: + seek_delimiter(f, delimiter, 2**16) + start = f.tell() + length -= start - offset + + f.seek(start + length) + seek_delimiter(f, delimiter, 2**16) + end = f.tell() + eof = not f.read(1) + + offset = start + length = end - start + + f.seek(offset) + + return f.read(length) diff --git a/dask/dataframe/csv.py b/dask/dataframe/csv.py index 81a216359a1..a123620b4c9 100644 --- a/dask/dataframe/csv.py +++ b/dask/dataframe/csv.py @@ -96,32 +96,71 @@ def read_csv_from_bytes(block_lists, header, head, kwargs, collection=True, """ dtypes = head.dtypes.to_dict() columns = list(head.columns) - dfs1 = [[delayed(bytes_read_csv)(b, header, kwargs, dtypes, columns) + func = delayed(bytes_read_csv) + dfs = [func(b, header, kwargs, dtypes, columns) + for blocks in block_lists for b in blocks] - for blocks in block_lists] - dfs2 = sum(dfs1, []) if collection: - return from_delayed(dfs2, head) + return from_delayed(dfs, head) else: - return dfs2 + return dfs def read_csv(filename, blocksize=2**25, chunkbytes=None, collection=True, lineterminator='\n', compression=None, enforce_dtypes=True, sample=10000, **kwargs): + """ Read CSV files into a Dask.DataFrame + + This parallelizes the ``pandas.read_csv`` file in the following ways: + + 1. It supports loading many files at once using globstrings as follows: + + >>> df = dd.read_csv('myfiles.*.csv') # doctest: +SKIP + + 2. In some cases it can break up large files as follows: + + >>> df = dd.read_csv('largefile.csv', blocksize=25e6) # 25MB chunks # doctest: +SKIP + + Internally dd.read_csv uses pandas.read_csv and so supports many of the + same keyword arguments with the same performance guarantees. + + See the docstring for ``pandas.read_csv`` for more information on available + keyword arguments. + + Parameters + ---------- + + filename: string + Filename or globstring for CSV files. May include protocols like s3:// + blocksize: int or None + Number of bytes by which to cut up larger files + collection: boolean + Return a dask.dataframe if True or list of dask.delayed objects if False + sample: int + Number of bytes to use when determining dtypes + **kwargs: dict + Options to pass down to ``pandas.read_csv`` + """ if chunkbytes is not None: warn("Deprecation warning: chunksize csv keyword renamed to blocksize") blocksize=chunkbytes if 'index' in kwargs or 'index_col' in kwargs: raise ValueError("Keyword 'index' not supported " "dd.read_csv(...).set_index('my-index') instead") + for kw in ['iterator', 'chunksize']: + if kw in kwargs: + raise ValueError("%s not supported for dd.read_csv" % kw) + if isinstance(kwargs.get('skiprows'), list): + raise TypeError("List of skiprows not supported for dd.read_csv") + if isinstance(kwargs.get('header'), list): + raise TypeError("List of header rows not supported for dd.read_csv") if blocksize and compression not in seekable_files: - print("Warning %s compression does not support breaking apart files\n" - "Please ensure that each individiaul file can fit in memory and\n" - "use the keyword ``blocksize=None to remove this message``\n" - "Setting ``blocksize=None``" % compression) + warn("Warning %s compression does not support breaking apart files\n" + "Please ensure that each individiaul file can fit in memory and\n" + "use the keyword ``blocksize=None to remove this message``\n" + "Setting ``blocksize=None``" % compression) blocksize = None if compression not in seekable_files and compression not in cfiles: raise NotImplementedError("Compression format %s not installed" % @@ -133,10 +172,15 @@ def read_csv(filename, blocksize=2**25, chunkbytes=None, sample=sample, compression=compression) if not isinstance(values[0], (tuple, list)): values = [values] + + if 'nrows' in kwargs: + values = [[values[0][0]]] + header = sample.split(b_lineterminator)[0] + b_lineterminator head = pd.read_csv(BytesIO(sample), **kwargs) df = read_csv_from_bytes(values, header, head, kwargs, collection=collection, enforce_dtypes=enforce_dtypes) + return df diff --git a/dask/dataframe/tests/test_csv.py b/dask/dataframe/tests/test_csv.py index 1d830b21c19..3d8152ee9be 100644 --- a/dask/dataframe/tests/test_csv.py +++ b/dask/dataframe/tests/test_csv.py @@ -152,13 +152,14 @@ def test_warn_non_seekable_files(capsys): files2 = valmap(compress['gzip'], files) with filetexts(files2, mode='b'): df = read_csv('2014-01-*.csv', compression='gzip') + assert df.npartitions == 3 out, err = capsys.readouterr() - assert 'gzip' in out - assert 'blocksize=None' in out + assert 'gzip' in err + assert 'blocksize=None' in err df = read_csv('2014-01-*.csv', compression='gzip', blocksize=None) out, err = capsys.readouterr() - assert not out + assert not err and not out with pytest.raises(NotImplementedError): df = read_csv('2014-01-*.csv', compression='foo') diff --git a/dask/tests/test_utils.py b/dask/tests/test_utils.py index 02cb1bb3645..cfe8924376d 100644 --- a/dask/tests/test_utils.py +++ b/dask/tests/test_utils.py @@ -6,8 +6,7 @@ from dask.compatibility import BZ2File, GzipFile, LZMAFile, LZMA_AVAILABLE from dask.utils import (textblock, filetext, takes_multiple_arguments, - Dispatch, tmpfile, different_seeds, file_size, - read_block, seek_delimiter) + Dispatch, tmpfile, different_seeds, file_size) SKIP_XZ = pytest.mark.skipif(not LZMA_AVAILABLE, reason="no lzma library") @@ -121,52 +120,3 @@ def test_different_seeds(): # Should be sorted smallseeds = different_seeds(10, 1234) assert smallseeds == sorted(smallseeds) - - -def test_read_block(): - delimiter = b'\n' - data = delimiter.join([b'123', b'456', b'789']) - f = io.BytesIO(data) - - assert read_block(f, 1, 2) == b'23' - assert read_block(f, 0, 1, delimiter=b'\n') == b'123\n' - assert read_block(f, 0, 2, delimiter=b'\n') == b'123\n' - assert read_block(f, 0, 3, delimiter=b'\n') == b'123\n' - assert read_block(f, 0, 5, delimiter=b'\n') == b'123\n456\n' - assert read_block(f, 0, 8, delimiter=b'\n') == b'123\n456\n789' - assert read_block(f, 0, 100, delimiter=b'\n') == b'123\n456\n789' - assert read_block(f, 1, 1, delimiter=b'\n') == b'' - assert read_block(f, 1, 5, delimiter=b'\n') == b'456\n' - assert read_block(f, 1, 8, delimiter=b'\n') == b'456\n789' - - for ols in [[(0, 3), (3, 3), (6, 3), (9, 2)], - [(0, 4), (4, 4), (8, 4)]]: - out = [read_block(f, o, l, b'\n') for o, l in ols] - assert b"".join(filter(None, out)) == data - - -def test_seek_delimiter_endline(): - f = io.BytesIO(b'123\n456\n789') - - # if at zero, stay at zero - seek_delimiter(f, b'\n', 5) - assert f.tell() == 0 - - # choose the first block - for bs in [1, 5, 100]: - f.seek(1) - seek_delimiter(f, b'\n', blocksize=bs) - assert f.tell() == 4 - - # handle long delimiters well, even with short blocksizes - f = io.BytesIO(b'123abc456abc789') - for bs in [1, 2, 3, 4, 5, 6, 10]: - f.seek(1) - seek_delimiter(f, b'abc', blocksize=bs) - assert f.tell() == 6 - - # End at the end - f = io.BytesIO(b'123\n456') - f.seek(5) - seek_delimiter(f, b'\n', 5) - assert f.tell() == 7 diff --git a/dask/utils.py b/dask/utils.py index 23b88ddb95e..c96acb86985 100644 --- a/dask/utils.py +++ b/dask/utils.py @@ -568,96 +568,6 @@ def funcname(func, full=False): return str(func) -def seek_delimiter(file, delimiter, blocksize): - """ Seek current file to next byte after a delimiter bytestring - - This seeks the file to the next byte following the delimiter. It does - not return anything. Use ``file.tell()`` to see location afterwards. - - Parameters - ---------- - file: a file - delimiter: bytes - a delimiter like ``b'\n'`` or message sentinel - blocksize: int - Number of bytes to read from the file at once. - """ - - if file.tell() == 0: - return - - last = b'' - while True: - current = file.read(blocksize) - if not current: - return - full = last + current - try: - i = full.index(delimiter) - file.seek(file.tell() - (len(full) - i) + len(delimiter)) - return - except ValueError: - pass - last = full[-len(delimiter):] - - -def read_block(f, offset, length, delimiter=None): - """ Read a block of bytes from a file - - Parameters - ---------- - f: File - offset: int - Byte offset to start read - length: int - Number of bytes to read - delimiter: bytes (optional) - Ensure reading starts and stops at delimiter bytestring - - If using the ``delimiter=`` keyword argument we ensure that the read - starts and stops at delimiter boundaries that follow the locations - ``offset`` and ``offset + length``. If ``offset`` is zero then we - start at zero. The bytestring returned WILL include the - terminating delimiter string. - - Examples - -------- - - >>> from io import BytesIO # doctest: +SKIP - >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP - >>> read_block(f, 0, 13) # doctest: +SKIP - b'Alice, 100\\nBo' - - >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP - b'Alice, 100\\nBob, 200\\n' - - >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP - b'Bob, 200\\nCharlie, 300' - """ - if offset != f.tell(): # commonly both zero - f.seek(offset) - - if not offset and length is None and f.tell() == 0: - return f.read() - - if delimiter: - seek_delimiter(f, delimiter, 2**16) - start = f.tell() - length -= start - offset - - f.seek(start + length) - seek_delimiter(f, delimiter, 2**16) - end = f.tell() - eof = not f.read(1) - - offset = start - length = end - start - - f.seek(offset) - - return f.read(length) - - def ensure_bytes(s): """ Turn string or bytes to bytes diff --git a/docs/source/dataframe-api.rst b/docs/source/dataframe-api.rst index c16ad8205c5..d77635085dd 100644 --- a/docs/source/dataframe-api.rst +++ b/docs/source/dataframe-api.rst @@ -108,7 +108,7 @@ Rolling Operations Create DataFrames ~~~~~~~~~~~~~~~~~ -.. currentmodule:: dask.dataframe.io +.. currentmodule:: dask.dataframe .. autosummary:: from_array @@ -147,7 +147,7 @@ Other functions .. autofunction:: concat .. autofunction:: merge -.. currentmodule:: dask.dataframe.io +.. currentmodule:: dask.dataframe .. autofunction:: read_csv .. autofunction:: from_array