Skip to content

Commit

Permalink
Merge branch 'develop' into 832_disp_s1_trigger_major_design_change
Browse files Browse the repository at this point in the history
  • Loading branch information
philipjyoon committed Jun 11, 2024
2 parents 805c238 + c51748a commit a876819
Show file tree
Hide file tree
Showing 18 changed files with 263 additions and 102 deletions.
2 changes: 1 addition & 1 deletion cluster_provisioning/dev-e2e/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ variable "pge_releases" {
"cslc_s1" = "2.1.1"
"rtc_s1" = "2.1.1"
"dswx_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.2"
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster_provisioning/dev-int/override.tf
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ variable "pge_releases" {
"cslc_s1" = "2.1.1"
"rtc_s1" = "2.1.1"
"dswx_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.2"
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster_provisioning/dev/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ variable "pge_releases" {
"cslc_s1" = "2.1.1"
"rtc_s1" = "2.1.1"
"dswx_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.2"
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster_provisioning/ebs-snapshot/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ variable "pge_releases" {
"cslc_s1" = "2.1.1"
"rtc_s1" = "2.1.1"
"dswx_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.2"
}
}

Expand Down
2 changes: 1 addition & 1 deletion cluster_provisioning/modules/common/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ variable "pge_releases" {
"cslc_s1" = "2.1.1"
"rtc_s1" = "2.1.1"
"dswx_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.1"
"disp_s1" = "3.0.0-rc.2.2"
}
}

Expand Down
102 changes: 75 additions & 27 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@

import copy
import logging
import os

from os.path import basename
from pathlib import PurePath, Path
import urllib.parse
from datetime import datetime, timezone
import boto3

from data_subscriber import ionosphere_download, es_conn_util
from data_subscriber.cmr import Collection
Expand Down Expand Up @@ -55,24 +56,56 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j
latest_acq_cycle_index = max(latest_acq_cycle_index, int(acq_cycle_index))

new_args.batch_ids = [batch_id]
granule_sizes = []

# First, download the files from ASF
cslc_products_to_filepaths: dict[str, set[Path]] = await super().run_download(
new_args, token, es_conn, netloc, username, password, job_id, rm_downloads_dir=False
)

logger.info(f"Uploading CSLC input files to S3")
cslc_files_to_upload = [fp for fp_set in cslc_products_to_filepaths.values() for fp in fp_set]
#TODO: uncomment this
cslc_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
files=cslc_files_to_upload))
# Download the files from ASF only if the transfer protocol is HTTPS
if args.transfer_protocol == "https":
cslc_products_to_filepaths: dict[str, set[Path]] = await super().run_download(
new_args, token, es_conn, netloc, username, password, job_id, rm_downloads_dir=False
)
logger.info(f"Uploading CSLC input files to S3")
cslc_files_to_upload = [fp for fp_set in cslc_products_to_filepaths.values() for fp in fp_set]
cslc_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
files=cslc_files_to_upload))

for granule_id, fp_set in cslc_products_to_filepaths.items():
filepath = list(fp_set)[0]
file_size = os.path.getsize(filepath)
granule_sizes.append((granule_id, file_size))

# For s3 we can use the files directly so simply copy over the paths
else: # s3 or auto
logger.info("Skipping download CSLC bursts and instead using ASF S3 paths for direct SCIFLO PGE ingestion")
downloads = self.get_downloads(args, es_conn)
cslc_s3paths = [download["s3_url"] for download in downloads]
if len(cslc_s3paths) == 0:
raise Exception(f"No s3_path found for {batch_id}. You probably should specify https transfer protocol.")

for p in cslc_s3paths:
# Split the following into bucket name and key
# 's3://asf-cumulus-prod-opera-products/OPERA_L2_CSLC-S1/OPERA_L2_CSLC-S1_T122-260026-IW3_20231214T011435Z_20231215T075814Z_S1A_VV_v1.0/OPERA_L2_CSLC-S1_T122-260026-IW3_20231214T011435Z_20231215T075814Z_S1A_VV_v1.0.h5'
parsed_url = urllib.parse.urlparse(p)
bucket = parsed_url.netloc
key = parsed_url.path[1:]
granule_id = p.split("/")[-1]

