---
title: Constructing an AR Dataset

author: Jimmy Butler
subtitle: A notebook to demonstrate workflows to ascribe characteristics and impacts to landfalling ARs in a catalog of AR storm events.
---

## Setup

First, we'll load up any packages and modules we might need.

In [1]:
# load external packages

import os
import pandas as pd
import xarray as xr
from pathlib import Path
import earthaccess
import ray
from tqdm import tqdm

In [2]:
# load internal helper modules

from utils.loading_utils import load_ais, load_cell_areas, EarthdataGatekeeper
from utils.attribute_utils import *
from utils.compute_attributes_streaming import *

Next, let's load up the catalog. We'll be using a subset of the full version for now (the first 250 landfalling storms, out of ~3000 total).

In [3]:
# load the dataset

total_storms = pd.read_hdf('catalog/subset_storms.h5')
#landfalling_storms = total_storms[total_storms.is_landfalling]

ImportError: Missing optional dependency 'pytables'.  Use pip or conda to install pytables.

## Catalog Overview

Before we get started with the masking workflow, we can first take a quick look at the catalog. The catalog is a clusterig of AR pixels identified in the [Wille (2021) catalog](https://doi.org/10.5281/zenodo.4009663), which takes an Eulerian approach to identifying atmoshperic rivers.

In [4]:
num_cpus = 8
miniters = num_cpus

We're going to be processing the data sequentially in chunks, with each chunk being processed in parallel. This is to reduce any memory pressure that we might incur by processing the whole list of storms in the same parallel instance/ray cluster. When I attempted to loop through the whole list of storms in parallel, I noticed that some iterations of the loop were taking extremely long to process (much longer than if I had executed the loop on a smaller chunk of storms), and then the server outright crashed. I suspect this is due to memory pressure, and so we start up a ray instance to process each chunk of the dataset individually, clearing the ray instance for the next chunk.

In [5]:
indices_list = np.arange(landfalling_storms.shape[0])
chunk_size = 500
chunk_inds_lst = [indices_list[i:i + chunk_size] for i in range(0, len(indices_list), chunk_size)]

In [6]:
data_dois = {'climatology': '10.5067/5ESKGQTZG7FO',
             'T2M_TQV_SLP': '10.5067/3Z173KIE2TPD',
             'VFLXQV_PRECIP': '10.5067/Q5GVUVUIVGO7',
             'V850': '10.5067/VJAFPLI1CSIV',
             'OMEGA': '10.5067/QBZ6MG944HW0'}

Now, let's load up the auxiliary datasets that will help us compute our relevant AR characteristics and impacts.

In [7]:
ais_mask = loading_utils.load_ais()
ais_mask = ais_mask.assign_coords(lat=ais_mask.lat.round(5), 
                                  lon=ais_mask.lon.round(5))

cell_areas = loading_utils.load_cell_areas()
cell_areas = cell_areas.assign_coords(lat=cell_areas.lat.round(5), 
                                      lon=cell_areas.lon.round(5)) # this is to avoid -0 not matching 0

Now, let's compute our climatologies, which we will later use to compute temperature and IWV anomalies. We'll just compute the climatology dataset and store it memory, partially because it's not that huge, and also because if we leave it to be executed until it's time to compute anomalies for the individual ARs, we may be repeating our anomaly computations multiple times. This will help us save on time.

:::{attention}
Either we can run the next cell to create the climatology dataset, which should take about 15 minutes to do. Or, you could run this just once, save it, and just load it up from disk for future runs.
:::

In [8]:
climatology_ds = xr.open_dataset('climatology.nc4')

Now, we write a function to setup our Ray instance to do our parallel processing.

In [9]:
def setup_ray(func_vars_dict):
    '''
    Function that serves to stop the previous chunk's ray cluster, clear its memory,
        and start a new one for a new chunk. Also initializes the gatekeeper for
        making open requests to Earthdata, and stores bigger objects into Ray's
        object store for more efficient sharing of these objects across clusters.

    Inputs:
        func_vars_dict (dict of dict): outer layer of dictionary keys are names of variables in
            MERRA-2 datasets, inner layer are names of desired aggregates of those variables as keys,
            along with their aggregation functions as values.
    '''
    
    # shutdown and restart the ray cluster, to clear memory
    if ray.is_initialized():
        ray.shutdown()
    
    ray.init(num_cpus=num_cpus, logging_level='ERROR', 
         _metrics_export_port=None, include_dashboard=False, 
         log_to_driver=False, runtime_env={'py_modules': [attribute_utils, loading_utils, st_dbscan]})

    climatology_ref = ray.put(climatology_ds)
    cell_areas_ref = ray.put(cell_areas)
    func_vars_ref = ray.put(func_vars_dict)

    return climatology_ref, cell_areas_ref, func_vars_ref
    

## Masking Quantities from `10.5067/3Z173KIE2TPD`

This includes IWV, IWV anomaly, 2m-temperature, 2m-temperature anomaly, and sea level pressure quantities.

We arrange our `func_vars_dict` variable in the following way so that, if there are multiple quantities and aggregations we wish to compute on the same variable DataArray, we compute all of them on the same streamed object instead of throwing it away and restreaming for each additional quantity.

In [10]:
func_vars_dict1 = {'T2M': {'max_T2m_ais': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, var_da, area_da, ais_mask),
                          'max_T2M_anomaly_ais': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, var_da, area_da, ais_mask)},
                  'TQV': {'avg_IWV_ais': lambda storm_da, var_da, area_da: compute_average(storm_da, var_da, area_da, ais_mask),
                          'max_IWV_ais': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, var_da, area_da, ais_mask),
                          'max_IWV_anomaly_ais': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, var_da, area_da, ais_mask)},
                  'SLP': {'max_ocean_SLP_gradient': lambda storm_da, var_da, area_da: compute_max_SLPgrad(storm_da, var_da, area_da, ais_mask)}
                 }

