Skip to content

Commit

Permalink
Large change to datasets.add
Browse files Browse the repository at this point in the history
- Removed verification step, not part of add, #489
- Deprecated `sources_policy` instead just `with_lineage=True|False` which is
  equivalent to `ensure|skip` of the old, `verify` is gone and will be done as a
  separate higher level step #451
- Addition of lineage sources is now done in one transaction #475
- Fixed warnings for duplicate lineage datasets #477
- Duplicate lineage datasets are processed once now #478
- Low-level `insert_dataset` doesn't raise DuplicateError, instead returns False
- Added method to check presence of several datasets in the DB
- Don't attempt insert for datasets present in DB
  • Loading branch information
Kirill888 committed Jun 20, 2018
1 parent b2f566f commit 48c0502
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 115 deletions.
55 changes: 31 additions & 24 deletions datacube/drivers/postgres/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sqlalchemy import delete
from sqlalchemy import select, text, bindparam, and_, or_, func, literal, distinct
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import JSONB, insert
from sqlalchemy.exc import IntegrityError

from datacube.index.exceptions import DuplicateRecordError, MissingRecordError
Expand Down Expand Up @@ -212,30 +212,27 @@ def insert_dataset(self, metadata_doc, dataset_id, dataset_type_id):
:return: whether it was inserted
:rtype: bool
"""
try:
dataset_type_ref = bindparam('dataset_type_ref')
ret = self._connection.execute(
DATASET.insert().from_select(
['id', 'dataset_type_ref', 'metadata_type_ref', 'metadata'],
dataset_type_ref = bindparam('dataset_type_ref')
ret = self._connection.execute(
insert(DATASET).from_select(
['id', 'dataset_type_ref', 'metadata_type_ref', 'metadata'],
select([
bindparam('id'), dataset_type_ref,
select([
bindparam('id'), dataset_type_ref,
select([
DATASET_TYPE.c.metadata_type_ref
]).where(
DATASET_TYPE.c.id == dataset_type_ref
).label('metadata_type_ref'),
bindparam('metadata', type_=JSONB)
])
),
id=dataset_id,
dataset_type_ref=dataset_type_id,
metadata=metadata_doc
)
return ret.rowcount > 0
except IntegrityError as e:
if e.orig.pgcode == PGCODE_UNIQUE_CONSTRAINT:
raise DuplicateRecordError('Duplicate dataset, not inserting: %s' % dataset_id)
raise
DATASET_TYPE.c.metadata_type_ref
]).where(
DATASET_TYPE.c.id == dataset_type_ref
).label('metadata_type_ref'),
bindparam('metadata', type_=JSONB)
])
).on_conflict_do_nothing(
index_elements=['id']
),
id=dataset_id,
dataset_type_ref=dataset_type_id,
metadata=metadata_doc
)
return ret.rowcount > 0

def update_dataset(self, metadata_doc, dataset_id, dataset_type_id):
"""
Expand Down Expand Up @@ -291,6 +288,16 @@ def contains_dataset(self, dataset_id):
).fetchone()
)

def datasets_intersection(self, dataset_ids):
""" Compute set intersection: db_dataset_ids & dataset_ids
"""
return [r[0]
for r in self._connection.execute(select(
[DATASET.c.id]
).where(
DATASET.c.id.in_(dataset_ids)
)).fetchall()]

def get_datasets_for_location(self, uri, mode=None):
scheme, body = _split_uri(uri)

Expand Down
10 changes: 5 additions & 5 deletions datacube/drivers/s3aio_index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ class DatasetResource(BaseDatasetResource):
additional s3 information to specific tables.
"""

def add(self, dataset, sources_policy='verify', **kwargs):
saved_dataset = super(DatasetResource, self).add(dataset, sources_policy, **kwargs)
def add(self, dataset, with_lineage=None, **kwargs):
saved_dataset = super(DatasetResource, self).add(dataset, with_lineage=with_lineage, **kwargs)

if dataset.format == FORMAT:
storage_metadata = kwargs['storage_metadata'] # It's an error to not include this
self.add_datasets_to_s3_tables([dataset.id], storage_metadata)
return saved_dataset

def add_multiple(self, datasets, sources_policy='verify'):
def add_multiple(self, datasets, with_lineage=None):
"""Index several datasets.
Perform the normal indexing, followed by the s3 specific
Expand All @@ -91,7 +91,7 @@ def add_multiple(self, datasets, sources_policy='verify'):
:param datasets: The datasets to be indexed. It must contain
an attribute named `storage_metadata` otherwise a ValueError
is raised.
:param str sources_policy: The sources policy.
:param bool with_lineage: Whether to recursively add lineage, default: yes
:return: The number of datasets indexed.
:rtype: int
Expand All @@ -104,7 +104,7 @@ def add_multiple(self, datasets, sources_policy='verify'):
# dataset_refs = []
# n = 0
# for dataset in datasets.values:
# self.add(dataset, sources_policy=sources_policy)
# self.add(dataset, with_lineage=with_lineage)
# dataset_refs.append(dataset.id)
# n += 1
# if n == len(datasets):
Expand Down
123 changes: 63 additions & 60 deletions datacube/index/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from datacube import compat
from datacube.model import Dataset, DatasetType
from datacube.model.utils import flatten_datasets
from datacube.utils import jsonify_document, changes
from datacube.utils.changes import get_doc_changes, check_doc_unchanged
from . import fields
Expand Down Expand Up @@ -109,36 +110,79 @@ def has(self, id_):
with self._db.connect() as connection:
return connection.contains_dataset(id_)

