Skip to content

Commit

Permalink
Remote lineage (#1401)
Browse files Browse the repository at this point in the history
* Add new index "supports" flag for external lineage.

* Postgis schema for new Lineage implementation.

* Lineage tree model.

* source and derived tree properties for Dataset model.

* More lineage work - NOT STABLE!

* Fix typo.

* Fix incomplete kwargs to resolve_legacy_lineage.

* Get tests passing.

* Extend support validation tests.

* Update PK for lineage table

* Update lineage API.

* Fix import

* BatchStatus comment to docstring

* Existing tests passing - ie. mostly typos.

* Some lineage tests. TODO: cyclic dependency detection.

* More fixes and tests - currently UNSTABLE.

* Docstrings and tests.

* flake8 etc.

* More flake8/lintage.

* Update hl after minor lineage API change.

* Rethink hl API based on experience implementing lineage models.

* Fix dataset_resolver()

* Fix hl validation and tests.

* More hl test coverage.

* Lineage API coverage, and fixes to relations_diff()

* flake8age.

* More test coverage.

* flake8age.

* More test coverage.

* Fix bad indentation - thanks flake8.

* I think that must be nearly 100% coverage of lineage.py

* Default (non-implementation) of LineageResource in most drivers.  Stubs for postgis impl.

* Postgis implementation of lineage home api.

* Postgis implementation of lineage home api.

* Add home-reading API, more tests and lint-fixes.

* Flake8 over aesthetics.

* Lineage API implemented - not tested, probably broken.

* Some basic tests. Make sense of relation pairs consistent.  (i.e. fix mutually-cancelling-out bugs)

* Tests, cleanup and bugfixes.

* flake8age

* Oops - debugger was still enabled.

* More test coverage.

* More test coverage of postgis driver.

* Don't delete lineage with datasets.

* Implement lineage.remove() and tests.

* Cleanup internal APIs.

* flake8age

* flake8age

* LineageTree serialisation/deserialisation methods.

* flake8age

* docstrings for postgis.drivers._api

* More docstrings and inline comments.

* Added comments to explain bad diamond test.
  • Loading branch information
SpacemanPaul committed Mar 22, 2023
1 parent 31555a0 commit 4f18c07
Show file tree
Hide file tree
Showing 18 changed files with 2,339 additions and 167 deletions.
202 changes: 194 additions & 8 deletions datacube/drivers/postgis/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import select, text, and_, or_, func
from sqlalchemy.dialects.postgresql import INTERVAL
from typing import Iterable, Sequence
from sqlalchemy.exc import IntegrityError
from typing import Iterable, Sequence, Optional, Set

from datacube.index.fields import OrExpression
from datacube.model import Range
from datacube.utils.geometry import CRS, Geometry
from datacube.utils.uris import split_uri
from datacube.index.abstract import DSID
from datacube.model.lineage import LineageRelation, LineageDirection
from . import _core
from ._fields import parse_fields, Expression, PgField, PgExpression # noqa: F401
from ._fields import NativeField, DateDocField, SimpleDocField, UnindexableValue
from ._schema import MetadataType, Product, \
Dataset, DatasetSource, DatasetLocation, SelectedDatasetLocation, \
search_field_index_map, search_field_indexes
Dataset, DatasetLineage, DatasetLocation, SelectedDatasetLocation, \
search_field_index_map, search_field_indexes, DatasetHome
from ._spatial import geom_alchemy, generate_dataset_spatial_values, extract_geometry_from_eo3_projection
from .sql import escape_pg_identifier

Expand Down Expand Up @@ -501,11 +503,6 @@ def delete_dataset(self, dataset_id):
DatasetLocation.dataset_ref == dataset_id
)
)
self._connection.execute(
delete(DatasetSource).where(
DatasetSource.dataset_ref == dataset_id
)
)
for table in search_field_indexes.values():
self._connection.execute(
delete(table).where(table.dataset_ref == dataset_id)
Expand Down Expand Up @@ -1188,3 +1185,192 @@ def grant_role(self, role, users):
raise ValueError('Unknown user %r' % user)

_core.grant_role(self._connection, pg_role, users)

def insert_home(self, home, ids, allow_updates):
"""
Set home for multiple IDs (but one home value)
:param home: The home value to set
:param ids: The IDs to set it for
:param allow_updates: If False only inserts are allowed
:return: number of database records updated or added.
"""
values = [
{"dataset_ref": id_, "home": home}
for id_ in ids
]
qry = insert(DatasetHome)
if allow_updates:
qry = qry.on_conflict_do_update(
index_elements=["dataset_ref"],
set_={"home": home},
where=(DatasetHome.home != home))
try:
res = self._connection.execute(
qry,
values
)
return res.rowcount
except IntegrityError:
return 0

def delete_home(self, ids):
"""
Delete the home value for the specified IDs
:param ids: The IDs to delete home for
:return: The number of hone records deleted from the databes
"""
res = self._connection.execute(
delete(DatasetHome).where(DatasetHome.dataset_ref.in_(ids))
)
return res.rowcount

def select_homes(self, ids):
"""
Find homes for IDs.
:param ids: Iterable of IDs
:return: Mapping of ID to home string for IDs found in database.
"""
results = self._connection.execute(
select(DatasetHome).where(DatasetHome.dataset_ref.in_(ids))
)
return {
row.dataset_ref: row.home
for row in results
}

def get_all_relations(self, dsids: Iterable[uuid.UUID]) -> Iterable[LineageRelation]:
"""
Fetch all lineage relations in the database involving a set on dataset IDs.
:param dsids: Iterable of dataset IDs
:return: Iterable of LineageRelation objects.
"""
results = self._connection.execute(
select(DatasetLineage).where(or_(
DatasetLineage.derived_dataset_ref.in_(dsids),
DatasetLineage.source_dataset_ref.in_(dsids)
))
)
for rel in results:
yield LineageRelation(classifier=rel["classifier"],
source_id=rel["source_dataset_ref"],
derived_id=rel["derived_dataset_ref"])

def write_relations(self, relations: Iterable[LineageRelation], allow_updates: bool):
"""
Write a set of LineageRelation objects to the database.
:param relations: An Iterable of LineageRelation objects
:param allow_updates: if False, only allow adding new relations, not updating old ones.
:return: Count of database rows affected
"""
if allow_updates:
by_classifier = {}
for rel in relations:
db_repr = {
"derived_dataset_ref": rel.derived_id,
"source_dataset_ref": rel.source_id,
"classifier": rel.classifier
}
if rel.classifier in by_classifier:
by_classifier[rel.classifier].append(db_repr)
else:
by_classifier[rel.classifier] = [db_repr]
updates = 0
for classifier, values in by_classifier.items():
qry = insert(DatasetLineage).on_conflict_do_update(
index_elements=["derived_dataset_ref", "source_dataset_ref"],
set_={"classifier": classifier},
where=(DatasetLineage.classifier != classifier))
res = self._connection.execute(qry, values)
updates += res.rowcount
return updates
else:
values = [
{
"derived_dataset_ref": rel.derived_id,
"source_dataset_ref": rel.source_id,
"classifier": rel.classifier
}
for rel in relations
]
qry = insert(DatasetLineage)
try:
res = self._connection.execute(
qry, values
)
return res.rowcount
except IntegrityError:
return 0

def load_lineage_relations(self,
roots: Iterable[uuid.UUID],
direction: LineageDirection,
depth: int,
ids_so_far: Optional[Set[uuid.UUID]] = None) -> Iterable[LineageRelation]:
"""
Read from the database all indexed LineageRelation objects required to build all LineageTrees with
the given roots, direction and depth.
:param roots: Iterable of root dataset ids
:param direction: tree direction
:param depth: Maximum tree depth - zero indicates unlimited depth.
:param ids_so_far: Used for maintaining state through recursion - expected to be None on initial call
:return: Iterable of LineageRelation objects read from database
"""
# Naive manually-recursive initial implementation.
# TODO: Reimplement using WITH RECURSIVE query
if ids_so_far is None:
ids_so_far = set(roots)
qry = select(DatasetLineage)
if direction == LineageDirection.SOURCES:
qry = qry.where(DatasetLineage.derived_dataset_ref.in_(roots))
else:
qry = qry.where(DatasetLineage.source_dataset_ref.in_(roots))
relations = []
next_lvl_ids = set()
results = self._connection.execute(qry)
for row in results:
rel = LineageRelation(classifier=row["classifier"],
source_id=row["source_dataset_ref"],
derived_id=row["derived_dataset_ref"])
relations.append(rel)
if direction == LineageDirection.SOURCES:
next_id = rel.source_id
else:
next_id = rel.derived_id
if next_id not in ids_so_far:
next_lvl_ids.add(next_id)
ids_so_far.add(next_id)
next_depth = depth - 1
recurse = True
if depth == 0:
next_depth = 0
elif depth == 1:
recurse = False
if recurse and next_lvl_ids:
relations.extend(self.load_lineage_relations(next_lvl_ids, direction, next_depth, ids_so_far))
return relations

def remove_lineage_relations(self,
ids: Iterable[DSID],
direction: LineageDirection) -> int:
"""
Remove lineage relations from the provided ids in the specified direction.
Note no depth parameter - depth is effectively always 1.
:param ids: Iterable of IDs to remove lineage information for.
:param direction: Remove the source or derived lineage relation records
:return: Return number of relation records deleted.
"""
qry = delete(DatasetLineage)
if direction == LineageDirection.SOURCES:
qry = qry.where(DatasetLineage.derived_dataset_ref.in_(ids))
else:
qry = qry.where(DatasetLineage.source_dataset_ref.in_(ids))
results = self._connection.execute(qry)
return results.rowcount
35 changes: 28 additions & 7 deletions datacube/drivers/postgis/_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,22 @@ class DatasetLocation:


@orm_registry.mapped
class DatasetSource:
class DatasetLineage:
__tablename__ = "dataset_lineage"
__table_args__ = (
_core.METADATA,
PrimaryKeyConstraint('dataset_ref', 'classifier'),
UniqueConstraint('source_dataset_ref', 'dataset_ref'),
PrimaryKeyConstraint('source_dataset_ref', 'derived_dataset_ref'),
Index("ix_lin_derived_classifier", "derived_dataset_ref", "classifier"),
{
"schema": sql.SCHEMA_NAME,
"comment": "Represents a source-lineage relationship between two datasets"
}
)
dataset_ref = Column(postgres.UUID(as_uuid=True), nullable=False, index=True,
comment="The downstream derived dataset produced from the upstream source dataset.")
derived_dataset_ref = Column(
postgres.UUID(as_uuid=True),
nullable=False, index=True,
comment="The downstream derived dataset produced from the upstream source dataset."
)
source_dataset_ref = Column(
postgres.UUID(as_uuid=True), nullable=False, index=True,
comment="An upstream source dataset that the downstream derived dataset was produced from."
Expand All @@ -176,6 +179,22 @@ class DatasetSource:
for a time-range summary.""")


@orm_registry.mapped
class DatasetHome:
__tablename__ = "dataset_home"
__table_args__ = (
_core.METADATA,
{
"schema": sql.SCHEMA_NAME,
"comment": "Represents an optional 'home index' for an external datasets"
}
)
dataset_ref = Column(postgres.UUID(as_uuid=True), primary_key=True,
comment="The dataset ID - no referential integrity enforced to dataset table.")
home = Column(Text, nullable=False, comment="""The 'home' index where this dataset can be found.
Not interpreted directly by ODC, provided as a convenience to database administrators.""")


class SpatialIndex:
"""
Base class for dynamically SpatialIndex ORM models (See _spatial.py)
Expand Down Expand Up @@ -309,8 +328,10 @@ class DatasetSearchDateTime:
}

ALL_STATIC_TABLES = [
MetadataType.__table__, Product.__table__, Dataset.__table__,
DatasetLocation.__table__, DatasetSource.__table__, SpatialIndexRecord.__table__,
MetadataType.__table__, Product.__table__,
Dataset.__table__, DatasetLocation.__table__,
DatasetLineage.__table__, DatasetHome.__table__,
SpatialIndexRecord.__table__,
DatasetSearchString.__table__, DatasetSearchNumeric.__table__,
DatasetSearchDateTime.__table__,
]

0 comments on commit 4f18c07

Please sign in to comment.