In [97]:
import rasterio
import numpy as np

In [105]:
# Set dataset paths
flood_path = r"D:\projects\sovereign-risk\Thailand\data\flood\maps\THA_global_pc_h100glob.tif"
exposure_path = r"D:\projects\sovereign-risk\Thailand\data\exposure\GHSL_res_val_THA.tif"
output_path = r"D:\projects\sovereign-risk\Thailand\analysis\flood\risk_maps\risk_test.tif"

In [106]:
flood = rasterio.open(flood_path)
exposure = rasterio.open(exposure_path)

In [113]:
def vectorized_damage(depth, value, heights, damage_percents):
    '''
    Vectorized damage function
    Apply damage function given a flood depth and exposure value.
    Function also needs as input the damage function heights > damage_percents
    '''
    # Use np.interp for vectorized linear interpolation
    damage_percentage = np.interp(depth, heights, damage_percents)
    return damage_percentage * value

def calculate_risk(flood, building_values, heights, damage_percents):
    '''
    Calculate flood risk given a flood map and a map of building values
    '''
    exposure = np.where(flood>0, building_values, 0)
    risk = vectorized_damage(flood, exposure, heights, damage_percents)

    return risk

In [114]:
heights = np.array([0, 0.5, 1, 1.5, 2, 3, 4, 5, 6])
damage_percents = np.array([0, 0.33, 0.49, 0.62, 0.72, 0.87, 0.93, 0.98, 1])

In [116]:
# Run the windowed analysis
profile = flood.meta.copy()
profile.update(dtype=rasterio.float32, compress='lzw', nodata=0)
nodata = flood.nodata

with rasterio.open(output_path, 'w', **profile) as dst:
    i = 0
    for ji, window in flood.block_windows(1):
        i += 1

        affine = rasterio.windows.transform(window, flood.transform)
        height, width = rasterio.windows.shape(window)
        bbox = rasterio.windows.bounds(window, flood.transform)

        profile.update({
            'height': height,
            'width': width,
            'affine': affine
        })

        flood_array = flood.read(1, window=window)
        exposure_array = exposure.read(1, window=window)

        flood_array = np.where(flood_array>0, flood_array, 0) # remove negative values

        flood_array = flood_array/100 # convert to m

        risk = calculate_risk(flood_array, exposure_array, heights, damage_percents)

        dst.write(risk.astype(rasterio.float32), window=window, indexes=1)

In [118]:
# Can we do zonal statistics on this???
from rasterstats import zonal_stats

In [119]:
vector_path_country = r"D:\projects\sovereign-risk\Thailand\data\GADM\gadm36_THA_shp\gadm36_THA_0.shp"
vector_path_analysis = r"D:\projects\sovereign-risk\Thailand\data\flood\basins\analysis_basins.gpkg"

In [120]:
country = gpd.read_file(vector_path_country)
basins = gpd.read_file(vector_path_analysis)

In [121]:
# Perform zonal statistics
stats_country = zonal_stats(country, output_path, stats="sum", band=1, all_touched=False) # only count pixels whos center is within the polgyon
stats_basins = zonal_stats(basins, output_path, stats="sum", band=1, all_touched=False)

In [124]:
country['Risk'] = [stat['sum'] for stat in stats_country]
basins['Risk'] = [stat['sum'] for stat in stats_basins]

In [125]:
country.to_file(r"D:\projects\sovereign-risk\Thailand\analysis\flood\risk_maps\test_country.gpkg")
basins.to_file(r"D:\projects\sovereign-risk\Thailand\analysis\flood\risk_maps\test_basins.gpkg")