try:
head_object = boto3.client("s3").head_object(Bucket=bucket, Key=key)
except Exception as e:
logger.error("Failed when accessing the S3 object:" + p)
raise e
file_size = int(head_object["ContentLength"])

granule_sizes.append((granule_id, file_size))

cslc_files_to_upload = [Path(p) for p in cslc_s3paths] # Need this for querying static CSLCs

cslc_products_to_filepaths = {} # Dummy when trying to delete files later in this function

# Mark the CSLC files as downloaded in the CSLC ES with the file size
# While at it also build up burst_id set for compressed CSLC query
for granule_id, fp_set in cslc_products_to_filepaths.items():
filepath = list(fp_set)[0]
file_size = os.path.getsize(filepath)
for granule_id, file_size in granule_sizes:
native_id = granule_id.split(".h5")[0] # remove file extension and revision id
burst_id, _, _, _ = parse_cslc_native_id(native_id, self.burst_to_frame)
unique_id = cslc_unique_id(batch_id, burst_id)
Expand All @@ -84,20 +117,35 @@ async def run_download(self, args, token, es_conn, netloc, username, password, j
cslc_files_to_upload, args, token, job_id, settings
)

logger.info(f"Downloading CSLC Static Layer products for {batch_id}")
cslc_static_products_to_filepaths: dict[str, set[Path]] = await self.download_cslc_static_files_for_cslc_batch(
cslc_static_granules, args, token, netloc,
username, password, job_id
)
# Download the files from ASF only if the transfer protocol is HTTPS
if args.transfer_protocol == "https":
logger.info(f"Downloading CSLC Static Layer products for {batch_id}")
cslc_static_products_to_filepaths: dict[str, set[Path]] = await self.download_cslc_static_files_for_cslc_batch(
cslc_static_granules, args, token, netloc,
username, password, job_id
)

logger.info("Uploading CSLC Static input files to S3")
cslc_static_files_to_upload = [fp for fp_set in cslc_static_products_to_filepaths.values() for fp in fp_set]
cslc_static_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
files=cslc_static_files_to_upload))
# For s3 we can use the files directly so simply copy over the paths
else: # s3 or auto
logger.info("Skipping download CSLC static files and instead using ASF S3 paths for direct SCIFLO PGE ingestion")

cslc_static_products_to_filepaths = {} # Dummy when trying to delete files later in this function

for cslc_static_granule in cslc_static_granules:
for url in cslc_static_granule["filtered_urls"]:
if url.startswith("s3"):
cslc_static_s3paths.append(url)

logger.info("Uploading CSLC Static input files to S3")
cslc_static_files_to_upload = [fp for fp_set in cslc_static_products_to_filepaths.values() for fp in fp_set]
cslc_static_s3paths.extend(concurrent_s3_client_try_upload_file(bucket=settings["DATASET_BUCKET"],
key_prefix=f"tmp/disp_s1/{batch_id}",
files=cslc_static_files_to_upload))
if len(cslc_static_s3paths) == 0:
raise Exception(f"No s3_path found for static files for {batch_id}. You probably should specify https transfer protocol.")

# Download all Ionosphere files corresponding to the dates covered by the
# input CSLC set
# Download all Ionosphere files corresponding to the dates covered by the input CSLC set
# We always download ionosphere files, there is no direct S3 ingestion option
logger.info(f"Downloading Ionosphere files for {batch_id}")
ionosphere_paths = self.download_ionosphere_files_for_cslc_batch(cslc_files_to_upload,
self.downloads_dir)
Expand Down
20 changes: 18 additions & 2 deletions data_subscriber/cmr.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from tools.ops.cmr_audit.cmr_client import cmr_requests_get, async_cmr_posts

logger = logging.getLogger(__name__)
MAX_CHARS_PER_LINE = 250000 #This is the maximum number of characters per line you can display in cloudwatch logs

class Collection(str, Enum):
HLSL30 = "HLSL30"
Expand Down Expand Up @@ -160,7 +161,18 @@ async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime,
product_granules = await _async_request_search_cmr_granules(args, request_url, [params])
search_results_count = len(product_granules)
logger.info(f"QUERY RESULTS: Found {search_results_count} granules")
logger.info(f'QUERY RESULTS: {[(granule["granule_id"], "revision " + str(granule["revision_id"])) for granule in product_granules]}')