def add(self, dataset, sources_policy='verify', **kwargs):
def has_many(self, ids_):
"""
Add ``dataset`` to the index. No-op if it is already present.
Like `has` but operates on a list of ids.
:param Dataset dataset: dataset to add
:param str sources_policy: how should source datasets included in this dataset be handled:
For every supplied id check if database contains a dataset with that id.
``verify``
Verify that each source exists in the index, and that they are identical.
:param [typing.Union[UUID, str]] ids_: list of dataset ids
``ensure``
Add source datasets to the index if they doesn't exist.
:rtype: [bool]
"""
with self._db.connect() as connection:
existing = set(connection.datasets_intersection(ids_))

``skip``
don't attempt to index source datasets (use when sources are already indexed)
return [x in existing for x in ids_]

def add(self, dataset, with_lineage=None, **kwargs):
"""
Add ``dataset`` to the index. No-op if it is already present.
:param Dataset dataset: dataset to add
:param bool with_lineage: True -- attempt adding lineage if it's missing, False don't
:rtype: Dataset
"""
self._add_sources(dataset, sources_policy)

if with_lineage is None:
policy = kwargs.pop('sources_policy', None)
if policy is not None:
_LOG.debug('Use of sources_policy is deprecated')
with_lineage = (policy != "skip")
if policy == 'verify':
_LOG.debug('Verify is no longer done inside add')
else:
with_lineage = True

def add_one(ds, transaction):
if not transaction.insert_dataset(ds.metadata_doc_without_lineage(),
ds.id,
ds.type.id):
# duplicate
return False

for classifier, source_dataset in ds.sources.items():
transaction.insert_dataset_source(classifier, ds.id, source_dataset.id)

if ds.uris:
self._ensure_new_locations(ds, transaction=transaction)

return True

_LOG.info('Indexing %s', dataset.id)

if not self._try_add(dataset):
existing = self.get(dataset.id)
if existing:
check_doc_unchanged(
existing.metadata_doc_without_lineage(),
jsonify_document(dataset.metadata_doc_without_lineage()),
'Dataset {}'.format(dataset.id)
)
if with_lineage:
ds_by_uuid, ds_by_depth = flatten_datasets(dataset, with_depth_grouping=True)
all_uuids = list(ds_by_uuid)

present = {k: v for k, v in zip(all_uuids, self.has_many(all_uuids))}

if present[dataset.id]:
_LOG.warning('Dataset %s is already in the database', dataset.id)
return dataset

added = set()

with self._db.begin() as transaction:
for dss in ds_by_depth[::-1]:
dss = [ds for ds in dss if present[ds.id] is False and ds.id not in added]
for ds in dss:
add_one(ds, transaction)
added.add(ds.id)
else:
with self._db.begin() as transaction:
if add_one(dataset, transaction) is False:
_LOG.warning('Dataset %s is already in the database', dataset.id)

return dataset

Expand Down Expand Up @@ -170,23 +214,6 @@ def load_field(f):
grouped_fields = tuple(record[1:])
yield result_type(*grouped_fields), dataset_ids

def _add_sources(self, dataset, sources_policy='verify'):
if dataset.sources is None:
raise ValueError('Dataset has missing (None) sources. Was this loaded without include_sources=True?\n'
'Note that: \n'
' sources=None means "not loaded", '
' sources={} means there are no sources (eg. raw telemetry data)')

if sources_policy == 'ensure':
for source in dataset.sources.values():
if not self.has(source.id):
self.add(source, sources_policy=sources_policy)
elif sources_policy == 'verify':
for source in dataset.sources.values():
self.add(source, sources_policy=sources_policy)
elif sources_policy != 'skip':
raise ValueError('sources_policy must be one of ("verify", "ensure", "skip")')

def can_update(self, dataset, updates_allowed=None):
"""
Check if dataset can be updated. Return bool,safe_changes,unsafe_changes
Expand Down Expand Up @@ -558,30 +585,6 @@ def count_product_through_time(self, period, **query):
"""
return next(self._do_time_count(period, query, ensure_single=True))[1]

def _try_add(self, dataset):
was_inserted = False

