In [None]:
import numpy as np
from numpy.testing import assert_almost_equal
import numba

In [None]:
def f32(f):
    return np.array([f], dtype="float32")[0]

def f32a(fs):
    return np.array(fs, dtype="float32")

In [None]:
damage = f32a([10,10,0]) # RP descending order
rps = f32a([100,50,10]) # RP descending order
rps_r = rps[::-1].copy()
aep = np.pow(rps, -1)
T = f32(66.66666666666667)

In [None]:
def integrate_truncated_risk_initial(risk_curve, T, RPs):
    """
    Integrate the risk curve above a protection threshold T.

    Parameters:
      risk_curve : 1D numpy array of risk values corresponding to each return period in RPs.
      T          : Protection threshold for the cell (e.g., 7, 10, 100, etc.)
      RPs        : 1D numpy array of discrete return periods.

    Returns:
      Integrated risk value after truncating the risk curve at T.
    """
    # If the protection threshold is less than or equal to the smallest RP,
    # no truncation is applied.
    if T <= RPs[0]:
        new_RPs = RPs
        new_risk = risk_curve
    # If the protection threshold is above the highest RP,
    # assume full protection (i.e. no risk).
    elif T >= RPs[-1]:
        return 0.0
    else:
        # Find the first index where the discrete RP is >= T.
        idx = np.searchsorted(RPs, T, side='left')
        # If T exactly matches one of the RPs, use that risk value.
        if T == RPs[idx]:
            risk_T = risk_curve[idx]
        else:
            # Linearly interpolate between the two adjacent RPs.
            risk_T = risk_curve[idx-1] + (risk_curve[idx] - risk_curve[idx-1]) * (T - RPs[idx-1]) / (RPs[idx] - RPs[idx-1])

        # Construct a new risk curve starting at T.
        new_RPs = RPs[idx:]
        new_RPs = np.insert(new_RPs, 0, T)
        new_risk = risk_curve[idx:]
        new_risk = np.insert(new_risk, 0, risk_T)

    # Calculate the annual exceedance probabilities for the new return periods.
    new_aep = 1 / new_RPs
    # Because aep decreases with increasing RP, reverse arrays to get increasing x values.
    sorted_aep = new_aep[::-1]
    sorted_risk = new_risk[::-1]

    # Compute the integrated risk using the trapezoidal rule.
    return np.trapezoid(sorted_risk, x=sorted_aep)

In [None]:
# zero damages
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,0,0]), f32(0.), rps_r), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_initial(f32a([10,10,10]), f32(100.), rps_r), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_initial(f32a([10,10,10]), f32(1000.), rps_r), 0)

# truncate none
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,10,10]), f32(0.), rps_r), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,10,10]), f32(10.), rps_r), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_initial(f32a([5,10,10]), f32(10.), rps_r), 0.7)

# truncate up to second
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,10,10]), f32(50.), rps_r), 0.1)

# interpolate and truncate
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,10,10]), f32(30.), rps_r), 0.2)

# interpolate and truncate
assert_almost_equal(integrate_truncated_risk_initial(f32a([0,10,10]), f32(66.66666666666667), rps_r), 0.05)


In [None]:
%%timeit
integrate_truncated_risk_initial(damage[::1], T, rps_r)

In [None]:
@numba.njit
def integrate_truncated_risk_v2(risk_curve, T, RPs):
    """
    Integrate the risk curve above a protection threshold T.

    Parameters:
      risk_curve : 1D numpy array of risk values corresponding to each return period in RPs.
      T          : Protection threshold for the cell (e.g., 7, 10, 100, etc.)
      RPs        : 1D numpy array of discrete return periods.

    Returns:
      Integrated risk value after truncating the risk curve at T.
    """
    # TODO avoid branching? mask and zero out; insert interpolated value?

    # If the protection threshold is less than or equal to the smallest RP,
    # no truncation is applied.
    if T <= RPs[-1]:
        new_RPs = RPs
        protected_risk = risk_curve
    # If the protection threshold is above the highest RP,
    # assume full protection (i.e. no risk).
    elif T >= RPs[0]:
        return 0.0
    else:
        # Find the first index where the discrete RP is >= T.
        ridx = np.searchsorted(RPs[::-1], T, side='left')
        idx = (RPs.size - ridx)
        # print(T, "in", RPs, "at", idx)
        # If T exactly matches one of the RPs, use that risk value.
        if T == RPs[idx]:
            risk_T = risk_curve[idx]
        else:
            # Linearly interpolate between the two adjacent RPs.
            risk_T = risk_curve[idx-1] + (risk_curve[idx] - risk_curve[idx-1]) * ((T - RPs[idx-1]) / (RPs[idx] - RPs[idx-1]))
            # print("between", risk_curve[idx-1], risk_curve[idx], "gives", risk_T)
        # Construct a new risk curve starting at T.
        new_RPs = RPs[:idx]
        new_RPs = np.append(new_RPs, T)
        protected_risk = risk_curve[:idx]
        protected_risk = np.append(protected_risk, risk_T)

    # Calculate the annual exceedance probabilities for the new return periods.
    protected_aep = np.pow(new_RPs, -1)
    # print(protected_risk, new_RPs)

    # Compute the integrated risk using the trapezoidal rule.
    return np.trapezoid(protected_risk, x=protected_aep, dx=None)

