Skip to content

Commit

Permalink
Merge branch 'nearspacelabs:master' into feature/switch-message-type-…
Browse files Browse the repository at this point in the history
…env-var
  • Loading branch information
pantierra committed Feb 20, 2024
2 parents 39abf40 + e3a5695 commit 68092f9
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 137 deletions.
44 changes: 24 additions & 20 deletions nsl/stac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import storage as gcp_storage
from google.oauth2 import service_account
from retry import retry
from tenacity import retry, stop_after_delay, wait_fixed

from epl.protobuf.v1 import stac_service_pb2_grpc
from epl.protobuf.v1.geometry_pb2 import GeometryData, ProjectionData, EnvelopeData
Expand All @@ -45,13 +45,12 @@
LandsatRequest, Mosaic, MosaicRequest, DatetimeRange, View, ViewRequest, Extent, Interval, Provider

__all__ = [
'stac_service', 'url_to_channel', 'STAC_SERVICE',
'Collection', 'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
'GeometryData', 'ProjectionData', 'EnvelopeData',
'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
'StacItem', 'Asset', 'Eo', 'View', 'Mosaic', 'DatetimeRange', 'Extent', 'Interval', 'Provider',
'gcs_storage_client',
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'AuthInfo', 'bearer_auth'
'bearer_auth', 'gcs_storage_client', 'stac_service', 'url_to_channel',
'CollectionRequest', 'EoRequest', 'StacRequest', 'LandsatRequest', 'MosaicRequest', 'ViewRequest',
'Collection', 'Eo', 'StacItem', 'Mosaic', 'View', 'Asset',
'GeometryData', 'ProjectionData', 'EnvelopeData', 'FloatFilter', 'TimestampFilter', 'StringFilter', 'UInt32Filter',
'DatetimeRange', 'Extent', 'Interval', 'Provider',
'AUTH0_TENANT', 'API_AUDIENCE', 'ISSUER', 'STAC_SERVICE', 'AuthInfo',
]

CLOUD_PROJECT = os.getenv("CLOUD_PROJECT")
Expand Down Expand Up @@ -79,10 +78,11 @@
MAX_BACKOFF_MS = int(os.getenv('MAX_BACKOFF_MS', 4))
MULTIPLIER = int(os.getenv('MULTIPLIER', 4))