data_doi = data_dois['T2M_TQV_SLP']
progress_log_path = 'T2M_TQV_SLP_logs'

missing_days1 = []
parallel_results1 = []

for i, chunk_inds in enumerate(chunk_inds_lst):

    storms_chunk = landfalling_storms.iloc[chunk_inds]
    climatology_ref, cell_areas_ref, func_vars_ref = setup_ray(func_vars_dict1)
    gatekeeper = EarthdataGatekeeper.remote()

    result_refs = []
    for index, row in storms_chunk.iterrows():
        result_refs.append(compute_summaries.remote(
            row.data_array,
            func_vars_ref,
            cell_areas_ref,
            data_doi,
            gatekeeper=gatekeeper,
            climatology_ds=climatology_ref))


    with open(f'{progress_log_path}/streaming_progress{i+1}.log', "w") as log_file:
        parallel_results = []
        missing_days = []
        for ref in tqdm(result_refs, total=len(result_refs), file=log_file, miniters=miniters):
            results = ray.get(ref)
            parallel_results.append(results[0])
            missing_days.append(results[1])

    parallel_results1 = parallel_results1 + parallel_results
    missing_days1 = missing_days1 + missing_days

    ray.shutdown()

[2025-12-05 05:11:27,397 E 97 441] core_worker_process.cc:825: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[2025-12-05 05:20:09,315 E 97 5430] core_worker_process.cc:825: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[2025-12-05 05:28:46,962 E 97 10900] core_worker_process.cc:825: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14
[2025-12-05 05:37:17,762 E 97 15830] core_worker_process.cc:825: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize

In [11]:
labels1 = [key for quantity_dict in func_vars_dict1.values() for key in quantity_dict.keys()]

In [12]:
df1 = pd.DataFrame(parallel_results1, columns=labels1)
df1['T2M_TQV_SLP_missing'] = missing_days1
df1.to_csv('streamed_datasets/streamed_T2M_TQV_SLP.csv', index=False)

## Masking Quantities from `10.5067/Q5GVUVUIVGO7`

This includes IVT and precipitation quantities (rainfall, snowfall). Starting with IVT.

In [15]:
func_vars_dict2 = {'VFLXQV': {'avg_vIVT_ais': lambda storm_da, var_da, area_da: compute_average(storm_da, -var_da, area_da, ais_mask),
                             'max_vIVT_ais': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, -var_da, area_da, ais_mask),
                             'avg_vIVT': lambda storm_da, var_da, area_da: compute_average(storm_da, -var_da, area_da),
                             'max_vIVT': lambda storm_da, var_da, area_da: compute_max_intensity(storm_da, -var_da, area_da)}}
data_doi = data_dois['VFLXQV_PRECIP']
progress_log_path = 'VFLXQV_logs'

In [16]:
missing_days2 = []
parallel_results2 = []

