/
hl.py
449 lines (358 loc) · 16.9 KB
/
hl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2023 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
High level indexing operations/utilities
"""
import logging
import json
import toolz
from uuid import UUID
from typing import cast, Any, Callable, Optional, Iterable, List, Mapping, Sequence, Tuple, Union, MutableMapping
from datacube.model import Dataset, LineageTree, Product
from datacube.index.abstract import AbstractIndex
from datacube.utils import changes, InvalidDocException, SimpleDocNav, jsonify_document
from datacube.model.utils import BadMatch, dedup_lineage, remap_lineage_doc, flatten_datasets
from datacube.utils.changes import get_doc_changes
from datacube.model import LineageDirection
from .eo3 import prep_eo3, is_doc_eo3, is_doc_geo # type: ignore[attr-defined]
_LOG = logging.getLogger(__name__)
class ProductRule:
def __init__(self, product: Product, signature: Mapping[str, Any]):
self.product = product
self.signature = signature
def load_rules_from_types(index: AbstractIndex,
product_names: Optional[Iterable[str]] = None,
excluding: Optional[Iterable[str]] = None
) -> Union[Tuple[List[ProductRule], None], Tuple[None, str]]:
products: List[Product] = []
if product_names:
for name in product_names:
product = index.products.get_by_name(name)
if not product:
return None, 'Supplied product name "%s" not present in the database' % name
products.append(product)
else:
products += index.products.get_all()
if excluding is not None:
excluding = set(excluding)
products = [p for p in products if p.name not in excluding]
if len(products) == 0:
return None, 'Found no matching products in the database'
return [ProductRule(p, p.metadata_doc) for p in products], None
ProductMatcher = Callable[[Mapping[str, Any]], Product]
def product_matcher(rules: Sequence[ProductRule]) -> ProductMatcher:
"""Given product matching rules return a function mapping a document to a
matching product.
"""
assert len(rules) > 0
def matches(doc: Mapping[str, Any], rule: ProductRule) -> bool:
return changes.contains(doc, rule.signature)
def single_product_matcher(rule):
def match(doc: Mapping[str, Any]) -> bool:
if matches(doc, rule):
return rule.product
raise BadMatch('Dataset metadata did not match product signature.'
'\nDataset definition:\n %s\n'
'\nProduct signature:\n %s\n'
% (json.dumps(doc, indent=4),
json.dumps(rule.signature, indent=4)))
return match
if len(rules) == 1:
return single_product_matcher(rules[0])
def match(doc: Mapping[str, Any]) -> Product:
matched = [rule.product for rule in rules if changes.contains(doc, rule.signature)]
if len(matched) == 1:
return matched[0]
doc_id = doc.get('id', '<missing id>')
if len(matched) == 0:
raise BadMatch('No matching Product found for dataset %s' % doc_id)
else:
raise BadMatch('Auto match failed, dataset %s matches several products:\n %s' % (
doc_id,
','.join(p.name for p in matched)))
return match
def check_dataset_consistent(dataset: Dataset) -> Tuple[bool, Optional[str]]:
"""
:type dataset: datacube.model.Dataset
:return: (Is consistent, [error message|None])
:rtype: (bool, str or None)
"""
product_measurements = set(dataset.product.measurements.keys())
if len(product_measurements) == 0:
return True, None
if dataset.measurements is None:
return False, "No measurements defined for a dataset"
# It the type expects measurements, ensure our dataset contains them all.
if not product_measurements.issubset(dataset.measurements.keys()):
# Exclude 3D measurements since it's just a mapping to 2D measurements
not_measured = {
m
for m in product_measurements - set(dataset.measurements.keys())
if "extra_dim" not in dataset.product.measurements.get(m, [])
}
if not_measured:
msg = "The dataset is not specifying all of the measurements in this product.\n"
msg += "Missing fields are;\n" + str(not_measured)
return False, msg
return True, None
def check_consistent(a: Mapping[str, Any], b: Mapping[str, Any]) -> Tuple[bool, Optional[str]]:
diffs = get_doc_changes(a, b)
if len(diffs) == 0:
return True, None
def render_diff(offset, a, b):
offset = '.'.join(map(str, offset))
return '{}: {!r}!={!r}'.format(offset, a, b)
return False, ", ".join([render_diff(offset, a, b) for offset, a, b in diffs])
DatasetOrError = Union[
Tuple[Dataset, None],
Tuple[None, Union[str, Exception]]
]
def check_intended_eo3(ds: SimpleDocNav, product: Product) -> None:
# warn if it looks like dataset was meant to be eo3 but is not
if not is_doc_eo3(ds.doc) and ("eo3" in product.metadata_type.name):
_LOG.warning(f"Dataset {ds.id} has a product with an eo3 metadata type, "
"but the dataset definition does not include the $schema field "
"and so will not be recognised as an eo3 dataset.")
def resolve_no_lineage(ds: SimpleDocNav,
uri: str,
matcher: ProductMatcher,
source_tree: Optional[LineageTree] = None) -> DatasetOrError:
if source_tree:
raise ValueError("source_tree passed to non-lineage resolver")
doc = ds.doc_without_lineage_sources
try:
product = matcher(doc)
except BadMatch as e:
return None, e
check_intended_eo3(ds, product)
return Dataset(product, doc, uris=[uri], sources={}), None
def resolve_with_lineage(doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
source_tree: Optional[LineageTree] = None,
home_index: Optional[str] = None) -> DatasetOrError:
"""
Dataset driver for the (new) external lineage API
API paramters
:param doc: Dataset docnav
:param uri: location uri
Extra kwargs passed in by Doc2Dataset:
:param matcher: Product matcher
:param source_tree: sourcewards LineageTree to use in place of EO3 sources (optional)
:param home_index: Home for sources (ignored if source_tree is not none)
:return:
"""
uuid_ = doc.id
if not uuid_:
return None, "No id defined in dataset doc"
try:
product = matcher(doc.doc)
except BadMatch as e:
return None, e
if source_tree is None:
# Get sources from EO3 document, use home_index as home of source id's
source_tree = LineageTree.from_data(uuid_, sources=doc.sources, home=home_index)
else:
# May be None
if source_tree.direction == LineageDirection.DERIVED:
raise ValueError("source_tree cannot be a derived tree.")
source_tree = source_tree.find_subtree(uuid_)
check_intended_eo3(doc, product)
return Dataset(product,
doc.doc,
source_tree=source_tree,
uris=[uri]), None
def resolve_legacy_lineage(main_ds_doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
index: AbstractIndex,
fail_on_missing_lineage: bool,
verify_lineage: bool,
source_tree: Optional[LineageTree] = None,
) -> DatasetOrError:
if source_tree:
raise ValueError("source_tree passed to non-external lineage resolver")
try:
main_ds = SimpleDocNav(dedup_lineage(main_ds_doc))
except InvalidDocException as e:
return None, e
main_uuid = main_ds.id
if not main_uuid:
return None, "No id defined in dataset doc"
ds_by_uuid = toolz.valmap(toolz.first, flatten_datasets(main_ds))
all_uuid = list(ds_by_uuid)
db_dss = {ds.id: ds for ds in index.datasets.bulk_get(all_uuid)}
lineage_uuids = set(filter(lambda x: x != main_uuid, all_uuid))
missing_lineage = lineage_uuids - set(db_dss)
if missing_lineage and fail_on_missing_lineage:
return None, "Following lineage datasets are missing from DB: %s" % (
','.join(str(m) for m in missing_lineage))
if not is_doc_eo3(main_ds.doc):
if is_doc_geo(main_ds.doc, check_eo3=False):
if not index.supports_legacy:
return None, "Legacy metadata formats not supported by the current index driver."
else:
if not index.supports_nongeo:
return None, "Non-geospatial metadata formats not supported by the current index driver."
if verify_lineage:
bad_lineage = []
for uuid in lineage_uuids:
if uuid in db_dss:
ok, err = check_consistent(jsonify_document(ds_by_uuid[uuid].doc_without_lineage_sources),
db_dss[uuid].metadata_doc)
if not ok:
bad_lineage.append((uuid, err))
if len(bad_lineage) > 0:
error_report = '\n'.join('Inconsistent lineage dataset {}:\n> {}'.format(uuid, err)
for uuid, err in bad_lineage)
return None, error_report
def with_cache(v: Dataset, k: UUID, cache: MutableMapping[UUID, Dataset]) -> Dataset:
cache[k] = v
return v
def resolve_ds(ds: SimpleDocNav,
sources: Optional[Mapping[UUID, Dataset]],
cache: MutableMapping[UUID, Dataset]) -> Dataset:
cached = cache.get(ds.id)
if cached is not None:
return cached
uris = [uri] if ds.id == main_uuid else []
doc = ds.doc
db_ds = db_dss.get(ds.id)
if db_ds:
product = db_ds.product
else:
product = matcher(doc)
check_intended_eo3(ds, product)
return with_cache(Dataset(product, doc, uris=uris, sources=sources), ds.id, cache)
try:
return remap_lineage_doc(main_ds, resolve_ds, cache={}), None
except BadMatch as e:
return None, e
def dataset_resolver(index: AbstractIndex,
match_product: Callable[[Mapping[str, Any]], Product],
fail_on_missing_lineage: bool = False,
verify_lineage: bool = True,
skip_lineage: bool = False,
home_index: Optional[str] = None) -> Callable[
[SimpleDocNav, str, Optional[LineageTree]],
DatasetOrError
]:
if skip_lineage or not index.supports_lineage:
# Resolver that ignores lineage.
resolver = resolve_no_lineage
extra_kwargs = {
"matcher": match_product,
}
elif index.supports_external_lineage:
# ODCv2 external lineage API resolver
resolver = resolve_with_lineage
extra_kwargs = {
"matcher": match_product,
"home_index": home_index,
}
else:
# Legacy lineage API resolver
resolver = resolve_legacy_lineage
extra_kwargs = {
"matcher": match_product,
"index": index,
"fail_on_missing_lineage": fail_on_missing_lineage,
"verify_lineage": verify_lineage,
}
def resolve(doc: SimpleDocNav, uri: str, source_tree: Optional[LineageTree] = None) -> DatasetOrError:
return resolver(doc, uri, source_tree=source_tree, **extra_kwargs)
return resolve
class Doc2Dataset:
"""Used for constructing `Dataset` objects from plain metadata documents.
This requires a database connection to perform the automatic matching against
available products.
There are options for including and excluding the products to match against,
as well as how to deal with source lineage.
Once constructed, call with a dictionary object and location URI, eg::
resolver = Doc2Dataset(index)
dataset = resolver(dataset_dictionary, 'file:///tmp/test-dataset.json')
index.dataset.add(dataset)
:param index: an open Database connection
:param list products: List of product names against which to match datasets
(including lineage datasets). If not supplied we will
consider all products.
:param list exclude_products: List of products to exclude from matching
:param fail_on_missing_lineage: If True fail resolve if any lineage
datasets are missing from the DB
Only False supported if index.supports_external_lineage is True.
:param verify_lineage: If True check that lineage datasets in the
supplied document are identical to DB versions
Ignored for EO3 documents. Will be dropped in ODCv2 as only eo3 documents
will be supported.
:param skip_lineage: If True ignore lineage sub-tree in the supplied
document and construct dataset without lineage datasets
:param eo3: 'auto'/True/False by default auto-detect EO3 datasets and pre-process them
Cannot be 'False' if index.supports_legacy is False.
Will be dropped in ODCv2 as only eo3 documents will be supported
:param home_index: Ignored if index.supports_exernal_home is False. Defaults to None.
Optional string labelling the "home index" for lineage datasets.
home_index is ignored if an explicit source_tree is passed to the resolver.
"""
def __init__(self,
index: AbstractIndex,
products: Optional[Sequence[str]] = None,
exclude_products: Optional[Sequence[str]] = None,
fail_on_missing_lineage: bool = False,
verify_lineage: bool = True,
skip_lineage: bool = False,
eo3: Union[bool, str] = 'auto',
home_index: Optional[str] = None):
if not index.supports_lineage:
skip_lineage = True
verify_lineage = False
fail_on_missing_lineage = False
home_index = None
else:
if not index.supports_legacy and not index.supports_nongeo:
if not eo3:
raise ValueError("EO3 cannot be set to False for a non-legacy geo-only index.")
eo3 = True
if index.supports_external_lineage and fail_on_missing_lineage:
raise ValueError("fail_on_missing_lineage is not supported for this index driver.")
if home_index and skip_lineage:
raise ValueError("Cannot provide a default home_index when skip_lineage is set.")
rules, err_msg = load_rules_from_types(index,
product_names=products,
excluding=exclude_products)
if rules is None:
raise ValueError(err_msg)
self.index = index
self._eo3 = eo3
matcher = product_matcher(rules)
self._ds_resolve = dataset_resolver(index,
matcher,
fail_on_missing_lineage=fail_on_missing_lineage,
verify_lineage=verify_lineage,
skip_lineage=skip_lineage,
home_index=home_index)
def __call__(self, doc_in: Union[SimpleDocNav, Mapping[str, Any]], uri: str,
source_tree: Optional[LineageTree] = None) -> DatasetOrError:
"""Attempt to construct dataset from metadata document and a uri.
:param doc: Dictionary or SimpleDocNav object
:param uri: String "location" property of the Dataset
:return: (dataset, None) is successful,
:return: (None, ErrorMessage) on failure
"""
if isinstance(doc_in, SimpleDocNav):
doc: SimpleDocNav = doc_in
else:
doc = SimpleDocNav(doc_in)
if self._eo3:
auto_skip = self._eo3 == 'auto'
doc = SimpleDocNav(
prep_eo3(
doc.doc,
auto_skip=auto_skip,
remap_lineage=not self.index.supports_external_lineage
)
)
dataset, err = self._ds_resolve(doc, uri, source_tree=source_tree)
if dataset is None:
return None, cast(Union[str, Exception], err)
is_consistent, reason = check_dataset_consistent(dataset)
if not is_consistent:
return None, cast(Union[str, Exception], reason)
return dataset, None