In [None]:
# zero damages
assert_almost_equal(integrate_truncated_risk_v2(f32a([0,0,0]), f32(0), rps), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,10]), f32(100), rps), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,10]), f32(1000), rps), 0)

# truncate none
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,0]), f32(0), rps), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,0]), f32(10), rps), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,5]), f32(10), rps), 0.7)

# truncate up to second
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,0]), f32(50), rps), 0.1)

# # interpolate and truncate
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,0]), f32(30), rps), 0.2)

# interpolate and truncate
assert_almost_equal(integrate_truncated_risk_v2(f32a([10,10,0]), f32(66.66666666666667), rps), 0.05)


In [None]:
%%timeit
integrate_truncated_risk_v2(damage, T, rps)

In [None]:
@numba.njit
def integrate_truncated_risk_v3(risk_curve, T, RPs, RPs_r, aep):
    """
    Integrate the risk curve above a protection threshold T.

    Parameters:
      risk_curve : 1D numpy array of risk values corresponding to each return period in RPs.
      T          : Protection threshold for the cell (e.g., 7, 10, 100, etc.)
      RPs        : 1D numpy array of discrete return periods.

    Returns:
      Integrated risk value after truncating the risk curve at T.
    """
    # If the protection threshold is above the highest RP,
    # assume full protection (i.e. no risk).
    if T >= RPs[0]:
        return 0.0
    # Or if the highest RP risk value is zero, assume no risk
    if risk_curve[0] <= 0:
        return 0.0

    # If the protection threshold is less than or equal to the smallest RP,
    # no truncation is applied.
    if T <= RPs[-1]:
        protected_risk = risk_curve
        protected_aep = aep
    else:
        # Find the first index where the discrete RP is >= T.
        ridx = np.searchsorted(RPs_r, T, side='left')
        idx = (RPs.size - ridx)
        # If T exactly matches one of the RPs, use that risk value.
        if T == RPs[idx]:
            risk_T = risk_curve[idx]
        else:
            # Linearly interpolate between the two adjacent RPs.
            risk_T = risk_curve[idx-1] + (risk_curve[idx] - risk_curve[idx-1]) * (T - RPs[idx-1]) / (RPs[idx] - RPs[idx-1])
        # Construct a new risk curve starting at T.
        protected_risk = risk_curve[:idx+1].copy()
        protected_risk[idx] = risk_T
        # Calculate the annual exceedance probabilities for the new return periods.
        protected_aep = aep[:idx+1].copy()
        protected_aep[idx] = 1 / T

    # Compute the integrated risk using the trapezoidal rule.
    return np.trapezoid(protected_risk, x=protected_aep, dx=None)


In [None]:
# zero damages
assert_almost_equal(integrate_truncated_risk_v3(f32a([0,0,0]), f32(0), rps, rps_r, aep), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,10]), f32(100), rps, rps_r, aep), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,10]), f32(1000), rps, rps_r, aep), 0)

# truncate none
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,0]), f32(0), rps, rps_r, aep), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,0]), f32(10), rps, rps_r, aep), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,5]), f32(10), rps, rps_r, aep), 0.7)

# truncate up to second
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,0]), f32(50), rps, rps_r, aep), 0.1)

# # interpolate and truncate
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,0]), f32(30), rps, rps_r, aep), 0.2)

# interpolate and truncate
assert_almost_equal(integrate_truncated_risk_v3(f32a([10,10,0]), f32(66.66666666666667), rps, rps_r, aep), 0.05)


In [None]:
%%timeit
integrate_truncated_risk_v3(damage, T, rps, rps_r, aep)

In [None]:
from jrc_average_annual_risk_protected import integrate_truncated_risk

In [None]:
# zero damages
assert_almost_equal(integrate_truncated_risk(f32a([0,0,0]), f32(0), rps, rps_r, aep), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk(f32a([10,10,10]), f32(100), rps, rps_r, aep), 0)

# truncate all
assert_almost_equal(integrate_truncated_risk(f32a([10,10,10]), f32(1000), rps, rps_r, aep), 0)

# truncate none
assert_almost_equal(integrate_truncated_risk(f32a([10,10,0]), f32(0), rps, rps_r, aep), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk(f32a([10,10,0]), f32(10), rps, rps_r, aep), 0.5)

# truncate up to one - no effect
assert_almost_equal(integrate_truncated_risk(f32a([10,10,5]), f32(10), rps, rps_r, aep), 0.7)

# truncate up to second
assert_almost_equal(integrate_truncated_risk(f32a([10,10,0]), f32(50), rps, rps_r, aep), 0.1)

# # interpolate and truncate
assert_almost_equal(integrate_truncated_risk(f32a([10,10,0]), f32(30), rps, rps_r, aep), 0.2)

# interpolate and truncate
assert_almost_equal(integrate_truncated_risk(f32a([10,10,0]), f32(66.66666666666667), rps, rps_r, aep), 0.05)
