Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 7 - Remove wait for GITC response #30

Merged
merged 67 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
28d710c
fix type in readme
torimcd May 1, 2024
f398c3a
change task token to uuid in send to gitc
torimcd May 1, 2024
1f9cf71
remove wait for task token & gitc response handler from state machine
torimcd May 1, 2024
c587bb0
change gitc response handler to invoke save cma message
torimcd May 2, 2024
42a8706
fix typo
torimcd May 2, 2024
53c6d1e
fix context parameter
torimcd May 2, 2024
168013e
convert uuid to string
torimcd May 2, 2024
d264ee3
fix json typo
torimcd May 2, 2024
fc48c27
remove uuid lib to use stdlib uuid
torimcd May 2, 2024
f3ddcae
change identifier to image set name instead of uuid
torimcd May 2, 2024
49f0af2
remove uuid import
torimcd May 2, 2024
191f76d
fix json formatting of cma invoke
torimcd May 2, 2024
86492da
add granule concept ID to identifier
torimcd May 2, 2024
ca46a16
update image set name with granule conceptid
torimcd May 13, 2024
5953f73
remove whitespace
torimcd May 14, 2024
2969c68
move get umm json to utils and save gitc outgoing cnm
torimcd May 15, 2024
e84af9f
remove unused import
torimcd May 15, 2024
da12a51
try setting cmr query env based on stage
torimcd May 15, 2024
e4fb9d7
remove task token from tests
torimcd May 15, 2024
921e057
fix granule index
torimcd May 15, 2024
5f62390
change how image set name defined
torimcd May 15, 2024
3865176
add region to ssm client in utils
torimcd May 15, 2024
7f28ba9
add ssm parameters to gitc lambdas
torimcd May 15, 2024
cf90d3d
change region reference
torimcd May 15, 2024
8ee95ee
update how granule name referenced in send to gitc
torimcd May 16, 2024
46a9395
remove unused cmr var
torimcd May 16, 2024
8944b35
fix case
torimcd May 16, 2024
bf86736
fix cnm parsing
torimcd May 16, 2024
b146dcc
reformat save cnm to separate step
torimcd May 28, 2024
ad8ed6e
update tf vars for save cnm
torimcd May 28, 2024
33e2225
fix type in module definition
torimcd May 28, 2024
10d3113
fix cnm in cma
torimcd May 28, 2024
02f4d1b
fix collection reference in save cnm
torimcd May 29, 2024
7b1f4a6
add debugging log statements
torimcd May 29, 2024
8b56b7a
fix input
torimcd May 29, 2024
b02dd73
update save cnm input
torimcd May 29, 2024
eaddd63
debugging
torimcd May 29, 2024
8d1f031
debugging
torimcd May 29, 2024
49191ca
linting
torimcd May 29, 2024
6eded09
debugging
torimcd May 29, 2024
ba5fc2d
debugging
torimcd May 29, 2024
9fdc7fc
linting
torimcd May 29, 2024
bf25f52
fix state machine
torimcd May 29, 2024
03d421c
update state machine
torimcd May 29, 2024
37c5f3b
reorg state machine
torimcd May 29, 2024
74be4dd
missing comma
torimcd May 29, 2024
df784d2
move save cnm into map
torimcd Jun 4, 2024
554a5b6
fix state machine transitions
torimcd Jun 4, 2024
1c833ca
fix sm
torimcd Jun 4, 2024
45c0435
fix boolean
torimcd Jun 4, 2024
e05dd3f
debugging cnm input
torimcd Jun 4, 2024
1dd0b62
fix input
torimcd Jun 4, 2024
0f30ca9
change input
torimcd Jun 4, 2024
27f9615
debug input
torimcd Jun 4, 2024
bcc81e4
add cnm as config parameter
torimcd Jun 4, 2024
741a438
lint
torimcd Jun 4, 2024
9b551e9
fix cnm payload
torimcd Jun 4, 2024
04be710
remove debugging statements
torimcd Jun 4, 2024
9fb5af6
add prefix to cnm path and fix gitc response
torimcd Jun 4, 2024
323b567
increase handle gitc response timeout
torimcd Jun 4, 2024
344f54b
Merge branch 'develop' into feature/issue-7
torimcd Jun 4, 2024
0871338
update changelog
torimcd Jun 4, 2024
0543496
add original shortname as cnm prefix
torimcd Jun 4, 2024
267f59c
revert shortname and change collection ref to save cnm & cnm-r in sam…
torimcd Jun 5, 2024
abe609b
remove save cma lambda no longer used
torimcd Jun 5, 2024
b5398f3
remove EDL env params from sendtogitc
torimcd Jun 5, 2024
04e1396
change parsing of granule concept id
torimcd Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
### Deprecated
### Removed
- [issues/7](https://github.com/podaac/bignbit/issues/15): Remove the wait for GITC response
### Fixed
### Security

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ _Visual representation of the bignbit step function state machine:_
## MacOS

1. Install miniconda (or conda) and [poetry](https://python-poetry.org/)
2. Run `conda env create -f conda-environment.yml` to install GDAL
2. Run `conda env create -f conda-environment.yaml` to install GDAL
3. Activate the bignbit conda environment `conda activate bignbit`
4. Install python package and dependencies `poetry install`
5. Verify tests pass `poetry run pytest tests/`
13 changes: 10 additions & 3 deletions bignbit/build_image_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from cumulus_logger import CumulusLogger
from cumulus_process import Process

from bignbit.image_set import from_big_output, IncompleteImageSet
from bignbit.image_set import from_big_output, IncompleteImageSet, ImageSet

CUMULUS_LOGGER = CumulusLogger('build_image_sets')

Expand Down Expand Up @@ -52,11 +52,18 @@ def process(self):
del response_payload['big']
response_payload['pobit'] = []

for image_set in image_sets:
for big_image_set in image_sets:
pobit_image_set = ImageSet(
name=big_image_set.name + '_' + self.input['granules'][0]['cmrConceptId'],
image=big_image_set.image,
image_metadata=big_image_set.image_metadata,
world_file=big_image_set.world_file)

response_payload['pobit'].append({
'image_set': image_set._asdict(),
'image_set': pobit_image_set._asdict(),
'cmr_provider': self.config.get('cmr_provider'),
'collection_name': self.config.get('collection').get('name'),
'granule_ur': self.input['granules'][0]['granuleId']
})

return response_payload
Expand Down
32 changes: 2 additions & 30 deletions bignbit/get_granule_umm_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import os

import requests
from cumulus_logger import CumulusLogger
from cumulus_process import Process

Expand Down Expand Up @@ -32,39 +31,12 @@ def process(self):

"""
cmr_environment = self.config['cmr_environment']
cmr_link = self.input['granules'][0]['cmrLink']
cmr_concept_id = self.input['granules'][0]['cmrConceptId']

self.input['granule_umm_json'] = download_umm_json(cmr_link, cmr_environment)
self.input['granule_umm_json'] = utils.get_umm_json(cmr_concept_id, cmr_environment)
return self.input


def download_umm_json(cmr_link: str, cmr_environment: str) -> dict:
"""
Retrieve the umm-json document from the given cmr_link

Parameters
----------
cmr_link: str
Link to the umm-g for downloading

cmr_environment: str
CMR environment used to retrieve user token

Returns
-------
dict
The umm-json document
"""
edl_user, edl_pass = utils.get_edl_creds()
token = utils.get_cmr_user_token(edl_user, edl_pass, cmr_environment)

umm_json_response = requests.get(cmr_link, headers={'Authorization': f'Bearer {token}'}, timeout=10)
umm_json_response.raise_for_status()
umm_json = umm_json_response.json()

return umm_json


def lambda_handler(event, context):
"""handler that gets called by aws lambda
Parameters
Expand Down
27 changes: 16 additions & 11 deletions bignbit/handle_gitc_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import logging
import os
from json import loads
import boto3
from botocore.exceptions import ClientError

from bignbit import utils


def handler(event, _):
Expand Down Expand Up @@ -32,13 +32,18 @@ def handler(event, _):

for message in event["Records"]:
message_body = loads(message["body"])
task_token = message_body["identifier"]
client = boto3.client('stepfunctions')
try:
client.send_task_success(taskToken=task_token, output=json.dumps(message_body))
logger.info("Step function triggered for task token %s", task_token)
except ClientError:
logger.warning("Error sending task success for messageId %s task token %s",
message['messageId'], task_token,
exc_info=True)
gitc_id = message_body["identifier"]
collection_name = message_body["collection"]
cmr_env = os.environ['CMR_ENVIRONMENT']

granule_concept_id = gitc_id.rpartition('_')[-1]
umm_json = utils.get_umm_json(granule_concept_id, cmr_env)
granule_ur = umm_json['GranuleUR']

cnm_key_name = os.environ['POBIT_AUDIT_PATH_NAME'] + "/" + collection_name + "/" + granule_ur + "." + message_body['submissionTime'] + "." + "cnm-r.json"

utils.upload_cnm(os.environ['POBIT_AUDIT_BUCKET_NAME'], cnm_key_name, json.dumps(message_body))

logging.debug('CNM-R uploaded to s3 audit bucket for id %s', gitc_id)

return {"statusCode": 200, "body": "All good"}
39 changes: 23 additions & 16 deletions bignbit/save_cma_message.py → bignbit/save_cnm_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from cumulus_logger import CumulusLogger
from cumulus_process import Process

CUMULUS_LOGGER = CumulusLogger('save_cma_message')
CUMULUS_LOGGER = CumulusLogger('save_cmm_message')


class CMA(Process):
class CNM(Process):
"""
A cumulus message adapter
"""
Expand All @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):

def process(self):
"""
Upload CMA message into a s3 bucket
Upload CNM message into a s3 bucket

Returns
-------
Expand All @@ -30,37 +30,44 @@ def process(self):

"""
pobit_audit_bucket = self.config['pobit_audit_bucket']
cma_key_name = self.config['cma_key_name']
pobit_audit_path = self.config['pobit_audit_path']

upload_cma(pobit_audit_bucket, cma_key_name, self.input)
granule_ur = self.config['granule_ur']

cnm_content = self.config['cnm']
collection_name = cnm_content['collection']

cnm_key_name = pobit_audit_path + "/" + collection_name + "/" + granule_ur + "." + cnm_content['submissionTime'] + "." + "cnm.json"

upload_cnm(pobit_audit_bucket, cnm_key_name, cnm_content)

return self.input


def upload_cma(pobit_audit_bucket: str, cma_key_name: str, cma_content: dict):
def upload_cnm(pobit_audit_bucket: str, cnm_key_name: str, cnm_content: dict):
"""
Upload CMA message into a s3 bucket
Upload CNM message into a s3 bucket

Parameters
----------
pobit_audit_bucket: str
Bucket name containing where CMA should be uploaded
Bucket name containing where CNM should be uploaded

cma_key_name: str
cnm_key_name: str
Key to object location in bucket

cma_content: dict
The CMA message to upload
cnm_content: dict
The CNM message to upload

Returns
-------
None
"""
s3_client = boto3.client('s3')
s3_client.put_object(
Body=json.dumps(cma_content, default=str).encode("utf-8"),
Body=json.dumps(cnm_content, default=str).encode("utf-8"),
Bucket=pobit_audit_bucket,
Key=cma_key_name
Key=cnm_key_name
)


Expand All @@ -75,7 +82,7 @@ def lambda_handler(event, context):
Returns
----------
dict
A CMA json message
A CNM json message
"""
# pylint: disable=duplicate-code
levels = {
Expand All @@ -91,8 +98,8 @@ def lambda_handler(event, context):
CUMULUS_LOGGER.logger.level = levels.get(logging_level, 'info')
CUMULUS_LOGGER.setMetadata(event, context)

return CMA.cumulus_handler(event, context=context)
return CNM.cumulus_handler(event, context=context)


if __name__ == "__main__":
CMA()
CNM()
29 changes: 14 additions & 15 deletions bignbit/send_to_gitc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from bignbit.image_set import ImageSet, to_cnm_product_dict

REGION_NAME = 'us-west-2'
CUMULUS_LOGGER = CumulusLogger('send_to_gitc')

GIBS_REGION_ENV_NAME = "GIBS_REGION"
Expand Down Expand Up @@ -45,20 +44,19 @@ def process(self):
list of granules
"""

notification_id = ""
token = self.config.get('token')

if self.input is not None:
# Send ImageSet(s) to GITC for processing
collection_name = self.input.get('collection_name')
cmr_provider = self.input.get('cmr_provider')
image_set = ImageSet(**self.input['image_set'])
notification_id = notify_gitc(image_set, cmr_provider, token, collection_name)
gitc_id = image_set.name

cnm_message = notify_gitc(image_set, cmr_provider, gitc_id, collection_name)

return notification_id
return cnm_message


def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str):
def notify_gitc(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str):
"""
Builds and sends a CNM message to GITC

Expand All @@ -68,8 +66,8 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
The image set to send
cmr_provider: str
The provider sent in the CNM message
token: str
The token identifying this particular request to GITC
gitc_id: str
The unique identifier for this particular request to GITC
collection_name: str
Collection that this image set belongs to

Expand All @@ -82,7 +80,7 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
queue_url = os.environ.get(GIBS_SQS_URL_ENV_NAME)
CUMULUS_LOGGER.info(f'Sending SQS message to GITC for image {image_set.name}')

cnm = construct_cnm(image_set, cmr_provider, token, collection_name)
cnm = construct_cnm(image_set, cmr_provider, gitc_id, collection_name)

cnm_json = json.dumps(cnm)
sqs_message_params = {
Expand All @@ -99,10 +97,11 @@ def notify_gitc(image_set: ImageSet, cmr_provider: str, token: str, collection_n
response = sqs.send_message(**sqs_message_params)

CUMULUS_LOGGER.debug(f'SQS send_message output: {response}')
return cnm['identifier']

return cnm


def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection_name: str):
def construct_cnm(image_set: ImageSet, cmr_provider: str, gitc_id: str, collection_name: str):
"""
Construct the CNM message for GITC

Expand All @@ -112,8 +111,8 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection
ImageSet for one image to be sent to gibs
cmr_provider: str
The provider sent in the CNM message
token: str
The token identifying this particular request to GITC
gitc_id: str
The unique identifier for this particular request to GITC
collection_name: str
Collection that this image set belongs to

Expand All @@ -131,7 +130,7 @@ def construct_cnm(image_set: ImageSet, cmr_provider: str, token: str, collection
"duplicationid": image_set.name,
"collection": new_collection,
"submissionTime": submission_time,
"identifier": token,
"identifier": gitc_id,
"product": product,
'provider': cmr_provider
}
Expand Down
Loading