# Aqua Monitor using OpenEO

This notebook implements the [Aqua Monitor](https://aqua-monitor.appspot.com/) algorithm using [OpenEO](https://openeo.org/), an open source processing platfrom for earth observation data.

The goal of aquamonitor is to provide a planetary-scale view on changes in land and water occurrence over time. In other words, it shows how land has changed to surface water and vice-versa. Applications like this give insight in both man-made interventions, as well as changes due to natural variability and climate change.

In this notebook, a global view (although you could put it in the input parameters) is not the goal. For a given area of interest (aoi), the notebook builds a set of instructions using the [OpenEO python client library](https://open-eo.github.io/openeo-python-client/) and sends it to the [OpenEO Platfrom](https://openeo.cloud/) backend for processing. Then the results from the analysis are downloaded and interactively plotted.

## Imports
This notebook has a few dependencies. The main ones being the OpenEO python client library. We also use xarray to analyse downloaded datacubes.

In [None]:
! pip install --upgrade "shapely<1.8.0" "openeo==0.11.0" geoviews bokeh

In [None]:
# imports
import math
from datetime import datetime

from dateutil.relativedelta import relativedelta
from openeo import connect, Connection
from openeo.rest.datacube import DataCube
from typing import List, Dict, Tuple, Union
from pathlib import Path
import xarray as xr

from utils import get_files_from_dc

In [None]:
# Visualisation imports
import cartopy.crs as ccrs

import matplotlib.pyplot as plt
import geoviews as gv
import holoviews as hv
import numpy as np

from holoviews import opts, streams
from holoviews.element.tiles import OSM

gv.extension("bokeh","matplotlib")

In [None]:
%matplotlib inline

## Connect to the backend
OpenEO features a generic API that can be implemented using many different backends using different implementations. To use the a certain backend, you need to use a backend url and provide credentials. Here we log in with one of those backends using OIDC, using egi as a provider.

In [None]:
# Connect to VITO backend
vito_url: str = "https://openeo.vito.be/openeo/1.0"
openeo_platform_url: str = "openeo.cloud"

backend_url: str = vito_url
con: Connection = connect(backend_url)
con.authenticate_oidc(provider_id="egi")

out_dir = Path("output")
out_dir.mkdir(parents=True, exist_ok=True)

local_cache_file: Path = out_dir / "job_cache.json"
debug = True

collection_id = "TERRASCOPE_S2_TOC_V2"

## Explore collections and processes
You can interactively explore different collections in the notebook using the `Connection` object.

In [None]:
# con.list_collections()
con.describe_collection(collection_id)

You can also list the processes available in the backend use the `list_processes` method

In [None]:
# con.list_processes()

# Load the available imagery
Processing a `DataCube` starts with loading a collection or multiple collections that the backend has available.
Here we download just the Sentinel2 imagery, filtering on the spatial and temporal extent that we are interested in. As processing for large areas takes time, we calculate for smaller areas if we are debugging or showing off the script (using `debug = True` ).
As the backend that we use loads the data in UTM projection, we calculate the UTM zone number based on the bounding box, so we can plot the results on a map later on.
Finally, we also specify `plot_resolution` and `smoothing_resolution` here. `plot_resolution` is used later on. This is mainly relevant when calculating for larger areas, as plotting too many pixels is not useful and very tough on the local hardware. `smoothing_resolution` is explained in the next step.

In [None]:
def get_utm_zone(lon: float) -> int:
    return math.ceil((180 + lon) / 6)

# Define constraints for loading
embalse_de_barbate_bbox: Dict[str, Union[float, str]] = {
    "west": -5.707740783691406,
    "east": -5.6764984130859375,
    "south": 36.40221798067486,
    "north": 36.4315034892636,
    "crs": "EPSG:4326"
}
sw_spain_bbox: Dict[str, Union[float, str]] = {
    "west": -7.682155041704681,
    "east": -5.083888440142181,
    "south": 36.18203953636458,
    "north": 38.620982842287496,
    "crs": "EPSG:4326"
}

if debug:
    spatial_extent: Dict[str, Union[float, str]] = embalse_de_barbate_bbox
    utm_zone = get_utm_zone(embalse_de_barbate_bbox["west"])
else:
    spatial_extent: Dict[str, Union[float, str]] = sw_spain_bbox
    utm_zone = get_utm_zone(sw_spain_bbox["west"])

smoothing_resolution: float = 30.0  # meters
if debug:
    plot_resolution: float = 10.0 # meters
else:
    plot_resolution: float = 1000.0  # meters

if debug:
    t_end = datetime.utcnow()
    t_start = t_end - relativedelta(years=2)
    temporal_extent: List[str] = [str(t_start), str(t_end)]
else:
    temporal_extent: List[str] = ["2019-01-01", "2022-01-01"]

band_names = ["swir", "nir", "green"]
bands = ["B11", "B08", "B03"]
        
dc: DataCube = con.load_collection(
    collection_id=collection_id,
    spatial_extent=spatial_extent,
    temporal_extent=temporal_extent,
    bands=bands
    ).add_dimension(name="source_name", label=collection_id, type="other") \
    .rename_labels(dimension="bands", source=bands, target=band_names)

### Download Source Data
To explore the source data, it is possible to download this data. There are two ways of doing this. Synchronously with the `.download` method on the `DataCube` object, or asynchronously by starting a batch job. For production or large jobs, it is better to do this using a batch job, as synchronous jobs are tied to a timeout. The processing speed of a synchronous job also depends on the load on the server, while batch jobs are queued up and always executed when there is space on the server.

In [None]:
# Synchronous download
# dc.download(out_dir / "raw.nc", format="netcdf")

In [None]:
# Asynchronous download

from utils import get_files_from_dc
files = get_files_from_dc(
    dc,
    out_directory=out_dir / "raw",
    job_name="get_collection",
    local_cache_file=out_dir / "job_cache.json",
    recalculate=False
)

In [None]:
data_set_raw: xr.Dataset = xr.open_dataset(out_dir / "raw" / "openEO.nc")

In [None]:
data_set_raw

### Visualize Raw data

Here we use [Open Street Map](https://www.openstreetmap.org) as well as the [holoviews](https://holoviews.org/) package, and its extension, [geoviews](https://geoviews.org/). Other standard tools such as bokeh (plotting backend) and cartopy (crs transforms) are used here as well.
In other plots, we use matplotlib functionality.

In [None]:
kdims = ["x", "y", "t"]
vdims = ["swir", "nir", "green"]

data_set_raw_fixed = data_set_raw.drop("crs")
hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d'  # readable time format
gv_raw = gv.Dataset(data_set_raw_fixed, kdims=kdims, vdims=vdims, crs=ccrs.UTM(utm_zone)).redim(x="lon", y="lat")

print(repr(gv_raw))

In [None]:
dmap = gv_raw.to(gv.Image, ["lon", "lat"], "green", group="raw_data", label="raw", datatype=["xarray"], dynamic=True)
overlay = OSM() * dmap
overlay.opts(
    opts.Image(cmap="turbo", colorbar=True, clim=(0, 7000), height=500, width=500, tools=["hover"]),
    opts.Tiles(height=500, width=500))

overlay

# Smoothen images
In order for the algorithm to function better, images are slightly smoothed spatially, using the `.resample_spatial` method. Note that the spatial resolution of the Sentinel-2 RGB bands is 10m.

In [None]:
smoothed_dc = dc.resample_spatial(method="cubic", resolution=smoothing_resolution)

## Divide the data into time-bands
Filtering clouds and other atmospheric effects is notoriously difficult, especially if the algorithm has to work globally. Luckily, we are interested on long-term changes, which means that we can make use of composite images that use multiple years of data to achieve a good cloud-free image. Here we assume that over the time period chosen, changes such as seasonal effects do not skew our composite image by a large degree.
The smoothed input data is divided in bands of a number of years (`year_interval`). The data is sorted and the n-th percentile of every time-band (`percentile`) is used for further analysis. The percentile is based on the MODIS cloud cover percentage dataset.

In [None]:
from pathlib import Path
from datetime import timedelta
import pandas as pd

year_interval: int = 1
percentile: float = 0.2  # fractions [0 - 1]
bucketed_netcdf_filename: str = "bucketed.nc"

if debug:
    dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], periods=3)
else:
    dr: pd.DatetimeIndex = pd.date_range(start=temporal_extent[0], end=temporal_extent[1], freq=f"{year_interval}YS")

In [None]:
from openeo import processes

if debug:
    image_count_limit: int = 5  # Will exclude some images
else:
    image_count_limit: int = 10

t_intervals = [[str(d), str(dr[i+1])] for i, d in enumerate(dr[:-1])]

# Create bucketed DC based on percentile of images
t_bucketed_dc: DataCube = smoothed_dc \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: processes.quantiles(data, probabilities=[percentile]),
        labels=[t_int[0] for t_int in t_intervals]
    )

