In [51]:
from pathlib import Path

import geopandas
import numpy
import pandas
import snail.damages
import snail.intersection
import snail.io
import snkit

from pyproj import Geod
from scipy.integrate import simpson
from tqdm.auto import tqdm

In [3]:
# set up variables for incoming and processed data paths
project_path = Path().resolve().parent # assume we're running from the project scripts directory, so get the parent
incoming_data_path = project_path / "incoming_data"
processed_data_path = project_path / "processed_data"

In [38]:
hazard_paths = list((incoming_data_path / "starter-data-kit" / "data" / "ZAF" / "jrc_floods").glob("*.tif"))
hazard_files = pandas.DataFrame({"path": hazard_paths})
hazard_files["key"] = [Path(path).stem for path in hazard_paths]
hazard_files.key = hazard_files.key.str.replace("floodMapGL_", "").str.replace("y__ZAF", "")
hazard_files.key = "jrc_flood_" + hazard_files.key + "_depth_m"
hazard_files, grids = snail.io.extend_rasters_metadata(hazard_files)
assert len(grids) == 1
grid = grids[0]
hazard_files

Unnamed: 0,path,key,grid_id,bands
0,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp20_depth_m,0,"(1,)"
1,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp200_depth_m,0,"(1,)"
2,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp50_depth_m,0,"(1,)"
3,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp500_depth_m,0,"(1,)"
4,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp100_depth_m,0,"(1,)"
5,/home/mert2014/projects/snail-datapkg-demos/za...,jrc_flood_rp10_depth_m,0,"(1,)"


In [14]:
power_network_path = processed_data_path / "networks" / "power" / "power_network.2025-05-01.gpkg"
network = snkit.network.read_file(power_network_path, nodes_layer="power_nodes", edges_layer="power_edges")

In [17]:
prepared = snail.intersection.prepare_linestrings(network.edges)

flood_intersections = snail.intersection.split_linestrings(prepared, grid)

# push into split_linestrings
flood_intersections = snail.intersection.apply_indices(
    flood_intersections, grid, index_i="i_0", index_j="j_0"
)

flood_intersections = snail.io.associate_raster_files(
    flood_intersections, hazard_files
)

# calculate the length of each line
geod = Geod(ellps="WGS84")
flood_intersections["length_m"] = flood_intersections.geometry.apply(
    geod.geometry_length
)

In [39]:
node_flood_intersections = snail.intersection.split_points(network.nodes.query("asset_type == 'generation'"), grid)

# push into split_linestrings
node_flood_intersections = snail.intersection.apply_indices(
    node_flood_intersections, grid, index_i="i_0", index_j="j_0"
)

node_flood_intersections = snail.io.associate_raster_files(
    node_flood_intersections, hazard_files
)

# calculate the length of each line
geod = Geod(ellps="WGS84")
node_flood_intersections["length_m"] = node_flood_intersections.geometry.apply(
    geod.geometry_length
)

In [40]:
# clip negative flood depths to zero
for colname in node_flood_intersections.columns:
    if "jrc_flood" in colname:
        node_flood_intersections[colname] = node_flood_intersections[colname].clip(0, 25)

In [41]:
damage_curves = {}
for damage_col in ["small_plant_lt_100mw_damage","medium_plant_lt_500mw_damage","large_plant_gt_500mw_damage"]:
    damage_curve = snail.damages.PiecewiseLinearDamageCurve.from_csv(
        incoming_data_path / "nirandjan-2024-vulnerabilty/power_curves.csv",
        intensity_col="depth_m",
        damage_col=damage_col
    )
    damage_curves[damage_col.split("_")[0]] = damage_curve
damage_curves.keys()

dict_keys(['small', 'medium', 'large'])

In [43]:
def classify_generation(capacity):
    if capacity > 500:
        return "large"
    if capacity > 100:
        return "medium"
    return "small"
node_flood_intersections["damage_curve_key"] = node_flood_intersections.capacity_mw.apply(classify_generation)
node_flood_intersections.head(1)

Unnamed: 0,generation_type,name,capacity_mw,asset_type,population,id,geometry,i_0,j_0,jrc_flood_rp20_depth_m,jrc_flood_rp200_depth_m,jrc_flood_rp50_depth_m,jrc_flood_rp500_depth_m,jrc_flood_rp100_depth_m,jrc_flood_rp10_depth_m,length_m,damage_curve_key
0,hydropower,Drakensberg hydroelectric plant,1000.0,generation,0.0,node_0,POINT (29.0834 -28.566),1515,773,0.0,0.0,0.0,0.0,0.0,0.0,0.0,large


In [46]:
dfs = []
for damage_curve_key, group in node_flood_intersections.groupby("damage_curve_key"):
    damage_curve = damage_curves[damage_curve_key]

    for colname in node_flood_intersections.columns:
        if "depth_m" in colname:
            damage_col = colname.replace("depth_m", "damage_fraction")
            damage_fraction = damage_curve.damage_fraction(group[colname])
            group[damage_col] = damage_fraction
    dfs.append(group)
