-
Notifications
You must be signed in to change notification settings - Fork 26
/
assemble.py
1723 lines (1471 loc) · 65.7 KB
/
assemble.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
API for easily writing an ODC Dataset
"""
import shutil
import tempfile
import uuid
import warnings
from copy import deepcopy
from enum import Enum, auto
from pathlib import Path, PosixPath, PurePath
from textwrap import dedent
from typing import Any, Dict, Generator, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlsplit
import numpy
import rasterio
import xarray
from rasterio import DatasetReader
from rasterio.crs import CRS
from rasterio.enums import Resampling
from shapely.geometry.base import BaseGeometry
import eodatasets3
from eodatasets3 import documents, images, serialise, validate
from eodatasets3.documents import find_and_read_documents
from eodatasets3.images import FileWrite, GridSpec, MeasurementBundler, ValidDataMethod
from eodatasets3.model import AccessoryDoc, DatasetDoc, Location, ProductDoc
from eodatasets3.names import NamingConventions, dc_uris, namer, resolve_location
from eodatasets3.properties import Eo3Dict, Eo3Interface
from eodatasets3.validate import Level, ValidationMessage
from eodatasets3.verify import PackageChecksum
class IfExists(Enum):
"""
Enum: what to do when output already exists?
"""
#: Skip the dataset
Skip = auto()
#: Overwrite the existing dataset
Overwrite = auto()
#: Throw an error
ThrowError = auto()
class AssemblyError(Exception):
pass
class IncompleteDatasetError(Exception):
"""
Raised when a dataset is missing essential things and so cannot be written.
(such as mandatory metadata)
"""
def __init__(self, validation: ValidationMessage) -> None:
self.validation = validation
class IncompleteDatasetWarning(UserWarning):
"""A non-critical warning for invalid or incomplete metadata"""
def __init__(self, validation: ValidationMessage) -> None:
self.validation = validation
def __str__(self) -> str:
return str(self.validation)
def _validate_property_name(name: str):
"""
>>> _validate_property_name('eo:gsd')
>>> _validate_property_name('thumbnail:full_resolution')
>>> _validate_property_name('full resolution')
Traceback (most recent call last):
...
ValueError: Not a valid property name 'full resolution' (must be alphanumeric with colons or underscores)
>>> _validate_property_name('Mr Sprinkles')
Traceback (most recent call last):
...
ValueError: Not a valid property name 'Mr Sprinkles' (must be alphanumeric with colons or underscores)
"""
if not name.replace(":", "").isidentifier():
raise ValueError(
f"Not a valid property name {name!r} "
"(must be alphanumeric with colons or underscores)"
)
def _default_metadata_path(dataset_url: str):
"""
The default metadata path for a given dataset location url.
By default, we put a sibling file with extension 'odc-metadata.yaml':
>>> _default_metadata_path('file:///tmp/ls7_nbar_20120403_c1/esri-scene.stac-item.json')
'file:///tmp/ls7_nbar_20120403_c1/esri-scene.odc-metadata.yaml'
>>> _default_metadata_path('s3://deafrica-data/jaxa/alos_palsar_mosaic/2017/N05E040/N05E040_2017.tif')
's3://deafrica-data/jaxa/alos_palsar_mosaic/2017/N05E040/N05E040_2017.odc-metadata.yaml'
>>> _default_metadata_path('file:///tmp/ls7_nbar_20120403_c1/my-dataset.tar.gz')
'file:///tmp/ls7_nbar_20120403_c1/my-dataset.odc-metadata.yaml'
Or, if a directory, we place one inside:
>>> _default_metadata_path('file:///tmp/ls7_nbar_20120403_c1/')
'file:///tmp/ls7_nbar_20120403_c1/odc-metadata.yaml'
If a tar/zip file, place it alongside.
>>> _default_metadata_path('tar:///g/data/v10/somewhere/my-dataset.tar!/')
'file:///g/data/v10/somewhere/my-dataset.odc-metadata.yaml'
>>> _default_metadata_path('zip:///g/data/v10/landsat-dataset.zip!')
'file:///g/data/v10/landsat-dataset.odc-metadata.yaml'
Unless it's already a metadata path:
>>> _default_metadata_path('file:///tmp/ls7_nbar_20120403_c1/odc-metadata.yaml')
'file:///tmp/ls7_nbar_20120403_c1/odc-metadata.yaml'
"""
# Already a metadata url?
if dataset_url.endswith("odc-metadata.yaml"):
return dataset_url
# If a tar URL, convert to file before proceding.
u = urlsplit(dataset_url)
path = PosixPath(u.path)
if u.scheme in ("tar", "zip"):
dataset_url = f"file://{path.as_posix()}"
# A directory, place a default name inside.
if dataset_url.endswith("/"):
return f"{dataset_url}odc-metadata.yaml"
# Otherwise a sibling file to the dataset file.
base_url, file_name = dataset_url.rsplit("/", maxsplit=1)
file_stem = file_name.split(".")[0]
return dc_uris.uri_resolve(dataset_url, f"{base_url}/{file_stem}.odc-metadata.yaml")
class DatasetPrepare(Eo3Interface):
"""
Prepare dataset metadata
"""
#: The properties that will automatically be inherited from a source dataset
#: when :meth:`auto_inherit_properties=True <.add_source_path>`
#:
#: These are fields that are inherent to the underlying observation, and so will
#: still be relevant after most 1:1 processing.
INHERITABLE_PROPERTIES = {
"datetime",
"dtr:end_datetime",
"dtr:start_datetime",
"eo:cloud_cover",
"eo:constellation",
"eo:gsd",
"eo:instrument",
"eo:platform",
"eo:sun_azimuth",
"eo:sun_elevation",
"fmask:clear",
"fmask:cloud",
"fmask:cloud_shadow",
"fmask:snow",
"fmask:water",
"gqa:abs_iterative_mean_x",
"gqa:abs_iterative_mean_xy",
"gqa:abs_iterative_mean_y",
"gqa:abs_x",
"gqa:abs_xy",
"gqa:abs_y",
"gqa:cep90",
"gqa:iterative_mean_x",
"gqa:iterative_mean_xy",
"gqa:iterative_mean_y",
"gqa:iterative_stddev_x",
"gqa:iterative_stddev_xy",
"gqa:iterative_stddev_y",
"gqa:mean_x",
"gqa:mean_xy",
"gqa:mean_y",
"gqa:stddev_x",
"gqa:stddev_xy",
"gqa:stddev_y",
"landsat:collection_category",
"landsat:collection_number",
"landsat:landsat_product_id",
"landsat:landsat_scene_id",
"landsat:scene_id",
"landsat:wrs_path",
"landsat:wrs_row",
"landsat:rmse",
"landsat:rmse_x",
"landsat:rmse_y",
"mission",
"odc:region_code",
"sat:absolute_orbit",
"sat:anx_datetime",
"sat:orbit_state",
"sat:platform_international_designator",
"sat:relative_orbit",
"sentinel:datastrip_id",
"sentinel:datatake_start_datetime",
"sentinel:grid_square",
"sentinel:latitude_band",
"sentinel:sentinel_tile_id",
"sentinel:utm_zone",
}
def __init__(
self,
collection_location: Optional[Location] = None,
*,
dataset_location: Optional[Location] = None,
metadata_path: Optional[Location] = None,
dataset_id: Optional[uuid.UUID] = None,
allow_absolute_paths: bool = False,
naming_conventions: Optional[str] = None,
names: Optional[NamingConventions] = None,
dataset: Optional[DatasetDoc] = None,
) -> None:
"""
Build an EO3 metadata document, with functions for reading information from imagery
and calculating names and paths.
In addition to the below documented methods, metadata fields can read and set using
:class:`Eo3Interface's <eodatasets3.properties.Eo3Interface>` fields.
There are three optional paths that can be specified. At least one must be specified. Collection,
dataset or metadata path.
- A ``collection_path`` is the root folder where datasets will live (in sub-[sub]-folders).
- Each dataset has its own ``dataset_location``, as stored in an Open Data Cube index.
All paths inside the metadata document are relative to this location.
- An output ``metadata_path`` document location*.
If you're writing data, you typically only need to specify the collection path, and the others
will be automatically generated using the naming conventions.
If you're only writing a metadata file (for existing data), you only need to specify a metadata path.
If you're storing data using an exotic URI schema, such as a 'tar://' URL path, you will need to specify
this as your dataset location.
:param collection_location:
Optional base directory where the collection of datasets should live. Subfolders will be
created accordion to the naming convention.
:param dataset_location:
Optional location for this dataset.
Otherwise it will be generated according to the collection path and naming conventions.
(this is as indexed into ODC -- ie. a file name).
:param metadata_path:
Optional metadata document output path. Otherwise it will be generated according to the collection path
and naming conventions.
:param dataset_id:
Optional UUID for this dataset, otherwise a random one will be created. Use this if you have a stable
way of generating your own IDs.
:param allow_absolute_paths:
Allow metadata paths to refer to files outside the dataset location. this means they will have to be
absolute paths, and not be portable. (default: False)
:param naming_conventions:
Naming conventions to use. Supports `default` or `dea`. The latter has stricter metadata requirements
(try it and see -- it will tell your what's missing).
"""
if (
(names is None)
and not collection_location
and not dataset_location
and not metadata_path
):
raise ValueError(
"Must specify either a collection folder, dataset location or a single metadata file"
)
if isinstance(collection_location, Path) and not collection_location.exists():
raise ValueError(
f"Provided collection location doesn't exist: {collection_location}"
)
#: What method to use to calculate the valid data geometry?
#:
#: Defaults to :attr:`eodatasets3.ValidDataMethod.thorough`
#:
#: You may change this property before finishing your package.
#:
#: Eg::
#:
#: p.valid_data_method = ValidDataMethod.filled
#:
self.valid_data_method: ValidDataMethod = ValidDataMethod.thorough
if not dataset:
dataset = DatasetDoc()
if not dataset.id:
dataset.id = dataset_id or uuid.uuid4()
self._dataset = dataset
self._measurements = MeasurementBundler()
self._accessories: Dict[str, Location] = {}
self._allow_absolute_paths = allow_absolute_paths
#: Valid-data polygon, in the same CRS as the measurements.
#:
#: This must cover all valid pixels to be valid in ODC
#: (it's allowed to be larger than the valid pixel area, but not
#: smaller).
#:
#: It will be computed automatically from measurements if not set
#: manually. You can also inherit it from source datasets in the
#: ``add_source_*()`` methods.
self.geometry: Optional[BaseGeometry] = None
no_naming_specified = (
(names is None)
and naming_conventions is None
and collection_location is None
)
if names is None:
names: NamingConventions = namer(
dataset.properties, conventions=naming_conventions or "default"
)
else:
# Our properties should come from the given names instance.
dataset.properties = names.metadata.properties
#: The name generator (an instance of :class:`NamingConventions <eodatasets3.NamingConventions>`)
#:
#: By default, all names will be generated based on metadata
#: fields and the chosen naming conventions.
#:
#: But you can set your own names here manually to avoid the magic.
#:
#: (for the devious among you, this can also avoid metadata field requirements
#: for name generation).
#:
#: Examples:
#:
#: Set a product name::
#:
#: p.names.product_name = 'my_product_name'
#:
#: Manually set the abbreviations used in name generation
#:
#: (By default, for example, landsat-7 will be abbreviated to "ls7". But maybe
#: you want "ls" in all your datasets)::
#:
#: p.names.platform_abbreviated = "ls"
#: # Other abbreviations:
#: p.names.instrument_abbreviated = "e"
#: p.names.producer_abbreviated = "usgs"
#:
#: Set your own label
#: (the human identifier for the dataset, and the default prefix of filenames)::
#:
#: p.names.dataset_label = "landsat-observations-12th-may-2021"
#:
#: Customise the dataset's folder offset::
#:
#: >>> p.names.dataset_folder
#: 'ga_ls8c_ones_3/090/084/2016/01/21'
#:
#: ... to use a custom time hierarchy::
#:
#: >>> p.names.time_folder = p.datetime.strftime("years/%Y")
#: >>> p.names.dataset_folder
#: 'ga_ls8c_ones_3/090/084/years/2016'
#:
#: ... or a custom region format::
#:
#: >>> p.names.region_folder = 'x04y23'
#: >>> p.names.dataset_folder
#: 'ga_ls8c_ones_3/x04y23/years/2016'
#:
#: ... or replace it altogether::
#:
#: p.names.dataset_folder = "datasets/january/2021"
#:
#: Configure the pattern used for generating filenames::
#:
#: p.names.filename_pattern = "my-file.{file_id}.{suffix}"
#:
#: .. note::
#:
#: All filenames are given a ``{file_id}`` (eg. ``"odc-metadata"`` or ``""``)
#: and ``{suffix}`` (eg. ``"yaml"``) variable to distinguish themselves.
#:
#: (Patterns can also contain folder separators. It will be relative to the dataset
#: folder)
#:
#: The path to the EO3 metadata doc (relative path to the dataset location)::
#:
#: p.names.metadata_file = "my-metadata.odc-metadata.yaml"
#:
#: The URI for the product::
#:
#: p.names.product_uri = "https://collections.earth.test.example/product/my-product"
#:
#: A full list of fields can be seen on :class:`eodatasets3.NamingConventions`
self.names: NamingConventions = names
if collection_location:
self.names.collection_prefix = resolve_location(collection_location)
if dataset_location:
self.names.dataset_location = resolve_location(dataset_location)
if metadata_path:
self.names.metadata_file = resolve_location(metadata_path)
has_collection_location = self.names.collection_prefix is not None
try:
has_dataset_location = self.names.dataset_location is not None
except ValueError:
# "Not enough fields to fill naming conventions"
has_dataset_location = False
try:
has_metadata_path = self.names.metadata_file is not None
except ValueError:
# "Not enough fields to fill naming conventions"
has_metadata_path = False
# We must always have a metadata path and dataset location.
# If they only gave a metadata path, it will be the dataset_location too.
if (
(not has_dataset_location)
and has_metadata_path
and (not has_collection_location)
):
self.names.dataset_location = resolve_location(self.names.metadata_file)
# If they only gave a dataset location, and don't have naming conventions, make metadata file a sibling.
if (not has_metadata_path) and no_naming_specified and has_dataset_location:
self.names.metadata_file = _default_metadata_path(
self.names.dataset_location
)
self._is_completed = False
self._finished_init_ = True
# Our with-blocks don't do anything as there's nothing to clean-up, but we want it to
# be a drop-in replacement for DatasetAssembler, so we let users use them.
# (it can also make code more readable, to have a clear block)
def __enter__(self) -> "DatasetPrepare":
return self
def __exit__(self, exc_type, exc_val, exc_tb):
...
@property
def collection_location(self) -> Path:
# Backward compat method. No docstring to avoid sphinx visibility.
return self.names.collection_path
@collection_location.setter
def collection_location(self, val: Path):
# Backward compat method. No docstring to avoid sphinx visibility.
# Previously, people could set the collection using this property, and it was a Path
self.names.collection_prefix = resolve_location(val)
@property
def dataset_id(self) -> uuid.UUID:
return self._dataset.id
@dataset_id.setter
def dataset_id(self, val: Union[uuid.UUID, str]):
if isinstance(val, str):
val = uuid.UUID(val)
self._dataset.id = val
@property
def properties(self) -> Eo3Dict:
return self._dataset.properties
@property
def measurements(self) -> Dict[str, Tuple[GridSpec, Path]]:
return {
name: (grid, path) for grid, name, path in self._measurements.iter_paths()
}
@property
def label(self) -> Optional[str]:
"""
An optional displayable string to identify this dataset.
These are often used when when presenting a list of datasets, such as in search results or a filesystem folder.
They are unstructured, but should be more humane than showing a list of UUIDs.
By convention they have no spaces, due to their usage in filenames.
Eg. ``ga_ls5t_ard_3-0-0_092084_2009-12-17_final`` or USGS's ``LT05_L1TP_092084_20091217_20161017_01_T1``
A label will be auto-generated using the naming-conventions, but you can manually override it by
setting this property.
"""
return self._dataset.label or self.names.dataset_label
@label.setter
def label(self, val: str):
self._dataset.label = val
def __setattr__(self, name: str, value: Any) -> None:
"""
Prevent the accident of setting new properties on the assembler (it has happened multiple times).
"""
if (
name != "label"
and hasattr(self, "_finished_init_")
and not hasattr(self, name)
):
raise TypeError(
f"Cannot set new field '{name}' on an assembler. "
f"(Perhaps you meant to set it on the .properties?)"
)
super().__setattr__(name, value)
def add_source_path(
self,
*paths: Path,
classifier: str = None,
auto_inherit_properties: bool = False,
inherit_geometry: bool = False,
):
"""
Record a source dataset using the path to its metadata document.
:param paths: Filesystem path(s) to source metadata documents
:param classifier: How to classify the kind of source dataset. This is will automatically
be filled with the family of dataset if available (eg. "level1").
You want to set this if you have two datasets of the same type that
are used for different purposes. Such as having a second level1 dataset
that was used for QA (but is not this same scene).
:param auto_inherit_properties: Whether to copy any common properties from the dataset
:param inherit_geometry: Instead of re-calculating the valid bounds geometry based on the
data, which can be very computationally expensive e.g. Landsat 7
striped data, use the valid data geometry from this source dataset.
See also :meth:`.add_source_dataset`
"""
for _, doc in find_and_read_documents(*paths):
# Newer documents declare a schema.
if "$schema" in doc:
self.add_source_dataset(
serialise.from_doc(doc),
classifier=classifier,
auto_inherit_properties=auto_inherit_properties,
inherit_geometry=inherit_geometry,
)
else:
if auto_inherit_properties or inherit_geometry:
raise NotImplementedError(
"Can't (yet) inherit properties from old-style metadata"
)
classifier = classifier or doc.get("product_type")
if not classifier:
# TODO: This rule is a little obscure to force people to know.
# We could somehow figure out from the product?
raise ValueError(
"Source dataset (of old-style eo) doesn't have a 'product_type' property (eg. 'level1', 'fc'), "
"you must specify a classifier for the kind of source dataset."
)
_validate_property_name(classifier)
self._dataset.lineage.setdefault(classifier, []).append(doc["id"])
def add_source_dataset(
self,
dataset: DatasetDoc,
classifier: Optional[str] = None,
auto_inherit_properties: bool = False,
inherit_geometry: bool = False,
inherit_skip_properties: Optional[str] = None,
):
"""
Record a source dataset using its metadata document.
It can optionally copy common properties from the source dataset (platform, instrument etc)/
(see :py:obj:`.INHERITABLE_PROPERTIES` for the list of fields that are inheritable)
:param dataset:
:param auto_inherit_properties: Whether to copy any common properties from the dataset
:param classifier: How to classify the kind of source dataset. This is will automatically
be filled with the family of dataset if available (eg. "level1").
You want to set this if you have two datasets of the same type that
are used for different purposes. Such as having a second level1 dataset
that was used for QA (but is not this same scene).
:param inherit_geometry: Instead of re-calculating the valid bounds geometry based on the
data, which can be very computationally expensive e.g. Landsat 7
striped data, use the valid data geometry from this source dataset.
:param inherit_skip_properties: An extra list of property names that should not be copied.
This is useful when generating summaries which combine multiple
input source datasets.
See :meth:`.add_source_path` if you have a filepath reference instead of a document.
"""
if not classifier:
classifier = dataset.properties.get("odc:product_family")
if not classifier:
# TODO: This rule is a little obscure to force people to know.
# We could somehow figure out the product family from the product?
raise ValueError(
"Source dataset doesn't have a 'odc:product_family' property (eg. 'level1', 'fc'), "
"you must specify a classifier for the kind of source dataset."
)
_validate_property_name(classifier)
self._dataset.lineage.setdefault(classifier, []).append(dataset.id)
if auto_inherit_properties:
self._inherit_properties_from(dataset, inherit_skip_properties)
if inherit_geometry:
if self.geometry and self.geometry != dataset.geometry:
warnings.warn("Overriding existing geometry from source dataset")
self.geometry = dataset.geometry
def note_source_datasets(
self,
classifier: str,
*dataset_ids: Union[str, uuid.UUID],
):
"""
Expand the lineage with raw source dataset ids.
.. note::
If you have direct access to the datasets, you probably want to use :func:`add_source_path`
or :func:`add_source_dataset`, so that fields can be inherited from them automatically.
:param classifier:
How to classify the source dataset.
By convention, this is usually the family of the source dataset
(properties->odc:product_family). Such as "level1".
A classifier is used to distinguish source datasets of the same product
that are used differently.
Such as a normal source level1 dataset (classifier: "level1"), and a
second source level1 that was used only for QA (classifier: "qa").
:param dataset_ids: The UUIDs of the source datasets
"""
for dataset_id in dataset_ids:
if not isinstance(dataset_id, uuid.UUID):
try:
dataset_id = uuid.UUID(dataset_id)
except ValueError as v:
# The default parse error doesn't tell you anything useful to track down which one broke.
raise ValueError(
f"Not a valid UUID for source {classifier!r} dataset: {dataset_id!r}"
) from v
self._dataset.lineage.setdefault(classifier, []).append(dataset_id)
def _inherit_properties_from(
self,
source_dataset: DatasetDoc,
inherit_skip_properties: Optional[List[str]] = None,
):
if not inherit_skip_properties:
# change the inherit_skip_properties to [] if it is None. Make the 'in list check' easier.
inherit_skip_properties = []
for name in self.INHERITABLE_PROPERTIES:
if name in inherit_skip_properties:
# if we plan to skip this property, skip it immediately.
continue
if name not in source_dataset.properties:
continue
new_value = source_dataset.properties[name]
try:
self.properties.normalise_and_set(
name,
new_value,
# If already set, do nothing.
allow_override=False,
)
except KeyError as k:
warnings.warn(
f"Inheritable property {name!r} is different from current value {k.args}"
)
def note_measurement(
self,
name,
path: Location,
expand_valid_data=True,
relative_to_dataset_location=False,
grid: GridSpec = None,
pixels: numpy.ndarray = None,
nodata: Optional[Union[float, int]] = None,
):
"""
Reference a measurement from its existing path. It may be a Path or any URL
resolvable by rasterio.
By default, a relative path is relative to your current directory. You may want
to specify ``relative_to_dataset_location=True``.
The path will be opened to read geo and pixel information, unless you specify the
information yourself (grid, pixels, nodata). (the latter two only needed if
expand_valid_data==True)
:param name: measurement name
:param path: path to measurement
:param expand_valid_data: Expand the valid data bounds with this measurement's valid data.
:param relative_to_dataset_location: Should this be read relative to the dataset location?
(requires a computed dataset location)
"""
_validate_property_name(name)
# If we have a polygon already, there's no need to compute valid data.
if self.geometry:
expand_valid_data = False
# If they didn't give us grid information, read it from the input.
if not grid:
read_location = path
if relative_to_dataset_location:
read_location = self.names.resolve_file(path)
with rasterio.open(read_location) as ds:
ds: DatasetReader
grid = images.GridSpec.from_rio(ds)
nodata = ds.nodata
if expand_valid_data:
if not pixels:
if ds.count != 1:
raise NotImplementedError(
"TODO: Only single-band files currently supported"
)
pixels = ds.read(1)
self._measurements.record_image(
name,
grid,
path,
pixels,
nodata=nodata,
expand_valid_data=expand_valid_data,
)
def _target_metadata_path(self) -> Path:
return self.names.resolve_path(self.names.metadata_file)
def write_eo3(
self,
path: Path = None,
embed_location: bool = False,
validate_correctness: bool = True,
sort_measurements: bool = True,
) -> Tuple[uuid.UUID, Path]:
"""Write the prepared metadata document to the given output path."""
metadata_path = path or self._target_metadata_path()
dataset_location = self.names.dataset_location
# Default behaviour:
# If the metadata path is not the dataset location, then record the location.
if embed_location is None:
embed_location = dataset_location != metadata_path.as_uri()
doc = serialise.to_formatted_doc(
self.to_dataset_doc(
embed_location=embed_location,
validate_correctness=validate_correctness,
sort_measurements=sort_measurements,
)
)
# It passed validation etc. Ensure output folder exists.
metadata_path.parent.mkdir(parents=True, exist_ok=True)
documents.make_paths_relative(
doc, metadata_path.parent, allow_paths_outside_base=False
)
serialise.dump_yaml(metadata_path, doc)
return self._dataset.id, metadata_path
def done(
self,
validate_correctness: bool = True,
sort_measurements: bool = True,
embed_location: Optional[bool] = False,
) -> Tuple[uuid.UUID, Path]:
"""Write the prepared metadata document to the given output path."""
return self.write_eo3(
validate_correctness=validate_correctness,
sort_measurements=sort_measurements,
embed_location=embed_location,
)
def to_dataset_doc(
self,
dataset_location: Optional[str] = None,
embed_location: bool = False,
validate_correctness: bool = True,
sort_measurements: bool = True,
expect_geometry: bool = True,
) -> DatasetDoc:
"""
Create the metadata doc as an in-memory :class:`eodatasets3.DatasetDoc` instance.
(You can manually write this out using :func:`serialise.to_path(): <eodatasets3.serialise.to_path>`
or :func:`serialise.to_stream() <eodatasets3.serialise.to_stream>`)
"""
dataset_location = dataset_location or self.names.dataset_location
def rel_location(p: Location) -> str:
if isinstance(p, PurePath):
if p.is_absolute():
p = p.as_uri()
else:
p = p.as_posix()
# Is it an (absolute) URL
if dc_uris.is_url(p):
return relative_url(
dataset_location,
p,
allow_absolute=self._allow_absolute_paths,
)
# Otherwise, already relative.
return p
if not dataset_location:
raise ValueError("No location available: cannot calculate relative paths")
dataset = self._dataset
if not dataset.product:
dataset.product = ProductDoc()
dataset.product.name = dataset.product.name or self.names.product_name
dataset.product.href = dataset.product.href or self.names.product_uri
dataset.label = dataset.label or self.names.dataset_label
if embed_location:
dataset.locations = [dataset_location]
else:
dataset.locations = None
crs, grid_docs, measurement_docs = self._measurements.as_geo_docs()
valid_data = self.geometry or self._measurements.consume_and_get_valid_data(
valid_data_method=self.valid_data_method
)
# Avoid the messiness of different empty collection types.
# (to have a non-null geometry we'd also need non-null grids and crses)
if valid_data.is_empty:
valid_data = None
new_crs = self._crs_str(crs) if crs is not None else None
if dataset.crs and dataset.crs != new_crs:
raise AssemblyError(
f"New measurements have a different CRS to the underlying dataset. "
f"Old: {dataset.crs!r}, New: {new_crs!r}"
)
dataset.crs = dataset.crs or new_crs
if valid_data:
if dataset.geometry:
dataset.geometry = dataset.geometry.union(valid_data)
else:
dataset.geometry = valid_data
# TODO: this could be made smarter, as we could merge with existing grids.
# for now we just throw an error if any of our generated grid names
# clash with existing ones.
if grid_docs:
if dataset.grids is None:
dataset.grids = {}
for name, doc in grid_docs.items():
if name in dataset.grids:
raise NotImplementedError(
f"Recorded grid name already exists in the underlying dataset: {name!r},"
f"and we don't yet support merging of grids."
)
dataset.grids[name] = doc
if measurement_docs:
if dataset.measurements is None:
dataset.measurements = {}
for name, doc in measurement_docs.items():
if name in dataset.measurements:
raise AssemblyError(
f"Recorded measurement already exists in the underlying dataset: {name!r}"
)
doc.path = rel_location(doc.path)
dataset.measurements[name] = doc
for name, path in self._accessories.items():
if name in dataset.accessories:
raise AssemblyError(
f"Recorded accessory already exists in the underlying dataset: {name!r}"
)
dataset.accessories[name] = AccessoryDoc(rel_location(path), name=name)
if dataset.measurements and sort_measurements:
# noinspection PyTypeChecker
dataset.measurements = dict(sorted(dataset.measurements.items()))
if validate_correctness:
doc = serialise.to_doc(dataset)
for m in validate.validate_dataset(doc, expect_geometry=expect_geometry):
if m.level in (Level.info, Level.warning):
warnings.warn(IncompleteDatasetWarning(m))
elif m.level == Level.error:
raise IncompleteDatasetError(m)
else:
raise RuntimeError(
f"Internal error: Unhandled type of message level: {m.level}"
)
return dataset
def _crs_str(self, crs: CRS) -> str:
# TODO: We should support more authorities here.
# if rasterio>=1.1.7, can use crs.to_authority(), but almost
# everyone is currently on 1.1.6
return f"epsg:{crs.to_epsg()}" if crs.is_epsg_code else crs.to_wkt()
def note_accessory_file(self, name: str, path: Location):
"""
Record a reference to an additional file that's included in the dataset, but is
not a band/measurement.
Such as non-ODC metadata, thumbnails, checksums, etc. Any included file that
is not recorded in the measurements.
By convention, the name should have prefixes with their category, such as
``metadata:`` or ``thumbnail:``.
eg. ``metadata:landsat_processor``, ``checksum:sha1``, ``thumbnail:full``.
:param name: identifying name, eg ``metadata:mtl``
:param path: local path to file.
"""
_validate_property_name(name)
existing_path = self._accessories.get(name)
if existing_path is not None and existing_path != path:
raise ValueError(
f"Duplicate accessory name {name!r}. "
f"New: {path!r}, previous: {existing_path!r}"
)
self._accessories[name] = path
def note_thumbnail(self, thumb_path: Path, kind: str = None):
"""
Record a reference to a thumbnail path.
Optionally specify the "kind" of thumbnail if there are multiple
to distinguish between. eg. 'full'
"""
accessory_name = "thumbnail"
if kind:
accessory_name += f":{kind}"
self.note_accessory_file(accessory_name, thumb_path)
def iter_measurement_paths(
self,
) -> Generator[Tuple[GridSpec, str, Path], None, None]:
"""
.. warning::
*not recommended* for use - will likely change soon.
Iterate through the list of measurement names that have been written, and their current (temporary) paths.
TODO: Perhaps we want to return a real measurement structure here as it's not very extensible.
"""
return self._measurements.iter_paths()
def __str__(self):
status = "written" if self._is_completed else "unfinished"
try:
output_location = self._target_metadata_path()
except ValueError:
output_location = "(not yet computable)"
measurements = list(self._measurements.iter_names())
properties = list(self.properties.keys())
product_name = None
try:
product_name = self.names.product_name
except ValueError:
...
def format_list(items: List, max_len=60):
s = ", ".join(sorted(items))
if len(s) > max_len:
return f"{s[:max_len]}..."