# Use Case 4 – Adaptation to societal risk of geohazards through schools and hospitals in Greece

In [2]:
# HIDE CODE
# Import necessary libraries
import os
import sys
import warnings
import inspect
import pandas as pd
import numpy as np
import geopandas as gpd
import rasterio
from shapely.geometry import Point
from scipy.stats import lognorm
from rasterstats import point_query
from rasterio.warp import calculate_default_transform, reproject, Resampling
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from matplotlib import rcParams
from matplotlib.lines import Line2D
from matplotlib import font_manager
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from rasterio.features import shapes
from matplotlib.colors import ListedColormap
import gc 
from pathlib import Path
from shapely import wkt

# Ignore all warnings
warnings.filterwarnings("ignore")


## General Comments

- This notebook follows the MIRACA Use Case template structure to ensure consistency across the MIRACA Python Book.
- The workflow is demonstrated for a subset of assets (Schools or Hospitals) within Central Macedonia (Greece).
- Output files are saved in a structured `Results/` directory to facilitate reproducibility and reuse.

## Introduction

- This Use Case assesses the societal risk of schools and hospitals in Central Macedonia (Greece), considering earthquake and earthquake-triggered landslide hazards.

- The analysis combines hazard, exposure, and vulnerability information to estimate potential physical damage, human losses, and economic impacts for different return periods.

- Relevant stakeholders include: The Civil Protection authorities of the Region of Central Macedonia and the Municipality of Thessaloniki, Greece.

- The results support climate adaptation planning and risk-informed decision-making for critical infrastructures such as educational and healthcare facilities.

## Directory structure

This notebook assumes the following directory structure:

- `data/`  
  Input datasets (exposure, hazard, fragility/vulnerability tables, administrative boundaries).
- `Results/`  
  Output folder automatically created by the notebook.
  - `Results/<asset>/Single-Hazard/`  
    Outputs for earthquake-only calculations.
  - `Results/<asset>/Multi-Hazard/`  
    Outputs for earthquake + landslide calculations.

> Note: `<asset>` is either `Schools` or `Hospitals` depending on the selection below.


Select whether you want to compute the risk assessment for educational buildings (asset = "Schools") or healthcare facilities (asset = "Hospitals").


In [14]:
asset = "Schools"

Define the folder where the data is available and where the results will be saved.

In [15]:

data_path = Path(os.getcwd()).parent / 'data'

savepath = Path(os.getcwd()).parent / 'Results' / asset


The following color variables are defined for the Miraca theme. These colors will be used for visualizations. The font settings for the plots are also adjusted for consistency.

In [16]:
# miraca colors
c1 = '#4069F6' # primary blue 500 
c2 = '#171E37' # black
c3 = '#64F4C0' # accent green 
c4 = '#FFFFFF' # white
c5 = '#ED5861' # red
c6 = '#F8CD48' # yellow
c7 = '#72DA95' # green 500
c8 = '#373D52' # grey 900
c9 = '#8F94A3' # grey 500
c10 = '#EBEDF5' # grey 100
c11 = '#72DA95' # green
c12 = '#373D52' # blue 900
c13 = '#6687F8' # blue 400
c14 = '#EBEDF5' # blue 100
c15 = '#373D52' # 429787 900
c16 = '#9CF8D7' # green 400
c17 = '#E0FDF2' # green 100

# Adjust font settings
mpl.rc('font', family='Calibri')
font = {'family': 'Calibri', 'weight': 'bold'}
rcParams['mathtext.default'] = 'regular'
rcParams['mathtext.rm'] = font['family']

## Hazard characterisation

Data description – Earthquakes

- Seismic hazard information is derived from the European Seismic Hazard Model 2020 (ESHM20), which provides harmonised probabilistic earthquake hazard estimates for the Euro-Mediterranean region. Peak Ground Acceleration (PGA) and spectral accelerations Sa(T) at periods T = 0.3 s, 0.6 s, 1.0 s and 1.5 s are considered for multiple return periods (73, 102, 475, 1000, 2500 and 5000 years). Hazard values are extracted for the Region of Central Macedonia, Greece.

- Ground-surface intensity measures are estimated using the OpenQuake Engine through probabilistic seismic hazard analysis (PSHA), accounting for local site effects. Site amplification is incorporated using the ESRM20 Exposure-to-Site model.

- Single hazard / Multi-hazards considered 
  - Single hazard: earthquake ground shaking (PGA and spectral accelerations).  
  - Multi-hazard: earthquake-triggered landslides, combined with seismic hazard in subsequent sections of the analysis.

In [6]:
""" This dictionary initializes a DataFrame containing seismic return period data. 
The 'No' column represents a sequential identifier for each seismic scenario. 
The 'Return_Period' column specifies the return period in years, 
and the '1/years' column provides the corresponding annual probability of occurrence (i.e., the inverse of the return period). 
The resulting DataFrame, Scenaria, captures these seismic scenario details. """


data = {'No': [0,1,2,3,4,5],
        'Return_Period': [73,102,475,1000,2500,5000],
        '1/years': [0.0137,0.0098,0.0021,0.001,0.0003,0.0001]
}

Scenaria = pd.DataFrame(data)

Load & visualise hazard data

In [None]:
# Load hazard data
csv2_path = data_path / "Hazard_Central_Macedonia.csv"

# Load the CSV file into pandas DataFrames
df_hazard = pd.read_csv(csv2_path)

# Column names for longitude and latitude
longitude_col2 = 'lon'
latitude_col2 = 'lat'

# Convert the DataFrame to GeoDataFrames
gdf_hazard = gpd.GeoDataFrame(df_hazard, geometry=gpd.points_from_xy(df_hazard[longitude_col2], df_hazard[latitude_col2]))

# Set the CRS 
gdf_hazard.set_crs(epsg=4326, inplace=True)

gdf_hazard.explore(
    column="PGA-0.001",
    cmap="OrRd",
    legend=True,
    legend_kwds={
        'caption': 'PGA (T=1000 years)',
        'orientation': 'horizontal'  # optional: 'horizontal' or 'vertical'
    },
    marker_kwds={
        'radius': 4,
        'fill': True
    }
)

## Exposure Assessment

Exposure data correspond to educational and healthcare facilities located in the Region of Central Macedonia, Greece.

- Study area: Region of Central Macedonia, Greece.
- Assets considered: Schools and major healthcare facilities (hospitals).
- Educational facilities achieve high spatial coverage across the study area.
- Healthcare facilities focus on the main hospitals located within the Metropolitan area of Thessaloniki.
- Assets are selected based on the availability of reliable exposure information to ensure robust risk assessment results.

Load & visualise exposure data

In [None]:
# Load exposure data
if asset == "Schools":
    csv1_path = data_path / "Exposure_Riskschools.csv"
elif asset == "Hospitals":
    csv1_path = data_path / "Exposure_Hospitals.csv"
else:
    print("Invalid asset type. Please choose either 'Schools' or 'Hospitals'.")
    sys.exit(1)


