/
_dscache.py
334 lines (242 loc) · 10.5 KB
/
_dscache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
from typing import Tuple, Dict, List, Union, Iterator, Optional
from uuid import UUID
import toolz
from datacube.model import Dataset, GridSpec, DatasetType
from datacube.utils.geometry import CRS
from . import _jsoncache as base
from odc.io.text import split_and_check
ProductCollection = Union[Iterator[DatasetType],
List[DatasetType],
Dict[str, DatasetType]]
def ds2doc(ds):
return (ds.id, dict(uris=ds.uris,
product=ds.type.name,
metadata=ds.metadata_doc))
def doc2ds(doc, products):
if doc is None:
return None
p = products.get(doc['product'], None)
if p is None:
raise ValueError('No product named: %s' % doc['product'])
return Dataset(p, doc['metadata'], uris=doc['uris'])
def gs2doc(gs: GridSpec):
return dict(crs=str(gs.crs),
tile_size=list(gs.tile_size),
resolution=list(gs.resolution),
origin=list(gs.origin))
def doc2gs(doc):
return GridSpec(crs=CRS(doc['crs']),
tile_size=tuple(doc['tile_size']),
resolution=tuple(doc['resolution']),
origin=tuple(doc['origin']))
def build_dc_product_map(metadata_json, products_json):
from datacube.model import metadata_from_doc, DatasetType
mm = toolz.valmap(metadata_from_doc, metadata_json)
def mk_product(doc, name):
mt = doc.get('metadata_type')
if mt is None:
raise ValueError('Missing metadata_type key in product definition')
metadata = mm.get(mt)
if metadata is None:
raise ValueError('No such metadata %s for product %s' % (mt, name))
return DatasetType(metadata, doc)
return mm, {k: mk_product(doc, k) for k, doc in products_json.items()}
def _metadata_from_products(products):
mm = {}
for p in products.values():
m = p.metadata_type
if m.name not in mm:
mm[m.name] = m
return mm
def train_dictionary(dss, dict_sz=8*1024):
""" Given a finite sequence of Datasets train zstandard compression dictionary of a given size.
Accepts both `Dataset` as well as "raw" datasets.
Will return None if input sequence is empty.
"""
docs = map(ds2doc, dss)
return base.train_dictionary(docs, dict_sz=dict_sz)
def mk_group_name(idx: Tuple[int, int], name: str = "unnamed_grid") -> str:
return f"{name}/{idx[0]:+05d}/{idx[1]:+05d}"
def parse_group_name(group_name: str) -> Tuple[Tuple[int, int], str]:
""" Return an ((int, int), prefix:str) tuple from group name.
Expects group to be in the form {prefix}/{x}/{y}
raises ValueError if group_name is not in the expected format.
"""
try:
prefix, x, y = split_and_check(group_name, '/', 3)
x, y = map(int, (x, y))
except ValueError:
raise ValueError('Bad group name: ' + group_name)
return (x, y), prefix
class DatasetCache(object):
"""
info:
version: 4-bytes
zdict: pre-trained compression dictionary, optional
product/{name}: json
metadata/{name}: json
grid/{name}: json
groups:
Each group is named list of uuids
udata:
arbitrary user data (TODO)
ds:
uuid: compressed(json({product: str,
uris: [str],
metadata: object}))
"""
def __init__(self, db: base.JsonBlobCache,
products: Optional[ProductCollection] = None):
""" Don't use this directly, use create_cache or open_(rw|ro).
"""
if products is None:
metadata, products = build_dc_product_map(db.get_info_dict('metadata/'),
db.get_info_dict('product/'))
else:
if not isinstance(products, dict):
products = {p.name: p for p in products}
metadata = _metadata_from_products(products)
self._db = db
self._products = products
self._metadata = metadata
def close(self):
"""Write any outstanding product/metadata definitions (if in write mode) and
close database file.
"""
self._db.close()
@property
def readonly(self):
return self._db.readonly
@property
def count(self):
return self._db.count
def put_group(self, name, uuids):
""" Group is a named list of uuids
"""
self._db.put_group(name, uuids)
def get_group(self, name):
""" Group is a named list of uuids
"""
return self._db.get_group(name)
def groups(self, raw=False, prefix=None):
"""Get list of tuples (group_name, group_size).
:raw bool: Normally names are returned as strings, supplying raw=True
would return bytes instead, this is needed if you are using group names
that are not strings, like integers or tuples of basic types.
:prefix str|bytes: Only report groups with name starting with prefix
"""
return self._db.groups(raw=raw, prefix=prefix)
@property
def products(self):
return self._products
@property
def metadata(self):
return self._metadata
def _add_metadata(self, metadata, transaction):
self._metadata[metadata.name] = metadata
self._db.append_info_dict('metadata/', {metadata.name: metadata.definition}, transaction)
def _add_product(self, product, transaction):
if product.metadata_type.name not in self._metadata:
self._add_metadata(product.metadata_type, transaction)
self._products[product.name] = product
self._db.append_info_dict('product/', {product.name: product.definition}, transaction)
def _ds2doc(self, ds):
if ds.type.name not in self._products:
self._add_product(ds.type, self._db.current_transaction)
return ds2doc(ds)
def bulk_save(self, dss):
docs = (self._ds2doc(ds) for ds in dss)
return self._db.bulk_save(docs)
def tee(self, dss, max_transaction_size=10000):
"""Given a lazy stream of datasets persist them to disk and then pass through
for further processing.
:dss: stream of datasets
:max_transaction_size int: How often to commit results to disk
"""
return self._db.tee(dss, max_transaction_size=max_transaction_size, transform=self._ds2doc)
def get(self, uuid):
"""Extract single dataset with a given uuid, or return None if not found"""
return doc2ds(self._db.get(uuid), self._products)
def get_all(self):
for _, v in self._db.get_all():
yield doc2ds(v, self._products)
def stream_group(self, group_name):
for _, v in self._db.stream_group(group_name):
yield doc2ds(v, self._products)
@property
def grids(self):
"""Grids defined for this dataset cache"""
return {key: doc2gs(value)
for key, value in self._db.get_info_dict("grid/").items()}
def add_grid(self, gs: GridSpec, name: str):
"""Register a grid"""
self._db.append_info_dict("grid/", {name: gs2doc(gs)})
def add_grid_tile(self, grid: str, idx, dss):
"""Add list of dataset UUIDs to a tile"""
key = mk_group_name(idx, grid)
self._db.put_group(key, dss)
def add_grid_tiles(self, grid: str, tiles: Dict[Tuple[int, int], List[UUID]]):
"""Add multiple tiles to a grid"""
for idx, dss in tiles.items():
self.add_grid_tile(grid, idx, dss)
def tiles(self, grid: str) -> List[Tuple[Tuple[int, int], int]]:
"""Return tile indexes and dataset counts"""
def tile_index(group_name):
idx, prefix = parse_group_name(group_name)
assert prefix == grid
return idx
return [(tile_index(group_name), count)
for group_name, count in self.groups(prefix=grid + '/')]
def stream_grid_tile(self, idx: Tuple[int, int], grid: str):
"""Iterate over datasets in a given tile"""
return self.stream_group(mk_group_name(idx, grid))
def open_ro(path: str,
products: Optional[ProductCollection] = None,
lock: bool = False) -> DatasetCache:
"""Open existing database in readonly mode.
.. note::
default mode assumes db file is static (not being modified
externally), if this is not the case, supply `lock=True` parameter.
:path str: Path to the db could be folder or actual file
:products: Override product dictionary with compatible products loaded from
the datacube database, this is generally only needed if you intend to add
datasets to the datacube index directly (i.e. without product matching
metadata documents).
:lock bool: Supply True if external process is changing DB concurrently.
"""
db = base.open_ro(path, lock=lock)
return DatasetCache(db, products=products)
def open_rw(path,
products=None,
max_db_sz=None,
complevel=6):
"""Open existing database in append mode.
:path str: Path to the db could be folder or actual file
:products: Override product dictionary with compatible products loaded from
the datacube database, this is generally only needed if you intend to add
datasets to the datacube index directly (i.e. without product matching
metadata documents).
:max_db_sz int: Maximum size in bytes database file is allowed to grow to, defaults to 10Gb
:complevel: Compression level (Zstandard) to use when storing datasets, 1
fastest, 6 good and still fast, 20+ best but slower.
"""
db = base.open_rw(path, max_db_sz=max_db_sz, complevel=complevel)
return DatasetCache(db, products)
def create_cache(path,
complevel=6,
zdict=None,
max_db_sz=None,
truncate=False):
"""Create new file database or open existing one.
:path str: Path where to create new database (this will be a directory with 2 files in it)
:complevel int: Level of compressions to apply per dataset, bigger is slower but better compression.
:zdict: Optional pre-trained compression dictionary
:max_db_sz int: Maximum size in bytes (defaults to 10GiB)
:truncate bool: If True wipe out any existing database and create new empty one.
"""
db = base.create_cache(path,
complevel=complevel,
zdict=zdict,
max_db_sz=max_db_sz,
truncate=truncate)
return DatasetCache(db)