damaged_generation = pandas.concat(dfs).query("jrc_flood_rp500_depth_m > 0")
damaged_generation

Unnamed: 0,generation_type,name,capacity_mw,asset_type,population,id,geometry,i_0,j_0,jrc_flood_rp20_depth_m,...,jrc_flood_rp100_depth_m,jrc_flood_rp10_depth_m,length_m,damage_curve_key,jrc_flood_rp20_damage_fraction,jrc_flood_rp200_damage_fraction,jrc_flood_rp50_damage_fraction,jrc_flood_rp500_damage_fraction,jrc_flood_rp100_damage_fraction,jrc_flood_rp10_damage_fraction
1,hydropower,Gariep hydroelectric plant,360.0,generation,0.0,node_1,POINT (25.5067 -30.6233),1085,1020,10.89,...,12.56,10.05,0.0,medium,0.3,0.3,0.3,0.3,0.3,0.3
5,hydropower,Vanderkloof hydroelectric plant,240.0,generation,0.0,node_5,POINT (24.7317 -29.9911),992,944,14.0,...,15.73,13.1,0.0,medium,0.3,0.3,0.3,0.3,0.3,0.3
174,solar,Lekwa-Teemane Local Municipality Dr Ruth Segom...,1.2,generation,0.0,node_174,POINT (25.6047 -27.6638),1097,665,0.0,...,0.0,0.0,0.0,small,0.0,0.0096,0.0,0.0322,0.0,0.0
197,solar,Sol Plaatje Local Municipality Frances Baard D...,1.1,generation,0.0,node_197,POINT (24.4768 -28.9918),962,825,1.3,...,1.68,1.11,0.0,small,0.107,0.1504,0.1246,0.1648,0.1374,0.0908


In [48]:
# highly approximate cost assumptions for replacement per unit power generation, drawn from Nirandjan table D3 cost IDs 28-30, converted to USD and rounded
cost_assumptions = {
    "small": 200_000_000,
    "medium": 1_000_000_000,
    "large": 1_000_000_000,
}
damaged_generation["cost_usd"] = damaged_generation.damage_curve_key.map(cost_assumptions)

In [50]:
for colname in damaged_generation.columns:
    if "damage_fraction" in colname:
        damage_col = colname.replace("damage_fraction", "damage_cost")
        damage_cost = damaged_generation[colname] * damaged_generation.cost_usd
        damaged_generation[damage_col] = damage_cost
damaged_generation

Unnamed: 0,generation_type,name,capacity_mw,asset_type,population,id,geometry,i_0,j_0,jrc_flood_rp20_depth_m,...,jrc_flood_rp500_damage_fraction,jrc_flood_rp100_damage_fraction,jrc_flood_rp10_damage_fraction,cost_usd,jrc_flood_rp20_damage_cost,jrc_flood_rp200_damage_cost,jrc_flood_rp50_damage_cost,jrc_flood_rp500_damage_cost,jrc_flood_rp100_damage_cost,jrc_flood_rp10_damage_cost
1,hydropower,Gariep hydroelectric plant,360.0,generation,0.0,node_1,POINT (25.5067 -30.6233),1085,1020,10.89,...,0.3,0.3,0.3,1000000000,300000000.0,300000000.0,300000000.0,300000000.0,300000000.0,300000000.0
5,hydropower,Vanderkloof hydroelectric plant,240.0,generation,0.0,node_5,POINT (24.7317 -29.9911),992,944,14.0,...,0.3,0.3,0.3,1000000000,300000000.0,300000000.0,300000000.0,300000000.0,300000000.0,300000000.0
174,solar,Lekwa-Teemane Local Municipality Dr Ruth Segom...,1.2,generation,0.0,node_174,POINT (25.6047 -27.6638),1097,665,0.0,...,0.0322,0.0,0.0,200000000,0.0,1920000.0,0.0,6440000.0,0.0,0.0
197,solar,Sol Plaatje Local Municipality Frances Baard D...,1.1,generation,0.0,node_197,POINT (24.4768 -28.9918),962,825,1.3,...,0.1648,0.1374,0.0908,200000000,21400000.0,30080000.0,24920000.0,32960000.0,27480000.0,18160000.0


In [60]:
rp_cols = [c for c in damaged_generation.columns if "damage_cost" in c]
def get_rp(col):
    return int(col.replace("jrc_flood_rp", "").replace("_damage_cost", ""))

rp_cols = sorted(rp_cols, key=lambda col: 1 / get_rp(col))
rps = numpy.array([get_rp(col) for col in rp_cols])
probabilities = 1 / rps
rp_damages = damaged_generation[rp_cols]
ead = simpson(rp_damages, x=probabilities, axis=1)
damaged_generation["jrc_flood_ead"] = ead

# write to CSV
damaged_generation[
    ["generation_type","name","capacity_mw","asset_type","population","id","geometry"]
    + rp_cols
    + ["jrc_flood_ead"]
].to_csv(project_path / "results" / "generation_nodes_with_damages.csv", index=False)