# Load the CSV file into pandas DataFrames
df_exposure= pd.read_csv(csv1_path)

# Column names for longitude and latitude
longitude_col1 = 'Longitude'
latitude_col1 = 'Latitude'

# Convert the DataFrame to GeoDataFrames
gdf_exposure = gpd.GeoDataFrame(df_exposure, geometry=gpd.points_from_xy(df_exposure[longitude_col1], df_exposure[latitude_col1]))

# Set the CRS  
gdf_exposure.set_crs(epsg=4326, inplace=True)

gdf_exposure.explore(
    marker_kwds={
        "radius": 4,  # Scale marker size (adjust as needed)
        "fill": True
    },
    color=c1,                         # Color map: Orange-Red (you can change it)
    legend=True                          # Show legend
)


The following csv file contains the spatially joined data of exposure and hazard points, with the nearest hazard point coordinates added for each exposure point.

In [20]:
# Perform the spatial join using the nearest neighbor method
gdf_exposure_w_hazard = gpd.sjoin_nearest(gdf_exposure, gdf_hazard, how="left", distance_col="distance")

# Add nearest x and y coordinates from gdf2

gdf_exposure_w_hazard['nearest_x'] = gdf_exposure_w_hazard[longitude_col2]  # Nearest longitude from gdf2
gdf_exposure_w_hazard['nearest_y'] = gdf_exposure_w_hazard[latitude_col2]   # Nearest latitude from gdf2

# Ensure the directory exists before saving
savepath.mkdir(parents=True, exist_ok=True)

# Save the joined GeoDataFrame to CSV
gdf_exposure_w_hazard.to_csv(savepath / f"Joined_nearest_hazard_{asset}.csv", index=False)


gdf_exposure_w_hazard.explore(
    column="PGA-0.001",
    cmap="OrRd",
    legend=True,
    legend_kwds={
        'caption': 'PGA (T=1000 years)',
        'orientation': 'horizontal'  # optional: 'horizontal' or 'vertical'
    },
    marker_kwds={
        'radius': 4,
        'fill': True
    },
    tooltip="PGA-0.001",
)

## Landslide hazard

Landslide hazard is represented using the European Landslide Susceptibility Map (ELSUS v2) for the Region of Central Macedonia, Greece.

- Landslide susceptibility information is derived from the ELSUS v2 dataset.
- Raster susceptibility classes are preprocessed and converted into polygon format.
- A spatial nearest-neighbour join is performed to associate each exposure point with the corresponding landslide susceptibility class.
- The susceptibility index is later used for the estimation of earthquake-triggered landslide impacts within the multi-hazard assessment.

In [21]:
# Load the CSV
csv3_path = data_path / f"Joined_nearest_susceptibility_{asset}.csv"
# there is only data for Schools. Might be interesting to indeed add both datasets.

susc = pd.read_csv(csv3_path)

# Convert WKT string to geometry (assuming the column is named 'geometry')
susc['geometry'] = susc['geometry'].apply(wkt.loads)

# Convert to GeoDataFrame
susc_gdf = gpd.GeoDataFrame(susc, geometry='geometry')

# Set the correct CRS if known (replace EPSG:4326 if different)
susc_gdf.set_crs("EPSG:4326", inplace=True)

Unnamed: 0,Longitude,Latitude,id,Name,Counsil,Material,Lateral Loading system,Code level,Floors,Lateral Coefficiency,Intensity,Typology,Area,geometry,index_right,raster_value,distance_to_raster
0,22.538980,40.511320,72,3ο Νηπιαγωγείο Αιγινίου,ΠΥΔΝΑΣ-ΚΟΛΙΝΔΡΟΥ,Precast,LFINF: infilled frame,CDH,1,,SA(1.5),Precast-CDH,107.60,POINT (22.53898 40.51132),4044155,1.0,0.0
1,22.543110,40.498490,73,1ο Δημοτικό Αιγινίου,ΠΥΔΝΑΣ-ΚΟΛΙΝΔΡΟΥ,CR: reinforced concrete,LFINF: infilled frame,CDH,1,10.0,PGA,CR_LFINF-CDH-10_H1,1008.00,POINT (22.54311 40.49849),4041080,2.0,0.0
2,22.911365,40.734834,82,1ο Νηπιαγωγείο Ωραιοκάστρου,ΩΡΑΙΟΚΑΣΤΡΟΥ,MUR: unreinforced masonry,LWAL: load bearing wall,CDN,1,,PGA,MUR_LWAL-DNO_H1,450.00,POINT (22.91136 40.73483),3972329,3.0,0.0
3,22.919226,40.723864,83,2ο Γυμνάσιο Ωραιοκάστρου,ΩΡΑΙΟΚΑΣΤΡΟΥ,CR: reinforced concrete,LFINF: infilled frame,CDH,3,15.0,SA(0.6),CR_LFINF-CDH-15_H3,3408.00,POINT (22.91923 40.72386),3989309,2.0,0.0
4,22.993852,40.612094,84,3ο Δημοτικό Σχολείο Πυλαίας,ΠΥΛΑΙΑΣ-ΧΟΡΤΙΑΤΗ,CR: reinforced concrete,LDUAL: dual frame-wall system,CDH,2,,PGA,CR_LDUAL-DUH_H2,2010.00,POINT (22.99385 40.61209),4017328,2.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1713,22.456130,40.222693,2024,Λύκειο Κονταριώτισσας - Κτίριο Β,ΔΙΟΥ-ΟΛΥΜΠΟΥ,CR: reinforced concrete,LDUAL: dual frame-wall system,CDH,2,,PGA,CR_LDUAL-DUH_H2,1024.80,POINT (22.45613 40.22269),4097147,3.0,0.0
1714,22.456130,40.222693,2025,Λύκειο Κονταριώτισσας - Κτίριο Γ,ΔΙΟΥ-ΟΛΥΜΠΟΥ,CR: reinforced concrete,LDUAL: dual frame-wall system,CDH,2,,PGA,CR_LDUAL-DUH_H2,1160.90,POINT (22.45613 40.22269),4097147,3.0,0.0
1715,22.456130,40.222693,2026,Λύκειο Κονταριώτισσας - Κτίριο Δ,ΔΙΟΥ-ΟΛΥΜΠΟΥ,CR: reinforced concrete,LDUAL: dual frame-wall system,CDH,2,,PGA,CR_LDUAL-DUH_H2,225.36,POINT (22.45613 40.22269),4097147,3.0,0.0
1716,22.456130,40.222693,2027,Λύκειο Κονταριώτισσας - Κτίριο Ε,ΔΙΟΥ-ΟΛΥΜΠΟΥ,S: steel,LFBR: braced frame,CDH,1,,PGA,S_LFBR-DUH_H1,956.37,POINT (22.45613 40.22269),4097147,3.0,0.0


Visualise susceptibility map

In [22]:
# Step 1: Ensure raster_value is treated as categorical with specific order
susc_gdf["Susceptibility_index"] = pd.Categorical(
    susc_gdf["raster_value"],
    categories=[1, 2, 3, 4, 5],
    ordered=True
)

