-
Notifications
You must be signed in to change notification settings - Fork 2
/
core.py
164 lines (135 loc) · 5.53 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import io
import sys
from toolz import merge
from .compression import seekable_files, files as compress_files
from .utils import SeekableFile
from ..delayed import delayed
from ..utils import system_encoding
delayed = delayed(pure=True)
_read_bytes = dict()
_open_files = dict()
_open_text_files = dict()
def read_bytes(path, delimiter=None, not_zero=False, blocksize=2**27,
sample=True, compression=None, **kwargs):
""" Convert path to a list of delayed values
The path may be a filename like ``'2015-01-01.csv'`` or a globstring
like ``'2015-*-*.csv'``.
The path may be preceeded by a protocol, like ``s3://`` or ``hdfs://`` if
those libraries are installed.
This cleanly breaks data by a delimiter if given, so that block boundaries
start directly after a delimiter and end on the delimiter.
Parameters
----------
path: string
delimiter: bytes
An optional delimiter, like ``b'\n'`` on which to split blocks of bytes
not_zero: force seek of start-of-file delimiter, discarding header
blocksize: int (=128MB)
Chunk size
compression: string or None
String like 'gzip' or 'xz'. Must support efficient random access.
sample: bool, int
Whether or not to return a sample from the first 10k bytes
**kwargs: dict
Options to send down to backend. Includes authentication information
for systems like S3 or HDFS
Examples
--------
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
>>> sample, blocks = read_bytes('s3://2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
Returns
-------
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``fn`` is a globstring.
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)
if '://' in path:
protocol, path = path.split('://', 1)
try:
read_bytes = _read_bytes[protocol]
except KeyError:
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
else:
read_bytes = _read_bytes['file']
return read_bytes(path, delimiter=delimiter, not_zero=not_zero,
blocksize=blocksize, sample=sample, compression=compression,
**kwargs)
def open_files(path, compression=None, **kwargs):
""" Given path return dask.delayed file-like objects
Parameters
----------
path: string
Filename or globstring
compression: string
Compression to use. See ``dask.bytes.compression.files`` for options.
**kwargs: dict
Options to pass to storage backend. Often used to pass authentication
information to S3 or HDFS.
Examples
--------
>>> files = open_files('2015-*-*.csv') # doctest: +SKIP
>>> files = open_files('s3://2015-*-*.csv.gz', compression='gzip') # doctest: +SKIP
Returns
-------
List of ``dask.delayed`` objects that compute to file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)
if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'file'
try:
files = _open_files[protocol](path, **kwargs)
except KeyError:
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))
if compression:
decompress = merge(seekable_files, compress_files)[compression]
if sys.version_info[0] < 3:
files = [delayed(SeekableFile)(file) for file in files]
files = [delayed(decompress)(file) for file in files]
return files
def open_text_files(path, encoding=system_encoding, errors='strict',
compression=None, **kwargs):
""" Given path return dask.delayed file-like objects in text mode
Parameters
----------
path: string
Filename or globstring
encoding: string
errors: string
compression: string
Compression to use. See ``dask.bytes.compression.files`` for options.
**kwargs: dict
Options to pass to storage backend. Often used to pass authentication
information to S3 or HDFS.
Examples
--------
>>> files = open_text_files('2015-*-*.csv', encoding='utf-8') # doctest: +SKIP
>>> files = open_text_files('s3://2015-*-*.csv') # doctest: +SKIP
Returns
-------
List of ``dask.delayed`` objects that compute to text file-like objects
"""
if compression is not None and compression not in compress_files:
raise ValueError("Compression type %s not supported" % compression)
original_path = path
if '://' in path:
protocol, path = path.split('://', 1)
else:
protocol = 'file'
if protocol in _open_text_files and compression is None:
return _open_text_files[protocol](path, encoding=encoding,
errors=errors, **kwargs)
elif protocol in _open_files:
files = open_files(original_path, compression=compression, **kwargs)
if sys.version_info[0] < 3:
files = [delayed(SeekableFile)(file) for file in files]
return [delayed(io.TextIOWrapper)(file, encoding=encoding,
errors=errors) for file in files]
else:
raise NotImplementedError("Unknown protocol %s://%s" %
(protocol, path))