Skip to content

Commit

Permalink
Merge pull request #1460 from opendatacube/archive_time_buffer_option
Browse files Browse the repository at this point in the history
Configurable timedelta in archive-less-mature cli option
  • Loading branch information
Ariana-B committed Jun 22, 2023
2 parents 148e296 + 212961c commit 8b2d884
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 40 deletions.
26 changes: 14 additions & 12 deletions datacube/index/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def bulk_has(self, ids_: Iterable[DSID]) -> Iterable[bool]:
@abstractmethod
def add(self, dataset: Dataset,
with_lineage: bool = True,
archive_less_mature: bool = False,
archive_less_mature: Optional[int] = None,
) -> Dataset:
"""
Add ``dataset`` to the index. No-op if it is already present.
Expand All @@ -839,9 +839,9 @@ def add(self, dataset: Dataset,
- ``False`` record lineage relations, but do not attempt
adding lineage datasets to the db
:param archive_less_mature:
- ``True`` search for less mature versions of the dataset
and archive them
:param archive_less_mature: if integer, search for less
mature versions of the dataset with the int value as a millisecond
delta in timestamp comparison
:return: Persisted Dataset model
"""
Expand Down Expand Up @@ -881,13 +881,13 @@ def can_update(self,
def update(self,
dataset: Dataset,
updates_allowed: Optional[Mapping[Offset, AllowPolicy]] = None,
archive_less_mature: bool = False,
archive_less_mature: Optional[int] = None,
) -> Dataset:
"""
Update dataset metadata and location
:param Dataset dataset: Dataset model with unpersisted updates
:param updates_allowed: Allowed updates
:param archive_less_mature: Find and archive less mature datasets
:param archive_less_mature: Find and archive less mature datasets with ms delta
:return: Persisted dataset model
"""

Expand All @@ -899,30 +899,32 @@ def archive(self, ids: Iterable[DSID]) -> None:
:param Iterable[Union[str,UUID]] ids: list of dataset ids to archive
"""

def archive_less_mature(self, ds: Dataset) -> None:
def archive_less_mature(self, ds: Dataset, delta: int) -> None:
"""
Archive less mature versions of a dataset
:param Dataset ds: dataset to search
"""
less_mature = self.find_less_mature(ds)
less_mature = self.find_less_mature(ds, delta)
less_mature_ids = map(lambda x: x.id, less_mature)

self.archive(less_mature_ids)
for lm_ds in less_mature_ids:
_LOG.info(f"Archived less mature dataset: {lm_ds}")

def find_less_mature(self, ds: Dataset) -> Iterable[Dataset]:
def find_less_mature(self, ds: Dataset, delta: int) -> Iterable[Dataset]:
"""
Find less mature versions of a dataset
:param Dataset ds: Dataset to search
:param int delta: millisecond delta for time range
:return: Iterable of less mature datasets
"""
less_mature = []
# 'expand' the date range by a millisecond to give a bit more leniency in datetime comparison
expanded_time_range = Range(ds.metadata.time.begin - timedelta(milliseconds=500),
ds.metadata.time.end + timedelta(milliseconds=500))
assert delta >= 0
# 'expand' the date range by `delta` milliseconds to give a bit more leniency in datetime comparison
expanded_time_range = Range(ds.metadata.time.begin - timedelta(milliseconds=delta),
ds.metadata.time.end + timedelta(milliseconds=delta))
dupes = self.search(product=ds.product.name,
region_code=ds.metadata.region_code,
time=expanded_time_range)
Expand Down
20 changes: 10 additions & 10 deletions datacube/index/postgis/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def bulk_has(self, ids_):
map((lambda x: UUID(x) if isinstance(x, str) else x), ids_)]

def add(self, dataset: Dataset,
with_lineage: bool = True, archive_less_mature: bool = False) -> Dataset:
with_lineage: bool = True, archive_less_mature: Optional[int] = None) -> Dataset:
"""
Add ``dataset`` to the index. No-op if it is already present.
Expand All @@ -148,9 +148,9 @@ def add(self, dataset: Dataset,
- ``False`` record lineage relations, but do not attempt
adding lineage datasets to the db
:param archive_less_mature:
- ``True`` search for less mature versions of the dataset
and archive them
:param archive_less_mature: if integer, search for less
mature versions of the dataset with the int value as a millisecond
delta in timestamp comparison
:rtype: Dataset
"""
Expand All @@ -173,8 +173,8 @@ def add(self, dataset: Dataset,
# 1c. Store locations
if dataset.uris is not None:
self._ensure_new_locations(dataset, transaction=transaction)
if archive_less_mature:
self.archive_less_mature(dataset)
if archive_less_mature is not None:
self.archive_less_mature(dataset, archive_less_mature)

return dataset

Expand Down Expand Up @@ -311,12 +311,12 @@ def can_update(self, dataset, updates_allowed=None):

return not bad_changes, good_changes, bad_changes

def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=False):
def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=None):
"""
Update dataset metadata and location
:param Dataset dataset: Dataset to update
:param updates_allowed: Allowed updates
:param archive_less_mature: Find and archive less mature datasets
:param archive_less_mature: Find and archive less mature datasets with ms delta
:rtype: Dataset
"""
existing = self.get(dataset.id)
Expand Down Expand Up @@ -349,8 +349,8 @@ def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=Fal
raise ValueError("Failed to update dataset %s..." % dataset.id)
transaction.update_spindex(dsids=[dataset.id])
transaction.update_search_index(dsids=[dataset.id])
if archive_less_mature:
self._archive_less_mature(dataset)
if archive_less_mature is not None:
self.archive_less_mature(dataset, archive_less_mature)

self._ensure_new_locations(dataset, existing)

Expand Down
20 changes: 11 additions & 9 deletions datacube/index/postgres/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import warnings
from collections import namedtuple
from time import monotonic
from typing import Iterable, List, Union, Mapping, Any
from typing import Iterable, List, Union, Mapping, Optional, Any
from uuid import UUID

from sqlalchemy import select, func
Expand Down Expand Up @@ -132,7 +132,7 @@ def bulk_has(self, ids_):
map((lambda x: UUID(x) if isinstance(x, str) else x), ids_)]

def add(self, dataset: Dataset,
with_lineage: bool = True, archive_less_mature: bool = False) -> Dataset:
with_lineage: bool = True, archive_less_mature: Optional[int] = None) -> Dataset:
"""
Add ``dataset`` to the index. No-op if it is already present.
Expand All @@ -143,9 +143,9 @@ def add(self, dataset: Dataset,
- ``False`` record lineage relations, but do not attempt
adding lineage datasets to the db
:param archive_less_mature:
- ``True`` search for less mature versions of the dataset
and archive them
:param archive_less_mature: if integer, search for less
mature versions of the dataset with the int value as a millisecond
delta in timestamp comparison
:rtype: Dataset
"""
Expand Down Expand Up @@ -191,8 +191,8 @@ def process_bunch(dss, main_ds, transaction):

with self._db_connection(transaction=True) as transaction:
process_bunch(dss, dataset, transaction)
if archive_less_mature:
self.archive_less_mature(dataset)
if archive_less_mature is not None:
self.archive_less_mature(dataset, archive_less_mature)

return dataset

Expand Down Expand Up @@ -284,12 +284,12 @@ def can_update(self, dataset, updates_allowed=None):

return not bad_changes, good_changes, bad_changes

def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=False):
def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=None):
"""
Update dataset metadata and location
:param Dataset dataset: Dataset to update
:param updates_allowed: Allowed updates
:param archive_less_mature: Find and archive less mature datasets
:param archive_less_mature: Find and archive less mature datasets with ms delta
:rtype: Dataset
"""
existing = self.get(dataset.id)
Expand Down Expand Up @@ -320,6 +320,8 @@ def update(self, dataset: Dataset, updates_allowed=None, archive_less_mature=Fal
with self._db_connection(transaction=True) as transaction:
if not transaction.update_dataset(dataset.metadata_doc_without_lineage(), dataset.id, product.id):
raise ValueError("Failed to update dataset %s..." % dataset.id)
if archive_less_mature is not None:
self.archive_less_mature(dataset, archive_less_mature)

self._ensure_new_locations(dataset, existing)

Expand Down
28 changes: 24 additions & 4 deletions datacube/scripts/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ def mk_dataset(ds, uri):
@click.option('--confirm-ignore-lineage',
help="Pretend that there is no lineage data in the datasets being indexed, without confirmation",
is_flag=True, default=False)
@click.option('--archive-less-mature', help='Archive less mature versions of the dataset',
is_flag=True, default=False)
@click.option('--archive-less-mature', is_flag=False, flag_value=500, default=None,
help=('Find and archive less mature versions of the dataset, will fail if more mature versions '
'of the dataset already exist. Can also specify a millisecond delta amount to be taken '
'into acount when comparing timestamps. Default delta is 500ms.'))
@click.argument('dataset-paths', type=str, nargs=-1)
@ui.pass_index()
def index_cmd(index, product_names,
Expand Down Expand Up @@ -200,6 +202,14 @@ def index_cmd(index, product_names,
_LOG.error(e)
sys.exit(2)

if archive_less_mature is not None:
if type(archive_less_mature) is not int:
click.echo('Error: millisecond delta value must be an integer')
sys.exit(1)
if archive_less_mature < 0:
click.echo('Error: millisecond delta value must be a positive integer')
sys.exit(1)

def run_it(dataset_paths):
doc_stream = ui_path_doc_stream(dataset_paths, logger=_LOG, uri=True)
doc_stream = remap_uri_from_doc(doc_stream)
Expand Down Expand Up @@ -250,8 +260,10 @@ def parse_update_rules(keys_that_can_change):
- 'keep': keep as alternative location [default]
- 'archive': mark as archived
- 'forget': remove from the index'''))
@click.option('--archive-less-mature', help='Archive less mature versions of the dataset',
is_flag=True, default=False)
@click.option('--archive-less-mature', is_flag=False, flag_value=500, default=None,
help=('Find and archive less mature versions of the dataset, will fail if more mature versions '
'of the dataset already exist. Can also specify a millisecond delta amount to be taken '
'into acount when comparing timestamps. Default delta is 500ms.'))
@click.argument('dataset-paths', nargs=-1)
@ui.pass_index()
def update_cmd(index, keys_that_can_change, dry_run, location_policy, dataset_paths, archive_less_mature):
Expand Down Expand Up @@ -299,6 +311,14 @@ def loc_keep(new_ds, existing_ds):
doc_stream = ui_path_doc_stream(dataset_paths, logger=_LOG, uri=True)
doc_stream = remap_uri_from_doc(doc_stream)

if archive_less_mature is not None:
if type(archive_less_mature) is not int:
click.echo('Error: millisecond delta value must be an integer')
sys.exit(1)
if archive_less_mature < 0:
click.echo('Error: millisecond delta value must be a positive integer')
sys.exit(1)

for dataset, existing_ds in load_datasets_for_update(doc_stream, index):
_LOG.info('Matched %s', dataset)

Expand Down
3 changes: 2 additions & 1 deletion docs/about/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ v1.8.next
GDAL. (:pull:`1457`)
- Fix broken pypi publishing Github action (:pull:`1454`)
- Documentation improvements (:pull:`1455`)
- Increase maturity leniency to +-500ms (:pull:`1458`)
- Increase default maturity leniency to +-500ms (:pull:`1458`)
- Add option to specify maturity timedelta when using ``--archive-less-mature`` option (:pull:`1460`)


v1.8.13 (6th June 2023)
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/index/test_index_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ def test_archive_datasets(index, local_config, ls8_eo3_dataset):

def test_archive_less_mature(index, final_dataset, nrt_dataset):
# case 1: add nrt then final; nrt should get archived
index.datasets.add(nrt_dataset, with_lineage=False, archive_less_mature=True)
index.datasets.add(nrt_dataset, with_lineage=False, archive_less_mature=0)
index.datasets.get(nrt_dataset.id).is_active
index.datasets.add(final_dataset, with_lineage=False, archive_less_mature=True)
index.datasets.add(final_dataset, with_lineage=False, archive_less_mature=0)
assert index.datasets.get(nrt_dataset.id).is_archived
assert index.datasets.get(final_dataset.id).is_active

Expand All @@ -107,14 +107,14 @@ def test_archive_less_mature(index, final_dataset, nrt_dataset):
assert index.datasets.get(nrt_dataset.id) is None
with pytest.raises(ValueError):
# should error as more mature version of dataset already exists
index.datasets.add(nrt_dataset, with_lineage=False, archive_less_mature=True)
index.datasets.add(nrt_dataset, with_lineage=False, archive_less_mature=0)


def test_archive_less_mature_approx_timestamp(index, ga_s2am_ard3_final, ga_s2am_ard3_interim):
# test archive_less_mature where there's a slight difference in timestamps
index.datasets.add(ga_s2am_ard3_interim, with_lineage=False)
index.datasets.get(ga_s2am_ard3_interim.id).is_active
index.datasets.add(ga_s2am_ard3_final, with_lineage=False, archive_less_mature=True)
index.datasets.add(ga_s2am_ard3_final, with_lineage=False, archive_less_mature=1)
assert index.datasets.get(ga_s2am_ard3_interim.id).is_archived
assert index.datasets.get(ga_s2am_ard3_final.id).is_active

Expand Down
1 change: 1 addition & 0 deletions wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ Terria
th
TIF
tif
timedelta
timeslice
timeslot
TIRS
Expand Down

0 comments on commit 8b2d884

Please sign in to comment.