Skip to content

Commit

Permalink
#832: Restored geo bounding box computation for DISP-S1 SCIFLO PGE jo…
Browse files Browse the repository at this point in the history
…b submission
  • Loading branch information
philipjyoon committed Jun 18, 2024
1 parent ff0e6a2 commit 88d7ca4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
23 changes: 12 additions & 11 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from util.job_submitter import try_submit_mozart_job

from data_subscriber.cslc_utils import (localize_disp_frame_burst_hist, split_download_batch_id, get_prev_day_indices,
get_bounding_box_for_frame, parse_cslc_native_id, build_ccslc_m_index)
get_bounding_box_for_frame, parse_cslc_native_id, build_ccslc_m_index,
localize_frame_geo_json)

logger = logging.getLogger(__name__)

Expand All @@ -32,6 +33,7 @@ class AsfDaacCslcDownload(AsfDaacRtcDownload):
def __init__(self, provider):
super().__init__(provider)
self.disp_burst_map, self.burst_to_frames, self.datetime_to_frames = localize_disp_frame_burst_hist()
self.frame_geo_map = localize_frame_geo_json()
self.daac_s3_cred_settings_key = "CSLC_DOWNLOAD"

async def run_download(self, args, token, es_conn, netloc, username, password, cmr, job_id, rm_downloads_dir=True):
Expand All @@ -46,6 +48,9 @@ async def run_download(self, args, token, es_conn, netloc, username, password, c
latest_acq_cycle_index = 0
burst_id_set = set()

# All batches should have the same frame_id so we pick the first one
frame_id, _ = split_download_batch_id(args.batch_ids[0])

new_args = copy.deepcopy(args)

for batch_id in args.batch_ids:
Expand Down Expand Up @@ -169,13 +174,11 @@ async def run_download(self, args, token, es_conn, netloc, username, password, c
for iono_file in ionosphere_paths:
os.remove(iono_file)

# Determine M Compressed CSLCs by querying compressed cslc GRQ ES
# Determine M Compressed CSLCs by querying compressed cslc GRQ ES -------------->
# Uses ccslc_m_index field which looks like T100-213459-IW3_417 (burst_id_acquisition-cycle-index)
k, m = es_conn.get_k_and_m(args.batch_ids[0])
logger.info(f"{k=}, {m=}")

frame_id, _ = split_download_batch_id(args.batch_ids[0])

# Search for all previous M compressed CSLCs
prev_day_indices = await get_prev_day_indices(latest_acq_cycle_index, frame_id, self.disp_burst_map, args, token, cmr, settings)
for mm in range(m-1): # m parameter is inclusive of the current frame at hand
Expand All @@ -194,14 +197,12 @@ async def run_download(self, args, token, es_conn, netloc, username, password, c

for ccslc in ccslcs:
c_cslc_s3paths.extend(ccslc["_source"]["metadata"]["product_s3_paths"])
# <------------------------- Compressed CSLC look up

# Compute bounding box for frame. All batches should have the same frame_id so we pick the first one
#TODO: Renable this when we get epsg, xmin/max, and ymin/max values for the frame.
frame = self.disp_burst_map[int(frame_id)]
#bounding_box = get_bounding_box_for_frame(frame)
#print(f'{bounding_box=}')
# Look up bounding box for frame
bounding_box = get_bounding_box_for_frame(int(frame_id), self.frame_geo_map)
print(f'{bounding_box=}')

# TODO: This code differs from data_subscriber/rtc/rtc_job_submitter.py. Ideally both should be refactored into a common function
# Now submit DISP-S1 SCIFLO job
logger.info(f"Submitting DISP-S1 SCIFLO job")

Expand All @@ -226,7 +227,7 @@ async def run_download(self, args, token, es_conn, netloc, username, password, c
},
"FileName": product_id,
"id": product_id,
#"bounding_box": bounding_box, TODO: enable this when available
"bounding_box": bounding_box,
"save_compressed_slcs": save_compressed_slcs,
"Files": [
{
Expand Down
33 changes: 23 additions & 10 deletions data_subscriber/cslc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from collections import defaultdict
from datetime import datetime, timedelta
import dateutil

import boto3
from pyproj import Transformer

from data_subscriber.cmr import async_query_cmr, CMR_TIME_FORMAT, DateTimeRange
from util import datasets_json_util
from util.conf_util import SettingsConf

DISP_FRAME_BURST_MAP_HIST = 'opera-disp-s1-consistent-burst-ids-with-datetimes.json'
FRAME_GEO_SIMPLE_JSON = 'frame-geometries-simple.geojson'

class _HistBursts(object):
def __init__(self):
Expand All @@ -36,6 +35,10 @@ def localize_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):
localize_anc_json(file)
return process_disp_frame_burst_hist(file)

def localize_frame_geo_json(file = FRAME_GEO_SIMPLE_JSON):
localize_anc_json(file)
return process_frame_geo_json(file)

def _calculate_sensing_time_day_index(sensing_time, first_frame_time):
''' Return the day index of the sensing time relative to the first sensing time of the frame'''

Expand All @@ -60,7 +63,7 @@ def sensing_time_day_index(sensing_time, frame_number, frame_to_bursts):
return (_calculate_sensing_time_day_index(sensing_time, frame.sensing_datetimes[0]))

def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):
'''Process the disp frame burst map json file intended for historical processing only and return the data as a dictionary'''
'''Process the disp frame burst map json file intended and return 3 dictionaries'''

j = json.load(open(file))
frame_to_bursts = defaultdict(_HistBursts)
Expand Down Expand Up @@ -92,6 +95,16 @@ def process_disp_frame_burst_hist(file = DISP_FRAME_BURST_MAP_HIST):

return frame_to_bursts, burst_to_frames, datetime_to_frames

def process_frame_geo_json(file = FRAME_GEO_SIMPLE_JSON):
'''Process the frame-geometries-simple.geojson file as dictionary used for determining frame bounding box'''

frame_geo_map = {}
j = json.load(open(file))
for feature in j["features"]:
frame_geo_map[feature["id"]] = feature["geometry"]["coordinates"][0]

return frame_geo_map

def _parse_cslc_file_name(native_id):
dataset_json = datasets_json_util.DatasetsJson()
cslc_granule_regex = dataset_json.get("L2_CSLC_S1")["match_pattern"]
Expand Down Expand Up @@ -226,14 +239,14 @@ def split_download_batch_id(download_batch_id):
frame_id = download_batch_id.split("_")[-1]
return int(frame_id), None

def get_bounding_box_for_frame(frame):
"""Returns a bounding box for a given frame in the format of [xmin, ymin, xmax, ymax]"""

proj_from = f'EPSG:{frame.epsg}'
transformer = Transformer.from_crs(proj_from, "EPSG:4326")
def get_bounding_box_for_frame(frame_id, frame_geo_map):
"""Returns a bounding box for a given frame in the format of [xmin, ymin, xmax, ymax] in EPSG4326 coordinate system"""

xmin, ymin = transformer.transform(xx=frame.xmin, yy=frame.ymin)
xmax, ymax = transformer.transform(xx=frame.xmax, yy=frame.ymax)
coords = frame_geo_map[frame_id]
xmin = min([x for x, y in coords])
ymin = min([y for x, y in coords])
xmax = max([x for x, y in coords])
ymax = max([y for x, y in coords])

return [xmin, ymin, xmax, ymax]

11 changes: 10 additions & 1 deletion tests/data_subscriber/test_cslc_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,13 @@ def test_determine_k_cycle():
k_cycle = cslc_utils.determine_k_cycle("2017-05-10T23:05:49Z", 832, disp_burst_map_hist, 10, args, token, cmr, settings)
assert k_cycle == 0


def test_frame_geo_map():
"""Test that the frame geo simple map is correctly constructed"""
frame_geo_map = cslc_utils.process_frame_geo_json()
assert frame_geo_map[10859] == [[-101.239536, 20.325197], [-100.942045, 21.860135], [-98.526059, 21.55014], [-98.845633, 20.021978], [-101.239536, 20.325197]]

def test_frame_bounds():
"""Test that the frame geo simple map is correctly constructed"""
frame_geo_map = cslc_utils.process_frame_geo_json()
bounds = cslc_utils.get_bounding_box_for_frame(10859, frame_geo_map)
assert bounds == [-101.239536, 20.021978, -98.526059, 21.860135]

0 comments on commit 88d7ca4

Please sign in to comment.