Skip to content

Commit

Permalink
Respond to comments on ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Apr 28, 2016
1 parent 2a8a8cf commit 2950e4a
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 191 deletions.
4 changes: 2 additions & 2 deletions dask/bag/text.py
Expand Up @@ -19,8 +19,9 @@ def read_text(path, blocksize=None, compression='infer',
Parameters
----------
path: string
path: string or list
Path to data. Can include ``'*'`` or protocol like ``'s3://'``
Can also be a list of filenames
blocksize: None or int
Size to cut up larger files. Streams by default.
compression: string
Expand Down Expand Up @@ -77,7 +78,6 @@ def read_text(path, blocksize=None, compression='infer',
compression=compression, **kwargs)
blocks = [delayed(list)(file) for file in files]


else:
_, blocks = read_bytes(path, delimiter=linedelimiter.encode(),
blocksize=blocksize, sample=False, compression=compression,
Expand Down
4 changes: 2 additions & 2 deletions dask/bytes/__init__.py
@@ -1,7 +1,7 @@
from ..utils import ignoring
from .core import read_bytes, open_files, open_text_files

from . import local

with ignoring(ImportError, SyntaxError):
from . import s3
with ignoring(ImportError):
from . import local
2 changes: 0 additions & 2 deletions dask/bytes/compression.py
Expand Up @@ -52,6 +52,4 @@ def noop_file(file, **kwargs):

if sys.version_info[0] >= 3:
import bz2
compress['bz2'] = bz2.compress
decompress['bz2'] = bz2.decompress
files['bz2'] = bz2.BZ2File
32 changes: 21 additions & 11 deletions dask/bytes/core.py
@@ -1,15 +1,17 @@
import io
import sys

from toolz import merge

from .compression import seekable_files, files as cfiles
from .compression import seekable_files, files as compress_files
from .utils import SeekableFile
from ..compatibility import PY2
from ..delayed import delayed
from ..utils import system_encoding

delayed = delayed(pure=True)

# Global registration dictionaries for backend storage functions
# See docstrings to functions below for more information
_read_bytes = dict()
_open_files = dict()
_open_text_files = dict()
Expand All @@ -22,9 +24,8 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
The path may be a filename like ``'2015-01-01.csv'`` or a globstring
like ``'2015-*-*.csv'``.
The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` and,
if those libraries are installed, the futures will point to those locations
instead.
The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` if
those libraries are installed.
This cleanly breaks data by a delimiter if given, so that block boundaries
start directly after a delimiter and end on the delimiter.
Expand Down Expand Up @@ -55,6 +56,9 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``fn`` is a globstring.
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if '://' in path:
protocol, path = path.split('://', 1)
try:
Expand All @@ -63,7 +67,7 @@ def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
else:
read_bytes = _read_bytes['local']
read_bytes = _read_bytes['file']

return read_bytes(path, delimiter=delimiter, not_zero=not_zero,
blocksize=blocksize, sample=sample, compression=compression,
Expand Down Expand Up @@ -92,19 +96,22 @@ def open_files(path, compression=None, **kwargs):
-------
List of ``dask.delayed`` objects that compute to file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'local'
protocol = 'file'

try:
files = _open_files[protocol](path, **kwargs)
except KeyError:
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
if compression:
decompress = merge(seekable_files, cfiles)[compression]
if sys.version_info[0] < 3:
decompress = merge(seekable_files, compress_files)[compression]
if PY2:
files = [delayed(SeekableFile)(file) for file in files]
files = [delayed(decompress)(file) for file in files]

Expand Down Expand Up @@ -136,18 +143,21 @@ def open_text_files(path, encoding=system_encoding, errors='strict',
-------
List of ``dask.delayed`` objects that compute to text file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

original_path = path
if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'local'
protocol = 'file'

if protocol in _open_text_files and compression is None:
return _open_text_files[protocol](path, encoding=encoding,
errors=errors, **kwargs)
elif protocol in _open_files:
files = open_files(original_path, compression=compression, **kwargs)
if sys.version_info[0] < 3:
if PY2:
files = [delayed(SeekableFile)(file) for file in files]
return [delayed(io.TextIOWrapper)(file, encoding=encoding,
errors=errors) for file in files]
Expand Down
19 changes: 9 additions & 10 deletions dask/bytes/local.py
Expand Up @@ -6,20 +6,18 @@
import sys

from .compression import files as compress_files, seekable_files
from .utils import SeekableFile
from .utils import SeekableFile, read_block
from ..base import tokenize
from ..compatibility import FileNotFoundError
from ..delayed import delayed
from ..utils import read_block, system_encoding
from ..utils import system_encoding

logger = logging.getLogger(__name__)


def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27,
sample=True, compression=None):
""" See dask.bytes.core.read_bytes for docstring """
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)
if '*' in fn:
filenames = list(map(os.path.abspath, sorted(glob(fn))))
sample, first = read_bytes(filenames[0], delimiter, not_zero,
Expand Down Expand Up @@ -51,7 +49,7 @@ def read_bytes(fn, delimiter=None, not_zero=False, blocksize=2**27,
for offset in offsets]

if sample:
if isinstance(sample, int) and not isinstance(sample, bool):
if sample is not True:
nbytes = sample
else:
nbytes = 10000
Expand All @@ -65,8 +63,9 @@ def read_block_from_file(fn, offset, length, delimiter, compression):
if compression:
f = SeekableFile(f)
f = compress_files[compression](f)
result = read_block(f, offset, length, delimiter)
if compression:
try:
result = read_block(f, offset, length, delimiter)
finally:
f.close()
return result

Expand All @@ -85,8 +84,8 @@ def open_files(path):


from . import core
core._read_bytes['local'] = read_bytes
core._open_files['local'] = open_files
core._read_bytes['file'] = read_bytes
core._open_files['file'] = open_files


if sys.version_info[0] >= 3:
Expand All @@ -104,7 +103,7 @@ def open_text_files(path, encoding=system_encoding, errors='strict'):
tokenize(fn, encoding, errors, os.path.getmtime(fn)))
for fn in filenames]

core._open_text_files['local'] = open_text_files
core._open_text_files['file'] = open_text_files


def getsize(fn, compression=None):
Expand Down
13 changes: 5 additions & 8 deletions dask/bytes/s3.py
Expand Up @@ -5,10 +5,10 @@
from s3fs import S3FileSystem

from .compression import files as compress_files, seekable_files
from .utils import read_block

from ..base import tokenize
from ..delayed import delayed
from ..utils import read_block

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,9 +39,6 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27,
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``path`` is a globstring.
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)