# Step 2: Define custom colors for each class (adjust as needed)
colors = ['#ffffcc', '#a1dab4', '#41b6c4', '#2c7fb8', 'red']  # Light yellow to dark blue

# Step 3: Plot with .explore using the categorical column and custom colors
susc_gdf.explore(
    column="Susceptibility_index",
    cmap=ListedColormap(colors),
    legend=True,
    legend_kwds={
        'caption': 'Susceptibility Index',
        'orientation': 'horizontal'
    },
    marker_kwds={
        'radius': 4,
        'fill': True,
    },
    text="Susceptibility_index"
)


## Combined exposure and hazard dataset

Exposure points are linked with the corresponding earthquake intensity measures and landslide susceptibility information.  
The resulting dataset combines exposure, seismic hazard, and landslide susceptibility attributes and is used as input for the subsequent vulnerability and risk assessment steps.

In [23]:
# Load Exposure and Hazard Data
df = pd.read_csv(savepath / f"Joined_nearest_hazard_{asset}.csv")
# Load the CSV file with the new column
susc = pd.read_csv(data_path / f"Joined_nearest_susceptibility_{asset}.csv")
df['SAMPLE_1'] = susc['raster_value']

## Vulnerability Assessment

Seismic vulnerability of educational and healthcare facilities is assessed using building-class specific fragility and vulnerability models.

- Vulnerability and fragility functions are primarily derived from the ESRM20 methodology developed within the European seismic risk framework.
- Additional literature-based vulnerability models are adopted for reinforced concrete precast buildings (e.g., Yesilyurt et al., 2021), where required.
- Vulnerability functions relate ground-motion intensity measures (PGA and spectral accelerations) to damage state probabilities.
- Damage states are subsequently converted into damage index (loss ratio) values used in the risk assessment.

Extract values for fragility based on Typology.

In [13]:
# Load ESRM20 data
sheet_name = "Final"
Fragility = pd.read_excel(data_path / "fragility_various_IM_lognormal.xlsx", sheet_name=sheet_name)
Fragility.columns = ['Typology', 'IMT', 'Median_DS1', 'Median_DS2', 'Median_DS3', 'Median_DS4', 'Beta_DS1', "Beta_DS2", "Beta_DS3","Beta_DS4"]

# Initialize lists to store results
IM1 = []
IM2 = []
IM3 = []
IM4 = []
Beta_DS1 = []
Beta_DS2 = []
Beta_DS3 = []
Beta_DS4 = []

# Iterate over each value in 'Typology' column of data
for taxonomy in df['Typology']:
    # Check if any row in 'Typology' column matches current taxonomy
    if (Fragility['Typology'] == taxonomy).any():
        # Find the row where 'Typology' matches and retrieve values
        filtered_data = Fragility.loc[Fragility['Typology'] == taxonomy]
        if not filtered_data.empty:
            IM1.append(filtered_data['Median_DS1'].values[0])
            IM2.append(filtered_data['Median_DS2'].values[0])
            IM3.append(filtered_data['Median_DS3'].values[0])
            IM4.append(filtered_data['Median_DS4'].values[0])
            Beta_DS1.append(filtered_data['Beta_DS1'].values[0])
            Beta_DS2.append(filtered_data['Beta_DS2'].values[0])
            Beta_DS3.append(filtered_data['Beta_DS3'].values[0])
            Beta_DS4.append(filtered_data['Beta_DS4'].values[0])
        else:
            # Handle case where no matching Typology is found in ESRM20
            IM1.append(None)
            IM2.append(None)
            IM3.append(None)
            IM4.append(None)
            Beta_DS1.append(None)
            Beta_DS2.append(None)
            Beta_DS3.append(None)            
            Beta_DS4.append(None)
    else:
        # Handle case where no matching Typology is found in ESRM20
        IM1.append(None)
        IM2.append(None)
        IM3.append(None)
        IM4.append(None)
        Beta_DS1.append(None)
        Beta_DS2.append(None)
        Beta_DS3.append(None)            
        Beta_DS4.append(None)

# Create a new DataFrame to store results
IM = pd.DataFrame({
    'IM1': IM1,
    'IM2': IM2,
    'IM3': IM3,
    'IM4': IM4,
    'Beta_DS1': Beta_DS1,
    'Beta_DS2': Beta_DS2,
    'Beta_DS3': Beta_DS3 ,   
    'Beta_DS4': Beta_DS4
})

IM.head()

Unnamed: 0,IM1,IM2,IM3,IM4,Beta_DS1,Beta_DS2,Beta_DS3,Beta_DS4
0,0.475,0.833,1.355,1.866,0.285,0.283,0.283,0.263
1,1.345492,3.218407,5.030098,6.708921,0.493866,,,
2,0.314759,0.586519,0.783376,0.939881,0.456357,,,
3,0.658211,1.429666,2.174719,2.862426,0.464744,,,
4,0.729419,1.405535,1.856772,2.204783,0.393803,,,


## Risk Assessment

Risk metrics are computed by combining hazard intensity measures with exposure and vulnerability information for each seismic scenario.

Asset-level damage probabilities and loss ratios are estimated for multiple return periods, followed by the computation of annual collapse probability using the stress-based methodology.

Calculate damage index

In [14]:
def calculate_log_normal_distribution(beta, scale):
    """
    Creates a log-normal distribution using the given beta and scale parameters.
    
    Parameters:
    - beta: Beta parameter (shape) for the log-normal distribution.
    - scale: Scale parameter for the log-normal distribution (IM value).
    
    Returns:
    - dist: A log-normal distribution object.
    """
    return lognorm(s=beta, scale=scale)

In [15]:
def get_beta_values(typology, im_row):
    """
    Determines beta values based on typology. Adjusts beta values if the typology contains 'Precast'.
    
    Parameters:
    - typology: The building typology.
    - im_row: The current row from the IM DataFrame.
    
    Returns:
    - Tuple of beta values for DS1, DS2, DS3, and DS4.
    """
    if 'Precast' in typology:
        beta_dist1 = im_row['Beta_DS1'] if pd.notna(im_row['Beta_DS1']) else im_row['Beta_DS1']
        beta_dist2 = im_row['Beta_DS2'] if pd.notna(im_row['Beta_DS2']) else im_row['Beta_DS1']
        beta_dist3 = im_row['Beta_DS3'] if pd.notna(im_row['Beta_DS3']) else im_row['Beta_DS1']
        beta_dist4 = im_row['Beta_DS4'] if pd.notna(im_row['Beta_DS4']) else im_row['Beta_DS1']
    else:
        beta_dist1 = im_row['Beta_DS1']
        beta_dist2 = im_row['Beta_DS1']
        beta_dist3 = im_row['Beta_DS1']
        beta_dist4 = im_row['Beta_DS1']
    
    return beta_dist1, beta_dist2, beta_dist3, beta_dist4