STAC_SERVICE = os.getenv('STAC_SERVICE', 'api.nearspacelabs.net:9090')
STAC_SERVICE_HOST = os.getenv('STAC_SERVICE_HOST', 'api.nearspacelabs.net')
STAC_SERVICE = os.getenv('STAC_SERVICE', f'{STAC_SERVICE_HOST}:9090')
BYTES_IN_MB = 1024 * 1024
# at this point only allowing 4 MB or smaller messages
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 20))
# at this point only allowing 10 MB or smaller messages
MESSAGE_SIZE_MB = int(os.getenv('MESSAGE_SIZE_MB', 10))
GRPC_CHANNEL_OPTIONS = [('grpc.max_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB),
('grpc.max_receive_message_length', MESSAGE_SIZE_MB * BYTES_IN_MB)]

Expand Down Expand Up @@ -299,7 +299,7 @@ def __init__(self, nsl_id: str, nsl_secret: str):
self.nsl_secret = nsl_secret

# this only retries if there's a timeout error
@retry(exceptions=requests.Timeout, delay=1, backoff=2, tries=4)
@retry(reraise=True, stop=stop_after_delay(3), wait=wait_fixed(0.5))
def authorize(self):
if self.skip_authorization:
return
Expand Down Expand Up @@ -388,7 +388,7 @@ def __init__(self, init=False):
def default_nsl_id(self):
return self._default_nsl_id

def auth_header(self, nsl_id: str = None, profile_name: str = None):
def auth_header(self, nsl_id: str = None, profile_name: str = None) -> str:
auth_info = self._get_auth_info(nsl_id, profile_name)
if not auth_info.skip_authorization and (auth_info.expiry - time.time()) < TOKEN_REFRESH_THRESHOLD:
print(f'authorizing NSL_ID: `{auth_info.nsl_id}`')
Expand All @@ -398,15 +398,19 @@ def auth_header(self, nsl_id: str = None, profile_name: str = None):
print(f"will attempt re-authorization in {ttl} minutes")
return f"Bearer {auth_info.token}"

def get_credentials(self, nsl_id: str = None) -> Optional[AuthInfo]:
def get_credentials(self, nsl_id: str = None, profile_name: str = None) -> Optional[AuthInfo]:
if profile_name is not None:
nsl_id = self._profile_map.get(profile_name, None)
return self._auth_info_map.get(nsl_id if nsl_id is not None else self.default_nsl_id, None)

def set_credentials(self, nsl_id: str, nsl_secret: str):
def set_credentials(self, nsl_id: str, nsl_secret: str, profile_name: str = None):
if len(self._auth_info_map) == 0:
self._default_nsl_id = nsl_id

self._auth_info_map[nsl_id] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
self._auth_info_map[nsl_id].authorize()
if profile_name is not None:
self._profile_map[profile_name] = nsl_id

def unset_credentials(self, profile_name: str):
nsl_id = self._profile_map.pop(profile_name)
Expand All @@ -431,14 +435,14 @@ def loads(self) -> Dict[str, AuthInfo]:
if not lines[i + 1].startswith('NSL_ID') or not lines[i + 2].startswith('NSL_SECRET'):
raise ValueError("credentials should be of the format:\n[named profile]\nNSL_ID={your "
"nsl id}\nNSL_SECRET={your nsl secret}")
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline
# for id like 'NSL_ID = all_the_id_text\n', first strip remove front whitespace and newline, and optionally the leading quote
# .strip(), now we now [6:] starts after 'NSL_ID' .strip()[6:], strip potential whitespace
# between NSL_ID and '=' with .strip()[6:].strip(), start one after equal
# .strip()[6:].strip()[1:], strip potential whitespace
# after equal .strip()[6:].strip()[1:].strip()
profile_name = line.strip().lstrip('[').rstrip(']')
nsl_id = lines[i + 1].strip()[6:].strip()[1:].strip()
nsl_secret = lines[i + 2].strip()[10:].strip()[1:].strip()
nsl_id = lines[i + 1].strip()[6:].strip().strip('"')[1:].strip().strip('"')
nsl_secret = lines[i + 2].strip()[10:].strip().strip('"')[1:].strip().strip('"')

output[profile_name] = AuthInfo(nsl_id=nsl_id, nsl_secret=nsl_secret)
return output
Expand All @@ -448,8 +452,8 @@ def dumps(self):
for profile_name, nsl_id in self._profile_map.items():
creds = self.get_credentials(nsl_id)
file_obj.write(f'[{profile_name}]\n')
file_obj.write(f'NSL_ID={creds.nsl_id}\n')
file_obj.write(f'NSL_SECRET={creds.nsl_secret}\n')
file_obj.write(f'NSL_ID="{creds.nsl_id}"\n')
file_obj.write(f'NSL_SECRET="{creds.nsl_secret}"\n')
file_obj.write('\n')
file_obj.close()

Expand Down
97 changes: 65 additions & 32 deletions nsl/stac/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
#
# for additional information, contact:
# info@nearspacelabs.com
import uuid

import requests

from typing import Iterator, List
from typing import Iterator, List, Optional, Tuple
from warnings import warn

from epl.protobuf.v1 import stac_pb2
Expand All @@ -29,14 +30,18 @@


class NSLClient:
def __init__(self, nsl_only=True):
def __init__(self, nsl_only=True, nsl_id=None, profile_name=None):
"""
Create a client connection to a gRPC STAC service. nsl_only limits all queries to only return data from Near
Space Labs.
:param nsl_only:
"""
self._stac_service = stac_singleton
self._nsl_only = nsl_only
if profile_name:
nsl_id = bearer_auth._get_auth_info(profile_name=profile_name).nsl_id
if nsl_id:
bearer_auth._default_nsl_id = nsl_id

@property
def default_nsl_id(self):
Expand Down Expand Up @@ -67,7 +72,8 @@ def search_one(self,
stac_request: stac_pb2.StacRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> stac_pb2.StacItem:
profile_name: str = None,
correlation_id: str = None) -> stac_pb2.StacItem:
"""
search for one item from the db that matches the stac request
:param timeout: timeout for request
Expand All @@ -77,20 +83,23 @@ def search_one(self,
NSLClient object's set_credentials to set credentials
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
using a different profile name
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
to identify the context and is passed to all components that are involved in the transaction flow
:return: StacItem
"""
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
return self._stac_service.stub.SearchOneItem(stac_request, timeout=timeout, metadata=metadata)

def count(self,
stac_request: stac_pb2.StacRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> int:
profile_name: str = None,
correlation_id: str = None) -> int:
"""
count all the items in the database that match the stac request
:param timeout: timeout for request
Expand All @@ -100,13 +109,15 @@ def count(self,
NSLClient object's set_credentials to set credentials
:param profile_name: if a ~/.nsl/credentials file exists, you can override the [default] credential usage, by
using a different profile name
:param correlation_id: is a unique identifier that is added to the very first interaction (incoming request)
to identify the context and is passed to all components that are involved in the transaction flow
:return: int
"""
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
db_result = self._stac_service.stub.CountItems(stac_request, timeout=timeout, metadata=metadata)
if db_result.status:
# print db_result
Expand All @@ -119,7 +130,9 @@ def search(self,
nsl_id: str = None,
profile_name: str = None,
auto_paginate: bool = False,
only_accessible: bool = False) -> Iterator[stac_pb2.StacItem]:
only_accessible: bool = False,
page_size: int = 50,
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
"""
search for stac items by using StacRequest. return a stream of StacItems
:param timeout: timeout for request
Expand All @@ -136,13 +149,16 @@ def search(self,
- If set to `False` (the default), `stac_request.limit` and `stac_request.offset` can be used to manually
page through StacItems.
:param only_accessible: limits results to only StacItems downloadable by your level of sample/paid access
:param page_size: how many results to page at a time
:return: stream of StacItems
"""
for item in self._search_all(stac_request,
timeout,
nsl_id=nsl_id,
profile_name=profile_name,
auto_paginate=auto_paginate):
auto_paginate=auto_paginate,
page_size=page_size,
correlation_id=correlation_id):
if not only_accessible or \
bearer_auth.is_valid_for(item_region(item), nsl_id=nsl_id, profile_name=profile_name):
yield item
Expand All @@ -151,8 +167,10 @@ def search_collections(self,
collection_request: stac_pb2.CollectionRequest,
timeout=15,
nsl_id: str = None,
profile_name: str = None) -> Iterator[stac_pb2.Collection]:
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
profile_name: str = None,
correlation_id: str = None) -> Iterator[stac_pb2.Collection]:

metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
for item in self._stac_service.stub.SearchCollections(collection_request, timeout=timeout, metadata=metadata):
yield item

Expand All @@ -173,7 +191,7 @@ def subscribe(self,
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT
res = requests.post(f'{AUTH0_TENANT}/subscription',
headers=NSLClient._json_headers(nsl_id, profile_name),
headers=self._json_headers(nsl_id, profile_name),
json=dict(stac_request=utils.stac_request_to_b64(stac_request),
destination=destination.to_json_str(),
is_active=is_active))
Expand All @@ -186,7 +204,7 @@ def subscribe(self,
def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
"""Reactivates a subscription with the given `sub_id`."""
res = requests.put(f'{AUTH0_TENANT}/subscription/{sub_id}',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 200)
print(f'reactivated subscription with id: {sub_id}')
Expand All @@ -195,7 +213,7 @@ def resubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None):
"""Deactivates a subscription with the given `sub_id`."""
res = requests.delete(f'{AUTH0_TENANT}/subscription/{sub_id}',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 202)
print(f'deactivated subscription with id: {sub_id}')
Expand All @@ -204,7 +222,7 @@ def unsubscribe(self, sub_id: str, nsl_id: str = None, profile_name: str = None)
def subscriptions(self, nsl_id: str = None, profile_name: str = None) -> List[Subscription]:
"""Fetches all subscriptions."""
res = requests.get(f'{AUTH0_TENANT}/subscription',
headers=NSLClient._json_headers(nsl_id, profile_name))
headers=self._json_headers(nsl_id, profile_name))

