Skip to content

Commit

Permalink
#832: Working towards using the new database form and dynamically cal…
Browse files Browse the repository at this point in the history
…culating k-cycle for historical and reprocessing modes
  • Loading branch information
philipjyoon committed Jun 12, 2024
1 parent a876819 commit 075a150
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 36 deletions.
8 changes: 5 additions & 3 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from util.conf_util import SettingsConf
from util.job_submitter import try_submit_mozart_job

from data_subscriber.cslc_utils import (localize_disp_frame_burst_json, split_download_batch_id,
from data_subscriber.cslc_utils import (localize_disp_frame_burst_hist, split_download_batch_id,
get_bounding_box_for_frame, parse_cslc_native_id, build_ccslc_m_index)

logger = logging.getLogger(__name__)
Expand All @@ -31,7 +31,7 @@ class AsfDaacCslcDownload(AsfDaacRtcDownload):

def __init__(self, provider):
super().__init__(provider)
self.disp_burst_map, self.burst_to_frame, metadata, version = localize_disp_frame_burst_json()
self.disp_burst_map, self.burst_to_frame, metadata, version = localize_disp_frame_burst_hist()
self.daac_s3_cred_settings_key = "CSLC_DOWNLOAD"

async def run_download(self, args, token, es_conn, netloc, username, password, job_id, rm_downloads_dir=True):
Expand Down Expand Up @@ -173,6 +173,8 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j
# Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index)
k, m = es_conn.get_k_and_m(args.batch_ids[0])
logger.info(f"{k=}, {m=}")

#TODO: Change this logic to query ES for the latest m compressed CSLCs, holding burst_id constant.
for mm in range(m-1): # m parameter is inclusive of the current frame at hand
acq_cycle_index = latest_acq_cycle_index - mm - 1
for burst_id in burst_id_set:
Expand Down Expand Up @@ -217,6 +219,7 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j
"FileName": product_id,
"id": product_id,
"bounding_box": bounding_box,
"save_compressed_slcs": save_compressed_slcs,
"Files": [
{
"FileName": PurePath(s3path).name,
Expand All @@ -231,7 +234,6 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j
}
}

# TODO: get rid of this print
#print(f"{product=}")

proc_mode_suffix = ""
Expand Down
13 changes: 13 additions & 0 deletions data_subscriber/cmr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
from datetime import datetime, timedelta
from enum import Enum
from typing import Iterable
from collections import namedtuple
import netrc

import dateutil.parser
from more_itertools import first_true

from data_subscriber.aws_token import supply_token
from data_subscriber.rtc import mgrs_bursts_collection_db_client as mbc_client
from rtc_utils import rtc_granule_regex
from tools.ops.cmr_audit import cmr_client
Expand All @@ -17,6 +20,8 @@
logger = logging.getLogger(__name__)
MAX_CHARS_PER_LINE = 250000 #This is the maximum number of characters per line you can display in cloudwatch logs

DateTimeRange = namedtuple("DateTimeRange", ["start_date", "end_date"])

class Collection(str, Enum):
HLSL30 = "HLSL30"
HLSS30 = "HLSS30"
Expand Down Expand Up @@ -88,6 +93,14 @@ class ProductType(str, Enum):
"DEFAULT": ["tif", "h5"]
}

def get_cmr_token(endpoint, settings):

cmr = settings["DAAC_ENVIRONMENTS"][endpoint]["BASE_URL"]
edl = settings["DAAC_ENVIRONMENTS"][endpoint]["EARTHDATA_LOGIN"]
username, _, password = netrc.netrc().authenticators(edl)
token = supply_token(edl, username, password)

return cmr, token

async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime, silent=False) -> list:
request_url = f"https://{cmr}/search/granules.umm_json"
Expand Down
63 changes: 49 additions & 14 deletions data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
import re
from collections import defaultdict
from types import SimpleNamespace
from datetime import datetime, timedelta
import dateutil

import boto3
from pyproj import Transformer