In [16]:
def compute_cdf_values(row, intensity_column_map, im_row):
    """
    Computes CDF values based on the intensity and IM row data.
    
    Parameters:
    - row: A row from the input DataFrame.
    - intensity_column_map: A map that links intensity types to column names.
    - im_row: A row from the IM DataFrame containing beta and IM values.
    
    Returns:
    - A tuple of computed CDF values (cdf1, cdf2, cdf3, cdf4).
    """
    # Get the intensity type and column name for current row
    intensity_type = row["Intensity"]
    column_name = intensity_column_map.get(intensity_type)
    x_value = row[column_name] if column_name else np.nan

    # Calculate log-normal distributions
    beta_dist1, beta_dist2, beta_dist3, beta_dist4 = get_beta_values(row['Typology'], im_row)
    
    dist1 = calculate_log_normal_distribution(beta_dist1, im_row["IM1"])
    dist2 = calculate_log_normal_distribution(beta_dist2, im_row["IM2"])
    dist3 = calculate_log_normal_distribution(beta_dist3, im_row["IM3"])
    dist4 = calculate_log_normal_distribution(beta_dist4, im_row["IM4"])
    
    # Compute CDF values
    return dist1.cdf(x_value), dist2.cdf(x_value), dist3.cdf(x_value), dist4.cdf(x_value)

### Single-hazard risk assessment (earthquake)

In [17]:
# Create a dictionary to store DataFrames from each scenario by their years_value
cdf_dict = {}

# Iterate over each seismic scenario
for index_scen, scen_row in Scenaria.iterrows():
    # Variables to store CDF results and intensity values
    cdf1, cdf2, cdf3, cdf4 = [], [], [], []
    
    # Scenario parameters
    years_value = scen_row['1/years']
    scenario_no = int(scen_row['No'])
    return_period = int(scen_row['Return_Period'])

    # Generate intensity column map for the current scenario
    intensity_column_map = {
        "PGA": f"PGA-{years_value}",
        "SA(0.3)": f"SA(0.3)-{years_value}",
        "SA(0.6)": f"SA(0.6)-{years_value}",
        "SA(1.0)": f"SA(1.0)-{years_value}",
        "SA(1.5)": f"SA(1.5)-{years_value}",
    }

    # Iterate over each row in the DataFrame df
    for index, row in df.iterrows():
        im_row = IM.iloc[index]  # Corresponding IM DataFrame row
    
        # Compute CDF values
        cdf1_val, cdf2_val, cdf3_val, cdf4_val = compute_cdf_values(row, intensity_column_map, im_row)
        cdf1.append(cdf1_val)
        cdf2.append(cdf2_val)
        cdf3.append(cdf3_val)
        cdf4.append(cdf4_val)
    
    # Create DataFrame to store CDF values for the current scenario
    CDF = pd.DataFrame({
        f"CDF1-{years_value}": cdf1,
        f"CDF2-{years_value}": cdf2,
        f"CDF3-{years_value}": cdf3,
        f"CDF4-{years_value}": cdf4
    })
    
    # Calculate probabilities of occurrence for each damage state
    CDF[f"P_DS1-{years_value}"] = CDF[f"CDF1-{years_value}"] - CDF[f"CDF2-{years_value}"]
    CDF[f"P_DS2-{years_value}"] = CDF[f"CDF2-{years_value}"] - CDF[f"CDF3-{years_value}"]
    CDF[f"P_DS3-{years_value}"] = CDF[f"CDF3-{years_value}"] - CDF[f"CDF4-{years_value}"]
    CDF[f"P_DS4-{years_value}"] = CDF[f"CDF4-{years_value}"]

    # Initialize the list for Damage Index (DI) values
    DI_values = []

    # Loop over the rows again to calculate the DI for each row
    for index, row in df.iterrows():
        # Check Material type and calculate DI accordingly
        if row['Material'] == 'Precast':
            DI_value = (
                CDF.loc[index, f"P_DS1-{years_value}"] * 0.05 +
                CDF.loc[index, f"P_DS2-{years_value}"] * 0.30 +
                CDF.loc[index, f"P_DS3-{years_value}"] * 0.70 +
                CDF.loc[index, f"P_DS4-{years_value}"] * 1
            )
        else:
            DI_value = (
                CDF.loc[index, f"P_DS1-{years_value}"] * 0.05 +
                CDF.loc[index, f"P_DS2-{years_value}"] * 0.15 +
                CDF.loc[index, f"P_DS3-{years_value}"] * 0.60 +
                CDF.loc[index, f"P_DS4-{years_value}"] * 1
            )
        
        # Append the calculated DI to the list
        DI_values.append(DI_value)

    # Add the calculated DI values to the DataFrame
    CDF[f"DI-{years_value}"] = DI_values
    
    # Reset the index to align all data
    CDF.reset_index(drop=True, inplace=True)
    
    # Store the DataFrame for the current scenario in a dictionary
    cdf_dict[years_value] = CDF

# Concatenate all the DataFrames horizontally (axis=1)
DI_seismic = pd.concat(cdf_dict.values(), axis=1)

# Drop duplicate 'Longitude' and 'Latitude' columns, keeping only the first occurrence
DI_seismic = DI_seismic.loc[:, ~DI_seismic.columns.duplicated()]

Calculate annual collapse probability based on strest methodology

In [18]:
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]

lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]

# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]


# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1):  # Avoid first and last index
    P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
    P_IM.append(P_IM_i)

# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}

# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']:  # Exclude first and last periods as they lack P[IM_i]
    column_name = f"CDF4-{r}"
    new_column_name = f"{column_name}*P[IM_i]"

    if column_name in DI_seismic.columns:  
        DI_seismic[new_column_name] = DI_seismic[column_name] * P_IM_dict[r]

# Add a new column that sums all the new columns
new_columns = [f"CDF4-{r}*P[IM_i]" for r in data['1/years'] if f"CDF4-{r}" in DI_seismic.columns]

# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_seismic["Annual Collapse Probability"] = DI_seismic[new_columns].sum(axis=1)

Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.

In [None]:
# Concatenate df and IM horizontally, aligning by their index
common_df = pd.concat([df, IM, DI_seismic], axis=1)

# Define the path first
seismic_file_path = savepath / f"Single-Hazard/Earthquakes/DI_Earthquakes_ESRM20_{asset}.csv"

# Ensure the directory exists
seismic_file_path.parent.mkdir(parents=True, exist_ok=True)

# Save the DataFrame to CSV
common_df.to_csv(seismic_file_path, index=False)


Visualise loss ratio map for a return period of 1000 years

In [None]:
common_df['geometry'] = common_df.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)

# Convert to GeoDataFrame
seismic_gdf = gpd.GeoDataFrame(common_df, geometry='geometry', crs="EPSG:4326")

# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1]  # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1']  # Labels for the bins

# Assign bin values based on 'DI-0.001'
seismic_gdf['DI_binned'] = pd.cut(seismic_gdf['DI-0.001'], bins=bins, labels=labels, right=False)


