In [2]:
from pathlib import Path
import os
import pandas as pd
import geopandas as gpd
import numpy as np
from tqdm import tqdm
import random
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestRegressor
import country_converter as coco
from scipy.interpolate import LinearNDInterpolator, NearestNDInterpolator
from matplotlib.colors import ListedColormap, BoundaryNorm
from sovereign.flood import build_basin_curves, BasinLossCurve, risk_data_future_shift, run_simulation, extract_sectoral_losses
from sovereign.macroeconomic import run_flood_sim_for_macro
from itertools import product

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

### User Config and Inputs for ENTIRE run

In [3]:
root = Path.cwd().parent # find project root
risk_basin_path = os.path.join(root, 'outputs', 'flood', 'risk', 'basins', 'risk_basins.csv')
copula_path = os.path.join(root, 'outputs', 'flood', 'dependence', 'copulas')
risk_data = pd.read_csv(risk_basin_path)
future_rp_shifts = pd.read_csv(os.path.join(root, 'outputs', 'flood', 'future', 'basin_rp_shifts.csv'))
copula_random_numbers = pd.read_parquet(os.path.join(copula_path, "copula_random_numbers.gzip"))
macro_presim = pd.read_csv(os.path.join(root, 'outputs', 'macro', 'DIGNAD_presim_n1000_noadapt.csv'))
economic = pd.read_csv(os.path.join(root, 'inputs', 'credit_risk', 'economic.csv')) # df1
SP_data = pd.read_csv(os.path.join(root, 'inputs', 'credit_risk', 'T3.csv'))
PD_data = pd.read_csv(os.path.join(root, 'inputs', 'credit_risk', 'PD_ratings.csv'), header=None)
# Drop first "unnamed column"
risk_data = risk_data.iloc[:, 1:]
# Add AEP column
risk_data['AEP'] = 1 / risk_data['RP']
# Add a column converting current prorection level into AEP
risk_data['Pr_L_AEP'] = np.where(risk_data['Pr_L'] == 0, 0, 1 / risk_data['Pr_L']) # using numpy where avoids zero division errors
risk_data.reset_index(drop=True, inplace=True)
adaptation_aep = 0.01 # 100-year flood protection
n_years = 10000 # number of years to simulate
Thai_GDP = 496e9 # 2022 numbers in USD
# future_hydro = 'jules-w2'
# future_epoch = 2070
# future_scenario = 'ssp585'
# future_stat = 'q90'
# National GVA figures from DOSE
agr_GVA = 42880325598
man_GVA = 162659433017
ser_GVA = 316647741231
# Disaggregate output losses
TRADABLE_SHARES = {
    "Agriculture": 1.0,
    "Manufacturing": 0.7,
    "Service": 0.5,
}
EPOCHS = ['2030', '2040', '2050', '2060', '2070'] # what future epochs are we interested in?
SCENARIOS = ['ssp126', 'ssp370', 'ssp585'] # what climate scenarios are we intersted in?
STATS = ['q05', 'q50', 'q95', 'mean']

#### Prepare credit risk model

In [4]:
gdp_losses = SP_data["GDP_per_capita"] / 100

def polyfit_raw(x, y):
    coeffs = np.polyfit(x, y, 3)
    return coeffs[::-1]   # reverse to match β0 + β1x + β2x² + β3x³

# NGGD
NGGD = np.log(SP_data["NGGD"])
b_NGGD = polyfit_raw(gdp_losses, NGGD)

# GGB (filter < 0)
sub = SP_data[SP_data["GGB"] < 0]
GGB = np.log(-sub["GGB"])
b_GGB = polyfit_raw(sub["GDP_per_capita"]/100, GGB)

# NNED (filter > 0)
sub = SP_data[SP_data["NNED"] > 0]
NNED = np.log(sub["NNED"])
b_NNED = polyfit_raw(sub["GDP_per_capita"]/100, NNED)

# CAB
CAB = np.log(-SP_data["CAB"])
b_CAB = polyfit_raw(gdp_losses, CAB)

# =========================================================
# Helper: apply polynomial
# =========================================================

def equa(A, b0, b1, b2, b3):
    return b0 + b1*A + b2*A**2 + b3*A**3

# Real GDP growth (pct change)
economic["S_RealGDPgrowth"] = economic.groupby("CountryName")["S_GDPpercapitaUS"].pct_change()

economic = economic[
    [
        "CountryName","Year","scale20","S_GDPpercapitaUS",
        "S_RealGDPgrowth","S_NetGGdebtGDP","S_GGbalanceGDP",
        "S_NarrownetextdebtCARs","S_CurrentaccountbalanceGDP"
    ]
]

cc = coco.CountryConverter()
economic["ISO2"] = cc.convert(economic["CountryName"], to="ISO3")

# Build baseline data
Baseline = economic.copy()
Baseline["ln_S_GDPpercapitaUS"] = np.log(Baseline["S_GDPpercapitaUS"])

Baseline = Baseline[Baseline["Year"] > 2014]
Baseline = Baseline.dropna()