products_per_line = 1000 # Default but this would never be used because we calculate dynamically below. Just here incase code moves around and we want a reasonable default
if search_results_count > 0:
# Print out all the query results but limit the number of characters per line
one_logout = f'{(product_granules[0]["granule_id"], "revision " + str(product_granules[0]["revision_id"]))}'
chars_per_line = len(one_logout) + 6 # 6 is a fudge factor
products_per_line = MAX_CHARS_PER_LINE // chars_per_line
for i in range(0, search_results_count, products_per_line):
end_range = i + products_per_line
if end_range > search_results_count:
end_range = search_results_count
logger.info(f'QUERY RESULTS {i+1} to {end_range} of {search_results_count}: {[(granule["granule_id"], "revision " + str(granule["revision_id"])) for granule in product_granules[i:end_range]]}')

# Filter out granules with revision-id greater than max allowed
least_revised_granules = []
Expand All @@ -186,7 +198,11 @@ async def async_query_cmr(args, token, cmr, settings, timerange, now: datetime,

if len(product_granules) != search_results_count:
logger.info(f"Filtered to {len(product_granules)} total granules after shortname filter check")
logger.info([(granule["granule_id"], "revision " + str(granule["revision_id"])) for granule in product_granules])
for i in range(0, len(product_granules), products_per_line):
end_range = i + products_per_line
if end_range > len(product_granules):
end_range = len(product_granules)
logger.info(f'FILTERED RESULTS {i+1} to {end_range} of {len(product_granules)}: {[(granule["granule_id"], "revision " + str(granule["revision_id"])) for granule in product_granules[i:end_range]]}')

for granule in product_granules:
granule["filtered_urls"] = _filter_granules(granule, args)
Expand Down
39 changes: 23 additions & 16 deletions data_subscriber/cslc/cslc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,26 @@ async def query_cmr_by_native_id (self, args, token, cmr, settings, now, native_

return granules

async def query_cmr_by_frame_and_dates(self, args, token, cmr, settings, now, timerange):
new_args = copy.deepcopy(args)
all_granules = []
frame_start, frame_end = self.args.frame_range.split(",")
for frame in range(int(frame_start), int(frame_end) + 1):
count, native_id = build_cslc_native_ids(frame, self.disp_burst_map_hist)
if count == 0:
continue
new_args.native_id = native_id
new_granules = await async_query_cmr(new_args, token, cmr, settings, timerange, now)
self.extend_additional_records(new_granules, no_duplicate=True, force_frame_id=frame)
all_granules.extend(new_granules)

return all_granules

async def query_cmr(self, args, token, cmr, settings, timerange, now):

# If we are in historical mode, we will query one frame worth at a time
if self.proc_mode == "historical":

all_granules = []
frame_start, frame_end = self.args.frame_range.split(",")
for frame in range(int(frame_start), int(frame_end) + 1):
count, native_id = build_cslc_native_ids(frame, self.disp_burst_map_hist)
if count == 0:
continue
args.native_id = native_id # Note that the native_id is overwritten here. It doesn't get used after this point so this should be ok.
new_granules = await async_query_cmr(args, token, cmr, settings, timerange, now)
self.extend_additional_records(new_granules, no_duplicate=True, force_frame_id=frame)
all_granules.extend(new_granules)
all_granules = await self.query_cmr_by_frame_and_dates(args, token, cmr, settings, now, timerange)

# Reprocessing can be done by specifying either a native_id or a date range
# native_id search takes precedence over date range if both are specified
Expand All @@ -332,7 +337,9 @@ async def query_cmr(self, args, token, cmr, settings, timerange, now):
if args.native_id is not None:
all_granules = await self.query_cmr_by_native_id(args, token, cmr, settings, now, args.native_id)

# TODO: query by frame id
# Query by frame range and date range. Both must exist.
elif self.args.frame_range is not None and args.start_date is not None and args.end_date is not None:
all_granules = await self.query_cmr_by_frame_and_dates(args, token, cmr, settings, now, timerange)

# Reprocessing by date range is a two-step process:
# 1) Query CMR for all CSLC files in the date range specified and create list of granules with unique frame_ids
Expand All @@ -353,7 +360,7 @@ async def query_cmr(self, args, token, cmr, settings, timerange, now):
new_granules = await self.query_cmr_by_native_id(args, token, cmr, settings, now, native_id)
all_granules.extend(new_granules)
else:
raise Exception("Reprocessing mode requires either a native_id or a date range to be specified.")
raise Exception("Reprocessing mode requires 1) a native_id 2) frame range and date range or 3) a date range to be specified.")

else:
all_granules = await async_query_cmr(args, token, cmr, settings, timerange, now)
Expand Down Expand Up @@ -383,9 +390,9 @@ def get_download_chunks(self, batch_id_to_urls_map):
frame_id, _ = split_download_batch_id(batch_chunk[0])
chunk_map[frame_id].append(batch_chunk)
if (len(chunk_map[frame_id]) > self.args.k):
raise AssertionError("Number of download batches is greater than K. This should not be possible!")
#print("[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[")
#print(frame_id, batch_chunk[0])
logger.error([chunk for chunk, data in chunk_map[frame_id]])
err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} is greater than K {self.args.k}."
raise AssertionError(err_str)
return chunk_map.values()

async def refresh_index(self):
Expand Down
12 changes: 5 additions & 7 deletions data_subscriber/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def __init__(self, provider):
self.daac_s3_cred_settings_key = None
self.cfg = SettingsConf().cfg # has metadata extractor config

self.downloads_dir = None
logger.info("Creating directories to process products")

# house all file downloads
self.downloads_dir = Path("downloads")
self.downloads_dir.mkdir(exist_ok=True)

async def run_download(self, args, token, es_conn, netloc, username, password,
job_id, rm_downloads_dir=True):
Expand All @@ -72,12 +76,6 @@ async def run_download(self, args, token, es_conn, netloc, username, password,

session = SessionWithHeaderRedirection(username, password, netloc)

logger.info("Creating directories to process products")

# house all file downloads
self.downloads_dir = Path("downloads")
self.downloads_dir.mkdir(exist_ok=True)

product_to_product_filepaths_map = self.perform_download(
session, es_conn, downloads, args, token, job_id
)
Expand Down
4 changes: 2 additions & 2 deletions data_subscriber/ionosphere_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import shutil
import sys
from collections import namedtuple, defaultdict
from datetime import datetime
from datetime import datetime, timezone
from functools import partial
from pathlib import Path, PurePath

Expand Down Expand Up @@ -132,7 +132,7 @@ def generate_ionosphere_metadata(output_ionosphere_filepath, ionosphere_url, s3_
"job_id": job_util.supply_job_id(),
"s3_url": f"s3://{s3_bucket}/{s3_key}/{output_ionosphere_filepath.name}",
"source_url": ionosphere_url,
"download_datetime": datetime.now()
"download_datetime": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
}
}
# DEV: compare to CoreMetExtractor.py
Expand Down
4 changes: 2 additions & 2 deletions docker/job-spec.json.SCIFLO_L3_DISP_S1
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
},
"dependency_images": [
{
"container_image_name": "opera_pge/disp_s1:3.0.0-rc.2.1",
"container_image_url": "$CODE_BUCKET_URL/opera_pge-disp_s1-3.0.0-rc.2.1.tar.gz",
"container_image_name": "opera_pge/disp_s1:3.0.0-rc.2.2",
"container_image_url": "$CODE_BUCKET_URL/opera_pge-disp_s1-3.0.0-rc.2.2.tar.gz",
"container_mappings": {
"$HOME/.netrc": ["/root/.netrc"],
"$HOME/.aws": ["/root/.aws", "ro"]
Expand Down
4 changes: 2 additions & 2 deletions docker/job-spec.json.SCIFLO_L3_DISP_S1_hist
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
},
"dependency_images": [
{
"container_image_name": "opera_pge/disp_s1:3.0.0-rc.2.1",
"container_image_url": "$CODE_BUCKET_URL/opera_pge-disp_s1-3.0.0-rc.2.1.tar.gz",
"container_image_name": "opera_pge/disp_s1:3.0.0-rc.2.2",
"container_image_url": "$CODE_BUCKET_URL/opera_pge-disp_s1-3.0.0-rc.2.2.tar.gz",
"container_mappings": {
"$HOME/.netrc": ["/root/.netrc"],
"$HOME/.aws": ["/root/.aws", "ro"]
Expand Down
Loading

0 comments on commit a876819

Please sign in to comment.