-
Notifications
You must be signed in to change notification settings - Fork 2
/
s3.py
138 lines (109 loc) · 4.29 KB
/
s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import print_function, division, absolute_import
import logging
from s3fs import S3FileSystem
from .compression import files as compress_files, seekable_files
from ..base import tokenize
from ..delayed import delayed
from ..utils import read_block
logger = logging.getLogger(__name__)
def read_bytes(path, s3=None, delimiter=None, not_zero=False, blocksize=2**27,
sample=True, compression=None, **s3_params):
""" Convert location in S3 to a list of delayed values
Parameters
----------
path: string
location in S3
s3: S3FileSystem (optional)
delimiter: bytes
An optional delimiter, like ``b'\n'`` on which to split blocks of bytes
not_zero: force seek of start-of-file delimiter, discarding header
blocksize: int (=128MB)
Chunk size
sample: bool, int
Whether or not to return a sample from the first 10k bytes
compression: string or None
String like 'gzip' or 'xz'. Must support efficient random access.
**s3_params: keyword arguments
Extra keywords to send to boto3 session (anon, key, secret...)
Returns
-------
10kB sample header and list of ``dask.Delayed`` objects or list of lists of
delayed objects if ``path`` is a globstring.
"""
if s3 is None:
s3 = S3FileSystem(**s3_params)
if '*' in path:
filenames = sorted(s3.glob(path))
sample, first = read_bytes(filenames[0], s3, delimiter, not_zero,
blocksize, sample=True,
compression=compression, **s3_params)
rest = [read_bytes(f, s3, delimiter, not_zero, blocksize,
sample=False, compression=compression, **s3_params)[1]
for f in filenames[1:]]
return sample, [first] + rest
else:
if blocksize is None:
offsets = [0]
else:
size = getsize(path, compression, s3)
offsets = list(range(0, size, blocksize))
if not_zero:
offsets[0] = 1
info = s3.ls(path, detail=True)[0]
token = tokenize(info['ETag'], delimiter, blocksize, not_zero, compression)
logger.debug("Read %d blocks of binary bytes from %s", len(offsets), path)
s3safe_pars = s3_params.copy()
s3safe_pars.update(s3.get_delegated_s3pars())
values = [delayed(read_block_from_s3)(path, offset, blocksize,
s3safe_pars, delimiter, compression,
dask_key_name='read-block-s3-%s-%d' % (token, offset))
for offset in offsets]
if sample:
if isinstance(sample, int) and not isinstance(sample, bool):
nbytes = sample
else:
nbytes = 10000
sample = read_block_from_s3(path, 0, nbytes, s3safe_pars, delimiter,
compression)
return sample, values
def read_block_from_s3(filename, offset, length, s3_params={}, delimiter=None,
compression=None):
s3 = S3FileSystem(**s3_params)
with s3.open(filename, 'rb') as f:
if compression:
f = compress_files[compression](f)
try:
result = read_block(f, offset, length, delimiter)
finally:
f.close()
return result
def s3_open_file(path, s3_params):
s3 = S3FileSystem(**s3_params)
return s3.open(path, mode='rb')
def open_files(path, s3=None, **s3_params):
""" Open many files. Return delayed objects.
See Also
--------
dask.bytes.core.open_files: User function
"""
s3 = s3 or S3FileSystem(**s3_params)
filenames = sorted(s3.glob(path))
myopen = delayed(s3_open_file)
s3_params = s3_params.copy()
s3_params.update(s3.get_delegated_s3pars())
return [myopen(path, s3_params,
dask_key_name='s3-open-file-%s' % s3.info(path)['ETag'])
for path in filenames]
def getsize(path, compression, s3):
if compression is None:
return s3.info(path)['Size']
else:
with s3.open(path, 'rb') as f:
g = seekable_files[compression](f)
g.seek(0, 2)
result = g.tell()
g.close()
return result
from . import core
core._read_bytes['s3'] = read_bytes
core._open_files['s3'] = open_files