# Transform

In [1]:
from typing import Tuple, List

import numpy as np
import pandas as pd
import pywsra
import scipy
import xarray as xr
import littlebuoybigwaves as buoy
from configure import read_stored_variable

## Setup

In [2]:
%run 'nb0-datasets.ipynb'
%run -i configure.py

earl_ds = read_stored_variable('earl_ds')
fiona_ds = read_stored_variable('fiona_ds')
ian_ds = read_stored_variable('ian_ds')
julia_ds = read_stored_variable('julia_ds')
idalia_ds = read_stored_variable('idalia_ds')
lee_ds = read_stored_variable('lee_ds')
atomic_ds = read_stored_variable('atomic_ds')

earl_drifter_df = read_stored_variable('earl_drifter_df')
fiona_drifter_df = read_stored_variable('fiona_drifter_df')
ian_drifter_df = read_stored_variable('ian_drifter_df')
idalia_drifter_df = read_stored_variable('idalia_drifter_df')
lee_drifter_df = read_stored_variable('lee_drifter_df')
atomic_swift_ds = read_stored_variable('atomic_swift_ds')

ian_ibtracs_df = read_stored_variable('ian_ibtracs_df')
idalia_ibtracs_df = read_stored_variable('idalia_ibtracs_df')

  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  values.append(resample_method(met_in_window[var].values))
  return np.nanmean(a, axis, out=out, keepdims=keepdims)
  values.append(resample_method(met_in_window[var].values))


## IBTrACS

In [3]:
def ibtracs_column_to_float(ibtracs_da):
    ibtracs_da_as_float = (ibtracs_da
                           .replace(' ', np.nan)
                           .apply(float))
    return ibtracs_da_as_float


def convert_ibtracs_columns_to_floats(ibtracs_df):
    ibtracs_df['USA_WIND'] = ibtracs_column_to_float(ibtracs_df['USA_WIND'])
    ibtracs_df['USA_PRES'] = ibtracs_column_to_float(ibtracs_df['USA_PRES'])
    ibtracs_df['USA_RMW'] = ibtracs_column_to_float(ibtracs_df['USA_RMW'])
    ibtracs_df['STORM_SPEED'] = ibtracs_column_to_float(ibtracs_df['STORM_SPEED'])
    ibtracs_df['STORM_DIR'] = ibtracs_column_to_float(ibtracs_df['STORM_DIR'])
    ibtracs_df['LON'] = ibtracs_column_to_float(ibtracs_df['LON'])
    ibtracs_df['LAT'] = ibtracs_column_to_float(ibtracs_df['LAT'])
    return ibtracs_df


In [4]:
idalia_ibtracs_df = convert_ibtracs_columns_to_floats(idalia_ibtracs_df)
ian_ibtracs_df = convert_ibtracs_columns_to_floats(ian_ibtracs_df)

## Drifters

### Drifter type

In [5]:
def is_spot_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains Spotters """
    return id_index.str.contains('SPOT')

def is_microswift_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains microSWIFTs """
    return id_index.str.match(r'^\d{3}$')  # e.g. 043