In [None]:
t_intervals

## Create a cache in the OpenEO backend
Because we are severely cutting down on the size of the total dataset in the previous timebanding step, caching the data here helps quick development and saves many cloud resources.

In [None]:
from utils import get_or_create_cached_cube
if not debug:
    t_bucketed_dc = get_or_create_cached_cube(
        dc=t_bucketed_dc,
        local_cache_file=local_cache_file,
        temporal_extent=tuple(temporal_extent),
        spatial_extent={
            "x": (spatial_extent["west"], spatial_extent["east"]),
            "y": (spatial_extent["north"], spatial_extent["south"])
        },
        job_name="t_bucketing_aquamonitor",
        result_format="gtiff"
    )

## Count the number of images included in each interval
To check whether there are enough images available for a meaningful composite image, we filter for a certain number of images present in each timestep.

In [None]:
# Workaround for now is to set images to 1 until count works
count_dc: DataCube = smoothed_dc.band("green") \
    .apply(lambda x: x * 0 + 1) \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: processes.sum(data),
        labels=[t_int[0] for t_int in t_intervals]
    )

## Create the time-bucketed NDWI cube
To detect surface-to-water changes and vice-versa, we rely on the NDWI: $$NDWI = \frac{\rho_{green} - \rho_{nir}}{\rho_{green} + \rho_{nir}}$$
and on the MNDWI: $$MNDWI = \frac{\rho_{green} - \rho_{swir}}{\rho_{green} + \rho_{swir}}$$

