In [1]:
"""
Use DBSCAN to cluster regions of coastal flooding to identify potential defence zones

https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html

Requires flood raster data here:
    ./data/flood_rasters/RCP85_100/JamaicaJAM001RCP852100_epsg_32618_RP_100.tif
"""

'\nUse DBSCAN to cluster regions of coastal flooding to identify potential defence zones\n\nhttps://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html\n\nRequires flood raster data here:\n    ./data/flood_rasters/RCP85_100/JamaicaJAM001RCP852100_epsg_32618_RP_100.tif\n'

In [2]:
import os
import random

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sklearn.cluster
import rasterio

In [22]:
def plot_clusters(X: np.ndarray, db: sklearn.cluster._dbscan.DBSCAN, extent: tuple[int], file_path) -> None:
    """
    Plot clusters from sklearn DBSCAN clustering

    Args:
        X: Data to cluster (samples, variables).
        db: Clustering object, post-fit.
        extent: Plot view window extent (xmin, ymin, xmax, ymax). Note that the origin is top-left.
        file_path: Path string to use as title and filename stem.
    """

    labels = db.labels_ 
    xmin, ymin, xmax, ymax = extent
    
    unique_labels = list(set(labels))
    random.shuffle(unique_labels)  # in place
    core_samples_mask = np.zeros_like(labels, dtype=bool)
    core_samples_mask[db.core_sample_indices_] = True
    
    colours = [plt.cm.hsv(each) for each in np.linspace(0, 1, len(unique_labels))]
    label_to_colour = dict(zip(unique_labels, colours))
    f, ax = plt.subplots(figsize=(16,10))
    label_locations = []
    for k, colour in zip(unique_labels, colours):
    
        if k == -1:
            # grey used for 'noisy' pixels
            colour = [0, 0, 0, 0.15]
        class_member_mask = labels == k
        xy = X[class_member_mask & core_samples_mask]
        ax.scatter(xy[:, 1], xy[:, 0], marker="o", facecolor=tuple(colour), edgecolor="none", s=6**2, alpha=0.5)
        xy = X[class_member_mask & ~core_samples_mask]
        ax.scatter(xy[:, 1], xy[:, 0], marker="o", facecolor=tuple(colour), edgecolor="none", s=2**2, alpha=0.2)
    
        if k != -1:
            label_locations.append((k, np.median(xy[:, 1]), np.median(xy[:, 0])))
    
    # label clusters
    cluster_pop = pd.Series(labels).value_counts().to_frame().sort_values("count", ascending=True)
    threshold_to_label = 200  # minimum number of pixels in a cluster to label it
    label_nudge = 30
    top_n = 10
    i = 0
    for label, x, y in label_locations:
        if i == top_n:
            break
        if cluster_pop.loc[label, "count"] > threshold_to_label:    
            i += 1
            text = ax.text(x + label_nudge, y - label_nudge, label, c=label_to_colour[label])
            text.set_bbox(dict(facecolor='white', alpha=0.8, edgecolor='black'))
    
    ax.set_xlim(xmin, xmax)
    ax.set_ylim(ymin, ymax)
    ax.set_xticklabels([])
    ax.set_yticklabels([])
    ax.set_facecolor('w')
    ax.invert_yaxis()  # match the imshow plots which have the origin upper left
    
    left, bottom, width, height = [0.74, 0.71, 0.2, 0.2]
    inset_ax = f.add_axes([left, bottom, width, height])
    cluster_pop = cluster_pop[cluster_pop.index != -1]
    cluster_pop["colour"] = cluster_pop.index.map(label_to_colour)
    pop_to_plot = cluster_pop.iloc[-top_n:]
    pop_to_plot["count"].plot.barh(ax=inset_ax, color=pop_to_plot["colour"])
    inset_ax.set_title("Largest clusters")
    inset_ax.set_ylabel("Label")
    inset_ax.set_xlabel("Number of pixels")
    
    ax.set_title(file_path.replace("/", "\n").replace("__", ", "))
    f.savefig(f"{file_path}.png")
    plt.close(f)

In [23]:
plt.style.use("bmh")

# load data from disk and create arrays of i and j indices
scenario = "RCP85_2100"
return_period = 100
raster_path = f"data/flood_rasters/{scenario}/JamaicaJAM001RCP852100_epsg_32618_RP_{return_period}.tif"
raster = rasterio.open(raster_path)
depth_m: np.ndarray = raster.read(1)
i, j = np.indices(depth_m.shape)

# plotting
ymax, xmax = depth_m.shape
plot_extent = (0, 0, xmax, ymax)
os.makedirs(f"plots/clustering/{scenario}/", exist_ok=True)

for threshold_m in [0.05, 0.1, 0.2, 0.5, 1]:
    
    df = pd.DataFrame(data={"i": i.ravel(), "j": j.ravel(), "depth_m": depth_m.ravel()})
    
    # drop pixels with a score below threshold
    df = df[df["depth_m"] > threshold_m]
    
    # clustering distance parameter
    for eps in [1, 2, 5, 10, 20, 50]:
        
        # create DBSCAN object
        db = sklearn.cluster.DBSCAN(eps=eps, min_samples=5)
        # identify clusters
        X = df.loc[:, ["i", "j"]].to_numpy()
        db.fit(X)
    
        labels = db.labels_
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        if n_clusters < 2:
            continue  # don't plot if we have one megacluster
        
        plot_clusters(X, db, plot_extent, f"plots/clustering/{scenario}/RP_{return_period}__eps_{eps}__thresh_{threshold_m}")

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be finite values
posx and posy should be fini