if dataset.sources is None:
raise ValueError("Dataset has missing (None) sources. Was this loaded without include_sources=True?")

with self._db.begin() as transaction:
try:
was_inserted = transaction.insert_dataset(dataset.metadata_doc_without_lineage(),
dataset.id,
dataset.type.id)

for classifier, source_dataset in dataset.sources.items():
transaction.insert_dataset_source(classifier, dataset.id, source_dataset.id)

# try to update location in the same transaction as insertion.
# if insertion fails we'll try updating location later
# if insertion succeeds the location bit can't possibly fail
if dataset.uris:
self._ensure_new_locations(dataset, transaction=transaction)
except DuplicateRecordError as e:
_LOG.warning(str(e))
return was_inserted

def _get_dataset_types(self, q):
types = set()
if 'product' in q.keys():
Expand Down
2 changes: 1 addition & 1 deletion datacube/scripts/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def _index_datasets(index, results):
extra_args['storage_metadata'] = datasets.attrs['storage_metadata']

for dataset in datasets.values:
index.datasets.add(dataset, sources_policy='skip', **extra_args)
index.datasets.add(dataset, with_lineage=False, **extra_args)
n += 1
return n

Expand Down
2 changes: 1 addition & 1 deletion datacube/ui/task_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def check_existing_files(paths):

def add_dataset_to_db(index, datasets):
for dataset in datasets.values:
index.datasets.add(dataset, sources_policy='skip')
index.datasets.add(dataset, with_lineage=False)
_LOG.info('Dataset added')


Expand Down
38 changes: 19 additions & 19 deletions integration_tests/index/test_index_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ def test_index_duplicate_dataset(index, initialised_postgres_db, local_config, d
assert index.datasets.has(_telemetry_uuid)

# Insert again.
with pytest.raises(DuplicateRecordError):
with initialised_postgres_db.connect() as connection:
was_inserted = connection.insert_dataset(
_telemetry_dataset,
_telemetry_uuid,
dataset_type.id
)
assert not was_inserted
with initialised_postgres_db.connect() as connection:
was_inserted = connection.insert_dataset(
_telemetry_dataset,
_telemetry_uuid,
dataset_type.id
)
assert was_inserted is False

assert index.datasets.has(_telemetry_uuid)


Expand All @@ -170,6 +170,8 @@ def test_get_dataset(index, telemetry_dataset):
assert index.datasets.has(_telemetry_uuid)
assert index.datasets.has(str(_telemetry_uuid))

assert index.datasets.has_many([_telemetry_uuid, 'f226a278-e422-11e6-b501-185e0f80a5c0']) == [True, False]

for tr in (lambda x: x, str):
ds = index.datasets.get(tr(_telemetry_uuid))
assert ds.id == _telemetry_uuid
Expand Down Expand Up @@ -238,26 +240,24 @@ def test_index_dataset_with_sources(index, default_metadata_type):
child = Dataset(type_, child_doc, local_uri=None, sources={'source': parent})

with pytest.raises(MissingRecordError):
index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, with_lineage=False)

index.datasets.add(child, sources_policy='ensure')
index.datasets.add(child)
assert index.datasets.get(parent.id)
assert index.datasets.get(child.id)

assert len(index.datasets.get_many([parent.id, child.id])) == 2

index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, sources_policy='ensure')
index.datasets.add(child, sources_policy='verify')
# Deprecated property, but it should still work until we remove it completely.
index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, with_lineage=False)
index.datasets.add(child, with_lineage=True)

parent_doc['platform'] = {'code': 'LANDSAT_9'}
index.datasets.add(child, sources_policy='ensure')
index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, with_lineage=True)
index.datasets.add(child, with_lineage=False)

with pytest.raises(DocumentMismatchError):
index.datasets.add(child, sources_policy='verify')
# backwards compatibility code path checks, don't use this in normal code
for p in ('skip', 'ensure', 'verify'):
index.datasets.add(child, sources_policy=p)


# Make sure that both normal and s3aio index can handle normal data locations correctly
Expand Down
8 changes: 3 additions & 5 deletions tests/index/test_api_index_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def get_dataset(self, id):
def get_locations(self, dataset):
return ['file:xxx']

def datasets_intersection(self, ids):
return [k for k in ids if k in self.dataset]

def insert_dataset_location(self, *args, **kwargs):
return

Expand Down Expand Up @@ -236,11 +239,6 @@ def test_index_dataset():
assert len(mock_db.dataset) == 3
assert len(mock_db.dataset_source) == 2

ds2 = deepcopy(_EXAMPLE_NBAR_DATASET)
ds2.metadata_doc['product_type'] = 'zzzz'
with pytest.raises(DocumentMismatchError):
dataset = datasets.add(ds2)


def test_index_already_ingested_source_dataset():
mock_db = MockDb()
Expand Down

0 comments on commit 48c0502

Please sign in to comment.