In [None]:
# Create products
from typing import Tuple

green: DataCube = t_bucketed_dc.band("green")
nir: DataCube = t_bucketed_dc.band("nir")
swir: DataCube = t_bucketed_dc.band("swir")
ndwi: DataCube = (green - nir) / (green + nir)
mndwi: DataCube = (green - swir) / (green + swir)

# Rescale
def rescale(dc: DataCube, factors: Tuple[float]) -> DataCube:
    return dc.subtract(factors[0]).divide(factors[1] - factors[0])

rescaled_mndwi = rescale(mndwi, factors=(-0.6, 0.6))

### Download the MNDWI results
For the interactive plot later in the notebook, we need to get the MNDWI results in as a Cloud Optimized GeoTiff. We can accomplish this by saving to the geotiff format and using a batch format. For now, the results of a batch operation stay reachable in the openEO backend for 3 months.
Using the Cloud Optimized Geotiff, we can query coordinates in the GeoTiff without downloading it, or opening it fully. This is further explained at: https://www.cogeo.org/

In [None]:
from utils import get_urls_from_dc
mndwi_urls = get_urls_from_dc(
    rescaled_mndwi.resample_spatial(method="cubic", resolution=plot_resolution),
    'mndwi_calculation',
    result_format="gtiff",
    local_cache_file=local_cache_file,
    recalculate=False
)

In [None]:
mndwi_urls

### Plot the mndwi results
Here we use the same plotting libraries as before to visually explore the MNDWI dataset.

In [None]:
ds = xr.open_dataset(mndwi_urls[0], engine="rasterio")

In [None]:
ds