rating = PD_data.iloc[:, 0]
default = PD_data.iloc[:, 1]
b_PD = polyfit_raw(rating, default)

def implement_PD_equation(r):
    PD = equa(r, *b_PD)
    return np.clip(PD, 0, 100)

def calculate_spreads(C_0, notches):
    C_0 = np.asarray(C_0)
    notches = np.asarray(notches)
    return (-282.51 * notches) + (14.23 * notches * C_0)

def estimate_country_scenario(country_, loss_):

    # extract baseline for 2020
    gdp_per_capita = Baseline[(Baseline["CountryName"] == country_) & (Baseline["Year"] == 2020)].iloc[0]

    gdp_pc_value = gdp_per_capita["S_GDPpercapitaUS"]

    estimation = pd.DataFrame({
        "CountryName": [country_],
        "loss": [loss_]
    })

    estimation["ISO2"] = cc.convert(estimation["CountryName"], to="ISO3")
    estimation["ln_S_GDPpercapitaUS"] = np.log(gdp_pc_value * (1 - loss_))
    estimation["S_RealGDPgrowth"] = -loss_

    # baseline values
    for col in ["S_NetGGdebtGDP","S_GGbalanceGDP","S_NarrownetextdebtCARs","S_CurrentaccountbalanceGDP"]:
        estimation[col] = gdp_per_capita[col]

    A = -loss_

    # apply fitted polynomial adjustments
    estimation["S_NetGGdebtGDP"] += np.exp(equa(A, *b_NGGD))
    estimation["S_GGbalanceGDP"] += -np.exp(equa(A, *b_GGB))
    estimation["S_NarrownetextdebtCARs"] += np.exp(equa(A, *b_NNED))
    estimation["S_CurrentaccountbalanceGDP"] += -np.exp(equa(A, *b_CAB))

    return estimation

features = [
    "ln_S_GDPpercapitaUS",
    "S_RealGDPgrowth",
    "S_NetGGdebtGDP",
    "S_GGbalanceGDP",
    "S_NarrownetextdebtCARs",
    "S_CurrentaccountbalanceGDP"
]

X_train = Baseline[features]
y_train = Baseline["scale20"]

rf = RandomForestRegressor(
    n_estimators=2000,
    random_state=77,
    oob_score=True
)
rf.fit(X_train, y_train)

Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Congo D.R. not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in regex
Ras Al KhaImah not found in re

#### Prepare basin curves

In [8]:
# Parameters to loop over
future_hydro = 'jules-w2'
SCENARIOS = ["ssp126", "ssp370", "ssp585"]
EPOCHS    = [2030, 2040, 2050, 2060, 2070]   # or whatever you use
STATS     = ["q05", "q50", "q95", "mean"]
# Adjust baseline risk to future risk 
future_risk = {}  # (scenario, epoch, stat) -> adjusted risk_data
for scenario, epoch, stat in product(SCENARIOS, EPOCHS, STATS):
    future_risk[(scenario, epoch, stat)] = risk_data_future_shift(
        risk_data,
        future_rp_shifts,
        future_hydro,
        scenario,
        epoch,
        stat,
        degrade_protection=True
    )
# Build baseline curves
baseline_curves: dict[int, BasinLossCurve] = build_basin_curves(risk_data)
future_curves = {}
# Build future curves
for key, future_risk_data in future_risk.items():
    future_curves[key] = build_basin_curves(future_risk_data)

#### Run full flood simulation

In [14]:
# DIGNAD INTERPOLATOR
# Create interpolators
X = macro_presim[['dY_T', 'dY_N', 'dK_priv', 'dK_pub']].values
y = macro_presim['gdp_avg'].values

linear_interp = LinearNDInterpolator(X, y)
nearest_interp = NearestNDInterpolator(X, y)

# Interpolation function for monte carlo analysis
def interpolate_gdp(params):
    gdp = linear_interp(params)
    if np.isnan(gdp): # if outside convex hull
        gdp = nearest_interp(params)
    if np.all(params == 0): # no disaster
        gdp = 0.0
    return gdp

In [15]:
# Functions
def flood_aggregates_one_year(basin_curves, random_ns_row):
    """
    Returns annual monetary losses aggregated across basins for the 5 sectors needed downstream.
    """
    ag = man = serv = priv = pub = 0.0

    for basin_id, curve in basin_curves.items():
        basin_str = str(int(basin_id))
        if basin_str not in random_ns_row:
            continue

        aep_event = 1 - random_ns_row[basin_str]

        ag   += curve.loss_at_event_aep(aep_event, scenario="baseline", sector="Agriculture")
        man  += curve.loss_at_event_aep(aep_event, scenario="baseline", sector="Manufacturing")
        serv += curve.loss_at_event_aep(aep_event, scenario="baseline", sector="Service")
        priv += curve.loss_at_event_aep(aep_event, scenario="baseline", sector="Private")
        pub  += curve.loss_at_event_aep(aep_event, scenario="baseline", sector="Public")

    return ag, man, serv, priv, pub

