In [2]:
# Time packages
import cftime, datetime, time
# Numerical analysis packages
import numpy as np, random, scipy, numba
# Local data storage packages
import functools, os, pickle, collections, sys, importlib
# Data structure packages
import pandas as pd, xarray as xr, nc_time_axis
xr.set_options(keep_attrs=True)
# Visualization tools
import cartopy, cartopy.crs as ccrs, matplotlib, matplotlib.pyplot as plt
# Local imports
import accessor, composite, composite_snapshots, derived, ibtracs, utilities, socket, visualization, tc_analysis, tc_processing, track_TCs, TC_tracker

from multiprocessing import Pool

importlib.reload(TC_tracker);
importlib.reload(track_TCs);
importlib.reload(ibtracs);

#### Notes
- IBTrACS data provides observational data that is more intense than ERA5 outputs

#### Load and format reanalysis data

In [3]:
def access_IBTrACS_tracks(basin_name: str,
                          date_range: tuple[str, str],
                          intensity_parameter: str,
                          intensity_range: tuple[int, int]) -> pd.DataFrame:
    
    track_data = ibtracs.main(basin_name=basin_name,
                              intensity_parameter=intensity_parameter,
                              intensity_range=intensity_range)
    
    return track_data.sort_values('time')

In [4]:
def load_reanalysis_data():

    # Load reanalysis data
    # Assumes that all files in `reanalysis_dirname` have congruent coordinates, like with ERA5
    reanalysis_dirname = '/scratch/gpfs/GEOCLIM/gr7610/tiger3/reference/datasets/ERA5'
    reanalysis_filenames = [filename for filename in os.listdir(reanalysis_dirname) if
                            filename.endswith('nc')]
    reanalysis_pathnames = [os.path.join(reanalysis_dirname, reanalysis_filename) for reanalysis_filename in reanalysis_filenames]
    reanalysis_data = xr.open_mfdataset(reanalysis_pathnames).drop_vars('sp')
    
    # Rename coordinates and data variables to adjust to GFDL QuickTracks outputs
    # This naming convention follows ERA5 outputs
    reanalysis_data = reanalysis_data.rename({'valid_time': 'time',
                                              'u10': 'u_ref',
                                              'v10': 'v_ref',
                                              't2m': 't_ref',
                                              'sst': 't_surf',
                                              'msl': 'slp',
                                              'tp': 'precip',
                                              'tcwv': 'WVP'})
    

    return reanalysis_data

In [5]:
def field_correction(reanalysis_dataset: xr.Dataset) -> xr.Dataset:

    ''' 
    Modify data to adjust for fields that do not readily provide desired units. 
    This is primarily performed to accommodate fields from 'accumulated' datasets in ERA5.
    '''

    # Define fields termed 'accumulated' from ERA5 - this represents parameters integrated hourly
    accumulated_fields = {'slhf': 'lhflx',
                          'sshf': 'shflx',
                          'ssr': 'swnet_sfc',
                          'ssrd': 'swdn_sfc',
                          'str': 'lwnet_sfc',
                          'strd': 'lwdn_sfc',
                          'tisr': 'swdn_toa',
                          'tsr': 'swnet_toa',
                          'ttr': 'olr'}
    accumulated_factor = 1 / 3600 # converts from J m^-2 to W m^-2

    # Iterate through all fields and perform correction
    for accumulated_field_name, accumulated_field_rename in accumulated_fields.items():
        reanalysis_dataset[accumulated_field_name] = reanalysis_dataset[accumulated_field_name] * accumulated_factor
        reanalysis_dataset = reanalysis_dataset.rename({accumulated_field_name: accumulated_field_rename})

    return reanalysis_dataset