# Interactive map with 4 bins and custom colors
seismic_gdf.explore(
    column="DI_binned",  # Use the binned DI-0.001 column for coloring
    cmap='OrRd',         # Optional colormap, we are using custom colors
    stroke=True,         # Add stroke (outline) around markers
    line_color="black",  # Set the stroke color to black
    legend=True,         # Show the legend
    legend_kwds={
        'caption': 'Loss ratio (T=1000 years)',  # Legend caption
        'orientation': 'horizontal'             # Horizontal legend
    },
    marker_kwds={
        'radius': 5,   # Set marker size (adjust as needed)
        'fill': True,   # Fill the markers with color
    },
    tooltip="DI-0.001",  # Show the DI-0.001 value in the tooltip
)

Compute human and economic losses based on ESRM20 methodology

In [21]:

# Load your reference data (Excel with probabilities)
reference = pd.read_excel(data_path / "fatality_damage_model_ESRM20.xlsx")

# Create empty lists to store the values
P_lethal_DS4 = []
Collapse_factor = []
P_entrap_day = []
P_entrap_night = []
P_loss_life = []

# For each row in common_df, look up values based on Typology
for taxonomy in common_df['Typology']:
    match = reference[reference['Typology'] == taxonomy]
    if not match.empty:
        P_lethal_DS4.append(match['P_lethal-building | DS4'].values[0])
        Collapse_factor.append(match['Collapse_factor'].values[0])
        P_entrap_day.append(match['P_entrapment_day'].values[0])
        P_entrap_night.append(match['P_entrapment_night'].values[0])
        P_loss_life.append(match['P_loss-life | entrapment'].values[0])
    else:
        # If not found, append None or a default
        P_lethal_DS4.append(None)
        Collapse_factor.append(None)
        P_entrap_day.append(None)
        P_entrap_night.append(None)
        P_loss_life.append(None)

# Add the new columns to your DataFrame
common_df['P_lethal-building | DS4'] = P_lethal_DS4
common_df['Collapse_factor'] = Collapse_factor
common_df['P_entrapment_day'] = P_entrap_day
common_df['P_entrapment_night'] = P_entrap_night
common_df['P_loss-life | entrapment'] = P_loss_life

# Iterate through each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
    return_period = row['1/years']  # Get the '1/years' value

    # Create the new column name dynamically, e.g., "new_column_0.0137" for the first return period
    new_column_name = f"Fatality_damage_{return_period}"

    # Calculate the new column for each row in common_df
    common_df[new_column_name] = (
        common_df[f"P_DS4-{return_period}"] * common_df['P_lethal-building | DS4'] * 
        common_df['Collapse_factor'] * common_df['P_loss-life | entrapment'] * common_df['Area'] / 3
    )

    common_df[new_column_name] = common_df[new_column_name].round(0)

# Iterate through each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
    return_period = row['1/years']  # Get the '1/years' value

    # Create the new column name dynamically, e.g., "new_column_0.0137" for the first return period
    new_column_name = f"Economic_Losses_{return_period}"

    # Calculate the new column for each row in common_df
    common_df[new_column_name] = (common_df['Area'] * 2000 * common_df[f"DI-{return_period}"])

# Save the updated common_df with the new columns
common_df.to_csv(seismic_file_path, index=False)

Visualise economic and human losses for a return period of 1000 years aggragated per municipality

In [22]:
common_df['geometry'] = common_df.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)

# Convert to GeoDataFrame
seismic_gdf = gpd.GeoDataFrame(common_df, geometry='geometry', crs="EPSG:4326")


municipalities_gdf = gpd.read_file(data_path / "oria_dimon.shp")


# Ensure both datasets are in the same CRS
if seismic_gdf.crs != municipalities_gdf.crs:
    seismic_gdf = seismic_gdf.to_crs(municipalities_gdf.crs)

# Remove conflicting columns if they exist
for col in ['index_left', 'index_right']:
    if col in seismic_gdf.columns:
        seismic_gdf = seismic_gdf.drop(columns=col)
    if col in municipalities_gdf.columns:
        municipalities_gdf = municipalities_gdf.drop(columns=col)

# Step 2: Perform spatial join — assign each point in seismic_gdf to a municipality
joined = gpd.sjoin(seismic_gdf, municipalities_gdf, how="left", predicate="within")

# Step 3: Aggregate — Sum Economic Losses and Fatality (Human) Losses per municipality
agg = joined.groupby('index_right').agg({
    'Economic_Losses_0.001': 'sum',  # Sum the Economic Losses for each municipality
    'Fatality_damage_0.001': 'sum'   # Sum Human Losses for each municipality
}).rename(columns={
    'Economic_Losses_0.001': 'Economic_Losses',
    'Fatality_damage_0.001': 'Human Losses'
})

# Step 4: Join the aggregated data back to the municipalities
municipalities_gdf = municipalities_gdf.join(agg)

# Step 5: Apply binning for **Economic Losses**
economic_bins = [0, 400000, 3430000, 6000000, 8098000, 124540000]  # Define custom bins
economic_labels = ['0-400K', '400K-3.4M', '3.4M-6M', '6M-8M', '8M+']  # Labels for the bins

# Create a new column with binned values for Economic Losses
municipalities_gdf['Economic_Losses_binned'] = pd.cut(municipalities_gdf['Economic_Losses'], bins=economic_bins, labels=economic_labels, right=False)

# Step 6: Apply binning for **Human Losses** with explicit inclusion of zero in the bins
human_bins = [0, 1, 2, 4, 7, np.inf]  # Bins for Human Losses (0-1, 1-2, etc.)
human_labels = ['0-1', '1-2', '2-4', '4-7', '7+']  # Labels for the bins

# Create a new column with binned values for Human Losses (ensure '0' is included in the first bin)
municipalities_gdf['Fatality_binned'] = pd.cut(municipalities_gdf['Human Losses'], bins=human_bins, labels=human_labels, right=False)

In [None]:
# Plot Economic Losses
municipalities_gdf.explore(
    column='Economic_Losses_binned',
    cmap='Spectral_r',
    legend=True,
    legend_kwds={
        'caption': 'Economic Losses € (T=1000 years)',
        'orientation': 'horizontal'
    },
    tooltip='Economic_Losses'  # Display the binned categories in the polygons
)

In [None]:
# Plot Human Losses
municipalities_gdf.explore(
    column='Fatality_binned',
    cmap='Spectral_r',
    legend=True,
    legend_kwds={
        'caption': 'Human Losses (T=1000 years)',
        'orientation': 'horizontal'
    },
    tooltip='Human Losses'  # Display the binned categories in the polygons
)

### Multi-hazard risk assessment (earthquake-triggered landslides)

This section extends the single-hazard seismic risk assessment by considering the effects of earthquake-triggered landslides.

