# Notebook for TCWV and VIWV SOM Preprocessing

By: Ty Janoski

Updated 1/2/2026

## Setup

### Imports

In [1]:
# Import Statements
import cartopy.crs as ccrs
import cmweather  # noqa: F401
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scienceplots  # noqa: F401
import xarray as xr
from dask.diagnostics.progress import ProgressBar  # noqa: F401

plt.style.use(["science", "nature", "grid"])
plt.rcParams["text.usetex"] = True


### CSV

In [2]:
# Read in CSV file with flash flood events
df = pd.read_csv("../data/storm_data_search_results.csv")

# Remove rows where EVENT_ID is not a digit
df = df[df["EVENT_ID"].astype(str).str.isdigit()]

# Turn BEGIN_TIME and END_TIME into strings with leading zeros if necessary
df["BEGIN_TIME"] = df["BEGIN_TIME"].fillna(0).astype(int).astype(str).str.zfill(4)
df["END_TIME"] = df["END_TIME"].fillna(0).astype(int).astype(str).str.zfill(4)

# Combine TIME and DATE into a single datetime string
begin_str = df["BEGIN_DATE"] + " " + df["BEGIN_TIME"] # type: ignore
end_str = df["END_DATE"] + " " + df["END_TIME"] # type: ignore

# Convert the datetime strings to pandas datetime objects
df["BEGIN_DATETIME"] = pd.to_datetime(
    begin_str, format="%m/%d/%Y %H%M", errors="coerce"
)
df["END_DATETIME"] = pd.to_datetime(end_str, format="%m/%d/%Y %H%M", errors="coerce")

# Take only the first row for each EPISODE_ID
df_unique = df.drop_duplicates(subset=["EPISODE_ID"], keep="first").copy()

# Create a list of datetimes rounded to the nearest preceding hour
event_hours = df_unique["BEGIN_DATETIME"].dt.floor("h").tolist() # type: ignore

## ERA5 Data

### Total-Column Water Vapor (TCWV)

In [None]:
# Only run the following code if you need to reprocess the ERA5 data
# --- IGNORE ---

# # Read in hourly, sliced total-column water vapor (tcwv) data
# tcwv = xr.open_mfdataset(
#     "/mnt/drive2/ERA5/NC_files/hourly_sliced/era5_pw_*.nc",
#     combine="nested",
#     concat_dim="time",
#     decode_times=True,
#     chunks={"time": 8760},
# ).rename({"var137": "tcwv"})

# # Sort by time
# tcwv = tcwv.sortby("time").tcwv.squeeze()

# # Save out
# with ProgressBar():
#     tcwv.rename("tcwv").load()

# tcwv.to_netcdf(
#     "/mnt/drive2/ERA5/NC_files/combined/era5_tcwv_hourly_warm_season_US.nc"
# )

tcwv = xr.load_dataarray(
        "/mnt/drive2/ERA5/NC_files/combined/era5_tcwv_hourly_warm_season_US.nc",
        decode_timedelta=True,
    )


In [None]:
# Group by day of year to calculate mean and std dev
mean_doy = tcwv.groupby("time.dayofyear").mean(dim="time")
std_doy = tcwv.groupby("time.dayofyear").std(dim="time")

std_doy_smooth = (
    std_doy
    .sortby("dayofyear")
    .rolling(dayofyear=14, center=True, min_periods=7)
    .mean()
)

# Calculate standardized anomalies
tcwv_anoms = tcwv.groupby("time.dayofyear") - mean_doy
tcwv_norm  = tcwv_anoms.groupby("time.dayofyear") / std_doy_smooth

# Weight by square root of cosine of latitude
weights = np.sqrt(np.cos(np.deg2rad(tcwv_norm.lat)))
tcwv_norm_weighted = tcwv_norm * weights

In [None]:
# Create an example plot of the standardized anomalies
fig, axs = plt.subplots(
    1, 3, figsize=(6.5, 3.5), subplot_kw={"projection": ccrs.PlateCarree()}, dpi=600
)

