Skip to content

Commit

Permalink
Merge pull request #92 from shendric/production/awi-v2p5
Browse files Browse the repository at this point in the history
AWI CryoSat-2 + Sentinel-3 update with Level-1 pre-processor bugfix
  • Loading branch information
shendric committed Aug 15, 2022
2 parents b27db57 + 8bbd54e commit 4983bb0
Show file tree
Hide file tree
Showing 27 changed files with 3,160 additions and 184 deletions.
1 change: 1 addition & 0 deletions pysiral/clocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def reset(self):

def start(self):
self.t0 = time.time()
return self

def stop(self):
self.t1 = time.time()
Expand Down
23 changes: 8 additions & 15 deletions pysiral/cryosat2/l1_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,14 @@ def _set_input_file_metadata(self):
info.set_attribute("timeliness", cs2_procstage2timeliness(metadata["processing_stage"]))

# Time-Orbit Metadata
tcs_tai = parse_datetime_str(metadata["first_record_time"][4:])
tce_tai = parse_datetime_str(metadata["last_record_time"][4:])
tcs_utc, tce_utc = Time([tcs_tai, tce_tai], scale="tai").utc.datetime

lats = [float(metadata["first_record_lat"])*1e-6, float(metadata["last_record_lat"])*1e-6]
lons = [float(metadata["first_record_lon"])*1e-6, float(metadata["last_record_lon"])*1e-6]
info.set_attribute("start_time", parse_datetime_str(metadata["first_record_time"][4:])) # TAI=....
info.set_attribute("stop_time", parse_datetime_str(metadata["last_record_time"][4:])) # TAI=....
info.set_attribute("start_time", tcs_utc)
info.set_attribute("stop_time", tce_utc)
info.set_attribute("lat_min", np.amin(lats))
info.set_attribute("lat_max", np.amax(lats))
info.set_attribute("lon_min", np.amin(lons))
Expand Down Expand Up @@ -365,34 +369,23 @@ def _set_classifier_group(self):
"""
Transfer the classifiers defined in the l1p config file to the Level-1 object.
NOTE: It is assumed that all classifiers are 20Hz
In addition, a few legacy parameter are computed based on the waveform counts that is only available at
this stage. Computation of other parameter such as sigma_0, leading_edge_width, ... are moved to the
post-processing
:return: None
"""
# Loop over all classifier variables defined in the processor definition file
for key in self.cfg.classifier_targets.keys():
variable_20hz = getattr(self.nc, self.cfg.classifier_targets[key])
self.l1.classifier.add(variable_20hz, key)

# Calculate Parameters from waveform counts
# XXX: This is a legacy of the CS2AWI IDL processor
# Threshold defined for waveform counts not power in dB
wfm_counts = self.nc.pwr_waveform_20_ku.values

# Calculate the OCOG Parameter (CryoSat-2 notation)
ocog = CS2OCOGParameter(self.l1.waveform.power)
self.l1.classifier.add(ocog.width, "ocog_width")
self.l1.classifier.add(ocog.amplitude, "ocog_amplitude")

# Calculate the Peakiness (CryoSat-2 notation)
pulse = CS2PulsePeakiness(wfm_counts)
self.l1.classifier.add(pulse.peakiness, "peakiness")
self.l1.classifier.add(pulse.peakiness_normed, "peakiness_normed")
self.l1.classifier.add(pulse.noise_floor, "peakiness_noise_floor")
self.l1.classifier.add(pulse.peakiness_r, "peakiness_r")
self.l1.classifier.add(pulse.peakiness_l, "peakiness_l")

# Get satellite velocity vector (classifier needs to be vector -> manual extraction needed)
satellite_velocity_vector = self.nc.sat_vel_vec_20_ku.values
self.l1.classifier.add(satellite_velocity_vector[:, 0], "satellite_velocity_x")
Expand Down
6 changes: 6 additions & 0 deletions pysiral/l1bdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ def update_data_limit_attributes(self):

info = self.info

# Check if timestamp is monotonically increasing
tdelta_dt = self.time_orbit.timestamp[1:]-self.time_orbit.timestamp[:-1]
tdelta_secs = np.array([t.total_seconds() for t in tdelta_dt])
if np.any(tdelta_secs < 0.0):
logger.warning("- Found anomaly (negative time step)")