- Ground-motion intensity measures (PGA) derived from ESHM20 and estimated at ground surface using OpenQuake are used as triggering parameters for landslide occurrence.
- Landslide susceptibility is characterised using the European Landslide Susceptibility Map (ELSUSv2), which is linked to yield acceleration ratio (ky) values based on FEMA (2022) recommendations and expert judgement.
- Permanent Ground Displacement (PGD) caused by earthquake-induced landslides is estimated using analytical models relating PGD with PGA and yield acceleration (e.g., Fotopoulou and Pitilakis, 2015; Rathje and Antonakos, 2011).
- Landslide-induced damage is evaluated using appropriate fragility relationships and combined with seismic damage probabilities.
- Compound damage probabilities are computed by combining ground-shaking and landslide damage state exceedance probabilities, assuming conditional landslide occurrence given the earthquake event.
- Final loss ratios are estimated considering both seismic intensity (PGA) and ground failure effects (PGD).

Results are provided in terms of asset-level loss estimates and spatial loss maps for multiple return periods.

Calculate yield acceleration ratio (ky)

In [25]:
# Define a function to apply the logic
def calculate_value(sample):
    if sample == 1:
        return 0.4
    elif sample == 2:
        return 0.3
    elif sample == 3:
        return 0.2
    elif sample == 4:
        return 0.15
    elif sample == 5:
        return 0.1
    else:
        return 0  # default value when CW is not 1, 2, 3, 4, or 5

# Apply the calculation and create a new dataframe
ky_new = {
    'ky': df['SAMPLE_1'].apply(lambda x: calculate_value(x)),
}

ky = pd.DataFrame(ky_new)

Calculate Permanent ground displacement (PGD) of (earthquake induced) landsliding by using an analytical expression that relates the PGD with PGA: 
$$PGD = \exp(a + b \cdot \ln(\text{PGA}) - c \cdot k_y + 0.535 \cdot d)$$

In [None]:
def calculate_PGD(df, PGA_column, return_period):
    """
    This function calculates Permanent Ground Displacement (PGD) for a given return period using
    the specified Peak Ground Acceleration (PGA) data.
    
    Parameters:
    df (pd.DataFrame): The input DataFrame containing PGA values and other relevant data.
    PGA_column (str): The name of the column in the DataFrame that contains the PGA values.
    return_period (int): The return period in years, which determines the value of constant 'd' used in the calculation.

    Returns:
    pd.Series: A Series containing the calculated PGD values for the specified return period.
    """
    # Constants for the calculation
    a = -2.965
    b = 2.127
    c = 6.583
    # this would be nicer defined per intervals, not just strict defined values
    if return_period == 73 or return_period == 102:
        d = 5
    elif return_period == 475:
        d = 6
    else:
        d = 7

    # Calculate PGD
    PGD = np.exp(a + b * np.log(df[PGA_column]) - c * ky["ky"] + 0.535 * d)

    return PGD  # Return as a Series instead of a DataFrame

# Define return periods and corresponding PGA columns
return_periods = [73, 102, 475, 1000, 2500, 5000]
PGA_columns = ['PGA-0.0137', 'PGA-0.0098', 'PGA-0.0021', 'PGA-0.001', 'PGA-0.0003', 'PGA-0.0001']

# Create a DataFrame to store all PGD results
PGD = pd.DataFrame()

# Loop through each return period and PGA column
for index, row in Scenaria.iterrows():
    return_period = row['Return_Period']
    PGA_column = f'PGA-{row["1/years"]}'  # Construct the corresponding PGA column name
    PGD_value = calculate_PGD(df, PGA_column, return_period)
    PGD[f'PGD-{row["1/years"]}'] = PGD_value  # Add the PGD values to the DataFrame

PGD.head(5)

Unnamed: 0,PGD-0.0137,PGD-0.0098,PGD-0.0021,PGD-0.001,PGD-0.0003,PGD-0.0001
0,0.000634,0.001019,0.009195,0.028004,0.053822,0.085483
1,0.001225,0.001969,0.01776,0.054091,0.103957,0.165111
2,0.002366,0.003751,0.033449,0.105478,0.212258,0.354169
3,0.001634,0.00258,0.021517,0.068424,0.147869,0.25178
4,0.001024,0.001586,0.012463,0.041079,0.091581,0.158642


Calculate the probability of the occurrence of the earthquake induced landslide 

In [28]:
# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist()  # Use the annual probabilities as years values

# Create a list to store Planaslide DataFrames for each years_value
plandslide_df_list = []

# Function to assign Planaslide values based on PGA and Sample
def assign_plandslide_values(sample):
    if sample == 1:
        return 0.000
    elif sample == 2:
        return 0.010
    elif sample == 3:
        return 0.050
    elif sample == 4:
        return 0.100
    elif sample == 5:
        return 0.400

# Initialize a list to store the DataFrames for each year
plandslide_df_list = []

# Iterate over each years_value
for years_value in years_values:
    # List to store Planaslide data for this specific years_value
    plandslide_data = []

    # Iterate over rows in the existing df DataFrame
    for index, row in df.iterrows():
        pga_value = row[f"PGA-{years_value}"]  # Access the PGA value using dynamic column name
        sample_value = row['SAMPLE_1']  # Get the Sample value
        plandslide1 = assign_plandslide_values(sample_value)  # Get Planaslide value

        # Store results in a dictionary
        plandslide_data.append({
            f'Plandslide-{years_value}': f"{plandslide1:.3f}",  # Unique column for each year
        })

    # Create a new DataFrame for this specific years_value
    plandslide_df = pd.DataFrame(plandslide_data)


    plandslide_df_list.append(plandslide_df)

# Concatenate all DataFrames into one common DataFrame along columns (axis=1)
Plandslide_df = pd.concat(plandslide_df_list, axis=1)

Plandslide_df.head()

Unnamed: 0,Plandslide-0.0137,Plandslide-0.0098,Plandslide-0.0021,Plandslide-0.001,Plandslide-0.0003,Plandslide-0.0001
0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.01,0.01,0.01,0.01,0.01,0.01
2,0.05,0.05,0.05,0.05,0.05,0.05
3,0.01,0.01,0.01,0.01,0.01,0.01
4,0.01,0.01,0.01,0.01,0.01,0.01


In [29]:
def compute_landcdf_values(row, years_value):
    """
    Computes CDF values based on the PGD values.
    
    Parameters:
    - row: A row from the input DataFrame containing PGD values.
    - years_value: The years value to access the correct PGD column.
    
    Returns:
    - A tuple of computed CDF values (cdf1, cdf2, cdf3, cdf4).
    """
    # Access the PGD value from the dynamic column
    pga_column_name = f"PGD-{years_value}"  # Create the column name based on years_value
    pgd_value = row[pga_column_name]  # Access the PGD value

    # Fixed beta and scale values
    beta = 1.2
    scale_values = [0.254] * 5  # Four IM values, all set to 0.254

    # Calculate log-normal distributions with fixed values
    dist1 = calculate_log_normal_distribution(beta, scale_values[0])
    dist2 = calculate_log_normal_distribution(beta, scale_values[1])
    dist3 = calculate_log_normal_distribution(beta, scale_values[2])
    dist4 = calculate_log_normal_distribution(beta, scale_values[3])

    # Compute CDF values using PGD as the x_value
    landcdf1 = dist1.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
    landcdf2 = dist2.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
    landcdf3 = dist3.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan
    landcdf4 = dist4.cdf(pgd_value) if pd.notnull(pgd_value) else np.nan

    return landcdf1, landcdf2, landcdf3, landcdf4

