In [None]:
import numpy as np
import geopandas as gpd
from shapely.geometry import box
from rasterstats import zonal_stats
import glob
import os

In [None]:

CELL_SIZE = 1000  # metres
CELL_AREA_M2 = CELL_SIZE ** 2
CELL_AREA_KM2 = CELL_AREA_M2 / 1e6

WATERCOURSE = "data/watercourse/Watercourse.shp"
ELEVATION = "data/elevation.tif"
IMPERVIOUS = "data/impervious.tif"
HISTORIC_FLOOD = "data/historic_flood_map/Historic_Flood_MapPolygon.shp"
ROAD = "data/road/RoadLink.shp"
HOSPITAL = "data/hospital_locations/hospital_locations.shp"
BOUNDARY = "GM_shapefile/CAUTH_MAY_2025_EN_BSC.shp"
FLOOD_LAYERS = "data/flood_risk/*/*.shp"

OUTPUT = "data/grid/grid.shp"

RISK_SCORES = {
    "Very Low": 0.0005, # (0 + 0.1)/2 
    "Low": 0.00055, # (0.1 + 1)/2
    "Medium": 0.0215, # (1 + 3.3)/2
    "High": 0.033 # (3.3 + 3.3)/2
}

FILE_DEPTH = {
    "merged_rofsw_4bandPolygon.shp": 0.0,
    "merged_rofsw_4band_0_2m_depthPolygon.shp": 0.2,
    "merged_rofsw_4band_0_3m_depthPolygon.shp": 0.3,
    "merged_rofsw_4band_0_6m_depthPolygon.shp": 0.6,
    "merged_rofsw_4band_0_9m_depthPolygon.shp": 0.9,
    "merged_rofsw_4band_1_2m_depthPolygon.shp": 1.2
}

BREAKS = [0, 0.2, 0.3, 0.6, 0.9, 1.2]


In [None]:
def build_grid(boundary):
    minx, miny, maxx, maxy = boundary.total_bounds

    xs = np.arange(minx, maxx, CELL_SIZE)
    ys = np.arange(miny, maxy, CELL_SIZE)

    grid = gpd.GeoDataFrame(
        geometry=[box(x, y, x + CELL_SIZE, y + CELL_SIZE) for x in xs for y in ys],
        crs="EPSG:27700"
    )

    grid = grid[grid.geometry.within(boundary.geometry.union_all())]
    grid["cell_id"] = grid.index
    return grid

def line_density(grid, lines, colname, cell_size):
    grid[colname] = 0.0
    cell_area_km2 = (cell_size * cell_size) / 1e6  # km²
    for i, cell in grid.geometry.items():
        inter = lines.geometry.intersection(cell)
        length_m = sum(geom.length for geom in inter if not geom.is_empty)
        grid.at[i, colname] = (length_m / 1000) / cell_area_km2
    return grid


def nearest_distance(grid, targets, colname, km=True):
    centroids = grid.copy()
    centroids["geometry"] = centroids.geometry.centroid

    nearest = gpd.sjoin_nearest(
        centroids,
        targets[["geometry"]],
        how="left",
        distance_col=colname
    )

    dist = nearest.groupby(nearest.index)[colname].first()
    if km:
        dist = dist / 1000

    grid[colname] = dist
    return grid


def zonal_mean(grid, raster_path, colname):
    stats = zonal_stats(
        grid.geometry,
        raster_path,
        stats="mean",
        nodata=0
    )
    grid[colname] = [s["mean"] for s in stats]
    return grid


def zonal_fraction_nonzero(grid, raster_path, colname):
    stats = zonal_stats(
        grid.geometry,
        raster_path,
        stats=["count", "nodata"],
        add_stats={"nonzero": lambda x: np.count_nonzero(x)}
    )
    grid[colname] = [
        s["nonzero"] / s["count"] if s["count"] else 0
        for s in stats
    ]
    return grid


def historic_flood_flag(grid, historic):
    inter = gpd.overlay(
        grid[["cell_id", "geometry"]],
        historic,
        how="intersection"
    )
    inter["area"] = inter.geometry.area
    flooded = inter.groupby("cell_id")["area"].sum()

    grid["historic"] = (
        flooded / CELL_AREA_M2 > 0.5
    ).reindex(grid.cell_id).fillna(False).astype(int)

    return grid