In [6]:
def derived_fields(reanalysis_dataset: xr.Dataset) -> xr.Dataset:

    ''' Derive certain fields for reanalysis data to match GCM output conventions. '''

    # Correct pressure units from Pa to hPa
    reanalysis_dataset['slp'] = reanalysis_dataset['slp'] / 100
    
    # Correct sign conventions
    sign_correction_field_names = ['olr', 'lhflx', 'shflx']
    for sign_correction_field_name in sign_correction_field_names:
        reanalysis_dataset[sign_correction_field_name] = reanalysis_dataset[sign_correction_field_name] * -1

    # Upwards longwave radiative flux at surface
    assert 'lwnet_sfc' in reanalysis_dataset.data_vars and 'lwdn_sfc' in reanalysis_dataset.data_vars, f'Fields lwnet_sfc and lwdn_sfc must be in dataset to compute lwup_sfc.'
    reanalysis_dataset['lwup_sfc'] = reanalysis_dataset['lwnet_sfc'] - reanalysis_dataset['lwdn_sfc']

    # Upwards shortwave radiative flux at surface
    assert 'swnet_sfc' in reanalysis_dataset.data_vars and 'swdn_sfc' in reanalysis_dataset.data_vars, f'Fields swnet_sfc and swdn_sfc must be in dataset to compute swup_sfc.'
    reanalysis_dataset['swup_sfc'] = reanalysis_dataset['swnet_sfc'] - reanalysis_dataset['swdn_sfc']

    # Upwards shortwave radiative flux at TOA
    assert 'swnet_toa' in reanalysis_dataset.data_vars and 'swdn_sfc' in reanalysis_dataset.data_vars, f'Fields swnet_toa and swdn_toa must be in dataset to compute swup_toa.'
    reanalysis_dataset['swup_toa'] = reanalysis_dataset['swnet_toa'] - reanalysis_dataset['swdn_sfc']

    # Net radiation at TOA
    assert 'swnet_toa' in reanalysis_dataset.data_vars and 'olr' in reanalysis_dataset.data_vars, f'Fields swnet_toa and olr must be in dataset to compute netrad_toa.'
    reanalysis_dataset['netrad_toa'] = reanalysis_dataset['swnet_toa'] - reanalysis_dataset['olr']

    return reanalysis_dataset

In [7]:
def get_reanalysis_timestamps(TC_track_dataset: pd.DataFrame,
                              reanalysis_data: xr.Dataset) -> list:
    
    # Select random timestamps from each dataset to ensure they are the same
    # Assume all timestamps within a dataset have the same timestamp type
    random_storm_timestamp = random.choice(TC_track_dataset.time.values)
    random_reanalysis_timestamp = random.choice(reanalysis_data.time.values)

    # Ensure timestamps are identically-typed
    check_timestamp_formats = type(random_storm_timestamp) == type(random_reanalysis_timestamp)
    assert check_timestamp_formats, 'Timestamp types between IBTrACS and reanalysis require alignment.'
    
    # Iterate through storm timestamps to make sure they are in the reanalysis data
    reanalysis_storm_timestamps = [storm_timestamp for storm_timestamp in TC_track_dataset.time.values if 
                                   storm_timestamp in reanalysis_data.time.values]

    return reanalysis_storm_timestamps

In [8]:
def get_storm_coordinates(TC_track_dataset: pd.DataFrame,
                          reanalysis_data: xr.Dataset,
                          reanalysis_storm_timestamps: list,
                          reanalysis_resolution: float) -> dict:

    interval_round = lambda x, y: y * round(x / y) # round coordinates to nearest dataset coordinates
    
    # Initialize dictionary for storm track coordinates
    storm_track_coordinates = {}
    # Construct dictionary for coordinates pertaining to each storm timestamp
    for reanalysis_storm_timestamp in reanalysis_storm_timestamps:
        # Obtain longitude and latitude for each timestamp
        storm_track_longitude = TC_track_dataset['center_lon'].loc[TC_track_dataset['time'] == reanalysis_storm_timestamp]
        storm_track_latitude = TC_track_dataset['center_lat'].loc[TC_track_dataset['time'] == reanalysis_storm_timestamp]
        # Round coordinates to align with dataset coordinate system and resolution
        storm_track_coordinates[reanalysis_storm_timestamp] = {'lon': interval_round(storm_track_longitude.item(), reanalysis_resolution),
                                                               'lat': interval_round(storm_track_latitude.item(), reanalysis_resolution)}

    return storm_track_coordinates