In [None]:
dim = xr.Variable(dims="t", data=dr[:-1])
data_set_mndwi = xr.concat([xr.open_dataset(url, engine="rasterio") for url in mndwi_urls], dim=dim)
da_mndwi = data_set_mndwi.rename(band_data="mndwi").drop("band")["mndwi"]
da_mndwi

In [None]:
kdims = ["x", "y", "t"]
vdims = ["mndwi"]

hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d'  # readable time format
lon = hv.Dimension("lon", label="longitude", unit="deg Northing")
lat = hv.Dimension("lat", label="latitude", unit="deg Easting")
gv_mndwi = gv.Dataset(da_mndwi, kdims=kdims, vdims=vdims, crs=ccrs.UTM(utm_zone)).redim(x=lon, y=lat)

img = gv_mndwi.to(gv.Image, ["lon", "lat"], "mndwi", group="mndwi", label="mndwi", datatype=["xarray"], dynamic=True)
img = img.redim(x=lon, y=lat)
overlay = OSM() * img
overlay.opts(
    opts.Image(cmap="turbo", colorbar=True, clim=(-1.2, 1.2), height=500, width=500, alpha=0.8),
    opts.Tiles(height=500, width=500))

## Perform a linear regression on each band
The aqua monitor algorithm uses the ndwi as the wetness indicator. 
The linear regression for the ndwi is used used to detect changes. The slope of the linear regression will be the indicator water-to-surface changes or vice-versa.
As a linear regression is defined as a process in the backend yet, OpenEO allows us to execute python code on the DataCube using a UDF (User Defined Function).
Here we load the UDF from this repository and apply it to the DataCube using `reduce_temporal_udf`, as we will lose the `t` dimension.

In [None]:
def load_udf(udf_path: Union[str, Path]):
    with open(udf_path, "r+") as f:
        return f.read()

linear_regression_udf_path: Path = Path(Path.cwd().parent / "udfs" / "linear_regression.py")
linear_regression_udf: str = load_udf(linear_regression_udf_path)

In [None]:
lin_reg: DataCube = rescaled_mndwi.reduce_temporal_udf(code=linear_regression_udf, runtime="Python")

### Try out the linear regression locally
It is possible to try out UDFs locally. Therefore we need to download the input data locally and run the udf using `execute_local_udf`. This is useful when debugging your UDF.

In [None]:
from utils import get_files_from_dc

mndwi_dir = out_dir / "mndwi_cache"

mndwi_files = get_files_from_dc(  # reuse the job for the mndwi urls
    rescaled_mndwi.resample_spatial(method="cubic", resolution=plot_resolution),
    mndwi_dir,
    'mndwi_calculation',
    result_format="gtiff",
    local_cache_file=local_cache_file,
    recalculate=False
)

Go from geotiffs to a xarray dataset

In [None]:
from datetime import date
import re

import rioxarray

mndwi_paths = list(mndwi_dir.glob("*.tif"))
mndwi_pd = [
    (
        date(
            *map(lambda g: int(g), re.match(r".+(\d{4})-(\d{2})-(\d{2}).+", path.name).groups())
        ),
        path
    ) for path in mndwi_paths
]

t = xr.Variable(dims="t", data=dr[:-1])
das = []
for d, p in mndwi_pd:
    da = rioxarray.open_rasterio(p)
    coords = da.coords
    coords.update({"t": d})
    da = da.assign_coords(coords)
    das.append(da)
combined: xr.DataArray = xr.concat(das, dim=t)

ds: xr.Dataset = combined.to_dataset('band').rename({1: "var"})
ds

save dataset as a file for use in the udf

In [None]:
netcdf_file = Path(out_dir / "mndwi_cache.nc")
netcdf_file.unlink(True)
ds.to_netcdf(out_dir / "mndwi_cache.nc", mode="w")

In [None]:
from openeo.udf import execute_local_udf
from openeo.udf.udf_data import UdfData
result: UdfData = execute_local_udf(linear_regression_udf, out_dir / "mndwi_cache.nc", fmt='netcdf')

