diff --git a/nsl/stac/__init__.py b/nsl/stac/__init__.py index db929a7..1c6b770 100644 --- a/nsl/stac/__init__.py +++ b/nsl/stac/__init__.py @@ -36,7 +36,7 @@ from google.auth.exceptions import DefaultCredentialsError from google.cloud import storage as gcp_storage from google.oauth2 import service_account -from retry import retry +from tenacity import retry, stop_after_delay, wait_fixed from epl.protobuf.v1 import stac_service_pb2_grpc from epl.protobuf.v1.geometry_pb2 import GeometryData, ProjectionData, EnvelopeData @@ -45,13 +45,12 @@ LandsatRequest, Mosaic, MosaicRequest, DatetimeRange, View, ViewRequest, Extent, Interval, Provider __all__ = [ - 'stac_service', 'url_to_channel', 'STAC_SERVICE', - 'Collection', 'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest', - 'GeometryData', 'ProjectionData', 'EnvelopeData', - 'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter', - 'StacItem', 'Asset', 'Eo', 'View', 'Mosaic', 'DatetimeRange', 'Extent', 'Interval', 'Provider', - 'gcs_storage_client', - 'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'AuthInfo', 'bearer_auth' + 'bearer_auth', 'gcs_storage_client', 'stac_service', 'url_to_channel', + 'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest', + 'Collection', 'Eo', 'StacItem', 'Mosaic', 'View', 'Asset', + 'GeometryData', 'ProjectionData', 'EnvelopeData', 'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter', + 'DatetimeRange', 'Extent', 'Interval', 'Provider', + 'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'STAC_SERVICE', 'AuthInfo', ] CLOUD_PROJECT = os.getenv("CLOUD_PROJECT") @@ -79,10 +78,11 @@ MAX_BACKOFF_MS = int(os.getenv('MAX_BACKOFF_MS', 4)) MULTIPLIER = int(os.getenv('MULTIPLIER', 4)) -STAC_SERVICE = os.getenv('STAC_SERVICE', 'api.nearspacelabs.net:9090') +STAC_SERVICE_HOST = os.getenv('STAC_SERVICE_HOST', 'api.nearspacelabs.net') +STAC_SERVICE = os.getenv('STAC_SERVICE', f'{STAC_SERVICE_HOST}:9090') BYTES_IN_MB = 1024 * 1024 -# at this point only allowing 4 MB or smaller messages -MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 20)) +# at this point only allowing 10 MB or smaller messages +MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 10)) GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB), ('grpc.max_receive_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB)] @@ -299,7 +299,7 @@ def __init__(self, nsl_id: str, nsl_secret: str): self.nsl_secret = nsl_secret # this only retries if there's a timeout error - @retry(exceptions=requests.Timeout, delay=1, backoff=2, tries=4) + @retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5)) def authorize(self): if self.skip_authorization: return @@ -388,7 +388,7 @@ def __init__(self, init=False): def default_nsl_id(self): return self._default_nsl_id - def auth_header(self, nsl_id: str = None, profile_name: str = None): + def auth_header(self, nsl_id: str = None, profile_name: str = None) -> str: auth_info = self._get_auth_info(nsl_id, profile_name) if not auth_info.skip_authorization and (auth_info.expiry - time.time()) < TOKEN_REFRESH_THRESHOLD: print(f'authorizing NSL_ID: `{auth_info.nsl_id}`') @@ -398,15 +398,19 @@ def auth_header(self, nsl_id: str = None, profile_name: str = None): print(f"will attempt re-authorization in {ttl} minutes") return f"Bearer {auth_info.token}" - def get_credentials(self, nsl_id: str = None) -> Optional[AuthInfo]: + def get_credentials(self, nsl_id: str = None, profile_name: str = None) -> Optional[AuthInfo]: + if profile_name is not None: + nsl_id = self._profile_map.get(profile_name, None) return self._auth_info_map.get(nsl_id if nsl_id is not None else self.default_nsl_id, None) - def set_credentials(self, nsl_id: str, nsl_secret: str): + def set_credentials(self, nsl_id: str, nsl_secret: str, profile_name: str = None): if len(self._auth_info_map) == 0: self._default_nsl_id = nsl_id self._auth_info_map[nsl_id] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret) self._auth_info_map[nsl_id].authorize() + if profile_name is not None: + self._profile_map[profile_name] = nsl_id def unset_credentials(self, profile_name: str): nsl_id = self._profile_map.pop(profile_name) @@ -431,14 +435,14 @@ def loads(self) -> Dict[str, AuthInfo]: if not lines[i + 1].startswith('NSL_ID') or not lines[i + 2].startswith('NSL_SECRET'): raise ValueError("credentials should be of the format:\n[named profile]\nNSL_ID={your " "nsl id}\nNSL_SECRET={your nsl secret}") - # for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline + # for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline, and optionally the leading quote # .strip(), now we now [6:] starts after 'NSL_ID' .strip()[6:], strip potential whitespace # between NSL_ID and '=' with .strip()[6:].strip(), start one after equal # .strip()[6:].strip()[1:], strip potential whitespace # after equal .strip()[6:].strip()[1:].strip() profile_name = line.strip().lstrip('[').rstrip(']') - nsl_id = lines[i + 1].strip()[6:].strip()[1:].strip() - nsl_secret = lines[i + 2].strip()[10:].strip()[1:].strip() + nsl_id = lines[i + 1].strip()[6:].strip().strip('"')[1:].strip().strip('"') + nsl_secret = lines[i + 2].strip()[10:].strip().strip('"')[1:].strip().strip('"') output[profile_name] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret) return output @@ -448,8 +452,8 @@ def dumps(self): for profile_name, nsl_id in self._profile_map.items(): creds = self.get_credentials(nsl_id) file_obj.write(f'[{profile_name}]\n') - file_obj.write(f'NSL_ID={creds.nsl_id}\n') - file_obj.write(f'NSL_SECRET={creds.nsl_secret}\n') + file_obj.write(f'NSL_ID="{creds.nsl_id}"\n') + file_obj.write(f'NSL_SECRET="{creds.nsl_secret}"\n') file_obj.write('\n') file_obj.close() diff --git a/nsl/stac/client.py b/nsl/stac/client.py index 8f080df..c58b472 100644 --- a/nsl/stac/client.py +++ b/nsl/stac/client.py @@ -14,10 +14,11 @@ # # for additional information, contact: # info@nearspacelabs.com +import uuid import requests -from typing import Iterator, List +from typing import Iterator, List, Optional, Tuple from warnings import warn from epl.protobuf.v1 import stac_pb2 @@ -29,7 +30,7 @@ class NSLClient: - def __init__(self, nsl_only=True): + def __init__(self, nsl_only=True, nsl_id=None, profile_name=None): """ Create a client connection to a gRPC STAC service. nsl_only limits all queries to only return data from Near Space Labs. @@ -37,6 +38,10 @@ def __init__(self, nsl_only=True): """ self._stac_service = stac_singleton self._nsl_only = nsl_only + if profile_name: + nsl_id = bearer_auth._get_auth_info(profile_name=profile_name).nsl_id + if nsl_id: + bearer_auth._default_nsl_id = nsl_id @property def default_nsl_id(self): @@ -67,7 +72,8 @@ def search_one(self, stac_request: stac_pb2.StacRequest, timeout=15, nsl_id: str = None, - profile_name: str = None) -> stac_pb2.StacItem: + profile_name: str = None, + correlation_id: str = None) -> stac_pb2.StacItem: """ search for one item from the db that matches the stac request :param timeout: timeout for request @@ -77,20 +83,23 @@ def search_one(self, NSLClient object's set_credentials to set credentials :param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by using a different profile name + :param correlation_id: is a unique identifier that is added to the very first interaction (incoming request) + to identify the context and is passed to all components that are involved in the transaction flow :return: StacItem """ # limit to only search Near Space Labs SWIFT data if self._nsl_only: stac_request.mission_enum = stac_pb2.SWIFT - metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),) + metadata = self._grpc_headers(nsl_id, profile_name, correlation_id) return self._stac_service.stub.SearchOneItem(stac_request, timeout=timeout, metadata=metadata) def count(self, stac_request: stac_pb2.StacRequest, timeout=15, nsl_id: str = None, - profile_name: str = None) -> int: + profile_name: str = None, + correlation_id: str = None) -> int: """ count all the items in the database that match the stac request :param timeout: timeout for request @@ -100,13 +109,15 @@ def count(self, NSLClient object's set_credentials to set credentials :param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by using a different profile name + :param correlation_id: is a unique identifier that is added to the very first interaction (incoming request) + to identify the context and is passed to all components that are involved in the transaction flow :return: int """ # limit to only search Near Space Labs SWIFT data if self._nsl_only: stac_request.mission_enum = stac_pb2.SWIFT - metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),) + metadata = self._grpc_headers(nsl_id, profile_name, correlation_id) db_result = self._stac_service.stub.CountItems(stac_request, timeout=timeout, metadata=metadata) if db_result.status: # print db_result @@ -119,7 +130,9 @@ def search(self, nsl_id: str = None, profile_name: str = None, auto_paginate: bool = False, - only_accessible: bool = False) -> Iterator[stac_pb2.StacItem]: + only_accessible: bool = False, + page_size: int = 50, + correlation_id: str = None) -> Iterator[stac_pb2.StacItem]: """ search for stac items by using StacRequest. return a stream of StacItems :param timeout: timeout for request @@ -136,13 +149,16 @@ def search(self, - If set to `False` (the default), `stac_request.limit` and `stac_request.offset` can be used to manually page through StacItems. :param only_accessible: limits results to only StacItems downloadable by your level of sample/paid access + :param page_size: how many results to page at a time :return: stream of StacItems """ for item in self._search_all(stac_request, timeout, nsl_id=nsl_id, profile_name=profile_name, - auto_paginate=auto_paginate): + auto_paginate=auto_paginate, + page_size=page_size, + correlation_id=correlation_id): if not only_accessible or \ bearer_auth.is_valid_for(item_region(item), nsl_id=nsl_id, profile_name=profile_name): yield item @@ -151,8 +167,10 @@ def search_collections(self, collection_request: stac_pb2.CollectionRequest, timeout=15, nsl_id: str = None, - profile_name: str = None) -> Iterator[stac_pb2.Collection]: - metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),) + profile_name: str = None, + correlation_id: str = None) -> Iterator[stac_pb2.Collection]: + + metadata = self._grpc_headers(nsl_id, profile_name, correlation_id) for item in self._stac_service.stub.SearchCollections(collection_request, timeout=timeout, metadata=metadata): yield item @@ -173,7 +191,7 @@ def subscribe(self, if self._nsl_only: stac_request.mission_enum = stac_pb2.SWIFT res = requests.post(f'{AUTH0_TENANT}/subscription', - headers=NSLClient._json_headers(nsl_id, profile_name), + headers=self._json_headers(nsl_id, profile_name), json=dict(stac_request=utils.stac_request_to_b64(stac_request), destination=destination.to_json_str(), is_active=is_active)) @@ -186,7 +204,7 @@ def subscribe(self, def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None): """Reactivates a subscription with the given `sub_id`.""" res = requests.put(f'{AUTH0_TENANT}/subscription/{sub_id}', - headers=NSLClient._json_headers(nsl_id, profile_name)) + headers=self._json_headers(nsl_id, profile_name)) NSLClient._handle_json_response(res, 200) print(f'reactivated subscription with id: {sub_id}') @@ -195,7 +213,7 @@ def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None) def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None): """Deactivates a subscription with the given `sub_id`.""" res = requests.delete(f'{AUTH0_TENANT}/subscription/{sub_id}', - headers=NSLClient._json_headers(nsl_id, profile_name)) + headers=self._json_headers(nsl_id, profile_name)) NSLClient._handle_json_response(res, 202) print(f'deactivated subscription with id: {sub_id}') @@ -204,7 +222,7 @@ def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None) def subscriptions(self, nsl_id: str = None, profile_name: str = None) -> List[Subscription]: """Fetches all subscriptions.""" res = requests.get(f'{AUTH0_TENANT}/subscription', - headers=NSLClient._json_headers(nsl_id, profile_name)) + headers=self._json_headers(nsl_id, profile_name)) NSLClient._handle_json_response(res, 200) return list(Subscription(response_dict) for response_dict in res.json()['results']) @@ -214,48 +232,63 @@ def _search_all(self, timeout=15, nsl_id: str = None, profile_name: str = None, - auto_paginate: bool = False) -> Iterator[stac_pb2.StacItem]: + auto_paginate: bool = False, + page_size: int = 50, + correlation_id: str = None) -> Iterator[stac_pb2.StacItem]: # limit to only search Near Space Labs SWIFT data if self._nsl_only: stac_request.mission_enum = stac_pb2.SWIFT if not auto_paginate: - metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),) + metadata = self._grpc_headers(nsl_id, profile_name, correlation_id) for item in self._stac_service.stub.SearchItems(stac_request, timeout=timeout, metadata=metadata): if not item.id: - warn("STAC item missing STAC id; ending search") + warn(f"STAC item missing STAC id: \n{item};\n ending search") return else: yield item else: - limit = stac_request.limit if stac_request.limit > 0 else None + original_limit = stac_request.limit if stac_request.limit > 0 else None offset = stac_request.offset - page_size = 500 count = 0 - stac_request.limit = page_size - items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name)) + stac_request.limit = page_size if original_limit is None else max(original_limit, page_size) + items = list(self._search_all(stac_request, timeout=timeout, + nsl_id=nsl_id, profile_name=profile_name, + page_size=page_size, correlation_id=correlation_id)) while len(items) > 0: for item in items: - if limit is None or (limit is not None and count < limit): + if original_limit is None or (original_limit is not None and count < original_limit): yield item count += 1 - if limit is not None and count >= limit: + if original_limit is not None and count >= original_limit: break - if limit is not None and count >= limit: + if original_limit is not None and count >= original_limit: break - stac_request.offset += page_size - items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name)) + stac_request.offset += len(items) + items = list(self._search_all(stac_request, timeout=timeout, + nsl_id=nsl_id, profile_name=profile_name, + page_size=page_size, correlation_id=correlation_id)) stac_request.offset = offset - stac_request.limit = limit if limit is not None else 0 - - @staticmethod - def _json_headers(nsl_id: str = None, profile_name: str = None) -> dict: - return {'content-type': 'application/json', - 'Authorization': bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)} + stac_request.limit = original_limit if original_limit is not None else 0 + + def _json_headers(self, + nsl_id: str = None, + profile_name: str = None, + correlation_id: str = None) -> dict: + headers = {k: v for (k, v) in self._grpc_headers(nsl_id, profile_name, correlation_id)} + return {'content-type': 'application/json', **headers} + + def _grpc_headers(self, + nsl_id: str = None, + profile_name: str = None, + correlation_id: str = None) -> Tuple[Tuple[str, str], ...]: + correlation_id = str(uuid.uuid4()) if correlation_id is None else correlation_id + return (('x-correlation-id', correlation_id), + ('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name))) @staticmethod def _handle_json_response(res, status_code: int): diff --git a/nsl/stac/enum.py b/nsl/stac/enum.py index d39c24c..7ff4875 100644 --- a/nsl/stac/enum.py +++ b/nsl/stac/enum.py @@ -28,8 +28,8 @@ from enum import IntFlag -__all__ = ['AssetType', 'CloudPlatform', 'FilterRelationship', 'SortDirection', 'Platform', 'Constellation', 'Band', - 'Instrument', 'Mission'] +__all__ = ['AssetType', 'Band', 'CloudPlatform', 'Constellation', 'Mission', 'Instrument', 'Platform', + 'FilterRelationship', 'SortDirection'] class AssetType(IntFlag): @@ -73,12 +73,6 @@ class Band(IntFlag): LWIR_2 = _Eo.LWIR_2 -class SortDirection(IntFlag): - NOT_SORTED = _SortDirection.NOT_SORTED - DESC = _SortDirection.DESC - ASC = _SortDirection.ASC - - class CloudPlatform(IntFlag): UNKNOWN_CLOUD_PLATFORM = _CloudPlatform.UNKNOWN_CLOUD_PLATFORM AWS = _CloudPlatform.AWS @@ -99,21 +93,6 @@ class Mission(IntFlag): PNOA = _Mission.PNOA -class FilterRelationship(IntFlag): - EQ = _FilterRelationship.EQ - LTE = _FilterRelationship.LTE - GTE = _FilterRelationship.GTE - LT = _FilterRelationship.LT - GT = _FilterRelationship.GT - BETWEEN = _FilterRelationship.BETWEEN - NOT_BETWEEN = _FilterRelationship.NOT_BETWEEN - NEQ = _FilterRelationship.NEQ - IN = _FilterRelationship.IN - NOT_IN = _FilterRelationship.NOT_IN - LIKE = _FilterRelationship.LIKE - NOT_LIKE = _FilterRelationship.NOT_LIKE - - class Instrument(IntFlag): UNKNOWN_INSTRUMENT = _Instrument.UNKNOWN_INSTRUMENT OLI = _Instrument.OLI @@ -141,6 +120,27 @@ class Platform(IntFlag): SWIFT_3 = _Platform.SWIFT_3 +class SortDirection(IntFlag): + NOT_SORTED = _SortDirection.NOT_SORTED + DESC = _SortDirection.DESC + ASC = _SortDirection.ASC + + +class FilterRelationship(IntFlag): + EQ = _FilterRelationship.EQ + LTE = _FilterRelationship.LTE + GTE = _FilterRelationship.GTE + LT = _FilterRelationship.LT + GT = _FilterRelationship.GT + BETWEEN = _FilterRelationship.BETWEEN + NOT_BETWEEN = _FilterRelationship.NOT_BETWEEN + NEQ = _FilterRelationship.NEQ + IN = _FilterRelationship.IN + NOT_IN = _FilterRelationship.NOT_IN + LIKE = _FilterRelationship.LIKE + NOT_LIKE = _FilterRelationship.NOT_LIKE + + # Final check to make sure that all enums have complete definitions for the associated protobufs for enum_class_name in __all__: nsl_enum = getattr(sys.modules[__name__], enum_class_name) diff --git a/nsl/stac/experimental.py b/nsl/stac/experimental.py index 63e21d9..7479ce7 100644 --- a/nsl/stac/experimental.py +++ b/nsl/stac/experimental.py @@ -1,8 +1,11 @@ import pathlib import re +import typing +import uuid + from copy import deepcopy from datetime import date, datetime, timezone -from typing import Union, Iterator, List, Optional, Tuple, Dict, BinaryIO, IO, Set +from typing import BinaryIO, Dict, IO, Iterator, List, Optional, Set, Tuple, Union import boto3 import botocore.exceptions @@ -14,7 +17,7 @@ from nsl.stac import enum, utils, stac_service as stac_singleton, \ StacItem, StacRequest, Collection, CollectionRequest, View, ViewRequest, Mosaic, MosaicRequest, Eo, EoRequest, \ Extent, Interval, Provider, ProjectionData, GeometryData, EnvelopeData, \ - FloatFilter, Asset, StringFilter, TimestampFilter + Asset, FloatFilter, StringFilter, TimestampFilter from nsl.stac.client import NSLClient from nsl.stac.destinations import BaseDestination from nsl.stac.subscription import Subscription @@ -388,7 +391,7 @@ def _asset_types_match(desired_type: enum.AssetType, asset_type: enum.AssetType, return asset_type == desired_type @staticmethod - def _asset_type_details(asset_type: enum.AssetType, b_thumbnail_png: bool = True) -> (str, str): + def _asset_type_details(asset_type: enum.AssetType, b_thumbnail_png: bool = True) -> Tuple[str, str]: """ for asset type and bool, get the extension and href type :param asset_type: @@ -423,7 +426,11 @@ def _asset_type_details(asset_type: enum.AssetType, b_thumbnail_png: bool = True class _BaseWrap: - def __init__(self, stac_data, properties_func, type_url_prefix="nearspacelabs.com/proto/"): + def __init__(self, + stac_data, + properties_func, + type_url_prefix="nearspacelabs.com/proto/", + correlation_id: str = None): """ Whether it's a stac_request or a stac_item, allow for the repack_properties method to work :param stac_data: @@ -433,6 +440,7 @@ def __init__(self, stac_data, properties_func, type_url_prefix="nearspacelabs.co self.properties = None self._properties_func = properties_func self._type_url_prefix = type_url_prefix + self._correlation_id = correlation_id if correlation_id is not None else str(uuid.uuid4()) if stac_data is not None and properties_func is not None and stac_data.HasField("properties"): self.properties = properties_func() @@ -447,12 +455,12 @@ def __str__(self): def _set_properties(self, properties): self._stac_data, self.properties = _set_properties(self._stac_data, properties, self._type_url_prefix) - def _get_field(self, metadata_key: str, key: str): + def _get_field(self, metadata_key: str, key: str) -> Optional[typing.Any]: if self.properties.HasField(metadata_key): return getattr(getattr(self.properties, metadata_key), key) return None - def _get_wrapped_field(self, metadata_key: str, key: str): + def _get_wrapped_field(self, metadata_key: str, key: str) -> Optional[typing.Any]: if self.properties.HasField(metadata_key): return getattr(getattr(getattr(self.properties, metadata_key), key), "value") return None @@ -475,16 +483,24 @@ def _set_nested_obj(self, metadata_key: str, object_key: str, value_key: str, va def _set_nested_field(self, metadata_key: str, object_key: str, value_key: str, value): setattr(getattr(getattr(self.properties, metadata_key), object_key), value_key, value) - def _get_nested_field(self, metadata_key: str, object_key: str, value_key: str): + def _get_nested_field(self, metadata_key: str, object_key: str, value_key: str) -> Optional[typing.Any]: if self.properties.HasField(metadata_key): return getattr(getattr(getattr(self.properties, metadata_key), object_key), value_key) return None - def _get_nested_wrapped_field(self, metadata_key: str, object_key: str, value_key: str): + def _get_nested_wrapped_field(self, metadata_key: str, object_key: str, value_key: str) -> Optional[typing.Any]: if self.properties.HasField(metadata_key): return getattr(getattr(getattr(getattr(self.properties, metadata_key), object_key), value_key), "value") return None + @property + def correlation_id(self) -> str: + return self._correlation_id + + @correlation_id.setter + def correlation_id(self, correlation_id: str): + self._correlation_id = correlation_id + class StacItemWrap(_BaseWrap): """ @@ -497,7 +513,7 @@ def __eq__(self, other): return self.equals_pb(other.stac_item) - def __init__(self, stac_item: StacItem = None, properties_constructor=None): + def __init__(self, stac_item: StacItem = None, properties_constructor=None, correlation_id: str = None): self._assets = {} if stac_item is None: stac_data = StacItem() @@ -508,7 +524,7 @@ def __init__(self, stac_item: StacItem = None, properties_constructor=None): for asset_key in stac_data.assets: self._assets[asset_key] = AssetWrap(stac_data.assets[asset_key], asset_key=asset_key) - super().__init__(stac_data, properties_constructor) + super().__init__(stac_data=stac_data, properties_func=properties_constructor, correlation_id=correlation_id) if self.created is None: self.created = datetime.now(tz=timezone.utc) @@ -591,7 +607,7 @@ def created(self, value: Union[datetime, date]): @property def end_observed(self) -> Optional[datetime]: - return self.stac_item.end_observed + return self.stac_item.end_observation @end_observed.setter def end_observed(self, value: Union[datetime, date]): @@ -601,7 +617,7 @@ def end_observed(self, value: Union[datetime, date]): @property def feature(self): """ -geojson feature with geometry being only aspect defined + geojson feature with geometry being only aspect defined :return: """ return { @@ -730,15 +746,17 @@ def mosaic_name(self, name: str): @property def mosaic_quad_key(self) -> Optional[str]: """ -If the STAC item is a quad from a mosaic, then it has a quad key that defines the boundaries of the quad. The quad tree -definition is assumed to be the convention defined by Google Maps, based off of there Pseudo-Web Mercator projection. + If the STAC item is a quad from a mosaic, then it has a quad key that defines the boundaries of the quad. The + quad tree definition is assumed to be the convention defined by Google Maps, based off of there Pseudo-Web + Mercator projection. -An example quad key is '02313012030231'. Quads use 2-bit tile interleaved addresses. The first character defines the -largest quadrant (in this case 0 is upper left), the next character ('2') is the upper right quadrant of that first -quadrant, the 3rd character ('3') is the lower left quadrant of the previous quadrant and so on. + An example quad key is '02313012030231'. Quads use 2-bit tile interleaved addresses. The first character + defines the largest quadrant (in this case 0 is upper left), the next character ('2') is the upper right + quadrant of that first quadrant, the 3rd character ('3') is the lower left quadrant of the previous quadrant + and so on. -For more details on the quad tree tiling for maps use `openstreetmaps docs -` + For more details on the quad tree tiling for maps use `openstreetmaps docs + ` :return: """ if self.stac_item.HasField("mosaic"): @@ -795,8 +813,8 @@ def platform(self, value: enum.Platform): @property def provenance_ids(self) -> List[str]: """ -The stac_ids that went into creating the current mosaic. They are in the array in the order which they were used in -the mosaic + The stac_ids that went into creating the current mosaic. They are in the array in the order which they were + used in the mosaic :return: """ return self.stac_item.mosaic.provenance_ids @@ -804,8 +822,8 @@ def provenance_ids(self) -> List[str]: @property def proj(self) -> ProjectionData: """ -The projection for all assets of this STAC item. If an Asset has its own proj definition, -then that supersedes this projection definition. + The projection for all assets of this STAC item. If an Asset has its own proj definition, + then that supersedes this projection definition. :return: projection information """ return self.stac_item.proj @@ -860,7 +878,7 @@ def download_asset(self, def equals_pb(self, other: StacItem): """ -does the StacItemWrap equal a protobuf StacItem + does the StacItemWrap equal a protobuf StacItem :param other: :return: """ @@ -872,7 +890,7 @@ def has_asset(self, cloud_platform: enum.CloudPlatform = enum.CloudPlatform.UNKNOWN_CLOUD_PLATFORM, eo_bands: enum.Band = enum.Band.UNKNOWN_BAND, asset_regex: Dict = None, - b_relaxed_types: bool = False): + b_relaxed_types: bool = False) -> bool: if asset_key is not None and asset_key in self._assets: return True elif asset_key is not None and asset_key and asset_key not in self._assets: @@ -927,13 +945,36 @@ def get_asset(self, def check_assets_exist(self, b_raise) -> List[str]: return _check_assets_exist(self.stac_item, b_raise=b_raise) + def append_mosaic_provenance(self, provenance_stac_id: str): + """ + Add a provenance stac_id to a mosaic. The 0th item is the first item added to the mosaic, and is therefore the + bottom-most image in the mosaic. The -1 item is the last item added to the mosaic; ie the topmost image in the + mosaic. + :param provenance_stac_id: + """ + if not self.stac_item.HasField("mosaic"): + self.stac_item.mosaic.CopyFrom(Mosaic(provenance_ids=[provenance_stac_id])) + else: + self.stac_item.mosaic.provenance_ids.append(provenance_stac_id) + + def set_mosaic_datetime_range(self, d_start, d_end): + datetime_range = utils.datetime_range(d_start=d_start, d_end=d_end) + if not self.stac_item.HasField("mosaic"): + self.stac_item.mosaic.CopyFrom(Mosaic(observation_range=datetime_range)) + else: + self.stac_item.mosaic.observation_range.CopyFrom(datetime_range) + class StacRequestWrap(_BaseWrap): - def __init__(self, stac_request: StacRequest = None, properties_constructor=None, id: str = ""): + def __init__(self, + stac_request: StacRequest = None, + properties_constructor=None, + id: str = "", + correlation_id: str = None): if stac_request is None: stac_request = StacRequest(id=id) - super().__init__(stac_request, properties_constructor) + super().__init__(stac_data=stac_request, properties_func=properties_constructor, correlation_id=correlation_id) @property def bbox(self) -> EnvelopeData: @@ -1130,16 +1171,19 @@ def set_azimuth(self, if not self.stac_request.HasField("view"): self.stac_request.view.CopyFrom(ViewRequest()) - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.view.azimuth.CopyFrom(float_filter) def set_id_complex(self, value_set: Set[str], - rel_type: enum.FilterRelationship = enum.FilterRelationship.IN): + rel_type: enum.FilterRelationship = enum.FilterRelationship.IN, + sort_direction: enum.SortDirection = enum.SortDirection.NOT_SORTED): if len(self.id) > 0: self.id = '' - self.stac_request.id_complex.CopyFrom(StringFilter(set=list(value_set), rel_type=rel_type)) + self.stac_request.id_complex.CopyFrom(StringFilter(set=list(value_set), + rel_type=rel_type, + sort_direction=sort_direction)) def set_off_nadir(self, rel_type: enum.FilterRelationship, @@ -1150,7 +1194,7 @@ def set_off_nadir(self, if not self.stac_request.HasField("view"): self.stac_request.view.CopyFrom(ViewRequest()) - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.view.off_nadir.CopyFrom(float_filter) def set_sun_azimuth(self, @@ -1162,7 +1206,7 @@ def set_sun_azimuth(self, if not self.stac_request.HasField("view"): self.stac_request.view.CopyFrom(ViewRequest()) - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.view.sun_azimuth.CopyFrom(float_filter) def set_sun_elevation(self, @@ -1174,7 +1218,7 @@ def set_sun_elevation(self, if not self.stac_request.HasField("view"): self.stac_request.view.CopyFrom(ViewRequest()) - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.view.sun_elevation.CopyFrom(float_filter) def set_cloud_cover(self, @@ -1186,7 +1230,7 @@ def set_cloud_cover(self, if not self.stac_request.HasField("eo"): self.stac_request.eo.CopyFrom(EoRequest()) - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.eo.cloud_cover.CopyFrom(float_filter) def set_gsd(self, @@ -1195,7 +1239,7 @@ def set_gsd(self, start: float = None, end: float = None, sort_direction: enum.SortDirection = enum.SortDirection.NOT_SORTED): - float_filter = self._float_filter(rel_type, value, start, end, sort_direction) + float_filter = StacRequestWrap.new_float_filter(rel_type, value, start, end, sort_direction) self.stac_request.gsd.CopyFrom(float_filter) def set_observed(self, @@ -1306,12 +1350,12 @@ def merge_updated(self, sort_direction=sort_direction if sort_direction is not None else updated.sort_direction, tzinfo=tzinfo) - def _float_filter(self, - rel_type: enum.FilterRelationship, - value: float = None, - start: float = None, - end: float = None, - sort_direction: enum.SortDirection = enum.SortDirection.NOT_SORTED): + @staticmethod + def new_float_filter(rel_type: enum.FilterRelationship, + value: float = None, + start: float = None, + end: float = None, + sort_direction: enum.SortDirection = enum.SortDirection.NOT_SORTED): if value is not None: if start is not None or end is not None: raise ValueError("if value is defined, start and end cannot be used") @@ -1334,11 +1378,14 @@ def _float_filter(self, class CollectionRequestWrap(_BaseWrap): - def __init__(self, collection: CollectionRequest = None, id: str = ""): + def __init__(self, + collection: CollectionRequest = None, + id: str = "", + correlation_id: str = None): if collection is None: collection = CollectionRequest(id=id) - super().__init__(collection, None) + super().__init__(stac_data=collection, properties_func=None, correlation_id=correlation_id) @property def id(self) -> Optional[str]: return self.inner.id @@ -1471,13 +1518,13 @@ def inner(self) -> CollectionRequest: return self._stac_data class CollectionWrap(_BaseWrap): - def __init__(self, collection: Collection = None): + def __init__(self, collection: Collection = None, correlation_id: str = None): collection_data = Collection() collection_data.extent.CopyFrom(Extent()) if collection is not None: collection_data.CopyFrom(collection) - super().__init__(collection_data, None) + super().__init__(stac_data=collection_data, properties_func=None, correlation_id=correlation_id) @property def id(self) -> str: return self.inner.id @@ -1594,8 +1641,8 @@ def inner(self) -> Collection: return self._stac_data class NSLClientEx(NSLClient): - def __init__(self, nsl_only=False): - super().__init__(nsl_only=nsl_only) + def __init__(self, nsl_only=False, **kwargs): + super().__init__(nsl_only=nsl_only, **kwargs) self._internal_stac_service = stac_singleton def update_service_url(self, stac_service_url): @@ -1613,13 +1660,16 @@ def search_ex(self, nsl_id: str = None, profile_name: str = None, auto_paginate: bool = False, - only_accessible: bool = False) -> Iterator[StacItemWrap]: + only_accessible: bool = False, + page_size: int = 50) -> Iterator[StacItemWrap]: for stac_item in self.search(stac_request_wrapped.stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name, auto_paginate=auto_paginate, - only_accessible=only_accessible): + only_accessible=only_accessible, + page_size=page_size, + correlation_id=stac_request_wrapped.correlation_id): yield StacItemWrap(stac_item=stac_item) def feature_collection_ex(self, @@ -1647,7 +1697,8 @@ def search_one_ex(self, nsl_id: str = None, profile_name: str = None) -> Optional[StacItemWrap]: stac_item = self.search_one(stac_request=stac_request_wrapped.stac_request, - timeout=timeout, nsl_id=nsl_id, profile_name=profile_name) + timeout=timeout, nsl_id=nsl_id, profile_name=profile_name, + correlation_id=stac_request_wrapped.correlation_id) if not stac_item.id: return None return StacItemWrap(stac_item=stac_item) @@ -1658,7 +1709,8 @@ def count_ex(self, nsl_id: str = None, profile_name: str = None) -> int: return self.count(stac_request=stac_request_wrapped.stac_request, - timeout=timeout, nsl_id=nsl_id, profile_name=profile_name) + timeout=timeout, nsl_id=nsl_id, profile_name=profile_name, + correlation_id=stac_request_wrapped.correlation_id) def search_collections_ex(self, collection_request: CollectionRequestWrap, @@ -1668,7 +1720,8 @@ def search_collections_ex(self, for collection in self.search_collections(collection_request.inner, timeout=timeout, nsl_id=nsl_id, - profile_name=profile_name): + profile_name=profile_name, + correlation_id=collection_request.correlation_id): yield CollectionWrap(collection=collection) def subscribe_ex(self, diff --git a/nsl/stac/utils.py b/nsl/stac/utils.py index 7da5c5d..e1eb8ff 100644 --- a/nsl/stac/utils.py +++ b/nsl/stac/utils.py @@ -30,9 +30,11 @@ import botocore.client from google.cloud import storage from google.protobuf import timestamp_pb2, duration_pb2 +from tenacity import retry, stop_after_delay, wait_fixed +from epl.protobuf.v1.stac_pb2 import epl_dot_protobuf_dot_v1_dot_query__pb2 as query from nsl.stac import gcs_storage_client, bearer_auth, \ - StacItem, StacRequest, Asset, TimestampFilter, Eo, DatetimeRange, enum + StacItem, StacRequest, Asset, TimestampFilter, DatetimeRange, Eo, FloatFilter, enum from nsl.stac.enum import Band, CloudPlatform, FilterRelationship, SortDirection, AssetType DEFAULT_RGB = [Band.RED, Band.GREEN, Band.BLUE, Band.NIR] @@ -57,6 +59,7 @@ def get_blob_metadata(bucket: str, blob_name: str) -> storage.Blob: return bucket.get_blob(blob_name=blob_name.strip('/')) +@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5)) def download_gcs_object(bucket: str, blob_name: str, file_obj: IO[bytes] = None, @@ -524,7 +527,7 @@ def pb_timestamp(d_utc: Union[datetime.datetime, datetime.date], def datetime_from_pb_timestamp(ts: timestamp_pb2.Timestamp) -> datetime: - return datetime.datetime.fromtimestamp(ts.seconds + ts.nanos/1e9) + return datetime.datetime.utcfromtimestamp(ts.seconds + ts.nanos/1e9) def timezoned(d_utc: Union[datetime.datetime, datetime.date], @@ -578,3 +581,36 @@ def stac_request_from_b64(encoded: str) -> StacRequest: req = StacRequest() req.ParseFromString(base64.b64decode(bytes(encoded, encoding='ascii'))) return req + + +def stac_item_to_b64(item: StacItem) -> str: + return str(base64.b64encode(item.SerializeToString()), encoding='ascii') + + +def stac_item_from_b64(encoded: str) -> StacItem: + item = StacItem() + item.ParseFromString(base64.b64decode(bytes(encoded, encoding='ascii'))) + return item + + +def eval_float_filter(float_filter: FloatFilter, val: float) -> bool: + rel = float_filter.rel_type + + if rel == enum.FilterRelationship.EQ: + return val == float_filter.value + elif rel == enum.FilterRelationship.NEQ: + return val != float_filter.value + elif rel == enum.FilterRelationship.BETWEEN: + return float_filter.start < val < float_filter.end + elif rel == enum.FilterRelationship.NOT_BETWEEN: + return val < float_filter.start or val > float_filter.end + elif rel == enum.FilterRelationship.GTE: + return val >= float_filter.value + elif rel == enum.FilterRelationship.GT: + return val > float_filter.value + elif rel == enum.FilterRelationship.LTE: + return val <= float_filter.value + elif rel == enum.FilterRelationship.LT: + return val < float_filter.value + else: + raise ValueError(f"not currently evaluating float filters of type: {query.FilterRelationship.Name(rel)}") diff --git a/requirements.txt b/requirements.txt index 59386c5..a130c86 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ google-cloud-storage>=1.14.0 grpcio-tools~=1.33.0 protobuf~=3.19.0 requests -retry +shapely==1.8.5.post1 +tenacity==8.0.1 diff --git a/setup.py b/setup.py index 36c2ce0..d3025d3 100644 --- a/setup.py +++ b/setup.py @@ -34,17 +34,21 @@ 'author': 'David Raleigh', 'author_email': 'david@nearspacelabs.com', 'license': 'Apache 2.0', - 'version': '1.2.6', + 'version': '1.2.7', 'python_requires': '>3.6.0', 'packages': ['nsl.stac', 'nsl.stac.destinations'], 'install_requires': [ - 'boto3==1.16.10', - 'epl.protobuf.v1==1.0.4', - 'google-cloud-storage>=1.14.0', - 'grpcio-tools~=1.33.0', + # local + 'epl.protobuf.v1', + 'epl.geometry', + # third-party + 'boto3', + 'google-cloud-storage', + 'grpcio-tools==1.33.*', 'protobuf~=3.19.0', 'requests', - 'retry', + 'shapely', + 'tenacity', ], 'zip_safe': False } @@ -52,8 +56,9 @@ clssfrs = [ 'Programming Language :: Python', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', ] kwargs['classifiers'] = clssfrs