# time orbit group infos
info.set_attribute("lat_min", np.nanmin(self.time_orbit.latitude))
info.set_attribute("lat_max", np.nanmax(self.time_orbit.latitude))
Expand Down
223 changes: 130 additions & 93 deletions pysiral/l1preproc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@
from attrdict import AttrDict
from pathlib import Path
from operator import attrgetter
from geopy import distance
from datetime import timedelta
from typing import Union, List, TypeVar, Dict, overload
from typing import Union, List, TypeVar, Dict, Tuple

import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature


from dateperiods import DatePeriod, PeriodIterator

Expand Down Expand Up @@ -186,8 +192,8 @@ def __init__(self,
self.processor_item_dict = {}
self._init_processor_items()

# The stack of Level-1 objects is a simple list
self.l1_stack = []
# # The stack of Level-1 objects is a simple list
# self.l1_stack = []

def _init_processor_items(self) -> None:
"""
Expand Down Expand Up @@ -247,6 +253,13 @@ def process_input_files(self, input_file_list: List[Union[Path, str]]) -> None:
# content of the current file
polar_ocean_check = L1PreProcPolarOceanCheck(self.__class__.__name__, self.polar_ocean_props)

# The stack of connected l1 segments is a list of l1 objects that together form a
# continuous trajectory over polar oceans. This stack will be emptied and its content
# exported to a l1p netcdf if the next segment is not connected to the stack.
# This stack is necessary, because the polar ocean segments from the next file may
# be connected to polar ocean segments from the previous file.
l1_connected_stack = []

# orbit segments may or may not be connected, therefore the list of input file
# needs to be processed sequentially.
for i, input_file in enumerate(input_file_list):
Expand All @@ -261,6 +274,7 @@ def process_input_files(self, input_file_list: List[Union[Path, str]]) -> None:
if l1 is None:
logger.info("- No polar ocean data for curent job -> skip file")
continue
# l1p_debug_map([l1], title="Source File")

# Step 2: Apply processor items on source data
# for the sake of computational efficiency.
Expand All @@ -270,30 +284,36 @@ def process_input_files(self, input_file_list: List[Union[Path, str]]) -> None:
# The input files may contain unwanted data (low latitude/land segments).
# It is the job of the L1PReProc children class to return only the relevant
# segments over polar ocean as a list of l1 objects.
l1_segments = self.extract_polar_ocean_segments(l1)
l1_po_segments = self.extract_polar_ocean_segments(l1)
# l1p_debug_map(l1_po_segments, title="Polar Ocean Segments")

self.l1_apply_processor_items(l1_segments, "post_ocean_segment_extraction")
self.l1_apply_processor_items(l1_po_segments, "post_ocean_segment_extraction")

# Step 5: Merge orbit segments
# Add the list of orbit segments to the l1 data stack and merge those that
# are connected (e.g. two half orbits connected at the pole) into a single l1
# object. Orbit segments that are unconnected from other segments in the stack
# will be exported to netCDF files.
l1_merged = self.l1_stack_merge(l1_segments)
if l1_merged is None:
l1_export_list, l1_connected_stack = self.l1_get_output_segments(l1_connected_stack, l1_po_segments)

# l1p_debug_map(l1_connected_stack, title="Polar Ocean Segments - Stack")
if not l1_export_list:
continue
# l1p_debug_map(l1_export_list, title="Polar Ocean Segments - Export")

# Step 4: Processor items post
# Computational expensive post-processing (e.g. computation of waveform shape parameters) can now be
# executed as the Level-1 segments are cropped to the minimal length.
self.l1_apply_processor_items(l1_merged, "post_merge")
self.l1_apply_processor_items(l1_export_list, "post_merge")

# Step 5: Export
self.l1_export_to_netcdf(l1_merged)
for l1_export in l1_export_list:
self.l1_export_to_netcdf(l1_export)

# Step : Export the last item in the stack
l1_merged = self.l1_get_merged_stack()
self.l1_export_to_netcdf(l1_merged)
if len(l1_connected_stack) != 1:
raise ValueError("something went wrong here")
self.l1_export_to_netcdf(l1_connected_stack[-1])

def extract_polar_ocean_segments(self, l1: "Level1bData") -> List["Level1bData"]:
"""
Expand Down Expand Up @@ -348,97 +368,89 @@ def _l1_apply_proc_item(self,
:return:
"""
timer = StopWatch().start()
for procitem, label in self.processor_item_dict.get(stage_name, []):
timer = StopWatch()
timer.start()
procitem.apply(l1_item)
timer.stop()
msg = f"- L1 processing item {stage_name}:{label} applied in {timer.get_seconds():.3f} seconds"
logger.debug(msg)

# breakpoint()
# # Get the post-processing options
# pre_processing_items = self.cfg.get("pre_processing_items", None)
# if pre_processing_items is None:
# logger.info("No pre processing items defined")
# return
#
# # Measure time for the different post processors
# timer = StopWatch()
#
# # Get the list of post-processing items
# for pp_item in pre_processing_items:
# timer.start()
# pp_class = get_cls(pp_item["module_name"], pp_item["class_name"], relaxed=False)
# post_processor = pp_class(**pp_item["options"])
# for l1 in l1_segments:
# post_processor.apply(l1)
# timer.stop()
# msg = "- L1 pre-processing item `%s` applied in %.3f seconds" % (pp_item["label"], timer.get_seconds())
# logger.info(msg)

def l1_stack_merge(self, l1_segments: List["Level1bData"]) -> Union[None, Level1bData]:
timer.stop()
msg = f"- L1 processing items applied in {timer.get_seconds():.3f} seconds"
logger.debug(msg)

def l1_get_output_segments(self,
l1_connected_stack: List["Level1bData"],
l1_po_segments: List["Level1bData"]
) -> Tuple[List["Level1bData"], List["Level1bData"]]:
"""
Add the input Level-1 segments to the l1 stack and
This method sorts the stack of connected l1 segments and the polar ocean segments
of the most recent file and sorts the segments into (connected) output and
the new stack, defined by the last (connected) item.
:param l1_segments: List of L1 data sets
:param l1_connected_stack: List of connected L1 polar ocean segments (from previous file(s))
:param l1_po_segments: List of L1 polar ocean segments (from current file)
:return: None or merged stack if
:return: L1 segments for the ouput, New l1 stack of connected items
"""

# Loop over all input segments
for l1 in l1_segments:

# Test if l1 segment is connected to stack
is_connected = self.l1_is_connected_to_stack(l1)

# Case 1: Segment is connected
# -> Add the l1 segment to the stack and check the next segment.
# Create a list of all currently available l1 segments. This includes the
# elements from the stack and the polar ocean segments
all_l1_po_segments = [*l1_connected_stack, *l1_po_segments]

# logger.debug(f"l1_po_segments = {len(l1_po_segments)} elements")
# logger.debug(f"l1_connected_stack = {len(l1_connected_stack)} elements")
# logger.debug(f"all_l1_po_segments = {len(all_l1_po_segments)} elements")

# There is a number of case to be caught her.
# Case 1: The list of polar ocean segments might be empty
if not l1_po_segments:
return l1_connected_stack, l1_po_segments

# Case 2: If there is only one element, all data goes to the stack
# and no data needs to be exported.
# (because the next file could be connected to it)
if len(all_l1_po_segments) == 1:
return [], l1_po_segments

# Check if segment is connected to the next one
are_connected = [
self.l1_are_connected(l1_0, l1_1)
for l1_0, l1_1 in zip(all_l1_po_segments[:-1], all_l1_po_segments[1:])
]

# Create a list of (piece-wise) merged elements.
# The last element of this list will be the transferred to the
# next iteration of the file list
merged_l1_list = [copy.deepcopy(all_l1_po_segments[0])]
for idx, is_connected in enumerate(are_connected):
target_l1 = all_l1_po_segments[idx+1]
if is_connected:
logger.info("- L1 segment connected -> add to stack")
self.l1_stack.append(l1)
return None

# Case 2: Segment is not connected
# -> In this case all items in the l1 stack will be merged and the merged l1 object will be
# exported to a l1p netCDF product. The current l1 segment that was unconnected to the stack
# will become the next stack
merged_l1_list[-1].append(target_l1)
else:
logger.info("- L1 segment unconnected -> exporting current stack")
l1_merged = self.l1_get_merged_stack()
self.l1_stack = [l1]
return l1_merged
merged_l1_list.append(copy.deepcopy(target_l1))

# Return (elements marked for export, l1_connected stack)
return merged_l1_list[:-1], [merged_l1_list[-1]]

def l1_is_connected_to_stack(self, l1: "Level1bData") -> bool:
def l1_are_connected(self, l1_0: "Level1bData", l1_1: "Level1bData") -> bool:
"""
Check if the start time of file i and the stop time if file i-1 indicate neighbouring orbit segments.
(e.g. due to radar mode change, or two half-orbits)
Check if the start time of l1 segment 1 and the stop time of l1 segment 0
indicate neighbouring orbit segments.
-> Assumes explicetly that l1_0 comes before l1_1
:param l1:
:param l1_0:
:param l1_1:
:return: Flag if l1 is connected (True of False)
:return: Flag if l1 segments are connected (True of False)
"""

# Stack is empty (return True -> create a new stack)
if self.stack_len == 0:
return True
# test alternate way of checking connectivity (distance)
l1_0_last_latlon = l1_0.time_orbit.latitude[-1], l1_0.time_orbit.longitude[-1]
l1_1_first_latlon = l1_1.time_orbit.latitude[0], l1_1.time_orbit.longitude[0]
distance_km = distance.distance(l1_0_last_latlon, l1_1_first_latlon).km
logger.debug(f"- {distance_km=}")

# Test if segments are adjacent based on time gap between them
tdelta = l1.info.start_time - self.last_stack_item.info.stop_time
tdelta = l1_1.info.start_time - l1_0.info.stop_time
threshold = self.cfg.orbit_segment_connectivity.max_connected_segment_timedelta_seconds
return tdelta.seconds <= threshold

def l1_get_merged_stack(self) -> "Level1bData":
"""
Concatenates all items in the l1 stack and returns the merged Level-1 data object.
Note: This operation leaves the state of the Level-1 stack untouched
:return: Level-1 data object
"""
l1_merged = copy.deepcopy(self.l1_stack[0])
for l1 in self.l1_stack[1:]:
l1_merged.append(l1)
return l1_merged
return tdelta.total_seconds() <= threshold

def l1_export_to_netcdf(self, l1: "Level1bData") -> None:
"""
Expand All @@ -454,7 +466,7 @@ def l1_export_to_netcdf(self, l1: "Level1bData") -> None:
self.output_handler.export_to_netcdf(l1)
logger.info(f"- Written l1p product: {self.output_handler.last_written_file}")
else:
logger.info("- Orbit segment below minimum size (%g), skipping" % l1.n_records)
logger.warning("- Orbit segment below minimum size (%g), skipping" % l1.n_records)

def trim_single_hemisphere_segment_to_polar_region(self, l1: "Level1bData") -> "Level1bData":
"""
Expand Down Expand Up @@ -737,14 +749,6 @@ def orbit_segment_connectivity_props(self) -> Union[Dict, AttrDict]:
self.error.raise_on_error()
return self.cfg.orbit_segment_connectivity

@property
def stack_len(self) -> int:
return len(self.l1_stack)

@property
def last_stack_item(self) -> "Level1bData":
return self.l1_stack[-1]


class L1PreProcCustomOrbitSegment(L1PreProcBase):
""" A Pre-Processor for input files with arbitrary segment lenght (e.g. CryoSat-2) """
Expand Down Expand Up @@ -1299,3 +1303,36 @@ def get_preproc(preproc_type: str,

# Return the initialized class
return cls(input_adapter, output_handler, cfg)


def l1p_debug_map(l1p_list: List["Level1bData"],
title: str = None
) -> None:
"""
Create an interactive map of l1p segment
:param l1p_list:
:param title:
:return:
"""

title = title if title is not None else ""
proj = ccrs.PlateCarree(central_longitude=0.0)

plt.figure(dpi=150)
fig_manager = plt.get_current_fig_manager()
fig_manager.window.showMaximized()
ax = plt.axes(projection=proj)
ax.set_global()
ax.set_title(title)
ax.coastlines(resolution='50m', color="0.25", linewidth=0.25, zorder=201)
ax.add_feature(cfeature.OCEAN, color="#D0CFD4", zorder=150)
ax.add_feature(cfeature.LAND, color="#EAEAEA", zorder=200)

for l1p in l1p_list:
ax.scatter(l1p.time_orbit.longitude, l1p.time_orbit.latitude,
s=1, zorder=300, linewidths=0.0,
transform=proj
)
plt.show()

0 comments on commit 4983bb0

Please sign in to comment.