def is_dwsd_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains DWSDs """
    return id_index.str.match(r'^\d{15}$')

def assign_drifter_type(drifter_df):
    id_index = drifter_df.index.get_level_values(level='id')
    is_spot = is_spot_id(id_index)
    is_microswift = is_microswift_id(id_index)
    is_dwsd = is_dwsd_id(id_index)
    drifter_types = np.empty((drifter_df.shape[0],), dtype=object)
    drifter_types[is_spot] = 'spotter'
    drifter_types[is_microswift] = 'microswift'
    drifter_types[is_dwsd] = 'dwsd'
    return drifter_df.assign(drifter_type=drifter_types)



In [6]:
earl_drifter_df = assign_drifter_type(earl_drifter_df)
fiona_drifter_df = assign_drifter_type(fiona_drifter_df)
ian_drifter_df = assign_drifter_type(ian_drifter_df)
idalia_drifter_df = assign_drifter_type(idalia_drifter_df)
lee_drifter_df = assign_drifter_type(lee_drifter_df)

### Frequencies

In [7]:
def unify_microswift_frequencies(drifter_df):
    is_microswift = drifter_df['drifter_type'] == 'microswift'
    frequencies = drifter_df.loc[is_microswift, 'frequency']
    drifter_df.loc[is_microswift, 'frequency'] \
                                = (drifter_df.loc[is_microswift, 'frequency']
                                   .apply(lambda f: frequencies.iloc[0]))
    return drifter_df

def trim_spotter_spectral_vars(drifter_df):
    is_spotter = drifter_df['drifter_type'] == 'spotter'
    drifter_df.loc[is_spotter] = (drifter_df.loc[is_spotter]
                                  .apply(trim_spectral_vars, axis=1))
    return drifter_df

def trim_spectral_vars(drifter_df):
    valid_frequencies = drifter_df['frequency'] < 0.5
    spectral_vars = ['energy_density', 'a1', 'a2', 'b1', 'b2', 'frequency']
    for var in spectral_vars:
        drifter_df[var] = drifter_df[var][valid_frequencies]
    return drifter_df

In [8]:
ian_drifter_df = unify_microswift_frequencies(ian_drifter_df)
idalia_drifter_df = unify_microswift_frequencies(idalia_drifter_df)
lee_drifter_df = unify_microswift_frequencies(lee_drifter_df)

In [9]:
earl_drifter_df = trim_spotter_spectral_vars(earl_drifter_df)
fiona_drifter_df = trim_spotter_spectral_vars(fiona_drifter_df)
ian_drifter_df = trim_spotter_spectral_vars(ian_drifter_df)
idalia_drifter_df = trim_spotter_spectral_vars(idalia_drifter_df)
lee_drifter_df = trim_spotter_spectral_vars(lee_drifter_df)

### Mean square slope

In [10]:
earl_drifter_df = earl_drifter_df.buoy.mean_square_slope(freq_range='total')
fiona_drifter_df = fiona_drifter_df.buoy.mean_square_slope(freq_range='total')
ian_drifter_df = ian_drifter_df.buoy.mean_square_slope(freq_range='total')
idalia_drifter_df = idalia_drifter_df.buoy.mean_square_slope(freq_range='total')
lee_drifter_df = lee_drifter_df.buoy.mean_square_slope(freq_range='total')

### Renaming

In [11]:
atomic_swift_ds = atomic_swift_ds.rename({'energy': 'energy_density', 'freq': 'frequency'})

In [12]:
energy_density = atomic_swift_ds['energy_density'].values.squeeze().T
energy_density = np.nan_to_num(energy_density)
frequency = atomic_swift_ds['frequency'].values
frequency = np.tile(frequency, (energy_density.shape[0], 1))
mss = buoy.waves.mean_square_slope(energy=energy_density, freq=frequency, freq_range='total')[0]
atomic_swift_ds['mean_square_slope'] = (('swift_id', 'time'), mss[None, :])

## WSRA

### Quality control metrics

Compute the standard deviation of the mean square slope observations.  These (5) observations are independent measures of mean square slope offset by -20, -10, 0, +10, and +20 seconds from the reported time.

In [13]:
def mean_square_slope_std(wsra_ds: xr.Dataset) -> xr.DataArray:
    """ Compute standard deviation of WSRA mean square slopes. """
    return wsra_ds['sea_surface_mean_square_slope'].std(axis=1)

In [14]:
earl_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(earl_ds)
fiona_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(fiona_ds)
ian_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(ian_ds)
julia_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(julia_ds)
idalia_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(idalia_ds)
lee_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(lee_ds)
atomic_ds['sea_surface_mean_square_slope_std'] = mean_square_slope_std(atomic_ds)

### Masking

Mask the WSRA observations based on flight metadata and quality control metrics.

In [15]:
def mask_wsra(wsra_ds: xr.Dataset, mask_dict: dict) -> xr.Dataset:
    """ Mask WSRA observations. """
    wsra_masked_ds = (wsra_ds
                      .wsra.create_trajectory_mask(mask_dict)
                      .wsra.mask(drop=True)
                      .drop_duplicates(dim='time'))  #TODO: added 01-30
    num_masked_values = wsra_masked_ds['time_mask'].attrs['num_masked_values']
    perc_masked_values = 100 * num_masked_values / wsra_ds['time_mask'].size
    print(
        f"{wsra_ds.attrs['storm_name']}: "
        f"{num_masked_values} masked values ({perc_masked_values.round(1)}%)."
    )
    return wsra_masked_ds


In [16]:
mask_dict = {
    'wsra_computed_roll': (-2.5, 2.5),
    'platform_radar_altitude': (1000, 4000),
    # 'peak_spectral_variance': (),
    'platform_speed_wrt_ground': (80, 250),
    'met_sfmr_rain_rate': (0, 25),
    'rainfall_rate_median': (0, 25),
    'sea_surface_mean_square_slope_std': (0, 0.1),
}

In [17]:
earl_masked_ds = mask_wsra(earl_ds, mask_dict)
fiona_masked_ds = mask_wsra(fiona_ds, mask_dict)
ian_masked_ds = mask_wsra(ian_ds, mask_dict)
julia_masked_ds = mask_wsra(julia_ds, mask_dict)
idalia_masked_ds = mask_wsra(idalia_ds, mask_dict)
lee_masked_ds = mask_wsra(lee_ds, mask_dict)
atomic_masked_ds = mask_wsra(atomic_ds, mask_dict)



earl: 1818 masked values (64.3%).
fiona: 366 masked values (25.3%).
ian: 491 masked values (34.1%).
julia: 64 masked values (11.6%).
idalia: 108 masked values (13.8%).
lee: 907 masked values (38.9%).
atomic: 1154 masked values (47.4%).




### Frequency spectra

In [18]:
def wsra_wn_spectrum_to_fq_spectrum(wsra_ds: xr.Dataset) -> xr.Dataset:
    """ Convert WSRA wavenumber spectra to frequency-direction spectra. """
    new_ds = wsra_ds = (wsra_ds
                        .wsra.wn_spectrum_to_fq_dir_spectrum(regrid=True)
                        .wsra.fq_dir_spectrum_to_fq_spectrum())
    return new_ds

In [19]:
earl_masked_ds = wsra_wn_spectrum_to_fq_spectrum(earl_masked_ds)
fiona_masked_ds = wsra_wn_spectrum_to_fq_spectrum(fiona_masked_ds)
ian_masked_ds = wsra_wn_spectrum_to_fq_spectrum(ian_masked_ds)
julia_masked_ds = wsra_wn_spectrum_to_fq_spectrum(julia_masked_ds)
idalia_masked_ds = wsra_wn_spectrum_to_fq_spectrum(idalia_masked_ds)
lee_masked_ds = wsra_wn_spectrum_to_fq_spectrum(lee_masked_ds)
atomic_masked_ds = wsra_wn_spectrum_to_fq_spectrum(atomic_masked_ds)

  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(
  return np.abs(estimated_var - actual_var) / np.abs(actual_var) * 100
  warn(


### IBTrACs data

In [20]:
def get_time_interpolation(interpolator, query_times):
    #TODO:
    numeric_query_times = pd.to_numeric(query_times)
    return interpolator(numeric_query_times)


def storm_distances(distance, bearing_deg):
    #TODO:
    bearing_rad = np.deg2rad(bearing_deg)
    distance_east = distance * np.sin(bearing_rad)
    distance_north = distance * np.cos(bearing_rad)
    return distance_east, distance_north


In [21]:
def ibtracs_interpolation(wsra_ds, ibtracs_df):

    coords = ibtracs_df[['LON', 'LAT']]
    direction = ibtracs_df['STORM_DIR']
    direction_unwrap = np.unwrap(direction, period=360)
    storm_speed = ibtracs_df['STORM_SPEED'] # kt
    radius_max_winds = ibtracs_df['USA_RMW'] # nm
    max_wind_speed = ibtracs_df['USA_WIND']# kt

    times = pd.to_numeric(ibtracs_df.index.to_numpy())
    coords_interpolator = scipy.interpolate.interp1d(times, coords, fill_value=np.nan, axis=0)
    direction_interpolator = scipy.interpolate.interp1d(times, direction_unwrap, fill_value=np.nan, axis=0)
    storm_speed_interpolator = scipy.interpolate.interp1d(times, storm_speed, fill_value=np.nan, axis=0)
    radius_max_winds_interpolator = scipy.interpolate.interp1d(times, radius_max_winds, fill_value=np.nan, axis=0)
    max_wind_speed_interpolator = scipy.interpolate.interp1d(times, max_wind_speed, fill_value=np.nan, axis=0)

    storm_positions = get_time_interpolation(coords_interpolator, wsra_ds['time'].values).T
    wsra_ds['storm_longitude'] = ('time', storm_positions[0])
    wsra_ds['storm_latitude'] = ('time', storm_positions[1])

    storm_headings = get_time_interpolation(direction_interpolator, wsra_ds['time'].values).T
    wsra_ds['storm_heading'] = ('time', storm_headings)

    storm_speeds = get_time_interpolation(storm_speed_interpolator, wsra_ds['time'].values).T
    wsra_ds['storm_speed'] = ('time', storm_speeds)

    storm_radius_max_winds = get_time_interpolation(radius_max_winds_interpolator, wsra_ds['time'].values).T
    wsra_ds['storm_radius_max_wind'] = ('time', storm_radius_max_winds)

    storm_max_wind_speeds = get_time_interpolation(max_wind_speed_interpolator, wsra_ds['time'].values).T
    wsra_ds['storm_max_wind_speed'] = ('time', storm_max_wind_speeds)

    return wsra_ds

In [22]:
idalia_masked_ds = ibtracs_interpolation(idalia_masked_ds, idalia_ibtracs_df)
ian_masked_ds = ibtracs_interpolation(ian_masked_ds, ian_ibtracs_df)

## Colocation

In [23]:
def is_spot_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains Spotters """
    return id_index.str.contains('SPOT')

