In [15]:
import os
import multiprocessing
import logging
from typing import Optional
import sys
import itertools

import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import rioxarray
import xarray as xr

import os
import multiprocessing
import logging
from typing import Optional
import sys

import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import rioxarray
import xarray as xr

from open_gira.io import bit_pack_dataarray_encoding
from open_gira.wind import (
    estimate_wind_field, interpolate_track, empty_wind_da, WIND_COORDS,
    ENV_PRESSURE
)

from open_gira.wind_plotting import plot_contours, animate_track

In [16]:
# Functions 

def cleanup(output_path: str):
    """
    If we don't have a network, or tracks and we can't continue, write empty
    file and quit.
    """
    empty_wind_da().to_netcdf(output_path)
    sys.exit(0)


def process_track(
    track: pd.core.groupby.generic.DataFrameGroupBy,
    longitude: np.ndarray,
    latitude: np.ndarray,
    downscaling_factors: np.ndarray,
    plot_max_wind: bool,
    plot_animation: bool,
    plot_dir: Optional[str]
) -> tuple[str, np.ndarray]:
    """
    Interpolate a track, reconstruct the advective and rotational vector wind
    fields, sum them and take the maximum of the wind vector magnitude across
    time. Optionally plot the wind fields and save to disk.

    Args:
        track: Subset of DataFrame describing a track. Must have a temporal
            index and the following fields: `min_pressure_hpa`,
            `max_wind_speed_ms`, `radius_to_max_winds_km`.
        longitude: Longitude values to construct evaluation grid
        latitude: Latitude values to construct evaluation grid
        downscaling_factors: Factors to bring gradient-level winds to surface.
        plot_max_wind: Whether to plot max wind fields
        plot_animation: Whether to plot wind field evolution
        plot_dir: Where to save optional plots.

    Returns:
        str: Track identifier
        np.ndarray: 2D array of maximum wind speed experienced at each grid pixel
    """

    track_id, = set(track.track_id)

    logging.info(track_id)

    grid_shape: tuple[int, int] = (len(latitude), len(longitude))

    # we can't calculate the advective component without at least two points
    if len(track) == 1:
        return track_id, np.zeros(grid_shape)

    # basin of first record for storm track (storm genesis for synthetic tracks)
    basin: str = track.iloc[0, track.columns.get_loc("basin_id")]

    # interpolate track (avoid 'doughnut effect' of wind field from infrequent eye observations)
    try:
        track: gpd.GeoDataFrame = interpolate_track(track)
    except AssertionError:
        logging.warning(f"Could not successfully interpolate {track_id}")
        return track_id, np.zeros_like(downscaling_factors)

    # forward azimuth angle and distances from track eye to next track eye
    geod_wgs84: pyproj.Geod = pyproj.CRS("epsg:4326").get_geod()
    advection_azimuth_deg, _, eye_step_distance_m = geod_wgs84.inv(
        track.geometry.x.iloc[:-1],
        track.geometry.y.iloc[:-1],
        track.geometry.x.iloc[1:],
        track.geometry.y.iloc[1:],
    )

    # gapfill last period/distance values with penultimate value
    period = track.index[1:] - track.index[:-1]
    period = period.append(period[-1:])
    eye_step_distance_m = [*eye_step_distance_m, eye_step_distance_m[-1]]
    track["advection_azimuth_deg"] = [*advection_azimuth_deg, advection_azimuth_deg[-1]]

    # calculate eye speed
    track["eye_speed_ms"] = eye_step_distance_m / period.seconds.values

    # result array
    wind_field: np.ndarray = np.zeros((len(track), *grid_shape), dtype=complex)

    for track_i, track_point in enumerate(track.itertuples()):

        try:
            wind_field[track_i, :] = estimate_wind_field(
                longitude,  # degrees
                latitude,  # degrees
                track_point.geometry.x,  # degrees
                track_point.geometry.y,  # degrees
                track_point.radius_to_max_winds_km * 1_000,  # convert to meters
                track_point.max_wind_speed_ms,
                track_point.min_pressure_hpa * 100,  # convert to Pascals
                ENV_PRESSURE[basin] * 100,  # convert to Pascals
                track_point.advection_azimuth_deg,
                track_point.eye_speed_ms,
            )
        except AssertionError:
            logging.warning(f"{track_id} failed wind field estimation for {track_i + 1} of {len(track)}, writing zeros")

    # take factors calculated from surface roughness of region and use to downscale speeds
    downscaled_wind_field = downscaling_factors * wind_field

    # find vector magnitude, then take max along timestep axis, giving (y, x)
    # N.B. np.max([np.nan, 1]) = np.nan, so use np.nanmax
    max_wind_speeds: np.ndarray[float] = np.nanmax(np.abs(downscaled_wind_field), axis=0)

    # any dimensions with a single cell will break the plotting routines
    if 1 not in grid_shape:

        if plot_max_wind:
            plot_contours(
                max_wind_speeds,
                f"{track_id} max wind speed",
                "Wind speed [m/s]",
                os.path.join(plot_dir, f"{track_id}_max_contour.png")
            )

        if plot_animation:
            animate_track(
                downscaled_wind_field,
                track,
                os.path.join(plot_dir, f"{track_id}.gif")
            )

    return track_id, max_wind_speeds