In [9]:
def reanalysis_grid_redefinition(storm_track_coordinates: dict,
                                 reanalysis_resolution: float,
                                 coarsen_factor: int,
                                 storm_reanalysis_window_size: int|float):

    ''' Generate a consistent grid for reanalysis data to allow for all timestamps to be interpolated to the same grid. '''

    # Coarsening factor
    interpolation_resolution = reanalysis_resolution * coarsen_factor
    
    # Define storm spatial extents for future interpolation
    minimum_longitude = np.min([entry['lon'] for entry in storm_track_coordinates.values()])
    minimum_latitude = np.min([entry['lat'] for entry in storm_track_coordinates.values()])
    maximum_longitude = np.max([entry['lon'] for entry in storm_track_coordinates.values()])
    maximum_latitude = np.max([entry['lat'] for entry in storm_track_coordinates.values()])
    
    # Define basis vectors for data interpolation
    # Subtract and add window sizes to minima and maxima, respectively, to capture full desired extent
    zonal_basis_vector = np.arange(minimum_longitude - storm_reanalysis_window_size, 
                                   maximum_longitude + storm_reanalysis_window_size, interpolation_resolution)
    meridional_basis_vector = np.arange(minimum_latitude - storm_reanalysis_window_size, 
                                        maximum_latitude + storm_reanalysis_window_size, interpolation_resolution)

    return zonal_basis_vector, meridional_basis_vector

In [10]:
def load_reanalysis_storm_timestamp(storm_track_coordinates: dict,
                                    storm_reanalysis_window_size: int | float,
                                    reanalysis_resolution: float,
                                    reanalysis_data: xr.Dataset,
                                    zonal_basis_vector: np.array,
                                    meridional_basis_vector: np.array,
                                    storm_timestamp):

    ''' 
    Method to link track data and reanalysis data for a single timestamp. 
    This is compartmentalized to allow for straightforward parallelization.
    '''

    # Define reanalysis dataset coordinate names
    grid_xt = 'longitude'
    grid_yt = 'latitude'
    
    # Initialize container dictionaries
    storm_reanalysis_container = {}
    storm_reanalysis_window_extent = {}
    storm_reanalysis_window_extent[storm_timestamp] = {}
    
    # Generate trimming window extents for each timestamp.
    # Window extents are defined as: 
    # 'grid_xt' = (longitude - window_extent, longitude + window_extent), 
    # 'grid_yt' = (latitude - window_extent, latitude + window_extent)
    
    # Assign zonal window
    storm_reanalysis_window_extent[storm_timestamp][grid_xt] = np.arange(storm_track_coordinates[storm_timestamp]['lon'] - storm_reanalysis_window_size,
                                                                         storm_track_coordinates[storm_timestamp]['lon'] + storm_reanalysis_window_size,
                                                                         reanalysis_resolution)
    # Assign meridional window
    storm_reanalysis_window_extent[storm_timestamp][grid_yt] = np.arange(storm_track_coordinates[storm_timestamp]['lat'] - storm_reanalysis_window_size,
                                                                         storm_track_coordinates[storm_timestamp]['lat'] + storm_reanalysis_window_size,
                                                                         reanalysis_resolution)
    # Extract GCM data for the given timestamp and spatial extent
    storm_reanalysis_container[storm_timestamp] = reanalysis_data.sel(time=storm_timestamp)
    storm_reanalysis_container[storm_timestamp] = storm_reanalysis_container[storm_timestamp].sel({grid_xt: storm_reanalysis_window_extent[storm_timestamp][grid_xt]})
    storm_reanalysis_container[storm_timestamp] = storm_reanalysis_container[storm_timestamp].sel({grid_yt: storm_reanalysis_window_extent[storm_timestamp][grid_yt]})

    # Interpolate to different resolution (shoot for 0.5 degrees)
    storm_reanalysis_container[storm_timestamp] = storm_reanalysis_container[storm_timestamp].interp(longitude=zonal_basis_vector).interp(latitude=meridional_basis_vector)
    
    return storm_reanalysis_container[storm_timestamp]

