Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgis cleanup #1368

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 14 additions & 46 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -20,7 +20,6 @@
from datacube.index.postgis._transaction import IndexResourceAddIn
from datacube.model import Dataset, Product
from datacube.model.fields import Field
from datacube.model.utils import flatten_datasets
from datacube.utils import jsonify_document, _readable_offset, changes
from datacube.utils.changes import get_doc_changes
from datacube.utils.geometry import CRS, Geometry
Expand Down Expand Up @@ -149,56 +148,25 @@ def add(self, dataset: Dataset,
:rtype: Dataset
"""

sp_crses = self._db.spatial_indexes()

def process_bunch(dss, main_ds, transaction):
edges = []
dsids_for_spatial_indexing = []
# 1: Loop over datasets
for ds in dss:
# 1a. insert (if not already exists)
is_new = transaction.insert_dataset(ds.metadata_doc_without_lineage(), ds.id, ds.type.id)
sources = ds.sources
# 1b. Build edge graph for new datasets
if is_new and sources is not None:
edges.extend((name, ds.id, src.id)
for name, src in sources.items())
# 1c. Prepare spatial index extents
if is_new:
dsids_for_spatial_indexing.append(ds.id)
# 2: insert lineage graph edges
for ee in edges:
transaction.insert_dataset_source(*ee)
# 3: insert spatial indexes
transaction.update_spindex(dsids=dsids_for_spatial_indexing)
# 4: insert search fields
transaction.update_search_index(dsids=dsids_for_spatial_indexing)
# Finally update location for top-level dataset only
if main_ds.uris is not None:
self._ensure_new_locations(main_ds, transaction=transaction)

_LOG.info('Indexing %s', dataset.id)

if with_lineage:
ds_by_uuid = flatten_datasets(dataset)
all_uuids = list(ds_by_uuid)

present = {k: v for k, v in zip(all_uuids, self.bulk_has(all_uuids))}
raise ValueError("Lineage is not yet supported by the postgis driver")

if present[dataset.id]:
_LOG.warning('Dataset %s is already in the database', dataset.id)
return dataset

dss = [ds for ds in [dss[0] for dss in ds_by_uuid.values()] if not present[ds.id]]
else:
if self.has(dataset.id):
_LOG.warning('Dataset %s is already in the database', dataset.id)
return dataset
_LOG.info('Indexing %s', dataset.id)

dss = [dataset]
if self.has(dataset.id):
_LOG.warning('Dataset %s is already in the database', dataset.id)
return dataset

with self._db_connection(transaction=True) as transaction:
process_bunch(dss, dataset, transaction)
# 1a. insert (if not already exists)
is_new = transaction.insert_dataset(dataset.metadata_doc_without_lineage(), dataset.id, dataset.type.id)
if is_new:
# 1b. Prepare spatial index extents
transaction.update_spindex(dsids=[dataset.id])
transaction.update_search_index(dsids=[dataset.id])
# 1c. Store locations
if dataset.uris is not None:
self._ensure_new_locations(dataset, transaction=transaction)

return dataset

Expand Down
2 changes: 1 addition & 1 deletion datacube/scripts/dataset.py
Expand Up @@ -202,7 +202,7 @@ def run_it(dataset_paths):
dss = dataset_stream(doc_stream, ds_resolve)
index_datasets(dss,
index,
auto_add_lineage=auto_add_lineage,
auto_add_lineage=auto_add_lineage and not confirm_ignore_lineage,
dry_run=dry_run)

# If outputting directly to terminal, show a progress bar.
Expand Down
2 changes: 2 additions & 0 deletions docs/about/whats_new.rst
Expand Up @@ -15,7 +15,9 @@ v1.8.next
- Fix database relationship diagram instruction for docker (:pull:`1362`)
- Document ``group_by`` for ``dataset.load`` (:pull:`1364`)
- Add search_by_metadata facility for products (:pull:`1366`)
- Postgis driver cleanup - remove faux support for lineage (:pull:`1368`)
- Add support for nested database transactions (:pull:`1369`)
- Fix Github doc lint action (:pull:`1370`)

v1.8.9 (17 November 2022)
=========================
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/conftest.py
Expand Up @@ -169,7 +169,7 @@ def doc_to_ds(index, product_name, ds_doc, ds_path):
resolver = Doc2Dataset(index, products=[product_name], verify_lineage=False)
ds, err = resolver(ds_doc, ds_path)
assert err is None and ds is not None
index.datasets.add(ds)
index.datasets.add(ds, with_lineage=False)
return index.datasets.get(ds.id)


Expand Down Expand Up @@ -663,5 +663,6 @@ def dataset_add_configs():
datasets_bad1=str(B / 'datasets_bad1.yml'),
datasets_no_id=str(B / 'datasets_no_id.yml'),
datasets_eo3=str(B / 'datasets_eo3.yml'),
datasets_eo3_updated=str(B / 'datasets_eo3_updated.yml'),
datasets=str(B / 'datasets.yml'),
empty_file=str(B / 'empty_file.yml'))
46 changes: 46 additions & 0 deletions integration_tests/data/dataset_add/datasets_eo3_updated.yml
@@ -0,0 +1,46 @@
---
# change capture time
$schema: https://schemas.opendatacube.org/dataset
id: 7d41a4d0-2ab3-4da1-a010-ef48662ae8ef
product:
name: eo3_test

location: "http://example.com/a.yml"

crs: "epsg:3857"
properties:
datetime: 2020-04-20 00:27:43Z
odc:processing_datetime: 2020-05-16 10:56:18Z

grids:
default:
shape: [100, 200]
transform: [10, 0, 100000, 0, -10, 200000, 0, 0, 1]
lineage:
a: [f80c30a5-1036-5607-a62f-fde5e3fec985]
bc:
- fb077e47-f62e-5869-9bd1-03584c2d7380
- 13d3d75a-1d90-5ec0-8b86-e8be78275660

---
# No changes
$schema: https://schemas.opendatacube.org/dataset
id: f884df9b-4458-47fd-a9d2-1a52a2db8a1a
product:
name: eo3_test

crs: "epsg:32660"
properties:
datetime: 2020-04-20 00:26:43Z
odc:processing_datetime: 2020-05-16 10:56:18Z

grids:
default:
shape: [7811, 7691]
transform: [30, 0, 618285, 0, -30, -1642485, 0, 0, 1]
pan:
shape: [15621, 15381]
transform: [15, 0, 618292.5, 0, -15, -1642492.5, 0, 0, 1]
lineage: {}

...
1 change: 1 addition & 0 deletions integration_tests/index/test_config_docs.py
Expand Up @@ -183,6 +183,7 @@ def test_idempotent_add_dataset_type(index, ls5_telem_type, ls5_telem_doc):
# TODO: Support for adding/changing search fields?


@pytest.mark.parametrize('datacube_env_name', ('datacube', ))
def test_update_dataset(index, ls5_telem_doc, example_ls5_nbar_metadata_doc):
"""
:type index: datacube.index.Index
Expand Down
18 changes: 9 additions & 9 deletions integration_tests/index/test_index_data.py
Expand Up @@ -284,18 +284,18 @@ def test_transactions_api_ctx_mgr(index,
with pytest.raises(Exception) as e:
with index.transaction() as trans:
assert index.datasets.get(ds1.id) is None
index.datasets.add(ds1)
index.datasets.add(ds1, False)
assert index.datasets.get(ds1.id) is not None
raise Exception("Rollback!")
assert "Rollback!" in str(e.value)
assert index.datasets.get(ds1.id) is None
with index.transaction() as trans:
assert index.datasets.get(ds1.id) is None
index.datasets.add(ds1)
index.datasets.add(ds1, False)
assert index.datasets.get(ds1.id) is not None
assert index.datasets.get(ds1.id) is not None
with index.transaction() as trans:
index.datasets.add(ds2)
index.datasets.add(ds2, False)
assert index.datasets.get(ds2.id) is not None
raise trans.rollback_exception("Rollback")
assert index.datasets.get(ds1.id) is not None
Expand Down Expand Up @@ -345,17 +345,17 @@ def test_transactions_api_manual(index,
ds1, err = resolver(*eo3_ls8_dataset_doc)
ds2, err = resolver(*eo3_ls8_dataset2_doc)
trans = index.transaction()
index.datasets.add(ds1)
index.datasets.add(ds1, False)
assert index.datasets.get(ds1.id) is not None
trans.begin()
index.datasets.add(ds2)
index.datasets.add(ds2, False)
assert index.datasets.get(ds1.id) is not None
assert index.datasets.get(ds2.id) is not None
trans.rollback()
assert index.datasets.get(ds1.id) is not None
assert index.datasets.get(ds2.id) is None
trans.begin()
index.datasets.add(ds2)
index.datasets.add(ds2, False)
trans.commit()
assert index.datasets.get(ds1.id) is not None
assert index.datasets.get(ds2.id) is not None
Expand All @@ -372,18 +372,18 @@ def test_transactions_api_hybrid(index,
ds2, err = resolver(*eo3_ls8_dataset2_doc)
with index.transaction() as trans:
assert index.datasets.get(ds1.id) is None
index.datasets.add(ds1)
index.datasets.add(ds1, False)
assert index.datasets.get(ds1.id) is not None
trans.rollback()
assert index.datasets.get(ds1.id) is None
trans.begin()
assert index.datasets.get(ds1.id) is None
index.datasets.add(ds1)
index.datasets.add(ds1, False)
assert index.datasets.get(ds1.id) is not None
trans.commit()
assert index.datasets.get(ds1.id) is not None
trans.begin()
index.datasets.add(ds2)
index.datasets.add(ds2, False)
assert index.datasets.get(ds2.id) is not None
trans.rollback()
assert index.datasets.get(ds1.id) is not None
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/index/test_postgis_index.py
Expand Up @@ -32,7 +32,7 @@ def test_spatial_index_maintain(index: Index, ls8_eo3_product, eo3_ls8_dataset_d
resolver = Doc2Dataset(index, products=[ls8_eo3_product.name], verify_lineage=False)
ds, err = resolver(*eo3_ls8_dataset_doc)
assert err is None and ds is not None
ds = index.datasets.add(ds)
ds = index.datasets.add(ds, False)
assert ds
index.datasets.archive([ds.id])
index.datasets.purge([ds.id])
Expand Down
7 changes: 2 additions & 5 deletions integration_tests/test_3d.py
Expand Up @@ -260,9 +260,6 @@ def test_indexing(clirunner, index, product_def):
clirunner(["-v", "dataset", "add", "--dry-run", str(index_yaml)])

# - do actual indexing
clirunner(["-v", "dataset", "add", str(index_yaml)])

# - this will be no-op but with ignore lineage
clirunner(
[
"-v",
Expand Down Expand Up @@ -318,7 +315,7 @@ def test_indexing_with_spectral_map(clirunner, index, dataset_types):
clirunner(["-v", "product", "add", str(dataset_types)])

# Index the Dataset
clirunner(["-v", "dataset", "add", str(index_yaml)])
clirunner(["-v", "dataset", "add", '--confirm-ignore-lineage', str(index_yaml)])
dc = Datacube(index=index)
check_open_with_dc_simple(dc, product_def, [product_id], measurement)

Expand All @@ -338,7 +335,7 @@ def test_end_to_end_multitime(clirunner, index, product_def, original_data):
measurement=measurement,
)
# Index the Datasets
clirunner(["-v", "dataset", "add", str(index_yaml)])
clirunner(["-v", "dataset", "add", '--confirm-ignore-lineage', str(index_yaml)])

if idx == 0: # Full check for the first measurement only
# Check data for all product IDs
Expand Down
17 changes: 11 additions & 6 deletions integration_tests/test_cli_output.py
Expand Up @@ -75,7 +75,7 @@ def test_cli_dataset_subcommand(index_empty, clirunner, dataset_add_configs):
# Expect to fail with legacy datasets
clirunner(['dataset', 'add', dataset_add_configs.datasets])
# Use EO3 datasets to allow subsequent tests to run.
clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])
result = clirunner(['dataset', 'add', "--confirm-ignore-lineage", dataset_add_configs.datasets_eo3])

runner = clirunner(['dataset', 'archive'], verbose_flag=False, expect_success=False)
assert "Completed dataset archival." not in runner.output
Expand All @@ -93,7 +93,6 @@ def test_cli_dataset_subcommand(index_empty, clirunner, dataset_add_configs):
assert "Usage: [OPTIONS] [IDS]" in runner.output
assert "Restore datasets" in runner.output
assert runner.exit_code == 1

runner = clirunner(['dataset', 'restore', "--all"], verbose_flag=False)
assert "restoring" in runner.output
assert "Usage: [OPTIONS] [IDS]" not in runner.output
Expand Down Expand Up @@ -129,10 +128,16 @@ def test_readd_and_update_metadata_product_dataset_command(index_empty, clirunne
update = clirunner(['product', 'update', dataset_add_configs.products])
assert "WARNING No changes detected for product" in update.output

clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])
rerun_add = clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])
# Update before add
update = clirunner(['dataset', 'update', dataset_add_configs.datasets_eo3])
assert "No such dataset in the database" in update.output
assert "Failure while processing" in update.output

clirunner(['dataset', 'add', '--confirm-ignore-lineage', dataset_add_configs.datasets_eo3])
rerun_add = clirunner(['dataset', 'add', '--confirm-ignore-lineage', dataset_add_configs.datasets_eo3])
assert "WARNING Dataset" in rerun_add.output
assert "is already in the database" in rerun_add.output

update = clirunner(['dataset', 'update', dataset_add_configs.datasets_eo3])
assert "1 successful, 0 failed" in update.output
update = clirunner(['dataset', 'update', '--allow-any', 'properties.datetime',
dataset_add_configs.datasets_eo3_updated])
assert "2 successful, 0 failed" in update.output
27 changes: 27 additions & 0 deletions tests/index/test_postgis_fields.py
@@ -0,0 +1,27 @@
import datetime
from decimal import Decimal

from datacube.drivers.postgis._fields import NumericDocField, IntDocField, DoubleDocField, DateDocField
from datacube.drivers.postgis._schema import Dataset


def test_numeric_parse():
fld = NumericDocField("test_fld", "field for testing", Dataset.metadata_doc, True)
assert isinstance(fld.parse_value("55.88"), Decimal)


def test_int_parse():
fld = IntDocField("test_fld", "field for testing", Dataset.metadata_doc, True)
assert fld.parse_value("55") == 55


def test_float_parse():
fld = DoubleDocField("test_fld", "field for testing", Dataset.metadata_doc, True)
assert isinstance(fld.parse_value("55.88"), float)


def test_date_parse():
fld = DateDocField("test_fld", "field for testing", Dataset.metadata_doc, True)
assert fld.parse_value("2020-07-22T14:45:22.452434+0000") == datetime.datetime(
2020, 7, 22, 14, 45, 22, 452434, tzinfo=datetime.timezone.utc
)