# Flood mapping with dynamic harmonic parameters

In [None]:
import os

import hvplot.xarray  # noqa
import numpy as np
import pystac_client
import xarray as xr
from dask.distributed import Client, wait
from odc import stac as odc_stac

In notebook 3, we could quickly get flooded extents because we had a precomputed set of harmonic parameters available to us. However, if this were not available (e.g. flood mapping on a different SAR product), we would need to compute those on-the-fly.

The workflow looks largely the same, starting with spinning up a Dask client and setting up chunking.


In [None]:
client = Client(processes=False, threads_per_worker=2, n_workers=3, memory_limit="12GB")
client

In [None]:
chunks = {"time": 1, "latitude": 1300, "longitude": 1300}

## Cube Definitions

The following generic specifications are used for presenting the data.


In [None]:
# Coordinate Reference System - World Geodetic System 1984 (WGS84) in this case
crs = "EPSG:4326"
res = 0.00018  # 20 meter in degree

## Northern Germany Flood

We will again use the study area around Zingst, Germany during Storm Babet  [Wikipedia](https://en.wikipedia.org/wiki/Storm_Babet).

However, this time we will need a lot of extra data. At TU Wien we typically use about three years of data to generate robust harmonic parameters. To make sure we have the most up-to-date parameters possible, we'll get Sentinel-1 SIG0 data from the EODC STAC Catalogue for the three years prior to Storm Babet.


In [None]:
parameters_time_range = "2020-10-25/2023-10-25"
flood_time_range = "2023-10-11/2023-10-25"
minlon, maxlon = 12.3, 13.1
minlat, maxlat = 54.3, 54.6
bounding_box = [minlon, minlat, maxlon, maxlat]

In [None]:
eodc_catalog = pystac_client.Client.open("https://stac.eodc.eu/api/v1")
search = eodc_catalog.search(
    collections="SENTINEL1_SIG0_20M",
    bbox=bounding_box,
    datetime=parameters_time_range,
)

items_sig0 = search.item_collection()
items_sig0

Define helper functions as before:

In [None]:
def extract_orbit_names(items):
    return np.array(
        [
            items[i].properties["sat:orbit_state"][0].upper()
            + str(items[i].properties["sat:relative_orbit"])
            for i in range(len(items))
        ]
    )

def post_process_eodc_cube(dc: xr.Dataset, items, bands):
    if not isinstance(bands, tuple):
        bands = tuple([bands])
    for i in bands:
        dc[i] = post_process_eodc_cube_(
            dc[i], items, i
        )  # https://github.com/TUW-GEO/dask-flood-mapper.git
    return dc


def post_process_eodc_cube_(dc: xr.Dataset, items, band):
    scale = items[0].assets[band].extra_fields.get("raster:bands")[0]["scale"]
    nodata = items[0].assets[band].extra_fields.get("raster:bands")[0]["nodata"]
    return dc.where(dc != nodata) / scale

Lazily load data for VV polarization:

In [None]:
bands = "VV"
sig0_dc = odc_stac.load(
    items_sig0,
    bands=bands,
    crs=crs,
    chunks=chunks,
    resolution=res,
    bbox=bounding_box,
    resampling="bilinear",
    groupby=None,
)

Rescaling, filling nodata values with np.nan, and adding orbit names, with the helper functions:


In [None]:
sig0_dc = (
    post_process_eodc_cube(sig0_dc, items_sig0, bands)
    .rename_vars({"VV": "sig0"})
    .assign_coords(orbit=("time", extract_orbit_names(items_sig0)))
    .dropna(dim="time", how="all")
    .sortby("time")
)

Converting time dimension to a relative orbits dimension:


In [None]:
__, indices = np.unique(sig0_dc.time, return_index=True)
indices.sort()
orbit_sig0 = sig0_dc.orbit[indices].data
sig0_dc = sig0_dc.groupby("time").mean(skipna=True)
sig0_dc = sig0_dc.assign_coords(orbit=("time", orbit_sig0))
sig0_dc = sig0_dc.persist()
wait(sig0_dc)
sig0_dc

## Harmonic Parameters

The so-called likelihoods of $P(\sigma^0|flood)$ and $P(\sigma^0|nonflood)$ can be calculated from past backscattering information. To be able to this we can model the expected variations in land back scattering based on seasonal changes in vegetation. The procedure is similar to the backscattering routine.

Now, instead of just loading the result of this calculation, we'll do it ourselves.


In [None]:
from dask_flood_mapper.processing import reduce_to_harmonic_parameters

We calculate parameters for each relative orbit and then combine them into a single dataset.

To do that, we group our $\sigma^0$ dataset by its `"orbit"` variable, and for each group, map the included `reduce_to_harmonic_parameters` function over the spatial chunks of the sub-dataset.


In [None]:
harm_pars_list = []
for orbit, orbit_ds in sig0_dc.groupby("orbit"):
    orbit_ds = orbit_ds.chunk({"time": -1}).persist()
    wait(orbit_ds)
    dtimes = orbit_ds["time.dayofyear"].compute()
    harm_pars = xr.map_blocks(func=reduce_to_harmonic_parameters,
                              obj=orbit_ds["sig0"],
                              kwargs={"dtimes": dtimes,
                                      "k": 3,
                                      "x_var_name": "longitude",
                                      "y_var_name": "latitude"}).persist()
    harm_pars_list.append((orbit, harm_pars))
wait(harm_pars_list)

Now we have a dataset of parameters for each orbit, and concatenate them into a single dataset along a new `"orbit"` dimension with corresponding coordinates. We also nan-out all pixels where there are fewer than 32 observations underlying the fit (fewer than this tends to give a bad fit).

In [None]:
hpar_dc = xr.concat([harm_pars[1] for harm_pars in harm_pars_list], dim="orbit")
hpar_dc["orbit"] = [harm_pars[0] for harm_pars in harm_pars_list]
hpar_dc = hpar_dc.where(hpar_dc.sel(param="NOBS") >= 32).drop_sel(param="NOBS")
hpar_dc

A very important step here is to now filter the sigma-nought data down to the period of interest for flood mapping. We then grab the ID of the relative orbit corresponding to each time step.

In [None]:
sig0_dc = sig0_dc.sel(time=slice(*(flood_time_range.split("/"))))
orbit_sig0 = sig0_dc.orbit.data

We expand the harmonic parameters dataset along the orbits of sigma nought to be able to calculate the correct land reference backscatter signatures.


In [None]:
hpar_dc = hpar_dc.to_dataset(dim="param").sel(orbit=orbit_sig0)
hpar_dc = hpar_dc.persist()
wait(hpar_dc)
hpar_dc

The rest of the workflow proceeds exactly as in notebook 3.

## Local Incidence Angles

In [None]:
search = eodc_catalog.search(collections="SENTINEL1_MPLIA", bbox=bounding_box)

items_plia = search.item_collection()

In [None]:
bands = "MPLIA"
plia_dc = odc_stac.load(
    items_plia,
    bands=bands,
    crs=crs,
    chunks=chunks,
    resolution=res,
    bbox=bounding_box,
    groupby=None,
)

plia_dc = post_process_eodc_cube(plia_dc, items_plia, bands).rename({"time": "orbit"})
plia_dc["orbit"] = extract_orbit_names(items_plia)
plia_dc = plia_dc.groupby("orbit").mean(skipna=True)

In [None]:
plia_dc = plia_dc.sel(orbit=orbit_sig0)
plia_dc = plia_dc.persist()
wait(plia_dc)
plia_dc

## ESA World Cover from Terrascope

In [None]:
os.environ["AWS_NO_SIGN_REQUEST"] = "YES"
wcover_catalog = pystac_client.Client.open("https://services.terrascope.be/stac/")

In [None]:
search = wcover_catalog.search(
    collections="urn:eop:VITO:ESA_WorldCover_10m_2021_AWS_V2", bbox=bounding_box
)

items_wcover = search.item_collection()

In [None]:
wcover_dc = (
    odc_stac.load(
        items_wcover,
        crs=crs,
        chunks=chunks,
        resolution=res,
        bbox=bounding_box,
    )
    .squeeze("time")
    .drop_vars("time")
    .rename_vars({"ESA_WORLDCOVER_10M_MAP": "wcover"})
)
wcover_dc = wcover_dc.persist()
wait(wcover_dc)
wcover_dc

## Fuse cube

In [None]:
flood_dc = xr.merge([sig0_dc, plia_dc, hpar_dc, wcover_dc])
flood_dc = flood_dc.where(flood_dc.wcover != 80)
flood_dc = (
    flood_dc.reset_index("orbit", drop=True)
    .rename({"orbit": "time"})
    .dropna(dim="time", how="all", subset=["sig0"])
)
flood_dc = flood_dc.persist()
wait(flood_dc)
flood_dc

## Likelihoods

Now we are ready to calculate the likelihoods of micorwave backscattering given flooding (or non flooding).

### Water


In [None]:
def calc_water_likelihood(dc):
    return dc.MPLIA * -0.394181 + -4.142015

In [None]:
flood_dc["wbsc"] = calc_water_likelihood(flood_dc)

### Land

In [None]:
def harmonic_expected_backscatter(dc):
    w = np.pi * 2 / 365

    t = dc.time.dt.dayofyear
    wt = w * t

    M0 = dc.M0
    S1 = dc.S1
    S2 = dc.S2
    S3 = dc.S3
    C1 = dc.C1
    C2 = dc.C2
    C3 = dc.C3
    hm_c1 = (M0 + S1 * np.sin(wt)) + (C1 * np.cos(wt))
    hm_c2 = (hm_c1 + S2 * np.sin(2 * wt)) + C2 * np.cos(2 * wt)
    hm_c3 = (hm_c2 + S3 * np.sin(3 * wt)) + C3 * np.cos(3 * wt)
    return hm_c3

In [None]:
flood_dc["hbsc"] = harmonic_expected_backscatter(flood_dc)

## Flood mapping

Having calculated the likelihoods, we can now move on to calculate the probability of (non-)flooding given a pixel's $\sigma^0$. These so-called *posteriors* need one more piece of information, as can be seen in the equation above. We need the probability that a pixel is flooded $P(F)$ or not flooded $P(NF)$. Of course, these are the figures we've been trying to find this whole time. We don't actually have them yet, so what can we do? In Bayesian statistics, we can just start with our best guess. These guesses are called our "priors", because they are the beliefs we hold *prior* to looking at the data. This subjective prior belief is the foundation Bayesian statistics, and we use the likelihoods we just calculated to update our belief in this particular hypothesis. This updated belief is called the "posterior".

Let's say that our best estimate for the chance of flooding versus non-flooding of a pixel is 50-50: a coin flip.  We now can also calculate the probability of backscattering $P(\sigma^0)$, as the weighted average of the water and land likelihoods, ensuring that our posteriors range between 0 to 1.

The following code block shows how we calculate the priors which allow use to predict whether it is likely if a land pixel became flooded.


In [None]:
def bayesian_flood_decision(dc):
    nf_std = 2.754041
    sig0 = dc.sig0
    std = dc.STD
    wbsc = dc.wbsc
    hbsc = dc.hbsc

    f_prob = (1.0 / (std * np.sqrt(2 * np.pi))) * np.exp(
        -0.5 * (((sig0 - wbsc) / nf_std) ** 2)
    )
    nf_prob = (1.0 / (nf_std * np.sqrt(2 * np.pi))) * np.exp(
        -0.5 * (((sig0 - hbsc) / nf_std) ** 2)
    )

    evidence = (nf_prob * 0.5) + (f_prob * 0.5)
    nf_post_prob = (nf_prob * 0.5) / evidence
    f_post_prob = (f_prob * 0.5) / evidence
    decision = xr.where(
        np.isnan(f_post_prob) | np.isnan(nf_post_prob),
        np.nan,
        np.greater(f_post_prob, nf_post_prob),
    )
    return nf_post_prob, f_post_prob, decision

In [None]:
flood_dc[["nf_post_prob", "f_post_prob", "decision"]] = bayesian_flood_decision(
    flood_dc
)

## Postprocessing

We continue by improving our flood map by filtering out observations that we expect to have low sensitivity to flooding based on a predefined set of criteria.

These criteria include:
* Masking of Exceeding Incidence Angles
* Identification of Conflicting Distributions
* Removal of Measurement Outliers
* Denial of High Uncertainty on Decision


In [None]:
def post_processing(dc):
    dc = dc * np.logical_and(dc.MPLIA >= 27, dc.MPLIA <= 48)
    dc = dc * (dc.hbsc > (dc.wbsc + 0.5 * 2.754041))
    land_bsc_lower = dc.hbsc - 3 * dc.STD
    land_bsc_upper = dc.hbsc + 3 * dc.STD
    water_bsc_upper = dc.wbsc + 3 * 2.754041
    mask_land_outliers = np.logical_and(
        dc.sig0 > land_bsc_lower, dc.sig0 < land_bsc_upper
    )
    mask_water_outliers = dc.sig0 < water_bsc_upper
    dc = dc * (mask_land_outliers | mask_water_outliers)
    return (dc * (dc.f_post_prob > 0.8)).decision

In [None]:
flood_output = post_processing(flood_dc)

## Removal of Speckles

The following step is designed to further improve the clarity of the floodmaps. These filters do not directly relate to prior knowledge on backscattering, but consists of contextual evidence that supports, or oppose, a flood classification. This mainly targets so-called speckles. These speckles are areas of one or a few pixels, and which are likely the result of the diversity of scattering surfaces at a sub-pixel level. In this approach it is argued that small, solitary flood surfaces are unlikely. Hence speckles are removed by applying a smoothing filter which consists of a rolling window median along the x and y-axis simultaneously.


In [None]:
flood_output = (
    flood_output.rolling({"longitude": 5, "latitude": 5}, center=True)
    .median(skipna=True)
    .persist()
)
wait(flood_output)
flood_output

## Results

In the following graphic we superimpose the data on a map and we can move the slider to see which areas become flooded over time.

In [None]:
flood_output.hvplot.image(
    x="longitude",
    y="latitude",
    rasterize=True,
    geo=True,
    tiles=True,
    project=True,
    cmap=["rgba(0, 0, 1, 0.1)", "darkred"],
    cticks=[(0, "non-flood"), (1, "flood")],
    frame_height=400,
)

In [None]:
client.close()