Obtain the results using the `datacube_list` property on the result object `UdfData`

In [None]:
result_da = result.datacube_list[0].get_array()
result_da

Plot the result of the local udf execution

In [None]:
plt.imshow(result_da.sel(bands="var"))
plt.colorbar()

significant changes are based on a threshold, as calculated in the next step. For now, let's see if we have significant changes

In [None]:
plt.imshow(abs(result_da.sel(bands="var")) > 0.12497148722627738)  # this number comes from the next step
plt.colorbar()

## Masking
In order to create an easier to read result image, a couple of masks are combined to create the resulting image:
Masks are created for areas that:
- have too few images
- have no clear changes from land-to-water or vice versa
- areas which maximum mndwi value is still clearly land
- areas which minimum mndwi value is still clearly water
pixels with value 1 are values that should be left out of the end result.

To combine masks, we invert the masks so that pixels with value 0 should be left out, and use the product of the masks to create an overlay. We then invert the result again.

In [None]:
from openeo import processes
from dateutil.parser import parse
from datetime import datetime

ndwi_min_water: float = -0.05 # minimum value of NDWI to assume as water
ndwi_max_land: float = 0.5
slope_threshold: float = 0.025
start: datetime = parse(temporal_extent[0])
stop: datetime = parse(temporal_extent[1])

if debug:
    min_num_images: int = 5
else:
    min_num_images: int = 20

# 15 / (stop_year - start_year)
slope_threshold_ratio: float = 15 * 365.25 / float(stop.__sub__(start).days)
print(f"slope threshold: {slope_threshold * slope_threshold_ratio}")

mndwi_min: DataCube = mndwi.reduce_dimension(dimension="t", reducer="min")
mndwi_max: DataCube = mndwi.reduce_dimension(dimension="t", reducer="max")
# Should always be water
min_water_mask: DataCube = mndwi_min.apply(lambda val: processes.lt(x=val, y=ndwi_max_land))  # invert mask, should be processes.gte
# should always be land
max_land_mask: DataCube = mndwi_max.apply(lambda val: processes.gt(x=val, y=ndwi_min_water))  # invert mask, should be processes.lte

# Create bucketed DC based on image count
num_images_mask: DataCube = smoothed_dc.band("green") \
    .apply(lambda data: processes.add(x=processes.multiply(x=data, y=0), y=1)) \
    .aggregate_temporal(
        intervals=t_intervals,
        reducer=lambda data: processes.sum(data),
        labels=[t_int[0] for t_int in t_intervals]
    ) \
    .apply(lambda val: processes.gte(x=val, y=image_count_limit)) \
    .apply(lambda val: processes.multiply(x=val, y=1.)) \
    .reduce_dimension(
        dimension="t",
        reducer=lambda data: processes.product(data)
    )

lin_reg_mask: DataCube = lin_reg.apply(processes.absolute).apply(lambda val: processes.gte(x=val, y=slope_threshold * slope_threshold_ratio))

mask: DataCube = lin_reg_mask.multiply(1.) \
    .multiply(min_water_mask.multiply(1.)) \
    .multiply(max_land_mask.multiply(1.)) \
    .multiply(num_images_mask.multiply(1.)) \
    .apply(lambda data: processes.eq(x=data, y=0)) \
    .multiply(1.)  # after multiplying masks, invert

lin_reg = lin_reg.mask(mask)

### Download the Linear Regression results

In [None]:
lin_reg_files = get_files_from_dc(
    lin_reg,
    out_directory=out_dir / "lin_reg",
    job_name="lin_reg",
    local_cache_file=out_dir / "job_cache.json",
    result_format="GTiff",
    recalculate=False
)

Or download files from a recent successful job:

### Plot the results of the Linear Regression

In [None]:
data_set_lin_reg: xr.Dataset = xr.open_dataset(out_dir / "lin_reg" / "openEO.tif").rename(band_data="lin_reg")
data_set_lin_reg