NSLClient._handle_json_response(res, 200)
return list(Subscription(response_dict) for response_dict in res.json()['results'])
Expand All @@ -214,48 +232,63 @@ def _search_all(self,
timeout=15,
nsl_id: str = None,
profile_name: str = None,
auto_paginate: bool = False) -> Iterator[stac_pb2.StacItem]:
auto_paginate: bool = False,
page_size: int = 50,
correlation_id: str = None) -> Iterator[stac_pb2.StacItem]:
# limit to only search Near Space Labs SWIFT data
if self._nsl_only:
stac_request.mission_enum = stac_pb2.SWIFT

if not auto_paginate:
metadata = (('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)),)
metadata = self._grpc_headers(nsl_id, profile_name, correlation_id)
for item in self._stac_service.stub.SearchItems(stac_request, timeout=timeout, metadata=metadata):
if not item.id:
warn("STAC item missing STAC id; ending search")
warn(f"STAC item missing STAC id: \n{item};\n ending search")
return
else:
yield item
else:
limit = stac_request.limit if stac_request.limit > 0 else None
original_limit = stac_request.limit if stac_request.limit > 0 else None
offset = stac_request.offset
page_size = 500
count = 0

stac_request.limit = page_size
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
stac_request.limit = page_size if original_limit is None else max(original_limit, page_size)
items = list(self._search_all(stac_request, timeout=timeout,
nsl_id=nsl_id, profile_name=profile_name,
page_size=page_size, correlation_id=correlation_id))
while len(items) > 0:
for item in items:
if limit is None or (limit is not None and count < limit):
if original_limit is None or (original_limit is not None and count < original_limit):
yield item
count += 1
if limit is not None and count >= limit:
if original_limit is not None and count >= original_limit:
break

if limit is not None and count >= limit:
if original_limit is not None and count >= original_limit:
break

stac_request.offset += page_size
items = list(self.search(stac_request, timeout=timeout, nsl_id=nsl_id, profile_name=profile_name))
stac_request.offset += len(items)
items = list(self._search_all(stac_request, timeout=timeout,
nsl_id=nsl_id, profile_name=profile_name,
page_size=page_size, correlation_id=correlation_id))

stac_request.offset = offset
stac_request.limit = limit if limit is not None else 0

@staticmethod
def _json_headers(nsl_id: str = None, profile_name: str = None) -> dict:
return {'content-type': 'application/json',
'Authorization': bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)}
stac_request.limit = original_limit if original_limit is not None else 0

def _json_headers(self,
nsl_id: str = None,
profile_name: str = None,
correlation_id: str = None) -> dict:
headers = {k: v for (k, v) in self._grpc_headers(nsl_id, profile_name, correlation_id)}
return {'content-type': 'application/json', **headers}

def _grpc_headers(self,
nsl_id: str = None,
profile_name: str = None,
correlation_id: str = None) -> Tuple[Tuple[str, str], ...]:
correlation_id = str(uuid.uuid4()) if correlation_id is None else correlation_id
return (('x-correlation-id', correlation_id),
('authorization', bearer_auth.auth_header(nsl_id=nsl_id, profile_name=profile_name)))

@staticmethod
def _handle_json_response(res, status_code: int):
Expand Down
Loading

0 comments on commit 68092f9

Please sign in to comment.