In [11]:
def load_reanalysis_storm(storm_track_coordinates: dict,
                          storm_timestamps: list,
                          reanalysis_data: xr.Dataset,
                          reanalysis_resolution: float,
                          target_resolution: float,
                          storm_reanalysis_window_size: int|float,
                          parallel: bool,
                          diagnostic: bool=False):

    ''' Method to load reanalysis data for a given storm given its track coordinates, track timestamps, and reanalysis data. '''
    
    # Get basis vectors for reanalysis data generation
    coarsen_factor = int(np.round(target_resolution / reanalysis_resolution)) # factor by which reanalysis data will be coarsened to match a target resolution
    zonal_basis_vector, meridional_basis_vector = reanalysis_grid_redefinition(storm_track_coordinates,
                                                                               reanalysis_resolution,
                                                                               coarsen_factor,
                                                                               storm_reanalysis_window_size)

    # Initialize a container to hold GCM output connected to each storm timestamp and the corresponding spatial extent
    storm_reanalysis_container = {}
    # Define partial function to streamline function calls, since the only variable argument is `storm_timestamps`
    partial_load_timestamp_reanalysis_entry = functools.partial(load_reanalysis_storm_timestamp,
                                                                 storm_track_coordinates,
                                                                 storm_reanalysis_window_size,
                                                                 reanalysis_resolution,
                                                                 reanalysis_data,
                                                                 zonal_basis_vector,
                                                                 meridional_basis_vector)
    # Keep time for profiling
    start_time = time.time()
    # Parallel implementation
    if parallel:
        # Distribute data loading in parallel over each timestamp
        with Pool() as pool:
            storm_reanalysis_timestamp_entry = pool.map(partial_load_timestamp_reanalysis_entry, storm_timestamps)
            storm_reanalysis_data = xr.concat(storm_reanalysis_timestamp_entry, dim='time').sortby('time')
            pool.close()
    # Serial implementation
    else:
        # Initialize container dictionary
        storm_reanalysis_container = {}
        # Iterate over all timestamps to find reanalysis data for the given entry
        for storm_timestamp in storm_timestamps:
            storm_reanalysis_container[storm_timestamp] = partial_load_timestamp_reanalysis_entry(storm_timestamp)
        # Concatenate all GCM output data corresponding to storm into a single xArray Dataset
        storm_reanalysis_data = xr.concat(storm_reanalysis_container.values(), dim='time').sortby('time')

    if diagnostic:
        print(f'Elapsed time to load reanalysis storm: {(time.time() - start_time):.2f} s.')
        print(f'\t per timestamp: {((time.time() - start_time) / len(storm_timestamps)):.2f} s.')

    return storm_reanalysis_data

In [12]:
def reanalysis_GFDL_compatibility_adjustments(storm_reanalysis_dataset: xr.Dataset) -> xr.Dataset:

    ''' Perform adjustments so reanalysis data can use the same conventions as GFDL output data. '''

    # Rename spatial basis vector coordinate names
    storm_reanalysis_dataset = storm_reanalysis_dataset.rename({'longitude': 'grid_xt', 'latitude': 'grid_yt'})

    # Perform deep copy for coordinate value modification
    storm_reanalysis_dataset_reformatted = storm_reanalysis_dataset.copy(deep=True)

    # Adjust timestamp format to cftime on the xArray Dataset.
    pd_timestamps = pd.to_datetime(storm_reanalysis_dataset.time) # convert to Pandas objects for easier indexing
    cftime_timestamps = [cftime.datetime(year=timestamp.year, 
                                         month=timestamp.month,
                                         day=timestamp.day,
                                         hour=timestamp.hour,
                                         calendar='julian') for timestamp in pd_timestamps]
    storm_reanalysis_dataset_reformatted['time'] = cftime_timestamps
    assert 'cftime' in str(type(storm_reanalysis_dataset_reformatted['time'].values[0])), f'[reanalysis_GFDL_compatibility_adjustments()] Timestamp is not a cftime object.' 

    return storm_reanalysis_dataset_reformatted