In [30]:
# Create a DataFrame to store the final results
Landcdf = pd.DataFrame()

# Iterate over each return period in the Scenaria DataFrame
for index, row in Scenaria.iterrows():
    years_value = row['1/years']  # Get the years value from the current row
    cdf_results = []

    # Compute CDF values for each row in the PGD DataFrame
    for _, pgd_row in PGD.iterrows():
        cdf_values = compute_landcdf_values(pgd_row, years_value)
        cdf_results.append(cdf_values)

    # Convert the results to a DataFrame
    cdf_df = pd.DataFrame(cdf_results, columns=[f'LandDS1-{years_value}', f'LandDS2-{years_value}', f'LandDS3-{years_value}', f'LandDS4-{years_value}'])
    
    # Concatenate the results to the main landcdf DataFrame
    Landcdf = pd.concat([Landcdf, cdf_df], axis=1)

#### PL = 1.0

Calculate the damage index (loss ratio) assuming that the probability of an earthquake-induced landslide is equal to the probability of the earthquake itself (i.e., $P_L = 1.0$), meaning the landslide will certainly occur given that the earthquake occurs.

In [31]:
# Convert relevant columns to numeric if they are not already
DI_seismic = DI_seismic.apply(pd.to_numeric, errors='coerce')
Landcdf = Landcdf.apply(pd.to_numeric, errors='coerce')

# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist()  # Use the annual probabilities as years values

# Initialize a new DataFrame for combined results
DI_combined_P1 = pd.DataFrame({
})

# List of dataset names (DS1, DS2, DS3, DS4.ds5)
datasets = [1,2,3,4]

# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
    for dataset in datasets:
        combined_value = (
            DI_seismic[f"CDF{dataset}-{years_value}"] + 
            1 * Landcdf[f"LandDS{dataset}-{years_value}"] -
            DI_seismic[f"CDF{dataset}-{years_value}"] * 1 * Landcdf[f"LandDS{dataset}-{years_value}"]
        )
        
        # Store the results in the new DataFrame
        DI_combined_P1[f"CombDS{dataset}-{years_value}"] = combined_value


In [32]:
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
    # Calculate probabilities of occurrence for each damage state
    DI_combined_P1[f"P_DS1-{years_value}"] = DI_combined_P1[f"CombDS1-{years_value}"] - DI_combined_P1[f"CombDS2-{years_value}"]
    DI_combined_P1[f"P_DS2-{years_value}"] = DI_combined_P1[f"CombDS2-{years_value}"] - DI_combined_P1[f"CombDS3-{years_value}"]
    DI_combined_P1[f"P_DS3-{years_value}"] = DI_combined_P1[f"CombDS3-{years_value}"] - DI_combined_P1[f"CombDS4-{years_value}"]
    DI_combined_P1[f"P_DS4-{years_value}"] = DI_combined_P1[f"CombDS4-{years_value}"]

    # Calculate damage index for the scenario based on Typology
    for index in range(len(DI_combined_P1)):
        if df['Material'].iloc[index] == 'Precast':
            DI_combined_P1.at[index, f"DI-{years_value}"] = (
                DI_combined_P1.at[index, f"P_DS1-{years_value}"] * 0.05 +
                DI_combined_P1.at[index, f"P_DS2-{years_value}"] * 0.30 +
                DI_combined_P1.at[index, f"P_DS3-{years_value}"] * 0.70 +
                DI_combined_P1.at[index, f"P_DS4-{years_value}"] * 1
            )
        else:
            DI_combined_P1.at[index, f"DI-{years_value}"] = (
                DI_combined_P1.at[index, f"P_DS1-{years_value}"] * 0.05 +
                DI_combined_P1.at[index, f"P_DS2-{years_value}"] * 0.15 +
                DI_combined_P1.at[index, f"P_DS3-{years_value}"] * 0.60 +
                DI_combined_P1.at[index, f"P_DS4-{years_value}"] * 1
            )

Calculate annual collapse probability based on strest methodology

In [33]:
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]

lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]

# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]

# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1):  # Avoid first and last index
    P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
    P_IM.append(P_IM_i)


# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}

# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']:  # Exclude first and last periods as they lack P[IM_i]
    column_name = f"CombDS4-{r}"
    new_column_name = f"{column_name}*P[IM_i]"

    if column_name in DI_combined_P1.columns:  
        DI_combined_P1[new_column_name] = DI_combined_P1[column_name] * P_IM_dict[r]

# Add a new column that sums all the new columns
new_columns = [f"CombDS4-{r}*P[IM_i]" for r in data['1/years'] if f"CombDS4-{r}" in DI_combined_P1.columns]

# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_combined_P1["Annual Collapse Probability"] = DI_combined_P1[new_columns].sum(axis=1)

Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.

In [34]:
# Concatenate df and IM horizontally, aligning by their index
common_df_combined_P1 = pd.concat([df, IM, ky, PGD, Landcdf, DI_combined_P1], axis=1)

# Define the path first
seimsic_landslide_P1_file_path = savepath / f"Multi-Hazard/Earthquakes & Landslides/DI_Earthquakes_&_Landslides_P1_ESRM20_{asset}.csv"

# Ensure the directory exists
seimsic_landslide_P1_file_path.parent.mkdir(parents=True, exist_ok=True)

# Save the DataFrame to CSV
common_df_combined_P1.to_csv(seimsic_landslide_P1_file_path, index=False)

Visualise loss ratio map for a return period of 1000 years

In [None]:
common_df_combined_P1['geometry'] = common_df_combined_P1.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)

# Convert to GeoDataFrame
combined_P1_gdf = gpd.GeoDataFrame(common_df_combined_P1, geometry='geometry', crs="EPSG:4326")

# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1]  # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1']  # Labels for the bins

# Assign bin values based on 'DI-0.001'
combined_P1_gdf['DI_binned'] = pd.cut(combined_P1_gdf['DI-0.001'], bins=bins, labels=labels, right=False)


# Interactive map with 4 bins and custom colors
combined_P1_gdf.explore(
    column="DI_binned",  # Use the binned DI-0.001 column for coloring
    cmap='OrRd',         # Optional colormap, we are using custom colors
    stroke=True,         # Add stroke (outline) around markers
    line_color="black",  # Set the stroke color to black
    legend=True,         # Show the legend
    legend_kwds={
        'caption': 'Loss ratio (T=1000 years)',  # Legend caption
        'orientation': 'horizontal'             # Horizontal legend
    },
    marker_kwds={
        'radius': 5,   # Set marker size (adjust as needed)
        'fill': True,   # Fill the markers with color
    },
    tooltip="DI-0.001",  # Show the DI-0.001 value in the tooltip
)