for i, chunk_inds in enumerate(chunk_inds_lst):

    storms_chunk = landfalling_storms.iloc[chunk_inds]
    climatology_ref, cell_areas_ref, func_vars_ref = setup_ray(func_vars_dict2)
    gatekeeper = EarthdataGatekeeper.remote()

    result_refs = []
    for index, row in storms_chunk.iterrows():
        result_refs.append(compute_summaries.remote(
            row.data_array,
            func_vars_ref,
            cell_areas_ref,
            data_doi,
            gatekeeper=gatekeeper,
            half_hour=True))


    with open(f'{progress_log_path}/streaming_progress{i+1}.log', "w") as log_file:
        parallel_results = []
        missing_days = []
        for ref in tqdm(result_refs, total=len(result_refs), file=log_file, miniters=miniters):
            results = ray.get(ref)
            parallel_results.append(results[0])
            missing_days.append(results[1])

    parallel_results2 = parallel_results2 + parallel_results
    missing_days2 = missing_days2 + missing_days

    ray.shutdown()

%%capture output2

[2025-12-05 06:11:06,023 E 97 36130] core_worker_process.cc:825: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


KeyboardInterrupt: 

In [None]:
labels2 = [key for quantity_dict in func_vars_dict2.values() for key in quantity_dict.keys()]

In [None]:
df2 = pd.DataFrame(parallel_results2, columns=labels2)
df2['VFLXQV_missing'] = missing_days2
df2.to_csv('streamed_datasets/streamed_VFLXQV.csv', index=False)

Now moving on to precipitation.

In [None]:
data_doi = data_dois['VFLXQV_PRECIP']
progress_log_path = 'PRECIP_logs'

def precip_func(storm_da, var_da, area_da):
    return compute_cumulative(storm_da, var_da, area_da, ais_mask)

In [None]:
missing_days3 = []
parallel_results3 = []

for i, chunk_inds in enumerate(chunk_inds_lst):

    storms_chunk = landfalling_storms.iloc[chunk_inds]
    climatology_ref, cell_areas_ref, precip_func_ref = setup_ray(precip_func)
    gatekeeper = EarthdataGatekeeper.remote()

    result_refs = []
    for index, row in storms_chunk.iterrows():
        result_refs.append(compute_precip_summaries.remote(
            row.data_array, 
            cell_areas_ref, 
            precip_func_ref,
            data_doi,
            gatekeeper=gatekeeper))


    with open(f'{progress_log_path}/streaming_progress{i+1}.log', "w") as log_file:
        parallel_results = []
        missing_days = []
        for ref in tqdm(result_refs, total=len(result_refs), file=log_file, miniters=miniters):
            results = ray.get(ref)
            parallel_results.append(results[0])
            missing_days.append(results[1])

    parallel_results3 = parallel_results3 + parallel_results
    missing_days3 = missing_days3 + missing_days

    ray.shutdown()

%%capture output3

In [None]:
labels3 = ['cumulative_rainfall_ais', 'cumulative_snowfall_ais']

## Masking Quantities from `10.5067/VJAFPLI1CSIV`

This is for poleward 850 hPa wind.

In [None]:
func_vars_dict4 = {'V850': {'max_landfalling_v850hPa': lambda storm_da, var_da, area_da: compute_max_landfalling_wind(storm_da, -var_da, area_da, ais_mask),
                           'avg_landfalling_v850hPa': lambda storm_da, var_da, area_da: compute_avg_landfalling_wind(storm_da, -var_da, area_da, ais_mask)}}
data_doi = data_dois['V850']
progress_log_path = 'V850_logs'

In [None]:
missing_days4 = []
parallel_results4 = []

for i, chunk_inds in enumerate(chunk_inds_lst):

    storms_chunk = landfalling_storms.iloc[chunk_inds]
    climatology_ref, cell_areas_ref, func_vars_ref = setup_ray(func_vars_dict4)
    gatekeeper = EarthdataGatekeeper.remote()

    result_refs = []
    for index, row in storms_chunk.iterrows():
        result_refs.append(compute_summaries.remote(
            row.data_array,
            func_vars_ref,
            cell_areas_ref,
            data_doi,
            gatekeeper=gatekeeper,
            half_hour=True))


    with open(f'{progress_log_path}/streaming_progress{i+1}.log', "w") as log_file:
        parallel_results = []
        missing_days = []
        for ref in tqdm(result_refs, total=len(result_refs), file=log_file, miniters=miniters):
            results = ray.get(ref)
            parallel_results.append(results[0])
            missing_days.append(results[1])

    parallel_results4 = parallel_results4 + parallel_results
    missing_days4 = missing_days4 + missing_days

    ray.shutdown()

