This notebook allows you to calculate the buildings exposed to coastal flooding.  To do this, the user enters the link to their flood map raster as well as the modeling conditions (RP, year, SSP, and defense level).

# Activation of the necessary functions

In [104]:
import pandas as pd
import geopandas as gpd
import numpy as np
import rasterio    
import sys
sys.path.append('/path/to/gdal')  
import gc
from shapely.geometry import shape
gc.collect()
from shapely.validation import make_valid
import gcsfs
from rasterio import features
from exactextract import exact_extract
import warnings
import requests
warnings.filterwarnings("ignore")
import tempfile
from rasterio.enums import Resampling
from rasterio.warp import calculate_default_transform, reproject, Resampling
from branca.colormap import LinearColormap


# information on the modeling used

## Enter the parameters

In [None]:
pathflood= # Path to the coastal flooding raster file, The file must be in .tif format (GeoTIFF).
year = "2010"# The possible values are 2010 , 2030, 2050, 2100
rp="100"# The possible values are static, 1, 100 or 1000
defense="UNDEFENDED" # The possible values are HIGH_DEFENDED, LOW_DEFENDED or UNDEFENDED
ssp="SSP126" # The possible values are SSP126, SSP245 or SSP585



In [106]:
def validate_inputs(year, rp, defense, ssp):
    valid_years = {"2010", "2030", "2050", "2100"}
    valid_rps = {"static", "1", "100", "1000"}
    valid_defenses = {"HIGH_DEFENDED", "LOW_DEFENDED", "UNDEFENDED"}
    valid_ssps = {"SSP126", "SSP245", "SSP585"}
    errors = []
    if year not in valid_years:
        errors.append(f"Invalid year: {year}. Must be one of {sorted(valid_years)}.")
    if rp not in valid_rps:
        errors.append(f"Invalid return period (rp): {rp}. Must be one of {sorted(valid_rps)}.")
    if defense not in valid_defenses:
        errors.append(f"Invalid defense level: {defense}. Must be one of {sorted(valid_defenses)}.")
    if ssp not in valid_ssps:
        errors.append(f"Invalid SSP scenario: {ssp}. Must be one of {sorted(valid_ssps)}.")
    if errors:
        raise ValueError("\n".join(errors))

validate_inputs(year, rp, defense, ssp)



# CoCliCo connection and data recovery

In [107]:
defense=defense+"_MAPS"

path="https://storage.googleapis.com/coclico-data-public/coclico/be_stats/map_stats/"+defense+"/"+rp+"/"+ssp+"/be_stats_"+defense+"_"+rp+"_"+ssp+"_"+year+".parquet"
local_path = "CoCliCo_data/number.parquet"
response = requests.get(path)
with open(local_path, "wb") as f:
    f.write(response.content)  
column="abs_affected"       
path = "gs://coclico-data-public/coclico/LAU/LAU_2020_NUTS_2021_01M_3035.parquet"
local_path = "CoCliCo_data/LAU.parquet"
fs = gcsfs.GCSFileSystem()
fs.get(path, local_path)


[None]

## list of functions

In [108]:
def modify_raster(input_raster_path): 
    with rasterio.open(input_raster_path) as src:
        raster_data = src.read(1) 
        nodata_value = src.nodata
        modified_data = np.full(raster_data.shape, np.nan, dtype=rasterio.float32)
        if nodata_value is not None:
            valid_mask = raster_data != nodata_value
            modified_data[valid_mask] = 1
        else:
            valid_mask = ~np.isnan(raster_data)
            modified_data[valid_mask] = 1
        return modified_data


def raster_value_to_shapefile(raster_path, target_value=1):
    with rasterio.open(raster_path) as src:
        raster_data = src.read(1)
        transform = src.transform
        crs = src.crs
        nodata = src.nodata
        shapes_and_values = rasterio.features.shapes(raster_data, transform=transform)
        geometries = []
        for geom, value in shapes_and_values:
            if value == target_value:  
                geometries.append({
                    'geometry': shape(geom),
                    'properties': {'value': value}
                })
        
        gdf = gpd.GeoDataFrame(geometries, crs=crs)
        return gdf
    
def reproject_raster(input_raster, target_crs="EPSG:3035"):
    with rasterio.open(input_raster) as src:
        transform, width, height = calculate_default_transform(
            src.crs, target_crs, src.width, src.height, *src.bounds)
        kwargs = src.meta.copy()
        kwargs.update({
            'crs': target_crs,
            'transform': transform,
            'width': width,
            'height': height
        })
        output_raster = input_raster.replace(".tif", "_3035.tif")
        with rasterio.open(output_raster, 'w', **kwargs) as dst:
            for i in range(1, src.count + 1):
                reproject(
                    source=rasterio.band(src, i),
                    destination=rasterio.band(dst, i),
                    src_transform=src.transform,
                    src_crs=src.crs,
                    dst_transform=transform,
                    dst_crs=target_crs,
                    resampling=Resampling.nearest
                )
        return output_raster


