Skip to content

Commit

Permalink
Multi location deprecation (#1546)
Browse files Browse the repository at this point in the history
* Implement spatial_index(product) as per EP13 - should have been in last PR.

* Close coverage gaps.

* Remove separate metadata resource from product resource - we have whole index.

* Some multi-location deprecation.

* More multi-location deprecation.

* Post-deprecation refactoring, mostly tests, some internal code.

* Post-deprecation refactoring, mostly tests, some internal code.

* Deprecate methods in all index drivers.

* Ensure tests of deprecated methods are clearly commented.

* Lintage and update whats new

* Fix ensure_new_locations behaviour and incremental coverage.

* mem driver fix and and incremental coverage.

* incremental coverage.

* incremental coverage.

* Fix bug in multi-location index cloning.

* Incremental test coverage.

* Oops, unused import.

* Last bit of test coverage.

* Some clarifying comments.

* Fix ... accidental revert from rebase?

* Ensure locations and cleanup and clarify.

* More minor code cleanup.

* More minor code cleanup.

* Oops - forgot old_uris was a set.
  • Loading branch information
SpacemanPaul committed Feb 14, 2024
1 parent b0946a3 commit 2ee5434
Show file tree
Hide file tree
Showing 32 changed files with 786 additions and 287 deletions.
88 changes: 85 additions & 3 deletions datacube/index/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,31 @@ class DatasetTuple(NamedTuple):
"""
product: Product
metadata: Mapping[str, Any]
uris: Sequence[str]
uri_: str | Sequence[str]

@property
def is_legacy(self):
if isinstance(self.uri_, str):
return False
return True

@property
def uri(self) -> str:
if self.is_legacy:
return self.uris[0]
else:
return self.uri_

@property
@deprecat(
reason="Multiple uris are deprecated. Please use the uri field and ensure that datasets only have one location",
version='1.9.0',
category=ODC2DeprecationWarning)
def uris(self) -> Sequence[str]:
if self.is_legacy:
return self.uri_
else:
return [self.uri_]


class AbstractDatasetResource(ABC):
Expand Down Expand Up @@ -1408,6 +1432,11 @@ def get_field_names(self, product_name: str | None = None) -> Iterable[str]:
"""
return self._index.products.get_field_names(product_name)

@deprecat(
reason="Multiple locations per dataset are now deprecated. Please use the 'get_location' method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def get_locations(self, id_: DSID) -> Iterable[str]:
"""
Expand All @@ -1417,6 +1446,21 @@ def get_locations(self, id_: DSID) -> Iterable[str]:
:return: Storage locations for the dataset
"""

@abstractmethod
def get_location(self, id_: DSID) -> str | None:
"""
Get (active) storage location for the given dataset id
:param id_: dataset id
:return: Storage location for the dataset - None if no location for the id_, or if id_ not in db.
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def get_archived_locations(self, id_: DSID) -> Iterable[str]:
"""
Expand All @@ -1426,6 +1470,12 @@ def get_archived_locations(self, id_: DSID) -> Iterable[str]:
:return: Archived storage locations for the dataset
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def get_archived_location_times(self,
id_: DSID
Expand All @@ -1437,6 +1487,12 @@ def get_archived_location_times(self,
:return: Archived storage locations, with archive date.
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def add_location(self, id_: DSID, uri: str) -> bool:
"""
Expand All @@ -1460,6 +1516,12 @@ def get_datasets_for_location(self,
:return: Matching dataset models
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def remove_location(self,
id_: DSID,
Expand All @@ -1473,6 +1535,13 @@ def remove_location(self,
:return: True if location was removed, false if it didn't exist for the database
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def archive_location(self,
id_: DSID,
Expand All @@ -1486,6 +1555,13 @@ def archive_location(self,
:return: True if location was able to be archived
"""

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be restorable in future releases. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
@abstractmethod
def restore_location(self,
id_: DSID,
Expand Down Expand Up @@ -1530,7 +1606,9 @@ def search(self,

def get_all_docs_for_product(self, product: Product, batch_size: int = 1000) -> Iterable[DatasetTuple]:
for ds in self.search(product=[product.name]):
yield (product, ds.metadata_doc, ds.uris)
yield DatasetTuple(product,
ds.metadata_doc,
ds._uris) # 2.0: ds.uri

def get_all_docs(self, products: Iterable[Product] | None = None,
batch_size: int = 1000) -> Iterable[DatasetTuple]:
Expand Down Expand Up @@ -1566,10 +1644,14 @@ def _add_batch(self, batch_ds: Iterable[DatasetTuple], cache: Mapping[str, Any])
b_added = 0
b_started = monotonic()
for ds_tup in batch_ds:
if ds_tup.is_legacy: # 2.0: {'uri': ds_tup.uri}
kwargs = {"uris": ds_tup.uris}
else:
kwargs = {"uri": ds_tup.uri}
try:
ds = Dataset(product=ds_tup.product,
metadata_doc=ds_tup.metadata,
uris=ds_tup.uris)
**kwargs)
self.add(ds, with_lineage=False)
b_added += 1
except DocumentMismatchError as e:
Expand Down
8 changes: 4 additions & 4 deletions datacube/index/hl.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def resolve_no_lineage(ds: SimpleDocNav,
except BadMatch as e:
return None, e
check_intended_eo3(ds, product)
return Dataset(product, doc, uris=[uri], sources={}), None
return Dataset(product, doc, uri=uri, sources={}), None


def resolve_with_lineage(doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
Expand Down Expand Up @@ -207,7 +207,7 @@ def resolve_with_lineage(doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
return Dataset(product,
doc.doc,
source_tree=source_tree,
uris=[uri]), None
uri=uri), None


def resolve_legacy_lineage(main_ds_doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
Expand Down Expand Up @@ -272,7 +272,7 @@ def resolve_ds(ds: SimpleDocNav,
if cached is not None:
return cached

uris = [uri] if ds.id == main_uuid else []
this_uri = uri if ds.id == main_uuid else None

doc = ds.doc

Expand All @@ -283,7 +283,7 @@ def resolve_ds(ds: SimpleDocNav,
product = matcher(doc)

check_intended_eo3(ds, product)
return with_cache(Dataset(product, doc, uris=uris, sources=sources), ds.id, cache)
return with_cache(Dataset(product, doc, uri=this_uri, sources=sources), ds.id, cache)
try:
return remap_lineage_doc(main_ds, resolve_ds, cache={}), None
except BadMatch as e:
Expand Down
67 changes: 62 additions & 5 deletions datacube/index/memory/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
Optional, Set, Tuple, Union,
cast)
from uuid import UUID
from deprecat import deprecat

from datacube.migration import ODC2DeprecationWarning
from datacube.index import fields

from datacube.index.abstract import (AbstractDatasetResource, DSID, dsid_to_uuid, BatchStatus,
QueryField, DatasetSpatialMixin, NoLineageResource, AbstractIndex)
from datacube.index.fields import Field
Expand Down Expand Up @@ -91,8 +92,8 @@ def add(self, dataset: Dataset,
persistable = self.clone(dataset, for_save=True)
self.by_id[persistable.id] = persistable
self.active_by_id[persistable.id] = persistable
if dataset.uris is not None:
self.locations[persistable.id] = dataset.uris.copy()
if dataset._uris:
self.locations[persistable.id] = dataset._uris.copy()
else:
self.locations[persistable.id] = []
self.archived_locations[persistable.id] = []
Expand Down Expand Up @@ -293,18 +294,48 @@ def get_all_dataset_ids(self, archived: bool) -> Iterable[UUID]:
else:
return (id_ for id_ in self.active_by_id.keys())

@deprecat(
reason="Multiple locations per dataset are now deprecated. Please use the 'get_location' method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def get_locations(self, id_: DSID) -> Iterable[str]:
uuid = dsid_to_uuid(id_)
return (s for s in self.locations[uuid])

def get_location(self, id_: DSID) -> str:
uuid = dsid_to_uuid(id_)
locations = [s for s in self.locations.get(uuid, [])]
if not locations:
return None
return locations[0]

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def get_archived_locations(self, id_: DSID) -> Iterable[str]:
uuid = dsid_to_uuid(id_)
return (s for s, dt in self.archived_locations[uuid])

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def get_archived_location_times(self, id_: DSID) -> Iterable[Tuple[str, datetime.datetime]]:
uuid = dsid_to_uuid(id_)
return ((s, dt) for s, dt in self.archived_locations[uuid])

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def add_location(self, id_: DSID, uri: str) -> bool:
uuid = dsid_to_uuid(id_)
if uuid not in self.by_id:
Expand Down Expand Up @@ -335,6 +366,12 @@ def get_datasets_for_location(self, uri: str, mode: Optional[str] = None) -> Ite
break
return self.bulk_get(ids)

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def remove_location(self, id_: DSID, uri: str) -> bool:
uuid = dsid_to_uuid(id_)
removed = False
Expand All @@ -352,6 +389,13 @@ def remove_location(self, id_: DSID, uri: str) -> bool:
removed = True
return removed

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be accessible in future releases. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def archive_location(self, id_: DSID, uri: str) -> bool:
uuid = dsid_to_uuid(id_)
if uuid not in self.locations:
Expand All @@ -364,6 +408,13 @@ def archive_location(self, id_: DSID, uri: str) -> bool:
self.archived_locations[uuid].append((uri, datetime.datetime.now()))
return True

@deprecat(
reason="Multiple locations per dataset are now deprecated. "
"Archived locations may not be restorable in future releases. "
"Dataset location can be set or updated with the update() method.",
version="1.9.0",
category=ODC2DeprecationWarning
)
def restore_location(self, id_: DSID, uri: str) -> bool:
uuid = dsid_to_uuid(id_)
if uuid not in self.archived_locations:
Expand Down Expand Up @@ -708,13 +759,19 @@ def clone(self, orig: Dataset, for_save=False, lookup_locations=True) -> Dataset
uris = orig.uris.copy()
else:
uris = []
if len(uris) == 1:
kwargs = {"uri": uris[0]}
elif len(uris) > 1:
kwargs = {"uris": uris}
else:
kwargs = {}
return Dataset(
product=self._index.products.clone(orig.product),
metadata_doc=jsonify_document(orig.metadata_doc_without_lineage()),
uris=uris,
indexed_by="user" if for_save and orig.indexed_by is None else orig.indexed_by,
indexed_time=datetime.datetime.now() if for_save and orig.indexed_time is None else orig.indexed_time,
archived_time=None if for_save else orig.archived_time
archived_time=None if for_save else orig.archived_time,
**kwargs
)

# Lineage methods need to be implemented on the dataset resource as that is where the relevant indexes
Expand Down

0 comments on commit 2ee5434

Please sign in to comment.