#### PL < 1.0

Calculate the damage index (loss ratio) assuming that the probability of an earthquake-induced landslide is equal to the probability of the earthquake itself (i.e., $P_L = 1.0$), meaning the landslide will certainly occur given that the earthquake occurs.

In [36]:
# Convert relevant columns to numeric if they are not already
DI_seismic = DI_seismic.apply(pd.to_numeric, errors='coerce')
Landcdf = Landcdf.apply(pd.to_numeric, errors='coerce')
Plandslide_df = Plandslide_df.apply(pd.to_numeric, errors='coerce')

# Extract years_values from the Scenaria DataFrame
years_values = Scenaria['1/years'].tolist()  # Use the annual probabilities as years values


# Initialize a new DataFrame for combined results
DI_combined = pd.DataFrame({
})

# List of dataset names (DS1, DS2, DS3, DS4)
datasets = [1,2,3,4]

# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:
    for dataset in datasets:
        combined_value = (
            DI_seismic[f"CDF{dataset}-{years_value}"] + 
            Plandslide_df[f'Plandslide-{years_value}'] * Landcdf[f"LandDS{dataset}-{years_value}"] -
            DI_seismic[f"CDF{dataset}-{years_value}"] * Plandslide_df[f'Plandslide-{years_value}'] * Landcdf[f"LandDS{dataset}-{years_value}"]
        )
        
        # Store the results in the new DataFrame
        DI_combined[f"CombDS{dataset}-{years_value}"] = combined_value

In [37]:
# Loop through each years_value and perform the calculations for each dataset
for years_value in years_values:

    # Calculate probabilities of occurrence for each damage state
    DI_combined[f"P_DS1-{years_value}"] = DI_combined[f"CombDS1-{years_value}"] - DI_combined[f"CombDS2-{years_value}"]
    DI_combined[f"P_DS2-{years_value}"] = DI_combined[f"CombDS2-{years_value}"] - DI_combined[f"CombDS3-{years_value}"]
    DI_combined[f"P_DS3-{years_value}"] = DI_combined[f"CombDS3-{years_value}"] - DI_combined[f"CombDS4-{years_value}"]
    DI_combined[f"P_DS4-{years_value}"] = DI_combined[f"CombDS4-{years_value}"]

    # Calculate damage index for the scenario based on Material
    # This assumes you want to check the Material for each row
    for index in range(len(DI_combined)):
        if df['Material'].iloc[index] == 'Precast':
            DI_combined.at[index, f"DI-{years_value}"] = (
                DI_combined.at[index, f"P_DS1-{years_value}"] * 0.05 +
                DI_combined.at[index, f"P_DS2-{years_value}"] * 0.30 +
                DI_combined.at[index, f"P_DS3-{years_value}"] * 0.70 +
                DI_combined.at[index, f"P_DS4-{years_value}"] * 1
            )
        else:
            DI_combined.at[index, f"DI-{years_value}"] = (
                DI_combined.at[index, f"P_DS1-{years_value}"] * 0.05 +
                DI_combined.at[index, f"P_DS2-{years_value}"] * 0.15 +
                DI_combined.at[index, f"P_DS3-{years_value}"] * 0.60 +
                DI_combined.at[index, f"P_DS4-{years_value}"] * 1
            )

Calculate annual collapse probability based on strest methodology

In [38]:
# Define return periods
Periods = [5,73,102,475,1000,2500,5000,10000]

lamda1=[0.9999,0.5,0.39,0.1,0.05,0.02,0.01,0.005]

# Compute λ values
lambda_values = [1 / (-50 / np.log(1 - l)) for l in lamda1]

# Compute P[IM_i] using the given formula
P_IM = []
for t in range(1, len(lambda_values) - 1):  # Avoid first and last index
    P_IM_i = (lambda_values[t - 1] - lambda_values[t + 1]) / 2
    P_IM.append(P_IM_i)


# Create a dictionary mapping return periods to P[IM_i]
P_IM_dict = {data['1/years'][i]: P_IM[i] for i in range(len(P_IM))}

# Multiply each "CDF4-{r}" column by the corresponding P[IM_i]
for r in data['1/years']:  # Exclude first and last periods as they lack P[IM_i]
    column_name = f"CombDS4-{r}"
    new_column_name = f"{column_name}*P[IM_i]"

    if column_name in DI_combined.columns:  
        DI_combined[new_column_name] = DI_combined[column_name] * P_IM_dict[r]

# Add a new column that sums all the new columns
new_columns = [f"CombDS4-{r}*P[IM_i]" for r in data['1/years'] if f"CombDS4-{r}" in DI_combined.columns]

# Create a new column "Sum_of_new_columns" by summing across the new columns
DI_combined["Annual Collapse Probability"] = DI_combined[new_columns].sum(axis=1)

Create and save the final DataFrame containing the damage index (loss ratio) for different return periods along with the annual probability of collapse for each exposure point.

In [39]:
# Concatenate df and IM horizontally, aligning by their index
common_df_combined = pd.concat([df, IM, ky, PGD, Landcdf, DI_combined], axis=1)

# Define the path first
seimsic_landslide_file_path = savepath / f"Multi-Hazard/Earthquakes & Landslides/DI_Earthquakes_&_Landslides_Plandslide_ESRM20_{asset}.csv"

# Ensure the directory exists
seimsic_landslide_file_path.parent.mkdir(parents=True, exist_ok=True)

# Save the DataFrame to CSV
common_df_combined.to_csv(seimsic_landslide_file_path, index=False)

Visualise loss ratio map for a return period of 1000 years

In [None]:
common_df_combined['geometry'] = common_df_combined.apply(lambda row: Point(row['Longitude'], row['Latitude']), axis=1)

# Convert to GeoDataFrame
combined_gdf = gpd.GeoDataFrame(common_df_combined, geometry='geometry', crs="EPSG:4326")

# Create bins for the DI-0.001 column
bins = [0, 0.1, 0.25, 0.4, 1]  # Define the bin ranges
labels = ['0-0.1', '0.1-0.25', '0.25-0.4', '0.4-1']  # Labels for the bins

# Assign bin values based on 'DI-0.001'
combined_gdf['DI_binned'] = pd.cut(combined_gdf['DI-0.001'], bins=bins, labels=labels, right=False)


# Interactive map with 4 bins and custom colors
combined_gdf.explore(
    column="DI_binned",  # Use the binned DI-0.001 column for coloring
    cmap='OrRd',         # Optional colormap, we are using custom colors
    stroke=True,         # Add stroke (outline) around markers
    line_color="black",  # Set the stroke color to black
    legend=True,         # Show the legend
    legend_kwds={
        'caption': 'Loss ratio (T=1000 years)',  # Legend caption
        'orientation': 'horizontal'             # Horizontal legend
    },
    marker_kwds={
        'radius': 5,   # Set marker size (adjust as needed)
        'fill': True,   # Fill the markers with color
    },
    tooltip="DI-0.001",  # Show the DI-0.001 value in the tooltip
)