Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search returning new args #1557

Merged
merged 15 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Expand Up @@ -179,3 +179,4 @@ jobs:
with:
file: ./coverage.xml
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}
27 changes: 15 additions & 12 deletions datacube/drivers/postgis/_api.py
Expand Up @@ -145,6 +145,15 @@ def get_native_fields():
return fields


def mk_simple_offset_field(field_name, description, offset):
return SimpleDocField(
name=field_name, description=description,
alchemy_column=Dataset.metadata_doc,
indexed=False,
offset=offset
)


def get_dataset_fields(metadata_type_definition):
dataset_section = metadata_type_definition['dataset']

Expand All @@ -158,19 +167,13 @@ def get_dataset_fields(metadata_type_definition):
False,
offset=dataset_section.get('creation_dt') or ['creation_dt']
),
format=SimpleDocField(
'format',
'File format (GeoTiff, NetCDF)',
Dataset.metadata_doc,
False,
offset=dataset_section.get('format') or ['format', 'name']
format=mk_simple_offset_field(
'format', 'File format (GeoTiff, NetCDF)',
dataset_section.get('format') or ['format', 'name']
),
label=SimpleDocField(
'label',
'Label',
Dataset.metadata_doc,
False,
offset=dataset_section.get('label') or ['label']
label=mk_simple_offset_field(
'label', 'Label',
dataset_section.get('label') or ['label']
),
))