if s3 is None:
s3 = S3FileSystem(**s3_params)

Expand Down Expand Up @@ -70,8 +67,7 @@ def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27,
logger.debug("Read %d blocks of binary bytes from %s", len(offsets), path)

s3safe_pars = s3_params.copy()
if not s3.anon:
s3safe_pars.update(s3.get_delegated_s3pars())
s3safe_pars.update(s3.get_delegated_s3pars())

values = [delayed(read_block_from_s3)(path, offset, blocksize,
s3safe_pars, delimiter, compression,
Expand All @@ -95,8 +91,9 @@ def read_block_from_s3(filename, offset, length, s3_params={}, delimiter=None,
with s3.open(filename, 'rb') as f:
if compression:
f = compress_files[compression](f)
result = read_block(f, offset, length, delimiter)
if compression:
try:
result = read_block(f, offset, length, delimiter)
finally:
f.close()
return result

Expand Down
52 changes: 52 additions & 0 deletions dask/bytes/tests/test_bytes_utils.py
@@ -0,0 +1,52 @@
import io

from dask.bytes.utils import read_block, seek_delimiter


def test_read_block():
delimiter = b'\n'
data = delimiter.join([b'123', b'456', b'789'])
f = io.BytesIO(data)

assert read_block(f, 1, 2) == b'23'
assert read_block(f, 0, 1, delimiter=b'\n') == b'123\n'
assert read_block(f, 0, 2, delimiter=b'\n') == b'123\n'
assert read_block(f, 0, 3, delimiter=b'\n') == b'123\n'
assert read_block(f, 0, 5, delimiter=b'\n') == b'123\n456\n'
assert read_block(f, 0, 8, delimiter=b'\n') == b'123\n456\n789'
assert read_block(f, 0, 100, delimiter=b'\n') == b'123\n456\n789'
assert read_block(f, 1, 1, delimiter=b'\n') == b''
assert read_block(f, 1, 5, delimiter=b'\n') == b'456\n'
assert read_block(f, 1, 8, delimiter=b'\n') == b'456\n789'

for ols in [[(0, 3), (3, 3), (6, 3), (9, 2)],
[(0, 4), (4, 4), (8, 4)]]:
out = [read_block(f, o, l, b'\n') for o, l in ols]
assert b"".join(filter(None, out)) == data


def test_seek_delimiter_endline():
f = io.BytesIO(b'123\n456\n789')

# if at zero, stay at zero
seek_delimiter(f, b'\n', 5)
assert f.tell() == 0

# choose the first block
for bs in [1, 5, 100]:
f.seek(1)
seek_delimiter(f, b'\n', blocksize=bs)
assert f.tell() == 4

# handle long delimiters well, even with short blocksizes
f = io.BytesIO(b'123abc456abc789')
for bs in [1, 2, 3, 4, 5, 6, 10]:
f.seek(1)
seek_delimiter(f, b'abc', blocksize=bs)
assert f.tell() == 6

# End at the end
f = io.BytesIO(b'123\n456')
f.seek(5)
seek_delimiter(f, b'\n', 5)
assert f.tell() == 7
10 changes: 9 additions & 1 deletion dask/bytes/tests/test_local.py
Expand Up @@ -14,7 +14,6 @@

compute = partial(compute, get=get)

# These get mirrored on s3://distributed-test/
files = {'.test.accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n'
b'{"amount": 200, "name": "Bob"}\n'
b'{"amount": 300, "name": "Charlie"}\n'
Expand Down Expand Up @@ -136,6 +135,7 @@ def test_registered_open_text_files(encoding):
assert list(data) == [files[k].decode(encoding)
for k in sorted(files)]


def test_open_files():
with filetexts(files, mode='b'):
myfiles = open_files('.test.accounts.*')
Expand Down Expand Up @@ -170,6 +170,14 @@ def test_getsize(fmt):
assert getsize('.tmp.getsize', fmt) == 10


def test_bad_compression():
from dask.bytes.core import read_bytes, open_files, open_text_files
with filetexts(files, mode='b'):
for func in [read_bytes, open_files, open_text_files]:
with pytest.raises(ValueError):
sample, values = func('.test.accounts.*',
compression='not-found')

def test_not_found():
fn = 'not-a-file'
with pytest.raises(FileNotFoundError) as e:
Expand Down

0 comments on commit 2950e4a

Please sign in to comment.