# Define plot configurations
configs = [
    {
        "data": tcwv.isel(time=0),
        "lon": tcwv.lon,
        "lat": tcwv.lat,
        "title": "Z$_{500}$",
        "cmap": "viridis",
        "levels": np.arange(0, 51, 5),
    },
    {
        "data": tcwv_norm.isel(time=0),
        "lon": tcwv_norm.lon,
        "lat": tcwv_norm.lat,
        "title": "Z$_{500}$ Standardized Anomalies",
        "cmap": "balance",
        "levels": np.arange(-3, 3.1, 0.5),
    },
    {
        "data": tcwv_norm_weighted.isel(time=0),
        "lon": tcwv_norm_weighted.lon,
        "lat": tcwv_norm_weighted.lat,
        "title": "Z$_{500}$ Standardized Anomalies Weighted",
        "cmap": "balance",
        "levels": np.arange(-3, 3.1, 0.5),
    },
]

# Create plots and colorbars
for i, config in enumerate(configs):
    c = axs[i].contourf(
        config["lon"],
        config["lat"],
        config["data"],
        cmap=config["cmap"],
        levels=config["levels"],
        extend="max",
    )
    axs[i].set_title(config["title"], fontsize=8)
    axs[i].coastlines(resolution="50m", linewidth=0.5)

    cb = fig.colorbar(c, ax=axs[i], orientation="horizontal", pad=0.03)
    if i == 0:
        cb.set_label("dam", fontsize=6)
    cb.set_ticks(c.levels)
    cb.ax.tick_params(labelsize=6, rotation=45)

plt.tight_layout()
plt.savefig(
    "figs/tcwv-SOM/tcwv_standardized_anomalies_example.png",
    dpi=600,
    bbox_inches="tight",
)
plt.close()


In [None]:
# Extract data for event hours
# The times are actual NYC local, so we need to convert them to UTC
times_local = pd.to_datetime(event_hours).tz_localize("US/Eastern")
times_utc = times_local.tz_convert("UTC").tz_convert(None)

intersect = pd.Index(times_utc).intersection(tcwv_norm_weighted.indexes["time"])
tcwv_norm_weighted_ffe = tcwv_norm_weighted.sel(time=intersect)
tcwv_norm_ffe = tcwv_norm.sel(time=intersect)
tcwv_ffe = tcwv.sel(time=intersect)

# Save out the extracted data
tcwv_norm_weighted_ffe.to_netcdf(
    "/mnt/drive2/SOM_intermediate_files/era5_tcwv_norm_weighted_ffe.nc"
)
tcwv_norm_ffe.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_tcwv_norm_ffe.nc")
tcwv_ffe.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_tcwv_ffe.nc")


### Vertically-Integrated Water Vapor Transport (VIWVN and VIWVE)

In [3]:
# Only run the following code if you need to reprocess the ERA5 data
# --- IGNORE ---

# # Read in hourly, sliced vertically integrated water vapor transport
# viwv = xr.open_mfdataset(
#     "/mnt/drive2/ERA5/NC_files/hourly_sliced/era5_viwv_*.nc",
#     combine="nested",
#     concat_dim="valid_time",
#     decode_times=True,
#     chunks={"valid_time": 8760},
# )

# # Load into memory
# with ProgressBar():
#     viwv = viwv.load()

# # Save out
# viwv.to_netcdf(
#     "/mnt/drive2/ERA5/NC_files/combined/era5_viwv_hourly_warm_season_US.nc"
# )

ds = xr.load_dataset(
    "/mnt/drive2/ERA5/NC_files/combined/era5_viwv_hourly_warm_season_US.nc",
    decode_timedelta=True,
)


In [4]:
# Add magnitude variable to dataset
ds["ivt"] = np.sqrt(ds["viwve"] ** 2 + ds["viwvn"] ** 2)

# Group by day of year to calculate mean and std dev
mean_doy = ds.groupby("valid_time.dayofyear").mean(dim="valid_time")
std_doy = ds.groupby("valid_time.dayofyear").std(dim="valid_time")