Expand Down
11 changes: 10 additions & 1 deletion datacube/index/abstract.py
Expand Up @@ -1751,8 +1751,10 @@ def search_by_product(self,
@abstractmethod
def search_returning(self,
field_names: Iterable[str] | None = None,
custom_offsets: Mapping[str, Offset] | None = None,
limit: int | None = None,
archived: bool | None = False,
order_by: str | Field | None = None,
**query: QueryField
) -> Iterable[tuple]:
"""
Expand All @@ -1762,11 +1764,18 @@ def search_returning(self,

It also allows for returning rows other than datasets, such as a row per uri when requesting field 'uri'.

:param field_names: Names of desired fields (default = all known search fields)
:param field_names: Names of desired fields (default = all known search fields, unless custom_offsets is set,
see below)
:param custom_offsets: A dictionary of offsets in the metadata doc for custom fields custom offsets
are returned in addition to fields named in field_names. Default is
None, field_names only. If field_names is None, and custom_offsets are provided,
only the custom offsets are included, over-riding the normal field_names default.
:param limit: Limit number of dataset (None/default = unlimited)
:param archived: False (default): Return active datasets only.
None: Include archived and active datasets.
True: Return archived datasets only.
:param order_by: a field name or field by which to sort output. None is unsorted and may allow faster return
of first result depending on the index driver's implementation.
:param query: search query parameters
:return: Namedtuple of requested fields, for each matching dataset.
"""
Expand Down
33 changes: 27 additions & 6 deletions datacube/index/memory/_datasets.py
Expand Up @@ -24,6 +24,7 @@
from datacube.index.fields import Field
from datacube.index.memory._fields import build_custom_fields, get_dataset_fields
from datacube.model import Dataset, LineageRelation, Product, Range, ranges_overlap
from datacube.model.fields import SimpleField
from datacube.utils import jsonify_document, _readable_offset
from datacube.utils import changes
from datacube.utils.changes import AllowPolicy, Change, Offset, get_doc_changes
Expand Down Expand Up @@ -103,7 +104,7 @@ def add(self, dataset: Dataset,
if dataset.product.name in self._by_product:
self._by_product[dataset.product.name].add(dataset.id)
else:
self._by_product[dataset.product.name] = set([dataset.id])
self._by_product[dataset.product.name] = {dataset.id}
if archive_less_mature is not None:
_LOG.warning("archive-less-mature functionality is not implemented for memory driver")
return cast(Dataset, self.get(dataset.id))
Expand Down Expand Up @@ -592,20 +593,40 @@ def search_by_product(self,

def search_returning(self,
field_names: Iterable[str] | None = None,
custom_offsets: Mapping[str, Offset] | None = None,
limit: Optional[int] = None,
archived: bool | None = False,
order_by: str | Field | None = None,
**query: QueryField) -> Iterable[Tuple]:
if field_names is None:
field_names = self._index.products.get_field_names()
# Note that this implementation relies on dictionaries being ordered by insertion - this has been the case
# since Py3.6, and officially guaranteed behaviour since Py3.7.
if order_by:
raise ValueError("order_by argument is not currently supported by the memory index driver.")
if field_names is None and custom_offsets is None:
field_name_d = {f: None for f in self._index.products.get_field_names()}
elif field_names:
field_name_d = {f: None for f in field_names}
else:
field_names = list(field_names)
field_name_d = {}

if custom_offsets:
custom_fields = {
name: SimpleField(offset, lambda x: x, "any", name=name, description="")
for name, offset in custom_offsets.items()
}
for name in custom_fields:
field_name_d[name] = None
else:
custom_fields = {}

# Typing note: mypy can't handle dynamically created namedtuples
result_type = namedtuple('search_result', field_names) # type: ignore[misc]
result_type = namedtuple('search_result', field_name_d.keys()) # type: ignore[misc]
for ds in self.search(limit=limit, archived=archived, **query): # type: ignore[arg-type]
ds_fields = get_dataset_fields(ds.metadata_type.definition)
ds_fields.update(custom_fields)
result_vals = {
fn: ds_fields[fn].extract(ds.metadata_doc) if fn in ds_fields else None
for fn in field_names
for fn in field_name_d.keys()
}
yield result_type(**result_vals)

Expand Down
5 changes: 4 additions & 1 deletion datacube/index/null/_datasets.py
Expand Up @@ -146,7 +146,10 @@ def search(self, limit=None, archived=False, **query):
def search_by_product(self, archived=False, **query):
return []

def search_returning(self, field_names=None, limit=None, archived=False, **query):
def search_returning(self,
field_names=None, custom_offsets=None,
limit=None, archived=False, order_by=None,
**query):
return []

def count(self, archived=False, **query):
Expand Down
49 changes: 40 additions & 9 deletions datacube/index/postgis/_datasets.py
Expand Up @@ -18,7 +18,7 @@

from datacube.drivers.postgis._fields import SimpleDocField
from datacube.drivers.postgis._schema import Dataset as SQLDataset, search_field_map
from datacube.drivers.postgis._api import non_native_fields, extract_dataset_fields
from datacube.drivers.postgis._api import non_native_fields, extract_dataset_fields, mk_simple_offset_field
from datacube.utils.uris import split_uri
from datacube.drivers.postgis._spatial import generate_dataset_spatial_values, extract_geometry_from_eo3_projection
from datacube.migration import ODC2DeprecationWarning
Expand All @@ -28,7 +28,7 @@
from datacube.model import Dataset, Product, Range, LineageTree
from datacube.model.fields import Field
from datacube.utils import jsonify_document, _readable_offset, changes
from datacube.utils.changes import get_doc_changes
from datacube.utils.changes import get_doc_changes, Offset
from odc.geo import CRS, Geometry
from datacube.index import fields

Expand Down Expand Up @@ -651,7 +651,13 @@ def search_by_product(self, archived: bool | None = False, **query):
for product, datasets in self._do_search_by_product(query, archived=archived):
yield product, self._make_many(datasets, product)

def search_returning(self, field_names=None, limit=None, archived: bool | None = False, **query):
def search_returning(self,
field_names=None,
custom_offsets: Mapping[str, Offset] | None = None,
limit=None,
archived: bool | None = False,
order_by: str | Field | None = None,
**query):
"""
Perform a search, returning only the specified fields.

Expand All @@ -664,21 +670,42 @@ def search_returning(self, field_names=None, limit=None, archived: bool | None =
:param int limit: Limit number of datasets
:returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields
"""
if field_names is None:
if order_by:
raise ValueError("order_by argument is not yet supported by the postgis index driver.")
if field_names is None and custom_offsets is None:
field_names = self._index.products.get_field_names()
elif field_names:
field_names = list(field_names)
else:
field_names = []
field_name_set = set(field_names)
if custom_offsets:
custom_fields = {
name: mk_simple_offset_field(name, name, offset)
for name, offset in custom_offsets.items()
}
for name in custom_fields:
if name not in field_name_set:
field_name_set.add(name)
field_names.append(name)
else:
custom_fields = {}

result_type = namedtuple('search_result', field_names)

for _, results in self._do_search_by_product(query,
return_fields=True,
select_field_names=field_names,
additional_fields=custom_fields,
limit=limit,
archived=archived):
for columns in results:
coldict = columns._asdict()
kwargs = {
field: coldict.get(field)
for field in field_names
}

def extract_field(f):
# Custom fields are not type-aware and returned as stringified json.
return json.loads(coldict.get(f)) if f in custom_fields else coldict.get(f)
kwargs = {f: extract_field(f) for f in field_names}
yield result_type(**kwargs)

def count(self, archived: bool | None = False, **query):
Expand Down Expand Up @@ -737,7 +764,9 @@ def _get_product_queries(self, query):
yield q, product

# pylint: disable=too-many-locals
def _do_search_by_product(self, query, return_fields=False, select_field_names=None,
def _do_search_by_product(self, query, return_fields=False,
additional_fields: Mapping[str, Field] | None = None,
select_field_names=None,
with_source_ids=False, source_filter=None, limit=None,
archived: bool | None = False):
assert not with_source_ids
Expand All @@ -758,6 +787,8 @@ def _do_search_by_product(self, query, return_fields=False, select_field_names=N
assert "lon" not in q

dataset_fields = product.metadata_type.dataset_fields
if additional_fields:
dataset_fields.update(additional_fields)
query_exprs = tuple(fields.to_expressions(dataset_fields.get, **q))
select_fields = None
if return_fields:
Expand Down
72 changes: 54 additions & 18 deletions datacube/index/postgres/_datasets.py
Expand Up @@ -24,7 +24,7 @@
from datacube.model.fields import Field
from datacube.model.utils import flatten_datasets
from datacube.utils import jsonify_document, _readable_offset, changes
from datacube.utils.changes import get_doc_changes
from datacube.utils.changes import get_doc_changes, Offset
from datacube.index import fields
from datacube.drivers.postgres._api import split_uri
from datacube.migration import ODC2DeprecationWarning
Expand Down Expand Up @@ -570,11 +570,14 @@ def _make(self, dataset_res, full_info=False, product=None):
**kwargs
)

def _make_many(self, query_result, product=None):
def _make_many(self, query_result, product=None, fetch_all: bool = False):
"""
:rtype list[Dataset]
"""
return (self._make(dataset, product=product) for dataset in query_result)
if fetch_all:
return [self._make(dataset, product=product) for dataset in query_result]
else:
return (self._make(dataset, product=product) for dataset in query_result)

def search_by_metadata(self, metadata, archived: bool | None = False):
"""
Expand Down Expand Up @@ -624,7 +627,13 @@ def search_by_product(self, archived: bool | None = False, **query):
for product, datasets in self._do_search_by_product(query, archived=archived):
yield product, self._make_many(datasets, product)

def search_returning(self, field_names=None, limit=None, archived: bool | None = False, **query):
def search_returning(self,
field_names=None,
custom_offsets: Mapping[str, Offset] | None = None,
limit=None,
archived: bool | None = False,
order_by: str | Field | None = None,
**query):
"""
Perform a search, returning only the specified fields.

Expand All @@ -637,21 +646,44 @@ def search_returning(self, field_names=None, limit=None, archived: bool | None =
:param int limit: Limit number of datasets
:returns __generator[tuple]: sequence of results, each result is a namedtuple of your requested fields
"""
if field_names is None:
if order_by:
raise ValueError("order_by argument is not currently supported by the postgres index driver.")
if field_names is None and custom_offsets is None:
field_names = self._index.products.get_field_names()
result_type = namedtuple('search_result', field_names)
elif field_names:
field_names = list(field_names)
else:
field_names = []
field_name_set = set(field_names)
if custom_offsets:
custom_fields = {
name: SimpleDocField(
name=name, description="",
alchemy_column=DATASET.c.metadata, indexed=False,
offset=offset)
for name, offset in custom_offsets.items()
}
for name in custom_fields:
if name not in field_name_set:
field_name_set.add(name)
field_names.append(name)
else:
custom_fields = {}

for _, results in self._do_search_by_product(query,
return_fields=True,
select_field_names=field_names,
limit=limit,
archived=archived):
for columns in results:
result_type = namedtuple('search_result', field_names)
for _, p_results in self._do_search_by_product(query,
return_fields=True,
select_field_names=field_names,
additional_fields=custom_fields,
limit=limit,
archived=archived):
for columns in p_results:
coldict = columns._asdict()
kwargs = {
field: coldict.get(field)
for field in field_names
}

def extract_field(f):
# Custom fields are not type-aware and returned as stringified json.
return json.loads(coldict.get(f)) if f in custom_fields else coldict.get(f)
kwargs = {f: extract_field(f) for f in field_names}
yield result_type(**kwargs)

def count(self, archived: bool | None = False, **query):
Expand Down Expand Up @@ -722,7 +754,9 @@ def _get_product_queries(self, query):
yield q, product

# pylint: disable=too-many-locals
def _do_search_by_product(self, query, return_fields=False, select_field_names=None,
def _do_search_by_product(self, query, return_fields=False,
additional_fields: Mapping[str, Field] | None = None,
select_field_names=None,
with_source_ids=False, source_filter=None,
limit=None,
archived: bool | None = False):
Expand All @@ -749,7 +783,9 @@ def _do_search_by_product(self, query, return_fields=False, select_field_names=N
raise ValueError(f"No such product: {product}")

for q, product in product_queries:
dataset_fields = product.metadata_type.dataset_fields
dataset_fields = product.metadata_type.dataset_fields.copy()
if additional_fields:
dataset_fields.update(additional_fields)
query_exprs = tuple(fields.to_expressions(dataset_fields.get, **q))
select_fields = None
if return_fields:
Expand Down
3 changes: 3 additions & 0 deletions datacube/model/fields.py
Expand Up @@ -14,6 +14,9 @@

# Allowed values for field 'type' (specified in a metadata type docuemnt)
_AVAILABLE_TYPE_NAMES = (
# Unrestricted type - handy for dynamically creating fields from offsets, e.g. for search_returning()
'any',

'numeric-range',
'double-range',
'integer-range',
Expand Down
1 change: 1 addition & 0 deletions docs/about/whats_new.rst
Expand Up @@ -35,6 +35,7 @@ v1.9.next
- Deprecate multiple locations. (:pull:`1546`)
- Deprecate search_eager and search_summaries and add `archived` arg to all dataset search/count methods. (:pull:`1550`)
- Migrate away from deprecated Python pkg_resources module (:pull:`1558`)
- Add `custom_offsets` and `order_by` arguments to search_retunrning() - order_by still unimplemented. (:pull:`1557`)


v1.8.next
Expand Down