In [None]:
from pathlib import Path

import numpy as np
import rasterio

In [None]:
def read_rp_map(fname: Path) -> np.ndarray:
    """Read flood map tiff to ndarray
    """
    with rasterio.open(fname) as dataset:
        data = dataset.read(1)
        data[data == dataset.nodata] = 0

    return data

def read_metadata(fname):
    with rasterio.open(fname) as dataset:
        crs = dataset.crs
        width = dataset.width
        height = dataset.height
        transform = dataset.transform
    return crs, width, height, transform

def save_to_tif(data, fname, crs, transform):
    with rasterio.open(
        fname,
        "w",
        driver="GTiff",
        height=data.shape[0],
        width=data.shape[1],
        count=1,
        dtype=data.dtype,
        crs=crs,
        transform=transform,
        compress="lzw",
    ) as dataset:
        dataset.write(data, 1)

def read_rp_maps(rp_maps: dict[float, Path]) -> dict[float, np.ndarray]:
    """Read flood map tiffs to dict of ndarrays
    """
    rp_data: dict[float, np.ndarray] = {}
    for rp, fname in rp_maps.items():
        rp_data[rp] = read_rp_map(fname)
    return rp_data

def interpolate_depth(rp: float, rp_l: float, depth_l: np.ndarray, rp_u: float, depth_u: np.ndarray) -> np.ndarray:
    """Interpolate between two flood map ndarrays
    """
    rp_factor = (np.log(rp) - np.log(rp_l)) / (np.log(rp_u) - np.log(rp_l))
    depth = depth_l + ((depth_u - depth_l) * rp_factor)

    return depth

def pick_upper_lower_rps(rp: float, rps: list[float]) -> tuple[float]:
    bin_index = np.searchsorted(rps, rp, side="left")
    rp_l = rps[bin_index - 1]
    rp_u = rps[bin_index]
    return rp_l, rp_u

def calculate_rp_maps(rps_to_calculate, rps_input):
    # Read all data
    depths = read_rp_maps(rps_input)
    baseline_rps = list(depths.keys())
    # Read metadata
    crs, width, height, transform = read_metadata(rps_input[baseline_rps[0]])
    depths[1e-3] = np.zeros((height, width))
    depths[2.0] = np.zeros((height, width))
    depths[1e6] = depths[max(baseline_rps)]
    baseline_rps = [1e-3, 2] + sorted(baseline_rps) + [1e6]

    for rp, output_fname in rps_to_calculate.items():
        rp_l, rp_u = pick_upper_lower_rps(rp, baseline_rps)
        print(rp_l, rp_u)
        depth = interpolate_depth(rp, rp_l, depths[rp_l], rp_u, depths[rp_u])
        save_to_tif(depth, output_fname, crs, transform)

In [None]:
rps_to_calculate = {
    2: "JM_FLRF_UD_Q2_RD_02-aligned.tif",
    5: "JM_FLRF_UD_Q5_RD_02-aligned.tif",
    10: "JM_FLRF_UD_Q10_RD_02-aligned.tif"
}
rps_input = {
    20: "JM_FLRF_UD_Q20_RD_02-aligned.tif",
    50: "JM_FLRF_UD_Q50_RD_02-aligned.tif",
    100: "JM_FLRF_UD_Q100_RD_02-aligned.tif",
    200: "JM_FLRF_UD_Q200_RD_02-aligned.tif",
    500: "JM_FLRF_UD_Q500_RD_02-aligned.tif",
}
calculate_rp_maps(rps_to_calculate, rps_input)

In [None]:
rps_to_calculate = {
    10: "JM_FLRF_UD_Q10_RD_02_flow0.8.tif",
    20: "JM_FLRF_UD_Q20_RD_02_flow0.8.tif",
    50: "JM_FLRF_UD_Q50_RD_02_flow0.8.tif",
    100: "JM_FLRF_UD_Q100_RD_02_flow0.8.tif",
}
rps_input = {
    4.8: "JM_FLRF_UD_Q2_RD_02-aligned.tif",
    33: "JM_FLRF_UD_Q10_RD_02-aligned.tif",
    173: "JM_FLRF_UD_Q50_RD_02-aligned.tif",
    346: "JM_FLRF_UD_Q100_RD_02-aligned.tif",
}
calculate_rp_maps(rps_to_calculate, rps_input)