Or we can take more time and plo the results on a map projection with OpenStreetMap in the background. We also want to have some interactivity by checking where the linear regression data comes from. We use the source data from the rescaled MNDWI to create an interactive timeseries when a pixel is selected on the Map. We do not want to download the full dataset locally, so we choose to use the COG (cloud optimized geotiffs).

In [None]:
import cartopy.crs as ccrs

import geoviews as gv
import holoviews as hv
import numpy as np
import panel as pn

from holoviews import opts, streams
from holoviews.element.tiles import OSM

gv.extension("bokeh","matplotlib")

#### Clip the image in case of a large area:
If taking the entirety of SW spain, we still have a problem plotting an image this large in jupyterlab, so we take an interesting area:

In [None]:
if not debug:
    from rasterio.crs import CRS
    from rasterio.windows import from_bounds
    from pyproj import Transformer

    bounds: Dict[str, float] = {
        "top": -6.5,
        "right": 36.5,
        "bottom": -5.5,
        "left": 37.5
    }

    crs_source = CRS.from_string("epsg:4326")
    crs_dest = rasterio.open(urls[0]).crs

    transformer = Transformer.from_crs(crs_source, crs_dest)
    bounds["left"], bounds["top"] = transformer.transform(bounds["left"], bounds["top"])
    bounds["right"], bounds["bottom"] = transformer.transform(bounds["right"], bounds["bottom"])

    window = from_bounds(**bounds, transform=rasterio.open(urls[0]).transform)

    data_set_lin_reg.rio.write_crs(crs_dest, inplace=True)
    data_set_lin_reg_clipped: xr.Dataset = data_set_lin_reg.rio.clip_box(maxx=bounds["right"], minx=bounds["left"], maxy=bounds["top"], miny=bounds["bottom"])
    data_set_lin_reg_clipped

#### Dynamic plot using the Cloud Optimized Geotiffs stored in the backend

In [None]:
kdims = ["x", "y"]
vdims = ["lin_reg"]

hv.Dimension.type_formatters[np.datetime64] = '%Y-%m-%d'  # readable time format
lon = hv.Dimension("lon", label="longitude", unit="deg Northing")
lat = hv.Dimension("lat", label="latitude", unit="deg Easting")
gv_lin_reg = gv.Dataset(data_set_lin_reg, kdims=kdims, vdims=vdims, crs=ccrs.UTM(zone=utm_zone, southern_hemisphere=False)).redim(x=lon, y=lat)

print(repr(gv_lin_reg))

In [None]:
from rasterio.transform import rowcol
from rasterio.windows import Window
import rasterio

img = gv_lin_reg.to(gv.Image, ["lon", "lat"], "lin_reg", group="lin_reg", label="linear regression", datatype=["xarray"])
img = img.redim(x=lon, y=lat)
overlay = OSM() * img

def get_mndwi_ts(x, y):
    """
    reads from mndwi COG based on coordinates and returns timeseries values
    """
    time = hv.Dimension("t", label="time")
    mndwi = hv.Dimension("norm mndwi", label="norm MNDWI")
    ts = []
    for url in mndwi_urls:
        with rasterio.open(url) as src:
            xs, ys = rowcol(src.transform, x, y)
            with open(out_dir / "log.txt", "w+") as f:
                f.write(f"""
                    x={x}, y={y}
                    xs={xs}, ys={ys}
                """)
            ts.append(src.read(window=Window(ys, xs, 1, 1)).flatten()[0])
    return hv.Curve(ts, time, mndwi)
    # return hv.Curve(da_mndwi.sel(x=x, y=y, method="nearest"), time, mndwi)
    
clicker = streams.Tap(name="location", source=img, x=data_set_lin_reg["x"].values[0], y=data_set_lin_reg["y"].values[0])
timeseries = hv.DynamicMap(callback=get_mndwi_ts, streams=[clicker])

layout = overlay + timeseries
layout.opts(
    opts.Image(cmap="turbo", colorbar=True, clim=(-1, 1), height=500, width=500, tools=["hover"], alpha=0.8),
    opts.Tiles(height=500, width=500),
    opts.Curve(height=200, width=300, ylim=(0, 2)))