def is_microswift_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains microSWIFTs """
    return id_index.str.match(r'^\d{3}$')  # e.g. 043

def is_dwsd_id(id_index: pd.Index) -> np.ndarray[bool]:
    """ Return boolean array where index contains DWSDs """
    return id_index.str.contains('X')  # 30023  #TODO: intentially null

def add_colocated_ds_id_dim(
    colocated_ds: xr.Dataset,
    path_coords: Tuple,
    path_vars: List,
    drifter_label: str,
    drifter_id: str,
) -> xr.Dataset:
    """ Expand colocated Dataset drifter DataArrays with an `id` dimension. """
    # Update all coordinates and variables except for `time` (path_coords[0]).
    vars_to_update = (list(path_coords[1:])
                      + path_vars
                      + ['time_difference', 'distance'])
    # Reassign each variable with an expanded `id` dim.
    for var in vars_to_update:
        prefix = drifter_label + '_'
        var_name = prefix + var
        dim_name = prefix + 'id'
        colocated_ds[var_name] = (colocated_ds[var_name]
                                  .expand_dims(dim={dim_name:[drifter_id]}))
    return colocated_ds

def colocate_wsra_and_drifters(
        wsra_ds: xr.Dataset,
        drifter_df: pd.DataFrame
) -> xr.Dataset:
    """
    Colocate observations in a WSRA Dataset with those in a drifter DataFrame,
    merge the results back into a copy of the WSRA Dataset, and return.
    """
    # Separate drifters by type (each type has a different # of frequencies)
    id_index = drifter_df.index.get_level_values(level='id')
    is_spot = is_spot_id(id_index)
    is_microswift = is_microswift_id(id_index)
    is_dwsd = is_dwsd_id(id_index)
    labels = ['spotter', 'microswift', 'dwsd']

    # For each type in the DataFrame (if any), subset the DataFrame and
    # colocate with the WSRA Dataset by drifter id.  Collect all colocated
    # Datasets into a list for later merging.
    ds_list = []
    for label, bool_index in zip(labels, [is_spot, is_microswift, is_dwsd]):
        if bool_index.sum() > 0:
            drifter_subset_df = (drifter_df
                                .loc[bool_index]
                                .sort_index())

            drifter_ds = drifter_subset_df.buoy.to_xarray()  #TODO: reorder time to first

            drifter_ids = drifter_subset_df.index.get_level_values(level='id').unique()

            path_coords = ('time', 'longitude', 'latitude')
            path_vars =  ['energy_density', 'a1', 'b1', 'a2', 'b2',
                          'significant_height', 'mean_square_slope']

            # For each drifter of this type, colocate with the WSRA Dataset.
            # This is done individually so that we can collect drifters under
            # an `id` coordinate.
            for drifter_id in drifter_ids:
                wsra_colocated_ds = wsra_ds.wsra.colocate_with_path_ds(
                    path_ds=drifter_ds.sel(id=drifter_id, drop=True),
                    path_coords=path_coords,
                    path_vars=path_vars,
                    temporal_tolerance=np.timedelta64(90, 'm'),
                    spatial_tolerance=100,  # km,
                    prefix=label,
                )
                wsra_colocated_ds = add_colocated_ds_id_dim(
                    colocated_ds=wsra_colocated_ds,
                    path_coords=path_coords,
                    path_vars=path_vars,
                    drifter_label=label,
                    drifter_id=drifter_id,
                )
                # Xarray needs timedelta units to assigned.
                time_difference_name = label + '_' + 'time_difference'
                wsra_colocated_ds[time_difference_name] \
                    = wsra_colocated_ds[time_difference_name].astype('timedelta64[s]')

                ds_list.append(wsra_colocated_ds)

    wsra_colocated_ds = xr.merge(ds_list)
    return wsra_colocated_ds

In [24]:
earl_merged_ds = colocate_wsra_and_drifters(earl_masked_ds, earl_drifter_df)
fiona_merged_ds = colocate_wsra_and_drifters(fiona_masked_ds, fiona_drifter_df)
ian_merged_ds = colocate_wsra_and_drifters(ian_masked_ds, ian_drifter_df)
# julia_masked_ds = colocate_wsra_and_drifters(julia_masked_ds, julia_drifter_ds)
idalia_merged_ds = colocate_wsra_and_drifters(idalia_masked_ds, idalia_drifter_df,)
lee_merged_ds = colocate_wsra_and_drifters(lee_masked_ds, lee_drifter_df)

atomic_merged_ds = atomic_masked_ds.wsra.colocate_with_path_ds(
    path_ds = atomic_swift_ds.sel(swift_id='SWIFT16'),  #TODO: should be all
    path_coords = ('time', 'lon', 'lat'),
    path_vars =  ['energy_density', 'sea_surface_wave_significant_height', 'mean_square_slope', 'wind_speed'],
    temporal_tolerance = np.timedelta64(30, 'm'),
    spatial_tolerance = 25,  # km,
    prefix='swift',
)

  size_df = self._obj.applymap(np.size, na_action='ignore')
  size_df = self._obj.applymap(np.size, na_action='ignore')
  size_df = self._obj.applymap(np.size, na_action='ignore')
  path_subset_ds['time_difference'] = path_subset_ds['time_difference'].astype('timedelta64[s]')
  = wsra_colocated_ds[time_difference_name].astype('timedelta64[s]')
  path_subset_ds['time_difference'] = path_subset_ds['time_difference'].astype('timedelta64[s]')
  = wsra_colocated_ds[time_difference_name].astype('timedelta64[s]')
  size_df = self._obj.applymap(np.size, na_action='ignore')
  size_df = self._obj.applymap(np.size, na_action='ignore')
  size_df = self._obj.applymap(np.size, na_action='ignore')
  path_subset_ds['time_difference'] = path_subset_ds['time_difference'].astype('timedelta64[s]')
  = wsra_colocated_ds[time_difference_name].astype('timedelta64[s]')
  path_subset_ds['time_difference'] = path_subset_ds['time_difference'].astype('timedelta64[s]')
  = wsra_colocated_ds[time_difference_name].a

### Store

In [None]:
%%capture

%store earl_merged_ds
%store fiona_merged_ds
%store ian_merged_ds
# %store julia_merged_ds
%store idalia_merged_ds
%store lee_merged_ds
%store atomic_merged_ds

%store all_wsra_df
%store atomic_df