from data_subscriber.url import determine_acquisition_cycle
from data_subscriber.cmr import async_query_cmr, CMR_TIME_FORMAT, DateTimeRange
from util import datasets_json_util
from util.conf_util import SettingsConf

Expand All @@ -16,7 +16,7 @@
class _HistBursts(object):
def __init__(self):
self.frame_number = None
self.burst_ids = [] # Burst id string
self.burst_ids = set() # Burst ids as strings in a set
self.sensing_datetimes = [] # Sensing datetimes as datetime object, sorted
self.sensing_seconds_since_first = [] # Sensing time in seconds since the first sensing time
self.sensing_datetime_days_index = [] # Sensing time in days since the first sensing time, rounded to the nearest day
Expand All @@ -36,24 +36,29 @@ def localize_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):
localize_anc_json(file)
return process_disp_frame_burst_hist(file)

def sensing_time_day_index(sensing_time, frame_number, frame_to_bursts):
''' Return the day index of the sensing time relative to the first sensing time of the frame AND
seconds since the first sensing time of the frame'''
def _calculate_sensing_time_day_index(sensing_time, first_frame_time):
''' Return the day index of the sensing time relative to the first sensing time of the frame'''

frame = frame_to_bursts[frame_number]
delta = sensing_time - frame.sensing_datetimes[0]
delta = sensing_time - first_frame_time
seconds = int(delta.total_seconds())
day_index_high_precision = seconds / (24 * 3600)

# Sanity check of the day index, 1 minute tolerance 1 / 24 / 60 = 0.00069444444 ~= 0.0007
# Sanity check of the day index, 10 minute tolerance 10 / 24 / 60 = 0.0069444444 ~= 0.007
remainder = day_index_high_precision - int(day_index_high_precision)
assert not (remainder > 0.49993 and remainder < 0.50007), \
f"Potential ambiguous day index grouping: {frame=} {day_index_high_precision=}"
assert not (remainder > 0.493 and remainder < 0.507), \
f"Potential ambiguous day index grouping: {day_index_high_precision=}"

day_index = int(round(day_index_high_precision))

return day_index, seconds

def sensing_time_day_index(sensing_time, frame_number, frame_to_bursts):
''' Return the day index of the sensing time relative to the first sensing time of the frame AND
seconds since the first sensing time of the frame'''

frame = frame_to_bursts[frame_number]
return (_calculate_sensing_time_day_index(sensing_time, frame.sensing_datetimes[0]))

def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):
'''Process the disp frame burst map json file intended for historical processing only and return the data as a dictionary'''

Expand All @@ -68,14 +73,15 @@ def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):
b = frame_to_bursts[int(frame)].burst_ids
for burst in j[frame]["burst_id_list"]:
burst = burst.upper().replace("_", "-")
b.append(burst)
b.add(burst)

# Map from burst id to the frames
burst_to_frames[burst].append(int(frame))
assert len(burst_to_frames[burst]) <= 2 # A burst can belong to at most two frames

frame_to_bursts[int(frame)].sensing_datetimes =\
sorted([dateutil.parser.isoparse(t) for t in j[frame]["sensing_time_list"]])