In [17]:
# Define inputs (from Snakemake originally)
storm_file_path = '/home/mark/projects/open-gira/results/direct/Bahamas/IBTrACS/0/tracks.geoparquet'
wind_grid_path = '/home/mark/projects/open-gira/results/direct/Bahamas/wind_grid.tiff'
surface_roughness_path = '/home/mark/projects/open-gira/results/direct/Bahamas/surface_roughness_mangroves_test.tif'
# storm_set: set[str] = set(snakemake.params.storm_set)
# plot_max_wind: bool = snakemake.config["plot_wind"]["max_speed"]
# plot_animation: bool = snakemake.config["plot_wind"]["animation"]
n_proc = 1 # normally 4 DEBUG
output_path = '/home/mark/projects/open-gira/results/direct/Bahamas/max_wind_field_IBTrACS_BHS_mangroves.nc'
downscale_path = '/home/mark/projects/open-gira/results/direct/Bahamas/downscale_factors.npy'
# Boolean arguments (do we want to plot - specify directory if so)
plot_dir_path = '/home/mark/projects/open-gira/results/direct/Bahamas/plots_STORM'
plot_max_wind = False
plot_animation = False

In [18]:
# Read tracks
tracks = gpd.read_parquet(storm_file_path)
if tracks.empty:
    print('No tracks found')

In [19]:
# filter tracks that don't make landfall
landfall_mask = tracks.groupby('track_id')['landfall'].any() # create a mask identifying track_ids that make landfall
landfall_tracks = landfall_mask[landfall_mask].index # filter the mask for only track_ids
tracks_filtered = tracks[tracks['track_id'].isin(landfall_tracks)]

In [20]:
# Group tracks
grouped_tracks = tracks_filtered.groupby('track_id')

In [21]:
# grid to evaluate wind speeds on, rioxarray will return midpoints of raster cells as dims
grid: xr.DataArray = rioxarray.open_rasterio(wind_grid_path)

In [22]:
# surface roughness raster for downscaling winds with
surface_roughness_raster: xr.DataArray = rioxarray.open_rasterio(surface_roughness_path)
# (1, y, x) where 1 is the number of surface roughness bands
# select the only value in the band dimension
surface_roughness: np.ndarray = surface_roughness_raster[0].values

In [23]:
downscaling_factors = np.load(downscale_path)

In [24]:
# track is a tuple of track_id and the tracks subset, we only want the latter
args = ((track[1], grid.x, grid.y, downscaling_factors, plot_max_wind, plot_animation, plot_dir_path) for track in grouped_tracks)

In [25]:
# Begin wind field estimation (revert to args from args_temp when not testing)
print('Estimating wind fields for %s storm tracks' % len(grouped_tracks))
max_wind_speeds: list[str, np.ndarray] = []
if n_proc > 1:
    with multiprocessing.Pool(processes=n_proc) as pool:
        max_wind_speeds = pool.starmap(process_track, args)
else:
    for arg in args:
        max_wind_speeds.append(process_track(*arg))

Estimating wind fields for 32 storm tracks




In [26]:
# sort by track_id so we have a reproducible order even after multiprocessing
max_wind_speeds = sorted(max_wind_speeds, key=lambda pair: pair[0])

In [27]:
# Saving maximum wind speeds to disk
track_ids, fields = zip(*max_wind_speeds)

In [28]:
# write to disk as netCDF with CRS
da = xr.DataArray(
    data=np.stack(fields),
    dims=WIND_COORDS.keys(),
    coords=(
        ("event_id", list(track_ids)),
        ("latitude", grid.y.values),
        ("longitude", grid.x.values),
    ),
    attrs=dict(
        description="Maximum estimated wind speed during event",
        units="m s-1",
    ),
    name="max_wind_speed",
)
da = da.rio.write_crs("EPSG:4326")
encoding = {"max_wind_speed": {"zlib": True, "complevel": 9}}
da.to_netcdf(output_path, encoding=encoding)