forked from intake/intake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
semistructured.py
104 lines (87 loc) · 3.42 KB
/
semistructured.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------
import datetime
from intake.container.base import RemoteSource, get_partition
from intake.source.base import Schema
class RemoteSequenceSource(RemoteSource):
"""Sequence-of-things source on an Intake server"""
name = 'remote_sequence'
container = 'python'
def __init__(self, url, headers, **kwargs):
self.url = url
self.npartitions = kwargs.get('npartition', 1)
self.partition_access = self.npartitions > 1
self.headers = headers
self.metadata = kwargs.get('metadata', {})
self._schema = Schema(npartitions=self.npartitions,
extra_metadata=self.metadata)
self.bag = None
super(RemoteSequenceSource, self).__init__(url, headers, **kwargs)
def _load_metadata(self):
import dask.bag as db
import dask
if self.bag is None:
self.parts = [dask.delayed(get_partition)(
self.url, self.headers, self._source_id, self.container, i
)
for i in range(self.npartitions)]
self.bag = db.from_delayed(self.parts)
return self._schema
def _get_partition(self, i):
self._load_metadata()
return self.parts[i].compute()
def read(self):
self._load_metadata()
return self.bag.compute()
def to_dask(self):
self._load_metadata()
return self.bag
def _close(self):
self.bag = None
@staticmethod
def _persist(source, path, encoder=None, **kwargs):
"""Save list to files using encoding
encoder : None or one of str|json|pickle
None is equivalent to str
"""
import pickle
import json
encoder = {None: str, 'str': str, 'json': json.dumps,
'pickle': pickle.dumps}[encoder]
try:
b = source.to_dask()
except NotImplementedError:
b = source.read()
return RemoteSequenceSource._data_to_source(b, path, encoder, **kwargs)
@staticmethod
def _data_to_source(b, path, encoder=None, storage_options=None, **kwargs):
import dask.bag as db
import posixpath
from fsspec import open_files
import dask
import pickle
import json
from intake.source.textfiles import TextFilesSource
encoder = {None: str, 'str': str, 'json': json.dumps,
'pickle': pickle.dumps}.get(encoder, encoder)
if not hasattr(b, 'to_textfiles'):
try:
b = db.from_sequence(b, npartitions=1)
except TypeError:
raise NotImplementedError
files = open_files(posixpath.join(path, 'part.*'), mode='wt',
num=b.npartitions, **(storage_options or {}))
dwrite = dask.delayed(write_file)
out = [dwrite(part, f, encoder)
for part, f in zip(b.to_delayed(), files)]
dask.compute(out)
s = TextFilesSource(posixpath.join(path, 'part.*'), storage_options=storage_options)
return s
def write_file(data, fo, encoder):
with fo as f:
for d in data:
f.write(encoder(d))