def flood_depth_probs(grid):
    grid["cell_area"] = grid.geometry.area

    def dstr(d):
        return "0p0" if d == 0 else str(d).replace(".", "p")

    # prepare output columns
    for i in range(len(BREAKS) - 1):
        a, b = BREAKS[i], BREAKS[i + 1]
        grid[f"p_{dstr(a)}_{dstr(b)}"] = 0.0

    grid["p_0"] = 0.0
    grid["p_gt_1p2"] = 0.0

    exceedance = {}

    # process flood layers
    for shp_path in glob.glob(FLOOD_LAYERS):
        layer = gpd.read_file(shp_path).to_crs(grid.crs)
        depth = FILE_DEPTH[os.path.basename(shp_path)]

        # precompute weighted area
        layer["weighted_area"] = layer.geometry.area * layer["risk_band"].map(RISK_SCORES)

        # spatial join with grid
        joined = gpd.sjoin(
            layer[["geometry", "weighted_area"]],
            grid[["cell_id", "geometry"]],
            predicate="intersects",
            how="inner"
        )

        # sum weighted areas per cell → exceedance probability
        ex = joined.groupby("cell_id")["weighted_area"].sum().div(grid.set_index("cell_id")["cell_area"])
        exceedance[depth] = ex

    # write exceedance probabilities to grid
    for depth, series in exceedance.items():
        grid[f"ex_{dstr(depth)}"] = grid["cell_id"].map(series).fillna(0.0)

    # compute depth-range probabilities
    grid["p_0"] = 1 - grid["ex_0p0"]

    for i in range(len(BREAKS) - 1):
        a, b = BREAKS[i], BREAKS[i + 1]
        grid[f"p_{dstr(a)}_{dstr(b)}"] = grid[f"ex_{dstr(a)}"] - grid[f"ex_{dstr(b)}"]

    grid["p_gt_1p2"] = grid[f"ex_{dstr(1.2)}"]

    # cleanup intermediate columns
    grid.drop(columns=[c for c in grid.columns if c.startswith("ex_")], inplace=True)

    return grid

In [None]:
# LOAD DATA
print("Loading data...")

water = gpd.read_file(WATERCOURSE).set_crs(epsg=27700, allow_override=True)
historic = gpd.read_file(HISTORIC_FLOOD).set_crs(epsg=27700, allow_override=True)
road = gpd.read_file(ROAD).set_crs(epsg=27700, allow_override=True)
hospital = gpd.read_file(HOSPITAL).set_crs(epsg=27700, allow_override=True)
boundary = gpd.read_file(BOUNDARY).set_crs(epsg=27700, allow_override=True)

In [None]:
# GRID
print("Building grid...")
grid = build_grid(boundary)

In [None]:
# LINE DENSITIES
print("Calculating watercourse density...")
grid = line_density(grid, water, "water_dens", CELL_SIZE)

print("Calculating road density...")
grid = line_density(grid, road, "road_dens", CELL_SIZE)

In [None]:
# NEAREST DISTANCES
print("Calculating nearest watercourse distance...")
grid = nearest_distance(grid, water, "water_dist", km=False)

print("Calculating nearest hospital distance...")
grid = nearest_distance(grid, hospital, "hospital")

print("Calculating nearest major road distance...")
major_roads = road[road["function"].isin(["A Road", "Motorway"])]
grid = nearest_distance(grid, major_roads, "road_dist")


In [None]:
# RASTER FEATURES
print("Calculating elevation...")
grid = zonal_mean(grid, ELEVATION, "elevation")

print("Calculating impervious fraction...")
grid = zonal_fraction_nonzero(grid, IMPERVIOUS, "impervious")

In [None]:
# HISTORIC FLOOD
print("Calculating historic flood flag...")
grid = historic_flood_flag(grid, historic)

In [None]:
# FLOOD DEPTH PROBABILITIES
print("Calculating flood depth probabilities...")
grid = flood_depth_probs(grid)

In [None]:
# SAVE OUTPUT
cols = [
    "water_dens", "water_dist", "elevation",
    "impervious", "historic", "road_dens", "road_dist", "hospital", 
    "p_0", "p_0p0_0p2", "p_0p2_0p3", "p_0p3_0p6",
    "p_0p6_0p9", "p_0p9_1p2", "p_gt_1p2",
    "geometry"
]

grid[cols].to_file(OUTPUT)