diff --git a/data_subscriber/asf_cslc_download.py b/data_subscriber/asf_cslc_download.py index f3ec94f1..d5888244 100644 --- a/data_subscriber/asf_cslc_download.py +++ b/data_subscriber/asf_cslc_download.py @@ -20,7 +20,7 @@ from util.conf_util import SettingsConf from util.job_submitter import try_submit_mozart_job -from data_subscriber.cslc_utils import (localize_disp_frame_burst_json, split_download_batch_id, +from data_subscriber.cslc_utils import (localize_disp_frame_burst_hist, split_download_batch_id, get_bounding_box_for_frame, parse_cslc_native_id, build_ccslc_m_index) logger = logging.getLogger(__name__) @@ -31,7 +31,7 @@ class AsfDaacCslcDownload(AsfDaacRtcDownload): def __init__(self, provider): super().__init__(provider) - self.disp_burst_map, self.burst_to_frame, metadata, version = localize_disp_frame_burst_json() + self.disp_burst_map, self.burst_to_frame, metadata, version = localize_disp_frame_burst_hist() self.daac_s3_cred_settings_key = "CSLC_DOWNLOAD" async def run_download(self, args, token, es_conn, netloc, username, password, job_id, rm_downloads_dir=True): @@ -173,6 +173,8 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j # Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index) k, m = es_conn.get_k_and_m(args.batch_ids[0]) logger.info(f"{k=}, {m=}") + + #TODO: Change this logic to query ES for the latest m compressed CSLCs, holding burst_id constant. for mm in range(m-1): # m parameter is inclusive of the current frame at hand acq_cycle_index = latest_acq_cycle_index - mm - 1 for burst_id in burst_id_set: @@ -217,6 +219,7 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j "FileName": product_id, "id": product_id, "bounding_box": bounding_box, + "save_compressed_slcs": save_compressed_slcs, "Files": [ { "FileName": PurePath(s3path).name, @@ -231,7 +234,6 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j } } - # TODO: get rid of this print #print(f"{product=}") proc_mode_suffix = "" diff --git a/data_subscriber/cmr.py b/data_subscriber/cmr.py index f9bbd3df..8bcce5ed 100644 --- a/data_subscriber/cmr.py +++ b/data_subscriber/cmr.py @@ -5,10 +5,13 @@ from datetime import datetime, timedelta from enum import Enum from typing import Iterable +from collections import namedtuple +import netrc import dateutil.parser from more_itertools import first_true +from data_subscriber.aws_token import supply_token from data_subscriber.rtc import mgrs_bursts_collection_db_client as mbc_client from rtc_utils import rtc_granule_regex from tools.ops.cmr_audit import cmr_client @@ -17,6 +20,8 @@ logger = logging.getLogger(__name__) MAX_CHARS_PER_LINE = 250000 #This is the maximum number of characters per line you can display in cloudwatch logs +DateTimeRange = namedtuple("DateTimeRange", ["start_date", "end_date"]) + class Collection(str, Enum): HLSL30 = "HLSL30" HLSS30 = "HLSS30" @@ -88,6 +93,14 @@ class ProductType(str, Enum): "DEFAULT": ["tif", "h5"] } +def get_cmr_token(endpoint, settings): + + cmr = settings["DAAC_ENVIRONMENTS"][endpoint]["BASE_URL"] + edl = settings["DAAC_ENVIRONMENTS"][endpoint]["EARTHDATA_LOGIN"] + username, _, password = netrc.netrc().authenticators(edl) + token = supply_token(edl, username, password) + + return cmr, token async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime, silent=False) -> list: request_url = f"https://{cmr}/search/granules.umm_json" diff --git a/data_subscriber/cslc_utils.py b/data_subscriber/cslc_utils.py index 69b39193..87037693 100644 --- a/data_subscriber/cslc_utils.py +++ b/data_subscriber/cslc_utils.py @@ -1,13 +1,13 @@ import json import re from collections import defaultdict -from types import SimpleNamespace +from datetime import datetime, timedelta import dateutil import boto3 from pyproj import Transformer -from data_subscriber.url import determine_acquisition_cycle +from data_subscriber.cmr import async_query_cmr, CMR_TIME_FORMAT, DateTimeRange from util import datasets_json_util from util.conf_util import SettingsConf @@ -16,7 +16,7 @@ class _HistBursts(object): def __init__(self): self.frame_number = None - self.burst_ids = [] # Burst id string + self.burst_ids = set() # Burst ids as strings in a set self.sensing_datetimes = [] # Sensing datetimes as datetime object, sorted self.sensing_seconds_since_first = [] # Sensing time in seconds since the first sensing time self.sensing_datetime_days_index = [] # Sensing time in days since the first sensing time, rounded to the nearest day @@ -36,24 +36,29 @@ def localize_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST): localize_anc_json(file) return process_disp_frame_burst_hist(file) -def sensing_time_day_index(sensing_time, frame_number, frame_to_bursts): - ''' Return the day index of the sensing time relative to the first sensing time of the frame AND - seconds since the first sensing time of the frame''' +def _calculate_sensing_time_day_index(sensing_time, first_frame_time): + ''' Return the day index of the sensing time relative to the first sensing time of the frame''' - frame = frame_to_bursts[frame_number] - delta = sensing_time - frame.sensing_datetimes[0] + delta = sensing_time - first_frame_time seconds = int(delta.total_seconds()) day_index_high_precision = seconds / (24 * 3600) - # Sanity check of the day index, 1 minute tolerance 1 / 24 / 60 = 0.00069444444 ~= 0.0007 + # Sanity check of the day index, 10 minute tolerance 10 / 24 / 60 = 0.0069444444 ~= 0.007 remainder = day_index_high_precision - int(day_index_high_precision) - assert not (remainder > 0.49993 and remainder < 0.50007), \ - f"Potential ambiguous day index grouping: {frame=} {day_index_high_precision=}" + assert not (remainder > 0.493 and remainder < 0.507), \ + f"Potential ambiguous day index grouping: {day_index_high_precision=}" day_index = int(round(day_index_high_precision)) return day_index, seconds +def sensing_time_day_index(sensing_time, frame_number, frame_to_bursts): + ''' Return the day index of the sensing time relative to the first sensing time of the frame AND + seconds since the first sensing time of the frame''' + + frame = frame_to_bursts[frame_number] + return (_calculate_sensing_time_day_index(sensing_time, frame.sensing_datetimes[0])) + def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST): '''Process the disp frame burst map json file intended for historical processing only and return the data as a dictionary''' @@ -68,7 +73,7 @@ def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST): b = frame_to_bursts[int(frame)].burst_ids for burst in j[frame]["burst_id_list"]: burst = burst.upper().replace("_", "-") - b.append(burst) + b.add(burst) # Map from burst id to the frames burst_to_frames[burst].append(int(frame)) @@ -76,6 +81,7 @@ def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST): frame_to_bursts[int(frame)].sensing_datetimes =\ sorted([dateutil.parser.isoparse(t) for t in j[frame]["sensing_time_list"]]) + for sensing_time in frame_to_bursts[int(frame)].sensing_datetimes: day_index, seconds = sensing_time_day_index(sensing_time, int(frame), frame_to_bursts) frame_to_bursts[int(frame)].sensing_seconds_since_first.append(seconds) @@ -102,6 +108,35 @@ def determine_acquisition_cycle_cslc(acquisition_dts, frame_number, frame_to_bur return day_index +async def determine_k_cycle(acquisition_dts, frame_number, frame_to_bursts, k, args, token, cmr, settings): + '''Return where in the k-cycle this acquisition falls for the frame_number + Returns integer between 0 and k-1 where 0 means that it's at the start of the cycle''' + + frame = frame_to_bursts[frame_number] + + day_index = determine_acquisition_cycle_cslc(acquisition_dts, frame_number, frame_to_bursts) + + # If the day index is within the historical database it's much simpler + # ASSUMPTION: This is slow linear search but there will never be more than a couple hundred entries here so doesn't matter. + # Clearly if we somehow end up with like 1000 + try: + index_number = frame.sensing_datetime_days_index.index(day_index) # note "index" is overloaded term here + return index_number % k + except ValueError: + # If not, we have to query CMR for all records for this frame, filter out ones that don't match the burst pattern, + # and then determine the k-cycle index + start_date = frame.sensing_datetimes[-1] + timedelta(minutes=30) # Make sure we are not counting this last sensing time cycle + end_date = dateutil.parser.isoparse(acquisition_dts[:-1]) + + # Add native-id condition in args + l, native_id = build_cslc_native_ids(frame_number, frame_to_bursts) + args.native_id = native_id + + query_timerange = DateTimeRange(start_date, end_date) + granules = await async_query_cmr(args, token, cmr, settings, query_timerange, datetime.utcnow(), silent=True) + + return -1 + def parse_cslc_native_id(native_id, burst_to_frames, frame_to_bursts): match_product_id = _parse_cslc_file_name(native_id) @@ -128,10 +163,10 @@ def parse_cslc_burst_id(native_id): def build_cslc_native_ids(frame, disp_burst_map): """Builds the native_id string for a given frame. The native_id string is used in the CMR query.""" - native_ids = disp_burst_map[frame].burst_ids + native_ids = list(disp_burst_map[frame].burst_ids) + native_ids = sorted(native_ids) # Sort to just enforce consistency return len(native_ids), "OPERA_L2_CSLC-S1_" + "*&native-id[]=OPERA_L2_CSLC-S1_".join(native_ids) + "*" - def build_cslc_static_native_ids(burst_ids): """ Builds the native_id string used with a CMR query for CSLC-S1 Static Layer diff --git a/data_subscriber/daac_data_subscriber.py b/data_subscriber/daac_data_subscriber.py index 48a84d8b..211330ea 100644 --- a/data_subscriber/daac_data_subscriber.py +++ b/data_subscriber/daac_data_subscriber.py @@ -3,7 +3,6 @@ import asyncio import boto3 import logging -import netrc import re import sys import uuid @@ -19,9 +18,8 @@ from data_subscriber.asf_cslc_download import AsfDaacCslcDownload from data_subscriber.asf_rtc_download import AsfDaacRtcDownload from data_subscriber.asf_slc_download import AsfDaacSlcDownload -from data_subscriber.aws_token import supply_token from data_subscriber.cmr import (ProductType, - Provider, + Provider, get_cmr_token, COLLECTION_TO_PROVIDER_TYPE_MAP, COLLECTION_TO_PRODUCT_TYPE_MAP) from data_subscriber.cslc.cslc_catalog import CSLCProductCatalog @@ -89,11 +87,7 @@ async def run(argv: list[str]): logger.info(f"{job_id=}") settings = SettingsConf().cfg - cmr = settings["DAAC_ENVIRONMENTS"][args.endpoint]["BASE_URL"] - - edl = settings["DAAC_ENVIRONMENTS"][args.endpoint]["EARTHDATA_LOGIN"] - username, _, password = netrc.netrc().authenticators(edl) - token = supply_token(edl, username, password) + cmr, token = get_cmr_token(args.endpoint, settings) results = {} diff --git a/data_subscriber/query.py b/data_subscriber/query.py index 50913ad4..dc5875f6 100644 --- a/data_subscriber/query.py +++ b/data_subscriber/query.py @@ -1,7 +1,7 @@ import asyncio import logging import uuid -from collections import namedtuple, defaultdict +from collections import defaultdict from datetime import datetime, timedelta from functools import partial from pathlib import Path @@ -11,7 +11,7 @@ from more_itertools import chunked from data_subscriber.cmr import (async_query_cmr, - ProductType, + ProductType, DateTimeRange, COLLECTION_TO_PRODUCT_TYPE_MAP, COLLECTION_TO_PROVIDER_TYPE_MAP) from data_subscriber.geojson_utils import (localize_include_exclude, @@ -25,9 +25,6 @@ logger = logging.getLogger(__name__) -DateTimeRange = namedtuple("DateTimeRange", ["start_date", "end_date"]) - - class CmrQuery: def __init__(self, args, token, es_conn, cmr, job_id, settings): self.args = args diff --git a/product2dataset/product2dataset.py b/product2dataset/product2dataset.py index 3803ab80..83a3243a 100644 --- a/product2dataset/product2dataset.py +++ b/product2dataset/product2dataset.py @@ -218,6 +218,7 @@ def convert( # last_date_time looks like this: "2024-04-18T00:00:00.000000Z" acquisition_cycle = determine_acquisition_cycle_cslc( ccslc_file["burst_id"], ccslc_file["last_date_time"], dataset_met_json["id"]) + dataset_met_json["burst_id"] = ccslc_file["burst_id"] dataset_met_json["acquisition_cycle"] = acquisition_cycle dataset_met_json["ccslc_m_index"] = build_ccslc_m_index(ccslc_file["burst_id"], str(acquisition_cycle)) diff --git a/tests/data_subscriber/test_cslc_query.py b/tests/data_subscriber/test_cslc_query.py index c615bacd..cee505b3 100644 --- a/tests/data_subscriber/test_cslc_query.py +++ b/tests/data_subscriber/test_cslc_query.py @@ -7,7 +7,7 @@ from data_subscriber.parser import create_parser from data_subscriber.cslc import cslc_query from datetime import datetime -from data_subscriber.query import DateTimeRange +from data_subscriber.cmr import DateTimeRange forward_arguments = ["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=forward", "--start-date=2021-01-24T23:00:00Z",\ "--end-date=2021-01-25T00:00:00Z", "--grace-mins=60", "--k=4", "--m=4"] diff --git a/tests/data_subscriber/test_cslc_util.py b/tests/data_subscriber/test_cslc_util.py index 037f9dba..16078e9f 100644 --- a/tests/data_subscriber/test_cslc_util.py +++ b/tests/data_subscriber/test_cslc_util.py @@ -7,7 +7,8 @@ from data_subscriber.parser import create_parser import dateutil from datetime import datetime -from data_subscriber.query import DateTimeRange +from data_subscriber.cmr import DateTimeRange, get_cmr_token +from util.conf_util import SettingsConf hist_arguments = ["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=historical", "--start-date=2021-01-24T23:00:00Z",\ "--end-date=2021-01-24T23:00:00Z", "--frame-range=100,101"] @@ -18,9 +19,12 @@ #TODO: So we may want to create different versions of this unit test, one for each version of the database json def test_burst_map(): assert len(disp_burst_map_hist.keys()) == 1433 - assert disp_burst_map_hist[46800].burst_ids == ["T175-374393-IW1","T175-374393-IW2","T175-374393-IW3","T175-374394-IW1",\ - "T175-374394-IW2","T175-374394-IW3","T175-374395-IW1","T175-374395-IW2",\ - "T175-374395-IW3"] + burst_set = set() + for burst in ["T175-374393-IW1", "T175-374393-IW2", "T175-374393-IW3", "T175-374394-IW1", \ + "T175-374394-IW2", "T175-374394-IW3", "T175-374395-IW1", "T175-374395-IW2", \ + "T175-374395-IW3"]: + burst_set.add(burst) + assert disp_burst_map_hist[46800].burst_ids.difference(burst_set) == set() assert disp_burst_map_hist[46800].sensing_datetimes[0] == dateutil.parser.isoparse("2019-11-14T16:51:06") assert len(disp_burst_map_hist[46799].burst_ids) == 15 @@ -102,3 +106,22 @@ def test_determine_acquisition_cycle_cslc(): acquisition_cycle = cslc_utils.determine_acquisition_cycle_cslc("2017-02-03T23:05:47", 832, disp_burst_map_hist) assert acquisition_cycle == 216 +def test_determine_k_cycle(): + """Test that the k cycle is correctly determined""" + + args = create_parser().parse_args(["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=forward", + "--start-date=2021-01-24T23:00:00Z", "--end-date=2021-01-24T23:00:00Z"]) + + settings = SettingsConf().cfg + cmr, token = get_cmr_token(args.endpoint, settings) + + k_cycle = cslc_utils.determine_k_cycle("2017-02-27T23:05:24Z", 831, disp_burst_map_hist, 10, args, token, cmr, settings) + assert k_cycle == 1 + + k_cycle = cslc_utils.determine_k_cycle("2016-07-02T23:05:46Z", 832, disp_burst_map_hist, 10, args, token, cmr, settings) + assert k_cycle == 0 + + k_cycle = cslc_utils.determine_k_cycle("2017-05-10T23:05:49Z", 832, disp_burst_map_hist, 10, args, token, cmr, settings) + assert k_cycle == 0 + + diff --git a/tools/disp_s1_burst_db_tool.py b/tools/disp_s1_burst_db_tool.py index 3fdce288..5ccb4ef0 100644 --- a/tools/disp_s1_burst_db_tool.py +++ b/tools/disp_s1_burst_db_tool.py @@ -1,12 +1,20 @@ #!/usr/bin/env python3 +import asyncio +import logging from data_subscriber import cslc_utils from datetime import datetime import argparse +from util.conf_util import SettingsConf +from data_subscriber.cmr import get_cmr_token +from data_subscriber.parser import create_parser ''' Tool to query the DISP S1 burst database The burst database file must be in the same directory as this script''' +logging.basicConfig(level="INFO") +logger = logging.getLogger(__name__) + parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="subparser_name", required=True) @@ -16,6 +24,7 @@ server_parser = subparsers.add_parser("native_id", help="Print information based on native_id") server_parser.add_argument("id", help="The CSLC native id from CMR") +server_parser.add_argument("--k", dest="k", help="If the k parameter is provided, the k-cycle of this granule is computed", required=False) server_parser = subparsers.add_parser("frame", help="Print information based on frame") server_parser.add_argument("number", help="The frame number") @@ -38,6 +47,17 @@ disp_burst_map, burst_to_frames, day_indices_to_frames = cslc_utils.process_disp_frame_burst_hist(cslc_utils.DISP_FRAME_BURST_MAP_HIST) +async def get_k_cycle(acquisition_dts, frame_id, disp_burst_map, k): + + subs_args = create_parser().parse_args(["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=forward"]) + + settings = SettingsConf().cfg + cmr, token = get_cmr_token(subs_args.endpoint, settings) + + k_cycle = await cslc_utils.determine_k_cycle(acquisition_dts, frame_id, disp_burst_map, k, subs_args, token, cmr, settings) + + return k_cycle + if args.subparser_name == "list": l = list(disp_burst_map.keys()) print("Frame numbers (%d): \n" % len(l), l) @@ -67,6 +87,15 @@ print("Acquisition cycles: ", acquisition_cycles) print("Frame ids: ", frame_ids) + if args.k: + k = int(args.k) + + k_cycle = asyncio.run(get_k_cycle(acquisition_dts, frame_ids[0], disp_burst_map, k)) + if (k_cycle >= 0): + print(f"K-cycle: {k_cycle} out of {k}") + else: + print("K-cycle can not computed") + elif args.subparser_name == "frame": frame_number = int(args.number) if frame_number not in disp_burst_map.keys(): @@ -104,4 +133,3 @@ print("\tSensing datetime: ", t.isoformat()) print("\tBurst ids (%d):" % len(disp_burst_map[frame_number].burst_ids)) print("\t", disp_burst_map[frame_number].burst_ids) -