std_doy_smooth = (
    std_doy.sortby("dayofyear").rolling(dayofyear=14, center=True, min_periods=7).mean()
)

# Calculate standardized anomalies
anoms = ds.groupby("valid_time.dayofyear") - mean_doy
norm = anoms.groupby("valid_time.dayofyear") / std_doy_smooth

# Weight by square root of cosine of latitude
weights = np.sqrt(np.cos(np.deg2rad(norm.latitude)))
norm_weighted = norm * weights


In [16]:
fig, axs = plt.subplots(
    3, 3, figsize=(6.5, 6),
    subplot_kw={"projection": ccrs.PlateCarree()},
    dpi=600,
)

components = [
    ("ivt", "IVT$_{mag}$", np.arange(0, 701, 50), np.arange(-6, 6.1, 1.0), "max"),
    ("viwve", "IVT$_{x}$", np.arange(-700, 701, 100), np.arange(-6, 6.1, 1.0), "both"),
    ("viwvn", "IVT$_{y}$", np.arange(-700, 701, 100), np.arange(-6, 6.1, 1.0), "both"),
]

datasets = [
    (ds, ""),
    (norm, " Standardized Anomalies"),
    (norm_weighted, " Standardized Anomalies Weighted"),
]

for row, (var, label, raw_levels, std_levels, extend) in enumerate(components):
    for col, (dset, suffix) in enumerate(datasets):

        ax = axs[row, col]
        data = getattr(dset, var).isel(valid_time=0)

        levels = raw_levels if col == 0 else std_levels
        cmap = "viridis" if (row == 0 and col == 0) else "balance"

        c = ax.contourf(
            data.longitude,
            data.latitude,
            data,
            cmap=cmap,
            levels=levels,
            extend=extend,
        )

        ax.set_title(f"{label}{suffix}", fontsize=8)
        ax.coastlines(resolution="50m", linewidth=0.5)

        cb = fig.colorbar(c, ax=ax, orientation="horizontal", pad=0.03)
        if col == 0:
            cb.set_label("kg m$^{-1}$ s$^{-1}$", fontsize=6)

        cb.set_ticks(levels)
        cb.ax.tick_params(labelsize=6, rotation=45)

plt.tight_layout()
plt.savefig(
    "../figs/Z500-and-ivtxy-SOM/ivt_standardized_anomalies_example.png",
    dpi=600,
    bbox_inches="tight",
)
plt.close()


In [17]:
# Extract data for event hours
# The times are actual NYC local, so we need to convert them to UTC
times_local = pd.to_datetime(event_hours).tz_localize("US/Eastern")
times_utc = times_local.tz_convert("UTC").tz_convert(None)

intersect = pd.Index(times_utc).intersection(norm_weighted.indexes["valid_time"])
norm_weighted_ffe = norm_weighted.sel(valid_time=intersect)
norm_ffe = norm.sel(valid_time=intersect)
ffe = ds.sel(valid_time=intersect)

# Save out the extracted data
norm_weighted_ffe.to_netcdf(
    "/mnt/drive2/SOM_intermediate_files/era5_ivt_norm_weighted_ffe.nc"
)
norm_ffe.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_ivt_norm_ffe.nc")
ffe.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_ivt_ffe.nc")

In [5]:
# Save out daily averaged raw Z500 and unweighted standardized anomalies
ds_daily = (
    ds.resample(valid_time="1D", label="left", closed="left")
    .mean()
    .dropna(dim="valid_time", how="all")
)
norm_daily = (
    norm.resample(valid_time="1D", label="left", closed="left")
    .mean()
    .dropna(dim="valid_time", how="all")
)
norm_weighted_daily = (
    norm_weighted.resample(valid_time="1D", label="left", closed="left")
    .mean()
    .dropna(dim="valid_time", how="all")
)

ds_daily.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_ivt_daily.nc")
norm_daily.to_netcdf("/mnt/drive2/SOM_intermediate_files/era5_ivt_norm_daily.nc")
norm_weighted_daily.to_netcdf(
    "/mnt/drive2/SOM_intermediate_files/era5_ivt_norm_weighted_daily.nc"
)
