Skip to content

Commit

Permalink
Cherry picked PRs from develop (1.8) (#1524)
Browse files Browse the repository at this point in the history
* Update whats_new.rst for 1.8.17 release. (#1510)

* [pre-commit.ci] pre-commit autoupdate

updates:
- [github.com/adrienverge/yamllint.git: v1.32.0 → v1.33.0](https://github.com/adrienverge/yamllint.git/compare/v1.32.0...v1.33.0)

* Bump conda-incubator/setup-miniconda from 2 to 3

Bumps [conda-incubator/setup-miniconda](https://github.com/conda-incubator/setup-miniconda) from 2 to 3.
- [Release notes](https://github.com/conda-incubator/setup-miniconda/releases)
- [Changelog](https://github.com/conda-incubator/setup-miniconda/blob/main/CHANGELOG.md)
- [Commits](conda-incubator/setup-miniconda@v2...v3)

---
updated-dependencies:
- dependency-name: conda-incubator/setup-miniconda
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>

* Dataset CLI tool to find duplicates (#1517)

* add dataset cli tool to find duplicates

* convert timestamps to UTC

* update whats_new

* update memory driver search duplicates implementation

---------

Co-authored-by: Ariana Barzinpour <ariana.barzinpour@ga.gov.au>

* Make solar_date() timezone aware. (#1521)

* Warn if non-eo3 dataset has eo3 metadata type (#1523)

* warn if non-eo3 dataset has eo3 metadata type

* fix str contains

* add test

---------

Co-authored-by: Ariana Barzinpour <ariana.barzinpour@ga.gov.au>

* Fix merge oops

* Resolve some merge issues arising fromm differences between SQLAlchemy 1.4 and 2.0.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Ariana-B <40238244+ariana-b@users.noreply.github.com>
Co-authored-by: Ariana Barzinpour <ariana.barzinpour@ga.gov.au>
  • Loading branch information
5 people committed Dec 20, 2023
1 parent 9e31d17 commit 0122fd4
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-conda-build.yml
Expand Up @@ -30,7 +30,7 @@ jobs:
${{ runner.os }}-conda-${{ env.CACHE_NUMBER }}-${{
hashFiles('conda-environment.yml') }}

- uses: conda-incubator/setup-miniconda@v2
- uses: conda-incubator/setup-miniconda@v3
with:
environment-file: conda-environment.yml
auto-update-conda: true
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/adrienverge/yamllint.git
rev: v1.32.0
rev: v1.33.0
hooks:
- id: yamllint
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
2 changes: 1 addition & 1 deletion datacube/api/query.py
Expand Up @@ -389,7 +389,7 @@ def solar_day(dataset: Dataset, longitude: Optional[float] = None) -> np.datetim
:param longitude: If supplied correct timestamp for this longitude,
rather than mid-point of the Dataset's footprint
"""
utc = dataset.center_time
utc = dataset.center_time.astimezone(datetime.timezone.utc)

if longitude is None:
_lon = _ds_mid_longitude(dataset)
Expand Down
55 changes: 54 additions & 1 deletion datacube/drivers/postgis/_api.py
Expand Up @@ -740,11 +740,14 @@ def search_unique_datasets(self, expressions, select_fields=None, limit=None):

def get_duplicates(self, match_fields: Sequence[PgField], expressions: Sequence[PgExpression]) -> Iterable[tuple]:
# TODO
if "time" in [f.name for f in match_fields]:
return self.get_duplicates_with_time(match_fields, expressions)

group_expressions = tuple(f.alchemy_expression for f in match_fields)
join_tables = PostgisDbAPI._join_tables(expressions, match_fields)

query = select(
func.array_agg(Dataset.id),
func.array_agg(Dataset.id).label("ids"),
*group_expressions
).select_from(Dataset)
for joins in join_tables:
Expand All @@ -759,6 +762,56 @@ def get_duplicates(self, match_fields: Sequence[PgField], expressions: Sequence[
)
return self._connection.execute(query)

def get_duplicates_with_time(
self, match_fields: Sequence[PgField], expressions: Sequence[PgExpression]
) -> Iterable[tuple]:
fields = []
for f in match_fields:
if f.name == "time":
time_field = f.expression_with_leniency
else:
fields.append(f.alchemy_expression)

join_tables = PostgisDbAPI._join_tables(expressions, match_fields)

cols = [Dataset.id, time_field.label('time'), *fields]
query = select(
*cols
).select_from(Dataset)
for joins in join_tables:
query = query.join(*joins)

query = query.where(
and_(Dataset.archived == None, *(PostgisDbAPI._alchemify_expressions(expressions)))
)

t1 = query.alias("t1")
t2 = query.alias("t2")

time_overlap = select(
t1.c.id,
text("t1.time * t2.time as time_intersect"),
*fields
).select_from(
t1.join(
t2,
and_(t1.c.time.overlaps(t2.c.time), t1.c.id != t2.c.id)
)
)

query = select(
func.array_agg(func.distinct(time_overlap.c.id)).label("ids"),
*fields,
text("(lower(time_intersect) at time zone 'UTC', upper(time_intersect) at time zone 'UTC') as time")
).select_from(
time_overlap
).group_by(
*fields, text("time_intersect")
).having(
func.count(time_overlap.c.id) > 1
)
return self._connection.execute(query)

def count_datasets(self, expressions):
"""
:type expressions: tuple[datacube.drivers.postgis._fields.PgExpression]
Expand Down
15 changes: 13 additions & 2 deletions datacube/drivers/postgis/_fields.py
Expand Up @@ -16,6 +16,7 @@
from sqlalchemy import cast, func, and_
from sqlalchemy.dialects import postgresql as postgres
from sqlalchemy.dialects.postgresql import NUMRANGE, TSTZRANGE
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.sql import ColumnElement
from sqlalchemy.orm import aliased

Expand Down Expand Up @@ -223,7 +224,7 @@ def __init__(self, name, description, alchemy_column, indexed, offset=None, sele

@property
def alchemy_expression(self):
return self._alchemy_offset_value(self.offset, self.aggregation.pg_calc)
return self._alchemy_offset_value(self.offset, self.aggregation.pg_calc).label(self.name)

def __eq__(self, value):
"""
Expand Down Expand Up @@ -362,7 +363,7 @@ def value_to_alchemy(self, value):

@property
def alchemy_expression(self):
return self.value_to_alchemy((self.lower.alchemy_expression, self.greater.alchemy_expression))
return self.value_to_alchemy((self.lower.alchemy_expression, self.greater.alchemy_expression)).label(self.name)

def __eq__(self, value):
"""
Expand Down Expand Up @@ -441,6 +442,16 @@ def between(self, low, high):
raise ValueError("Unknown comparison type for date range: "
"expecting datetimes, got: (%r, %r)" % (low, high))

@property
def expression_with_leniency(self):
return func.tstzrange(
self.lower.alchemy_expression - cast('500 milliseconds', INTERVAL),
self.greater.alchemy_expression + cast('500 milliseconds', INTERVAL),
# Inclusive on both sides.
'[]',
type_=TSTZRANGE,
)


def _number_implies_year(v: Union[int, datetime]) -> datetime:
"""
Expand Down
60 changes: 59 additions & 1 deletion datacube/drivers/postgres/_api.py
Expand Up @@ -775,10 +775,13 @@ def search_unique_datasets(self, expressions, select_fields=None, limit=None):
return self._connection.execute(select_query)

def get_duplicates(self, match_fields: Iterable[PgField], expressions: Iterable[PgExpression]) -> Iterable[Tuple]:
if "time" in [f.name for f in match_fields]:
return self.get_duplicates_with_time(match_fields, expressions)

group_expressions = tuple(f.alchemy_expression for f in match_fields)

select_query = select(
func.array_agg(DATASET.c.id),
func.array_agg(DATASET.c.id).label('ids'),
*group_expressions
).select_from(
PostgresDbAPI._from_expression(DATASET, expressions, match_fields)
Expand All @@ -791,6 +794,61 @@ def get_duplicates(self, match_fields: Iterable[PgField], expressions: Iterable[
)
return self._connection.execute(select_query)

def get_duplicates_with_time(
self, match_fields: Iterable[PgField], expressions: Iterable[PgExpression]
) -> Iterable[Tuple]:
"""
If considering time when searching for duplicates, we need to grant some amount of leniency
in case timestamps are not exactly the same.
From the set of datasets that are active and have the correct product (candidates),
find all those whose extended timestamp range overlap (overlapping),
then group them by the other fields.
"""
fields = []
for f in match_fields:
if f.name == "time":
time_field = f.expression_with_leniency
else:
fields.append(f.alchemy_expression)

candidates_table = select(
DATASET.c.id,
time_field.label('time'),
*fields
).select_from(
PostgresDbAPI._from_expression(DATASET, expressions, match_fields)
).where(
and_(DATASET.c.archived == None, *(PostgresDbAPI._alchemify_expressions(expressions)))
)

t1 = candidates_table.alias("t1")
t2 = candidates_table.alias("t2")

overlapping = select(
t1.c.id,
text("t1.time * t2.time as time_intersect"),
*fields
).select_from(
t1.join(
t2,
and_(t1.c.time.overlaps(t2.c.time), t1.c.id != t2.c.id)
)
)

final_query = select(
func.array_agg(func.distinct(overlapping.c.id)).label("ids"),
*fields,
text("(lower(time_intersect) at time zone 'UTC', upper(time_intersect) at time zone 'UTC') as time")
).select_from(
overlapping
).group_by(
*fields, text("time_intersect")
).having(
func.count(overlapping.c.id) > 1
)

return self._connection.execute(final_query)

def count_datasets(self, expressions):
"""
:type expressions: tuple[datacube.drivers.postgres._fields.PgExpression]
Expand Down
15 changes: 13 additions & 2 deletions datacube/drivers/postgres/_fields.py
Expand Up @@ -15,6 +15,7 @@
from sqlalchemy.dialects import postgresql as postgres
from sqlalchemy.dialects.postgresql import INT4RANGE
from sqlalchemy.dialects.postgresql import NUMRANGE, TSTZRANGE
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.sql import ColumnElement

from datacube import utils
Expand Down Expand Up @@ -193,7 +194,7 @@ def __init__(self, name, description, alchemy_column, indexed, offset=None, sele

@property
def alchemy_expression(self):
return self._alchemy_offset_value(self.offset, self.aggregation.pg_calc)
return self._alchemy_offset_value(self.offset, self.aggregation.pg_calc).label(self.name)

def __eq__(self, value):
"""
Expand Down Expand Up @@ -322,7 +323,7 @@ def postgres_index_type(self):

@property
def alchemy_expression(self):
return self.value_to_alchemy((self.lower.alchemy_expression, self.greater.alchemy_expression))
return self.value_to_alchemy((self.lower.alchemy_expression, self.greater.alchemy_expression)).label(self.name)

def __eq__(self, value):
"""
Expand Down Expand Up @@ -431,6 +432,16 @@ def between(self, low, high):
raise ValueError("Unknown comparison type for date range: "
"expecting datetimes, got: (%r, %r)" % (low, high))

@property
def expression_with_leniency(self):
return func.tstzrange(
self.lower.alchemy_expression - cast('500 milliseconds', INTERVAL),
self.greater.alchemy_expression + cast('500 milliseconds', INTERVAL),
# Inclusive on both sides.
'[]',
type_=TSTZRANGE,
)


def _number_implies_year(v: Union[int, datetime]) -> datetime:
"""
Expand Down
15 changes: 15 additions & 0 deletions datacube/index/hl.py
Expand Up @@ -5,6 +5,8 @@
"""
High level indexing operations/utilities
"""
import logging

import json
import toolz
from uuid import UUID
Expand All @@ -18,6 +20,8 @@
from datacube.model import LineageDirection
from .eo3 import prep_eo3, is_doc_eo3, is_doc_geo # type: ignore[attr-defined]

_LOG = logging.getLogger(__name__)


class ProductRule:
def __init__(self, product: Product, signature: Mapping[str, Any]):
Expand Down Expand Up @@ -145,6 +149,14 @@ def render_diff(offset, a, b):
]


def check_intended_eo3(ds: SimpleDocNav, product: Product) -> None:
# warn if it looks like dataset was meant to be eo3 but is not
if not is_doc_eo3(ds.doc) and ("eo3" in product.metadata_type.name):
_LOG.warning(f"Dataset {ds.id} has a product with an eo3 metadata type, "
"but the dataset definition does not include the $schema field "
"and so will not be recognised as an eo3 dataset.")


def resolve_no_lineage(ds: SimpleDocNav,
uri: str,
matcher: ProductMatcher,
Expand All @@ -156,6 +168,7 @@ def resolve_no_lineage(ds: SimpleDocNav,
product = matcher(doc)
except BadMatch as e:
return None, e
check_intended_eo3(ds, product)
return Dataset(product, doc, uris=[uri], sources={}), None


Expand Down Expand Up @@ -190,6 +203,7 @@ def resolve_with_lineage(doc: SimpleDocNav, uri: str, matcher: ProductMatcher,
if source_tree.direction == LineageDirection.DERIVED:
raise ValueError("source_tree cannot be a derived tree.")
source_tree = source_tree.find_subtree(uuid_)
check_intended_eo3(doc, product)
return Dataset(product,
doc.doc,
source_tree=source_tree,
Expand Down Expand Up @@ -268,6 +282,7 @@ def resolve_ds(ds: SimpleDocNav,
else:
product = matcher(doc)

check_intended_eo3(ds, product)
return with_cache(Dataset(product, doc, uris=uris, sources=sources), ds.id, cache)
try:
return remap_lineage_doc(main_ds, resolve_ds, cache={}), None
Expand Down
20 changes: 12 additions & 8 deletions datacube/index/memory/_datasets.py
Expand Up @@ -133,34 +133,38 @@ def search_product_duplicates(self,
product: Product,
*args: Union[str, Field]
) -> Iterable[Tuple[Tuple, Iterable[UUID]]]:
field_names: List[str] = [arg.name if isinstance(arg, Field) else arg for arg in args]
# Typing note: mypy cannot handle dynamically created namedtuples
GroupedVals = namedtuple('search_result', field_names) # type: ignore[misc]

"""
Find dataset ids of a given product that have duplicates of the given set of field names.
Returns each set of those field values and the datasets that have them.
Note that this implementation does not account for slight timestamp discrepancies.
"""
def to_field(f: Union[str, Field]) -> Field:
if isinstance(f, str):
f = product.metadata_type.dataset_fields[f]
assert isinstance(f, Field), "Not a field: %r" % (f,)
return f

fields = [to_field(f) for f in args]
# Typing note: mypy cannot handle dynamically created namedtuples
GroupedVals = namedtuple('search_result', list(f.name for f in fields)) # type: ignore[misc]

def values(ds: Dataset) -> GroupedVals:
vals = []
for field in fields:
vals.append(field.extract(ds.metadata_doc)) # type: ignore[attr-defined]
return GroupedVals(*vals)

dups: Dict[Tuple, List[UUID]] = {}
dups: Dict[Tuple, set[UUID]] = {}
for ds in self.active_by_id.values():
if ds.product.name != product.name:
continue
vals = values(ds)
if vals in dups:
dups[vals].append(ds.id)
dups[vals].add(ds.id)
else:
dups[vals] = [ds.id]
return list(dups.items())
dups[vals] = set([ds.id]) # avoid duplicate entries
# only return entries with more than one dataset
return list({k: v for k, v in dups.items() if len(v) > 1})

def can_update(self,
dataset: Dataset,
Expand Down
9 changes: 4 additions & 5 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -270,15 +270,14 @@ def load_field(f: Union[str, fields.Field]) -> fields.Field:
return f

group_fields: List[fields.Field] = [load_field(f) for f in args]
result_type = namedtuple('search_result', list(f.name for f in group_fields)) # type: ignore

expressions = [product.metadata_type.dataset_fields.get('product') == product.name]

with self._db_connection() as connection:
for record in connection.get_duplicates(group_fields, expressions):
dataset_ids = set(record[0])
grouped_fields = tuple(record[1:])
yield result_type(*grouped_fields), dataset_ids
as_dict = record._asdict()
if 'ids' in as_dict.keys():
ids = as_dict.pop('ids')
yield namedtuple('search_result', as_dict.keys())(**as_dict), set(ids)

def can_update(self, dataset, updates_allowed=None):
"""
Expand Down

0 comments on commit 0122fd4

Please sign in to comment.