def save_modified_raster(modified_data, transform, crs, output_path):
    with rasterio.open(output_path, 'w', driver='GTiff', 
                       height=modified_data.shape[0], width=modified_data.shape[1],
                       count=1, dtype=modified_data.dtype, crs=crs, transform=transform) as dst:
        dst.write(modified_data, 1)



## link and download of files

### the data is downloaded and stored in the CoCliCo data folder, (infrastructure, geographical unit and CoCliCo data)

opening files, converting flood maps to vector and detecting the region in which it is located

In [109]:
unitgeo="CoCliCo_data/LAU.parquet"
lau=gpd.read_parquet(unitgeo)
pathflood= reproject_raster(pathflood)
flood = modify_raster(pathflood)#
with tempfile.NamedTemporaryFile(delete=False, suffix=".tif") as tmpfile:
    temp_raster_path = tmpfile.name
    with rasterio.open(pathflood) as src:
        save_modified_raster(flood, src.transform, src.crs, temp_raster_path) 
    floodshp = raster_value_to_shapefile(temp_raster_path)
intersection= gpd.overlay(lau, floodshp, how="intersection")
unique_values = intersection['nuts_2'].unique()


## download infrastructure

In [110]:
for i in range (len(unique_values)):
    path = "gs://coclico-data-public/coclico/ceed/"
    path=path+unique_values[i]+"_CEED.parquet"
    local_path = "CoCliCo_data/CEED/"+unique_values[i]+"_CEED.parquet"
    fs = gcsfs.GCSFileSystem()
    fs.get(path, local_path)

Calculation of the numbers of buildings exposed

In [111]:
stock=gpd.GeoDataFrame()
for i in range (len(unique_values)):
    local_path = "CoCliCo_data/CEED/"+unique_values[i]+"_CEED.parquet"
    exposure=gpd.read_parquet(local_path)
    exposure = exposure.drop_duplicates(subset=['geometry'])
    exposure.reset_index(inplace=True)               
    exposure=exposure[exposure["level_0"]=="buildings"]
    exposure = exposure.drop_duplicates(subset=['geometry'])
    exposure["areacal"]=exposure.area
    if 'level_0' not in exposure.columns:
        exposure.reset_index(drop=False, inplace=True)
    if 'level_0' in exposure.index.names:
        exposure['level_0'] = exposure.index.get_level_values('level_0')
        exposure.reset_index(drop=True, inplace=True) 
    exposure.drop(columns=['level_0'], inplace=True)
    exposure=gpd.sjoin(exposure,intersection,how="inner",predicate="intersects")# only building on flood
    exposure['geometry'] = exposure['geometry'].apply(lambda geom: make_valid(geom) if not geom.is_valid else geom)
    exposure = exposure[exposure['geometry'].is_valid]
    stock=gpd.GeoDataFrame(pd.concat([stock, exposure], ignore_index=True))
stock = stock.drop_duplicates(subset=['geometry'])
stock=stock.groupby('GISCO_ID').size().reset_index(name='nb')   
stock=stock[["GISCO_ID","nb"]]        
      


compare the results with those of the platform

In [112]:

coclico=gpd.read_parquet("CoCliCo_data/number.parquet")
merged = pd.merge(stock, coclico[["GISCO_ID", column]], on="GISCO_ID", how='left')
merged=merged.rename(columns={"abs_affected":"coclico"})
print(merged)


join_column_gdf = "GISCO_ID"
join_column_df ="GISCO_ID"
merged=lau.merge(merged,left_on=join_column_gdf, right_on=join_column_df, how="inner")

vmin =merged["nb"].min()
vmax = merged["nb"].max()
steps = np.linspace(vmin, vmax, 5)
colormap = LinearColormap(
    colors=["white", "red"],
    vmin=vmin,
    vmax=vmax
).to_step(index=steps)
merged.explore(column="nb",
    cmap=colormap,
    legend=True,
    legend_kwds={"position": "bottomright", "orientation": "vertical"},

    tooltip=["LAU_NAME","GISCO_ID", "nb", "coclico"], 
    style_kwds={"fillOpacity": 1.0, "color": "black", "weight": 0.5})


   GISCO_ID     nb  coclico
0  FR_76217  11023    565.0
1  FR_76414    360     27.0
2  FR_76545    374     89.0
3  FR_76565      1      NaN
