# Generating Water observations from space (WOfS)

## NOTE: modified for PNG on EASI ASIA, original notebook [**here**](https://github.com/csiro-easi/eocsi-hackathon-2022/blob/fa5bbf8b8170bac72b3d5965dcdaf453ef8672c5/case-studies/water-observations-from-space.ipynb)

The [DEA water observation product](https://www.dea.ga.gov.au/products/dea-water-observation) maps the presense of surface water from Landsat imagery. An analysis through time can map the frequency a pixel is inundated by water, which can be used to infer the temporal and spatial statistics of flood/drought events. This notebook demonstrates how to run the DEA water observation algorithm for a given area of interest

- [DEA product and algorithm details](https://cmi.ga.gov.au/data-products/dea/613/dea-water-observations-landsat)
- [Reference code](https://github.com/GeoscienceAustralia/wofs/blob/master/wofs/virtualproduct.py)

This notebook requires the "WOfS_Environment", which needs to be installed as in the first cell of this notebook (to be run on commandline). Once done, set the environment in the top-right corner. 


**Notes**
- scaling and offset factor applied to col2 of landsat within the function `transform = WOfSClassifier(c2_scaling=True, dsm_path=DEM_PATH)`  

#### Install the WOfS environment using the following on command line


In [1]:
'''
python3 -m venv ~/venvs/WOfS_Environment
source ~/venvs/WOfS_Environment/bin/activate
pip install --upgrade pip
deactivate
realpath /env/lib/python3.10/site-packages > ~/venvs/WOfS_Environment/lib/python3.10/site-packages/base_venv.pth
source ~/venvs/WOfS_Environment/bin/activate
pip install ephem
pip install --index-url https://packages.dea.ga.gov.au/ wofs
python -m ipykernel install --user --name=WOfS_Environment
source ~/venvs/WOfS_Environment/bin/activate
cd /venvs/WOfS_Environment
touch setup.py

##### add in text below to setup.py

from setuptools import setup, find_packages

setup(
    name='WOfS_Environment',
    version='1.0',
    packages=find_packages(),
    install_requires=[
        'ephem',
        'wofs'
    ],
)

#####

python setup.py bdist_egg
'''

'\npython3 -m venv ~/venvs/WOfS_Environment\nsource ~/venvs/WOfS_Environment/bin/activate\npip install --upgrade pip\ndeactivate\nrealpath /env/lib/python3.10/site-packages > ~/venvs/WOfS_Environment/lib/python3.10/site-packages/base_venv.pth\nsource ~/venvs/WOfS_Environment/bin/activate\npip install ephem\npip install --index-url https://packages.dea.ga.gov.au/ wofs\npython -m ipykernel install --user --name=WOfS_Environment\n'

#### Load packages

In [2]:
%matplotlib inline
import os, sys
from pathlib import Path

import numpy as np
import xarray as xr
import rioxarray

from dask_gateway import Gateway

import warnings
warnings.filterwarnings('ignore')

import datacube
from datacube.utils import masking
from datacube.utils.cog import write_cog
from datacube.drivers.netcdf import write_dataset_to_netcdf

dc = datacube.Datacube(app='Generating_WOfS')

from dea_tools.plotting import rgb, display_map

# WOfS classifier
from wofs.virtualproduct import WOfSClassifier
from odc.algo import safe_div, apply_numexpr, keep_good_only

#### Set up a dask cluster

In [4]:
egg_path = '/home/jovyan/venvs/WOfS_Environment/dist/WOfS_Environment-1.0-py3.10.egg'

def init_dask_cluster() -> tuple:
    """Connect to an existing or start a new dask gateway cluster.
    Return (cluster, client)
    """
    gateway = Gateway()
    clusters = gateway.list_clusters()
    if not clusters:
        print('Creating new cluster. Please wait for this to finish.')
        cluster = gateway.new_cluster()
    else:
        print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
        cluster=gateway.connect(clusters[0].name)
    cluster.adapt(minimum=1, maximum=4)  # A default starting point
    client = cluster.get_client()
    client.upload_file(egg_path)

    return (cluster, client)

cluster, client = init_dask_cluster()
display(cluster)


Creating new cluster. Please wait for this to finish.


VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

#### AWS Access

In [7]:
## Optional: Access AWS "requester-pays" buckets
# This is necessary for Landsat ("landsatN_c2l2_*") and Sentinel-2 ("s2_l2a") products
from datacube.utils.aws import configure_s3_access
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);

#### display AOI

In [8]:
latitude = (-7.3, -7.4)
longitude = (144.4, 144.5)
display_map(longitude, latitude)

#### Load landsat data and DEM (Copernicus 30m Digital Elevation Model (GLO-30))

In [9]:
# set time range
time = ('2020-01-01', '2020-12-31')

# load data
ds = dc.load(
    product = 'landsat8_c2l2_sr', 
    latitude = latitude,
    longitude = longitude,
    time=time,
    measurements=['blue', 'green', 'red', 'nir08', 'swir16', 'swir22', 'qa_pixel'],
    output_crs="EPSG:32755",  # Target CRS - https://epsg.io/?q=Papua%20New%20Guinea%20kind%3APROJCRS
    resolution=(30, -30),     # Target resolution
    group_by='solar_day',     # Group by time method
    dask_chunks={'time': 1, 'x': 100, 'y': 100}
)

# rename bands, needed for rgb function and for xr_geomedian
ds = ds.rename({
    "blue": "nbart_blue",
    "green": "nbart_green",
    "red": "nbart_red",
    "nir08": "nbart_nir",
    "swir16": "nbart_swir_1",
    "swir22": "nbart_swir_2",
    "qa_pixel": "fmask",
})


In [10]:
# load DEM
dem = dc.load(
    product="copernicus_dem_30", 
    latitude=latitude,
    longitude=longitude,
    output_crs="epsg:4326", 
    resolution=(-1/3600, 1/3600),
)
elevation = dem.elevation

In [11]:
# Where to save the DEM (fetched in WOfS function)?
DEM_PATH = "/home/jovyan/dems/PNG_test.tif"

dem_path = Path(DEM_PATH)
dem_path.parent.mkdir(parents=True, exist_ok=True)
elevation.rio.to_raster(dem_path)

#### Classify WOfS
Run the water observations classifier.

In [12]:
%%time
transform = WOfSClassifier(c2_scaling=True, dsm_path=DEM_PATH)

# Compute the WOFS layer
wofl = transform.compute(ds)
wofl

CPU times: user 4.29 s, sys: 183 ms, total: 4.48 s
Wall time: 2min 10s


Unnamed: 0,Array,Chunk
Bytes,2.63 MiB,9.77 kiB
Shape,"(20, 372, 371)","(1, 100, 100)"
Dask graph,320 chunks in 818 graph layers,320 chunks in 818 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 2.63 MiB 9.77 kiB Shape (20, 372, 371) (1, 100, 100) Dask graph 320 chunks in 818 graph layers Data type uint8 numpy.ndarray",371  372  20,

Unnamed: 0,Array,Chunk
Bytes,2.63 MiB,9.77 kiB
Shape,"(20, 372, 371)","(1, 100, 100)"
Dask graph,320 chunks in 818 graph layers,320 chunks in 818 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


#### Water observations summaries
Water observations summaries, based on [odc-stats/plugins/wofs.py](https://github.com/opendatacube/odc-stats/blob/develop/odc/stats/plugins/wofs.py), consist of:
- `count_clear`: a count of every time a pixel was observed (not obscured by terrain or clouds)
- `count_wet`: a count of every time a pixel was observed and wet
- `frequency`: what fraction of time (wet/clear) was the pixel wet

In [13]:
# Rename dimensions as required
wofl = wofl.rename({"x": "longitude", "y": "latitude"})

In [14]:
wofl["bad"] = (wofl.water & 0b0111_1110) > 0
wofl["some"] = apply_numexpr("((water<<30)>>30)==0", wofl, name="some")
wofl["dry"] = wofl.water == 0
wofl["wet"] = wofl.water == 128
wofl = wofl.drop_vars("water")
for dv in wofl.data_vars.values():
    dv.attrs.pop("nodata", None)

In [15]:
# plot all wet observations (takes a couple minutes)

# Ignore warnings triggered by time slices without data at all
warnings.filterwarnings("ignore", message="divide by zero encountered in true_divide") 
warnings.filterwarnings("ignore", message="invalid value encountered in true_divide") 

# wofl.wet.plot(col="time", col_wrap=5);

In [16]:
# Helper frunction from https://github.com/opendatacube/odc-stats/blob/develop/odc/stats/plugins/wofs.py

def reduce(xx: xr.Dataset) -> xr.Dataset:
    nodata = -999
    count_some = xx.some.sum(axis=0, dtype="int16")
    count_wet = xx.wet.sum(axis=0, dtype="int16")
    count_dry = xx.dry.sum(axis=0, dtype="int16")
    count_clear = count_wet + count_dry
    frequency = safe_div(count_wet, count_clear, dtype="float32")

    count_wet.attrs["nodata"] = nodata
    count_clear.attrs["nodata"] = nodata

    is_ok = count_some > 0
    count_wet = keep_good_only(count_wet, is_ok)
    count_clear = keep_good_only(count_clear, is_ok)

    return xr.Dataset(
        dict(
            count_wet=count_wet,
            count_clear=count_clear,
            frequency=frequency,
        )
    )

summary = reduce(wofl)

In [17]:
%%time
wofs_freq = summary.frequency.compute()

KilledWorker: Attempted to run task ('dc_load_qa_pixel-eq-getitem-stack-a87b794be3685e6327290981bd843d04', 6, 2, 3) on 20 different workers, but all those workers died while running it. The last worker that attempt to run the task was tls://10.0.42.80:34403. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
wofs_freq.plot(size=10)

In [None]:
# add crs and to_array for cog output
wofs_freq.attrs['crs'] = 'EPSG:32755'

wofs_freq_ds = wofs_freq.to_dataset(name='frequency').squeeze()
# Rename dimensions as required for write_dataset_to_netcdf
wofs_freq_ds = wofs_freq_ds.rename({"longitude": "x", "latitude": "y"})

# output as cog
write_cog(wofs_freq, '../data/wofs_test.tif', overwrite=True)
# export out as .nc to allow load back in easily as xarray
write_dataset_to_netcdf(wofs_freq_ds, '../data/wofs_test.nc')