In [13]:
def reanalysis_storm_generator(reanalysis_track_dataset: pd.DataFrame,
                               reanalysis_dataset: xr.Dataset,
                               reanalysis_resolution: float,
                               target_resolution: float,
                               storm_reanalysis_window_size: int,
                               parallel: bool=True,
                               storm_ID: str|None=None):

    ''' Method to perform all steps related to binding corresponding GFDL QuickTracks and GCM model output together for a given TC. '''

    print(f'[storm_generator] Processing storm ID {storm_ID}...')

    # 4. Find a candidate storm from the track data, ensure it is ordered by time
    storm_track_dataset = TC_tracker.pick_storm(reanalysis_track_dataset, selection_method='storm_number', storm_ID=storm_ID).sort_values('time')
    # 5. Pull storm-specific timestamps
    storm_track_timestamps = get_reanalysis_timestamps(storm_track_dataset, reanalysis_dataset)
    # 6. Pull storm-specific coordinates to align with track timestamps
    storm_track_coordinates = get_storm_coordinates(storm_track_dataset, reanalysis_dataset, storm_track_timestamps, reanalysis_resolution)
    # 7. Load reanalysis data for the iterand storm to align with track data
    storm_reanalysis_dataset = load_reanalysis_storm(storm_track_coordinates, 
                                                     storm_track_timestamps,
                                                     reanalysis_dataset,
                                                     reanalysis_resolution,
                                                     target_resolution,
                                                     storm_reanalysis_window_size,
                                                     parallel=parallel)
    # 8. Append information from track data to object containing reanalysis output.
    storm_reanalysis_dataset = TC_tracker.join_track_GCM_data(storm_track_data=storm_track_dataset,
                                                              storm_gcm_data=storm_reanalysis_dataset,
                                                              storm_time_variable='time')
    # 9. Derive fields present in GCM output data but not directly provided in ERA5 data
    storm_reanalysis_dataset = derived_fields(storm_reanalysis_dataset)
    # 10. Perform adjustments for compatibility with GFDL GCM outputs
    storm_reanalysis_dataset = reanalysis_GFDL_compatibility_adjustments(storm_reanalysis_dataset)
    # 11. Save xArray Dataset to netCDF file
    TC_tracker.save_storm_netcdf(storm_reanalysis_dataset, model_name='ERA5', experiment_name='reanalysis')

In [14]:
def main(date_range: tuple[str, str],
         basin_name: str='global',
         intensity_parameter: str='min_slp',
         intensity_range: tuple[int | float, int | float]=(980, 1000),
         number_of_storms: int=1,
         reanalysis_resolution: float=0.25,
         target_resolution: float=0.5,
         storm_reanalysis_window_size: int|float=12,
         parallel: bool=True):

    # 1. Pull track data for a given date range, basin, and intensity range
    reanalysis_track_dataset = access_IBTrACS_tracks(date_range, basin_name, intensity_parameter, intensity_range)
    # 2. Access reanalysis dataset. Ensure this is done lazily to avoid excessive memory usage.
    reanalysis_dataset = load_reanalysis_data()
    # 2a. Perform dataset field correction
    reanalysis_dataset = field_correction(reanalysis_dataset)
    # 3. Obtain N randomized storm IDs from the filtered track data, where 'N' is `number_of_storms`
    storm_IDs = TC_tracker.pick_storm_IDs(reanalysis_track_dataset, number_of_storms)
    
    # Define partial function to allow for using Pool.map since `track_data` is equivalent for all subprocesses
    preloaded_reanalysis_storm_generator = functools.partial(reanalysis_storm_generator, 
                                                             reanalysis_track_dataset, 
                                                             reanalysis_dataset,
                                                             reanalysis_resolution,
                                                             target_resolution,
                                                             storm_reanalysis_window_size,
                                                             parallel)
    
    for storm_ID in storm_IDs:
        preloaded_reanalysis_storm_generator(storm_ID)

In [None]:
date_range = ('2010-01-01', '2011-01-01')
basin_name = 'global'
intensity_parameter = 'min_slp'
intensity_range = (0, 970)

number_of_storms = -1

main(basin_name, date_range, intensity_parameter, intensity_range, number_of_storms=number_of_storms)

%reset

[storm_generator] Processing storm ID 2010-040S11185...
[save_storm_netcdf] Loading data for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-040S11185.max_wind-44.min_slp-945.basin-SP.nc
Elapsed loading time for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-040S11185.max_wind-44.min_slp-945.basin-SP.nc: 17.96 s
File size for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-040S11185.max_wind-44.min_slp-945.basin-SP.nc: 49.33 MB

[storm_generator] Processing storm ID 2010-240N15142...
[save_storm_netcdf] Loading data for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-240N15142.max_wind-41.min_slp-960.basin-WP.nc
Elapsed loading time for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-240N15142.max_wind-41.min_slp-960.basin-WP.nc: 9.95 s
File size for TC.model-ERA5.experiment-reanalysis.storm_ID-2010-240N15142.max_wind-41.min_slp-960.basin-WP.nc: 13.98 MB

[storm_generator] Processing storm ID 2010-151N14065...