for sensing_time in frame_to_bursts[int(frame)].sensing_datetimes:
day_index, seconds = sensing_time_day_index(sensing_time, int(frame), frame_to_bursts)
frame_to_bursts[int(frame)].sensing_seconds_since_first.append(seconds)
Expand All @@ -102,6 +108,35 @@ def determine_acquisition_cycle_cslc(acquisition_dts, frame_number, frame_to_bur

return day_index

async def determine_k_cycle(acquisition_dts, frame_number, frame_to_bursts, k, args, token, cmr, settings):
'''Return where in the k-cycle this acquisition falls for the frame_number
Returns integer between 0 and k-1 where 0 means that it's at the start of the cycle'''

frame = frame_to_bursts[frame_number]

day_index = determine_acquisition_cycle_cslc(acquisition_dts, frame_number, frame_to_bursts)

# If the day index is within the historical database it's much simpler
# ASSUMPTION: This is slow linear search but there will never be more than a couple hundred entries here so doesn't matter.
# Clearly if we somehow end up with like 1000
try:
index_number = frame.sensing_datetime_days_index.index(day_index) # note "index" is overloaded term here
return index_number % k
except ValueError:
# If not, we have to query CMR for all records for this frame, filter out ones that don't match the burst pattern,
# and then determine the k-cycle index
start_date = frame.sensing_datetimes[-1] + timedelta(minutes=30) # Make sure we are not counting this last sensing time cycle
end_date = dateutil.parser.isoparse(acquisition_dts[:-1])

# Add native-id condition in args
l, native_id = build_cslc_native_ids(frame_number, frame_to_bursts)
args.native_id = native_id

query_timerange = DateTimeRange(start_date, end_date)
granules = await async_query_cmr(args, token, cmr, settings, query_timerange, datetime.utcnow(), silent=True)

return -1

def parse_cslc_native_id(native_id, burst_to_frames, frame_to_bursts):
match_product_id = _parse_cslc_file_name(native_id)

Expand All @@ -128,10 +163,10 @@ def parse_cslc_burst_id(native_id):
def build_cslc_native_ids(frame, disp_burst_map):
"""Builds the native_id string for a given frame. The native_id string is used in the CMR query."""

native_ids = disp_burst_map[frame].burst_ids
native_ids = list(disp_burst_map[frame].burst_ids)
native_ids = sorted(native_ids) # Sort to just enforce consistency
return len(native_ids), "OPERA_L2_CSLC-S1_" + "*&native-id[]=OPERA_L2_CSLC-S1_".join(native_ids) + "*"


def build_cslc_static_native_ids(burst_ids):
"""
Builds the native_id string used with a CMR query for CSLC-S1 Static Layer
Expand Down
10 changes: 2 additions & 8 deletions data_subscriber/daac_data_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import boto3
import logging
import netrc
import re
import sys
import uuid
Expand All @@ -19,9 +18,8 @@
from data_subscriber.asf_cslc_download import AsfDaacCslcDownload
from data_subscriber.asf_rtc_download import AsfDaacRtcDownload
from data_subscriber.asf_slc_download import AsfDaacSlcDownload
from data_subscriber.aws_token import supply_token
from data_subscriber.cmr import (ProductType,
Provider,
Provider, get_cmr_token,
COLLECTION_TO_PROVIDER_TYPE_MAP,
COLLECTION_TO_PRODUCT_TYPE_MAP)
from data_subscriber.cslc.cslc_catalog import CSLCProductCatalog
Expand Down Expand Up @@ -89,11 +87,7 @@ async def run(argv: list[str]):
logger.info(f"{job_id=}")

settings = SettingsConf().cfg
cmr = settings["DAAC_ENVIRONMENTS"][args.endpoint]["BASE_URL"]

edl = settings["DAAC_ENVIRONMENTS"][args.endpoint]["EARTHDATA_LOGIN"]
username, _, password = netrc.netrc().authenticators(edl)
token = supply_token(edl, username, password)
cmr, token = get_cmr_token(args.endpoint, settings)

results = {}

Expand Down
7 changes: 2 additions & 5 deletions data_subscriber/query.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from collections import namedtuple, defaultdict
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
Expand All @@ -11,7 +11,7 @@
from more_itertools import chunked

from data_subscriber.cmr import (async_query_cmr,
ProductType,
ProductType, DateTimeRange,
COLLECTION_TO_PRODUCT_TYPE_MAP,
COLLECTION_TO_PROVIDER_TYPE_MAP)
from data_subscriber.geojson_utils import (localize_include_exclude,
Expand All @@ -25,9 +25,6 @@

logger = logging.getLogger(__name__)

DateTimeRange = namedtuple("DateTimeRange", ["start_date", "end_date"])


class CmrQuery:
def __init__(self, args, token, es_conn, cmr, job_id, settings):
self.args = args
Expand Down
1 change: 1 addition & 0 deletions product2dataset/product2dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def convert(
# last_date_time looks like this: "2024-04-18T00:00:00.000000Z"
acquisition_cycle = determine_acquisition_cycle_cslc(
ccslc_file["burst_id"], ccslc_file["last_date_time"], dataset_met_json["id"])
dataset_met_json["burst_id"] = ccslc_file["burst_id"]
dataset_met_json["acquisition_cycle"] = acquisition_cycle
dataset_met_json["ccslc_m_index"] = build_ccslc_m_index(ccslc_file["burst_id"], str(acquisition_cycle))

Expand Down
2 changes: 1 addition & 1 deletion tests/data_subscriber/test_cslc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from data_subscriber.parser import create_parser
from data_subscriber.cslc import cslc_query
from datetime import datetime
from data_subscriber.query import DateTimeRange
from data_subscriber.cmr import DateTimeRange

forward_arguments = ["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=forward", "--start-date=2021-01-24T23:00:00Z",\
"--end-date=2021-01-25T00:00:00Z", "--grace-mins=60", "--k=4", "--m=4"]
Expand Down
31 changes: 27 additions & 4 deletions tests/data_subscriber/test_cslc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from data_subscriber.parser import create_parser
import dateutil
from datetime import datetime
from data_subscriber.query import DateTimeRange
from data_subscriber.cmr import DateTimeRange, get_cmr_token
from util.conf_util import SettingsConf

hist_arguments = ["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=historical", "--start-date=2021-01-24T23:00:00Z",\
"--end-date=2021-01-24T23:00:00Z", "--frame-range=100,101"]
Expand All @@ -18,9 +19,12 @@
#TODO: So we may want to create different versions of this unit test, one for each version of the database json
def test_burst_map():
assert len(disp_burst_map_hist.keys()) == 1433
assert disp_burst_map_hist[46800].burst_ids == ["T175-374393-IW1","T175-374393-IW2","T175-374393-IW3","T175-374394-IW1",\
"T175-374394-IW2","T175-374394-IW3","T175-374395-IW1","T175-374395-IW2",\
"T175-374395-IW3"]
burst_set = set()
for burst in ["T175-374393-IW1", "T175-374393-IW2", "T175-374393-IW3", "T175-374394-IW1", \
"T175-374394-IW2", "T175-374394-IW3", "T175-374395-IW1", "T175-374395-IW2", \
"T175-374395-IW3"]:
burst_set.add(burst)
assert disp_burst_map_hist[46800].burst_ids.difference(burst_set) == set()
assert disp_burst_map_hist[46800].sensing_datetimes[0] == dateutil.parser.isoparse("2019-11-14T16:51:06")

assert len(disp_burst_map_hist[46799].burst_ids) == 15
Expand Down Expand Up @@ -102,3 +106,22 @@ def test_determine_acquisition_cycle_cslc():
acquisition_cycle = cslc_utils.determine_acquisition_cycle_cslc("2017-02-03T23:05:47", 832, disp_burst_map_hist)
assert acquisition_cycle == 216

def test_determine_k_cycle():
"""Test that the k cycle is correctly determined"""

args = create_parser().parse_args(["query", "-c", "OPERA_L2_CSLC-S1_V1", "--processing-mode=forward",
"--start-date=2021-01-24T23:00:00Z", "--end-date=2021-01-24T23:00:00Z"])

settings = SettingsConf().cfg
cmr, token = get_cmr_token(args.endpoint, settings)

k_cycle = cslc_utils.determine_k_cycle("2017-02-27T23:05:24Z", 831, disp_burst_map_hist, 10, args, token, cmr, settings)
assert k_cycle == 1

k_cycle = cslc_utils.determine_k_cycle("2016-07-02T23:05:46Z", 832, disp_burst_map_hist, 10, args, token, cmr, settings)
assert k_cycle == 0

k_cycle = cslc_utils.determine_k_cycle("2017-05-10T23:05:49Z", 832, disp_burst_map_hist, 10, args, token, cmr, settings)
assert k_cycle == 0


0 comments on commit 075a150

Please sign in to comment.