def run_integrated_yearwise(
    country_: str,
    baseline_curves: dict,
    future_curves: dict,              # (scenario, epoch, stat) -> basin_curves
    copula_random_ns: pd.DataFrame,
    n_years: int,
    # mapping constants
    agr_GVA: float,
    man_GVA: float,
    ser_GVA: float,
    tradable_shares: dict,
    thai_gdp: float,
):
    # Combine curve sets: baseline + futures
    curve_sets = {"baseline": baseline_curves, **future_curves}

    # Precompute denominators for macro mapping
    tradable_output_baseline = (
        agr_GVA * tradable_shares["Agriculture"] +
        man_GVA * tradable_shares["Manufacturing"] +
        ser_GVA * tradable_shares["Service"]
    )
    nontrad_output_baseline = (
        agr_GVA * (1 - tradable_shares["Agriculture"]) +
        man_GVA * (1 - tradable_shares["Manufacturing"]) +
        ser_GVA * (1 - tradable_shares["Service"])
    )

    # Pull baseline rating once (like your function does)
    original_rating = float(
        Baseline.loc[
            (Baseline["CountryName"] == country_) &
            (Baseline["Year"] == 2020),
            "scale20"
        ].iloc[0]
    )
    original_pd = implement_PD_equation(original_rating)

    rows = []

    for t in tqdm(range(n_years), desc="Simulating years"):
        random_ns_row = copula_random_ns.loc[t]

        for key, basin_curves in curve_sets.items():
            # --- 1) FLOOD ---
            ag, man, serv, priv, pub = flood_aggregates_one_year(basin_curves, random_ns_row)

            # --- 2) MAP TO MACRO PARAMS (what interpolate_gdp expects) ---
            # shocks as *fractions* (matching your DIGNAD-ish mapping)
            trad_out = (
                ag  * tradable_shares["Agriculture"] +
                man * tradable_shares["Manufacturing"] +
                serv* tradable_shares["Service"]
            )
            nontrad_out = (
                ag  * (1 - tradable_shares["Agriculture"]) +
                man * (1 - tradable_shares["Manufacturing"]) +
                serv* (1 - tradable_shares["Service"])
            )

            dY_T   = trad_out / tradable_output_baseline
            dY_N   = nontrad_out / nontrad_output_baseline
            dK_priv= priv / thai_gdp
            dK_pub = pub  / thai_gdp

            # Build params vector in the order your interpolator expects.
            # You’ll need to align this with how linear_interp() was trained.
            params = np.array([dY_T, dY_N, dK_priv, dK_pub], dtype=float)

            # --- 3) MACRO (GDP loss) ---
            gdp_loss = float(interpolate_gdp(params))

            # --- 4) CREDIT ---
            # Your estimate_country_scenario expects a "loss_" scalar. Here we pass gdp_loss.
            estimation = estimate_country_scenario(country_, (gdp_loss * -1 / 100))
            predicted_rating = float(rf.predict(estimation[features])[0])
            predicted_pd = float(implement_PD_equation(predicted_rating))
            spread_delta = float(calculate_spreads(original_rating, (predicted_rating - original_rating)))

            # --- store ---
            if key == "baseline":
                scenario = "baseline"
                epoch = stat = None
            else:
                scenario, epoch, stat = key  # key is (ssp, epoch, stat)

            rows.append({
                "year_index": t,
                "scenario": scenario,
                "epoch": epoch,
                "stat": stat,
                # flood outputs (optional, but useful)
                "AGR_loss": ag,
                "MAN_loss": man,
                "SER_loss": serv,
                "PRI_dam": priv,
                "PUB_dam": pub,
                # macro shocks
                "dY_T": dY_T,
                "dY_N": dY_N,
                "dK_priv": dK_priv,
                "dK_pub": dK_pub,
                "gdp_loss": gdp_loss,
                # credit outputs
                "original_rating": original_rating,
                "predicted_rating": predicted_rating,
                "original_pd": original_pd,
                "predicted_pd": predicted_pd,
                "spread_delta": spread_delta,
            })

    return pd.DataFrame(rows)

In [21]:
full_sim = run_integrated_yearwise(
    country_="Thailand",
    baseline_curves=baseline_curves,
    future_curves=future_curves,
    copula_random_ns=copula_random_numbers,
    n_years=100,
    agr_GVA=agr_GVA,
    man_GVA=man_GVA,
    ser_GVA=ser_GVA,
    tradable_shares=TRADABLE_SHARES,
    thai_gdp=Thai_GDP
)

Simulating years: 100%|██████████████████████████████████████████████████████████████| 100/100 [15:27<00:00,  9.27s/it]


In [23]:
full_sim.to_csv('test_file.csv')

In [39]:
full_sim.to_csv('full_sim_n1000.csv')

In [28]:
gdp_loss = -1.0
estimation = estimate_country_scenario('Thailand',gdp_loss)
predicted_rating = float(rf.predict(estimation[features])[0])
print(predicted_rating)

13.396