%%capture output4

In [None]:
labels4 = [key for quantity_dict in func_vars_dict4.values() for key in quantity_dict.keys()]

## Masking Quantities from `10.5067/QBZ6MG944HW0`

This is for omega.

In [None]:
func_vars_dict5 = {'OMEGA': {'avg_landfalling_minomega': lambda storm_da, var_da, area_da: compute_avg_landfalling_minomega(storm_da, var_da, area_da, ais_mask)}}
data_doi = data_dois['OMEGA']
progress_log_path = 'OMEGA_logs'

In [None]:
missing_days5 = []
parallel_results5 = []

for i, chunk_inds in enumerate(chunk_inds_lst):

    storms_chunk = landfalling_storms.iloc[chunk_inds]
    climatology_ref, cell_areas_ref, func_vars_ref = setup_ray(func_vars_dict5)
    gatekeeper = EarthdataGatekeeper.remote()

    result_refs = []
    for index, row in storms_chunk.iterrows():
        result_refs.append(compute_summaries.remote(
            row.data_array,
            func_vars_ref,
            cell_areas_ref,
            data_doi,
            gatekeeper=gatekeeper))


    with open(f'{progress_log_path}/streaming_progress{i+1}.log', "w") as log_file:
        parallel_results = []
        missing_days = []
        for ref in tqdm(result_refs, total=len(result_refs), file=log_file, miniters=miniters):
            results = ray.get(ref)
            parallel_results.append(results[0])
            missing_days.append(results[1])

    parallel_results5 = parallel_results5 + parallel_results
    missing_days5 = missing_days5 + missing_days

    ray.shutdown()

%%capture output5

In [None]:
labels5 = [key for quantity_dict in func_vars_dict5.values() for key in quantity_dict.keys()]

## Remaining Quantities

Computing remaining quantities that don't involve streaming any MERRA-2 data, just for completeness.

In [None]:
landfalling_storms['max_area'] = landfalling_storms['data_array'].apply(lambda x: compute_max_area(x, cell_areas))
landfalling_storms['mean_area'] = landfalling_storms['data_array'].apply(lambda x: compute_mean_area(x, cell_areas))
landfalling_storms['mean_landfalling_area'] = landfalling_storms['data_array'].apply(lambda x: compute_mean_area(x, cell_areas, ais_mask))
landfalling_storms['cumulative_landfalling_area'] = landfalling_storms['data_array'].apply(lambda x: compute_cumulative_spacetime(x, cell_areas, ais_mask))
landfalling_storms['duration'] = landfalling_storms['data_array'].apply(compute_duration)
landfalling_storms['start_date'] = landfalling_storms['data_array'].apply(add_start_date)
landfalling_storms['end_date'] = landfalling_storms['data_array'].apply(add_end_date)
landfalling_storms['max_south_extent'] = landfalling_storms['data_array'].apply(compute_max_southward_extent)
landfalling_storms['max_elevation_grad'] = landfalling_storms['data_array'].apply(lambda x: compute_max_elevation_grad(x, elevation))

region_defs_coarser = {'West': [-150, -30], 
               'East 1': [-30, 75],
               'East 2': [75, -150]}

region_masks_coarser = find_region_masks(region_defs_coarser, ais_mask)

landfalling_storms['coarser_region'] = landfalling_storms['data_array'].apply(lambda x: find_landfalling_region(x, cell_areas, region_masks_coarser))

landfalling_storms['trajectory'] = landfalling_storms['data_array'].apply(extract_trajectory)

# add in the merra2 aggregates to the landfalling dataframe
landfalling_storms[labels1] = parallel_results1
landfalling_storms[labels2] = parallel_results2
landfalling_storms[labels3] = parallel_results3
landfalling_storms[labels4] = parallel_results4
landfalling_storms[labels5] = parallel_results5

# add in the missing days
landfalling_storms['T2M_TQV_SLP_missing'] = missing_days1
landfalling_storms['VFLXQV_missing'] = missing_days2
landfalling_storms['PRECIP_missing'] = missing_days3
landfalling_storms['V850_missing'] = missing_days4
landfalling_storms['OMEGA_missing'] = missing_days5

# save the dataframe
landfalling_storms.to_hdf(home_dir/'project/dataset/datasets/streamed_landfalling_storm_quantities_df.h5', key='df')