In [1]:
from dotenv import load_dotenv

# Path to env file containing the waterbodies database credentials
# Only necessary on the Sandbox.
dotenv_path = "/home/jovyan/.env"
load_dotenv(dotenv_path=dotenv_path, verbose=True, override=True)

True

# Turn water observations into waterbody polygons

* **Products used:** 
[wofs_ls_summary_alltime](https://explorer.digitalearth.africa/products/wofs_ls_summary_alltime)
* **Prerequisites:** 
    * The Hydrosheds version 1.1 Land Mask split into [WaterBodiesGrid tiles](../HydroSHEDSv1.1LandMask/SplitHydroSHEDSv1.1LandMaskIntoTiles.ipynb) and saved into a single directory.
        * Variable name: `land_sea_mask_rasters_directory`

## Background

Water is among one the most precious natural resources and is essential for the survival of life on Earth. For many countries in Africa, the scarcity of water is both an economic and social issue. Water is required not only for consumption but for industries and environmental ecosystems to function and flourish. 

With the demand for water increasing, there is a need to better understand our water availability to ensure we are managing our water resources effectively and efficiently.  

Digital Earth Africa (DE Africa)'s [Water Observations from Space (WOfS) dataset](https://docs.digitalearthafrica.org/en/latest/data_specs/Landsat_WOfS_specs.html), provides a water classified image of Africa approximately every 16 days. These individual water observations have been combined into a [WOfS All-Time Summary](https://explorer.digitalearth.africa/products/wofs_ls_summary_alltime) product, which calculates the frequency of wet observations (compared against all clear observations of that pixel), over the full 30-plus years satellite archive. 

The WOfS All-Time Summary product provides valuable insights into the persistence of water across the African landscape on a pixel by pixel basis. While knowing the wet history of a single pixel within a waterbody is useful, it is more useful to be able to map the whole waterbody as a single object. 

This notebook demonstrates a workflow for mapping waterbodies across Africa as polygon objects. This workflow has been used to produce **DE Africa Waterbodies**. 

## Description
This code follows the following workflow:

* Load the required python packages
* Set your chosen analysis parameters:
    * set up the analysis region
    * set up some file names for the inputs and outputs
    * wetness threshold/s
    * minimum number of valid observations
    * min/max waterbody size
    * read in a land/sea mask
* Generate waterbody polygons for each tile:
  * For each tile:
    * Load the WOfS All Time Summary Dataset
    * Mask the dataset using the land/sea mask 
    * Keep only pixels observed at least x times
    * Keep only pixels identified as wet at least x% of the time
        * Here the code can take in two wetness thresholds, to produce two initial temporary polygon files.
    * Convert the raster data into polygons
    * Write the polygons to disk
* Remove artificial polygon borders created at tile boundaries by merging polygons that intersect across tile boundaries
* Filter the combined polygon dataset (note that this step happens after the merging of tile boundary polygons to ensure that artifacts are not created by part of a polygon being filtered out, while the remainder of the polygon that sits on a separate tile is treated differently).
    * Add the area and perimeter and length of each polygon as attributes
    * Filter the polygons based on area / size and length
    * Add unique IDs to the polygons
* Save out the final polygon set to a shapefile or write to the database

## Load python packages

In [2]:
import json
import logging
import os
import subprocess
from itertools import chain

import click
import geohash as gh
import geopandas as gpd
import numpy as np
import pandas as pd
from datacube import Datacube
from geopandas import gpd
from odc.geo.geom import Geometry
from shapely.ops import unary_union
from waterbodies.db import get_waterbodies_engine
from waterbodies.grid import WaterbodiesGrid
from waterbodies.historical_extent import (
    add_waterbodies_polygons_to_db,
    get_polygon_length,
    get_waterbodies,
)
from waterbodies.hopper import create_tasks_from_datasets
from waterbodies.io import (
    check_directory_exists,
    check_file_exists,
    find_parquet_files,
    get_filesystem,
)
from waterbodies.logs import logging_setup
from waterbodies.text import format_task, get_tile_index_str_from_tuple

## Define Analysis Parameters

The following section walks you through the analysis parameters you will need to set for this workflow. Each section describes the parameter, how it is used, and what value was used for the DE Africa Waterbodies product.

### Set the analysis region
If you would like to perform the analysis for all of Africa, using the published WOfS All-time Summary, set `all_of_africa = True`. If you set the flag `all_of_africa` to `False`, you will need to provide a path to the shapefile / GeoJSON defining the area of interest.

In [3]:
all_of_africa = False
if all_of_africa:
    tile_index_filter = None
else:
    aoi_file = "https://deafrica-waterbodies-dev.s3.af-south-1.amazonaws.com/waterbodies/v0.0.2/senegal_basin/senegal_basin_boundary.geojson"
    gridspec = WaterbodiesGrid().gridspec
    aoi_gdf = gpd.read_file(aoi_file).to_crs(gridspec.crs)
    aoi_tiles = list(gridspec.tiles_from_geopolygon(geopolygon=Geometry(geom=aoi_gdf.geometry.iloc[0], crs=aoi_gdf.crs)))
    tile_index_filter = [tile[0] for tile in aoi_tiles]

In [7]:
import json

# Parse the GeoJSON data
geojson_data = {
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "properties": {},
      "geometry": {
        "coordinates": [
          [
            [
              -1.5381331338635675,
              6.602134444266824
            ],
            [
              -1.5381331338635675,
              6.373530335543478
            ],
            [
              -1.267246532216319,
              6.373530335543478
            ],
            [
              -1.267246532216319,
              6.602134444266824
            ],
            [
              -1.5381331338635675,
              6.602134444266824
            ]
          ]
        ],
        "type": "Polygon"
      }
    }
  ]
}
features = json.dumps(geojson_data)['features']

TypeError: string indices must be integers

### Set up the directory to save the waterbodies for each tile

When overwrite is set to `False` if a waterbodies polygons file exists in the `output_directory` for a specific tile, the tile is skipped.

In [4]:
output_directory = "s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/intermediate_outputs"
overwrite = True

<a id='wetnessthreshold'></a>
### How frequently wet does a pixel need to be to be included?
The values set here will be the minimum frequency (as a decimal between 0 and 1) that you want water to be detected across all analysis years before it is included. 

E.g. If this was set to 0.10, any pixels that are wet *at least* 10% of the time across all valid observations will be included.

Polygons identified by the extent threshold that intersect with the polygons generated by the detection threshold will be extracted, and included in the final polygon dataset. This means that the **location** of polygons is set by the detection threshold, but the **shape/extent** of these polygons is set by the extent threshold.

In [None]:
detection_threshold = 0.1
extent_threshold = 0.05

<a id='size'></a>

### How big/small should the polygons be?
This filtering step can remove very small waterbodies and very large waterbodies will be segmented. The size listed here is in pixels.
> Note: A single pixel in Landsat data is 30 m x 30 m = 900 m<sup>2</sup>. 

**MinSize**

E.g. A minimum size of 6 pixels means that polygons 5 pixels or less will be excluded. If you don't want to use this filter, set this value to 0.

**MaxSize**

E.g. A maximum size of 1000 pixels means that if a polygon is larger than 1000 pixels the polygon will segmented using watershed segmentation. If you don't want to use this filter, set this number to `math.inf`. 

In [None]:
min_polygon_size = 6
max_polygon_size = 1000

### Filter results based on number of valid observations

The total number of valid WOfS observations for each pixel varies depending on the frequency of clouds and cloud shadow, the proximity to high slope and terrain shadow, and the seasonal change in solar angle. 

The `count_clear` parameter within the [`wofs_ls_summary_alltime`](https://explorer.digitalearth.africa/products/wofs_ls_summary_alltime) data provides a count of the number of valid observations each pixel recorded over the analysis period. We can use this parameter to mask out pixels that were infrequently observed. 
If this mask is not applied, pixels that were observed only once could be included if that observation was wet (i.e. a single wet observation means the calculation of the frequency statistic would be (1 wet observation) / (1 total observation) = 100% frequency of wet observations).

Note that this parameter does not specify the timing of these observations, but rather just the **total number of valid observations** (observed at any time of the year, in any year).

In [None]:
min_valid_observations = 60

<a id='coastline'></a>
### Read in a land/sea mask

You can choose which land/sea mask you would like to use to mask out ocean polygons, depending on how much coastal water you would like in the final product. We use the [HydroSHEDS version 1.1 Land Mask](https://www.hydrosheds.org/hydrosheds-core-downloads). Any WOfS All Time Summary product pixels with a value of 0 in the land/sea mask are filtered out. 

To use a different product, supply a directory path to `land_sea_mask_rasters_directory` which contains raster tiles of the land/sea mask covering all the WOfS All Time Summary product [regions](https://explorer.digitalearth.africa/api/regions/wofs_ls_summary_alltime). See the [Split HydroSHEDS v1.1 Land Mask](../HydroSHEDSv1.1LandMask/SplitHydroSHEDSv1.1LandMaskIntoTiles.ipynb) notebook on how to split your raster product into tiles and the naming convention to use when saving the raster tiles. Ensure that for your product, pixels with the value 0 are ocean pixels and pixels with the value 1 are land pixels.

In [None]:
land_sea_mask_rasters_directory = "s3://deafrica-waterbodies-dev/waterbodies/v0.0.2/hydrosheds_v1_1_land_mask/"

## Generate the waterbody polygons for each tile

In [None]:
# Set up logging.
logging_setup(3)
_log = logging.getLogger(__name__)

In [None]:
# Connect to the datacube
dc = Datacube(app="GeneratePolygons")

### Group all the WOfS All Time Summary product datasets into tiles

In an Argo workflow, this is done by the `waterbodies historical-extent generate-tasks` cli.

In [None]:
# Find all the WOfS All Time Summaries datasets
dc_query = dict(product="wofs_ls_summary_alltime")
datasets = dc.find_datasets(**dc_query)
_log.info(f"Found {len(datasets)} datasets matching the query {dc_query}")

In [None]:
# Each task contains a tile index and the dataset UUID of the WOfS All Time Summary dataset
# that covers the tile.
tasks = create_tasks_from_datasets(datasets=datasets, tile_index_filter=tile_index_filter, bin_solar_day=False)
tasks = [format_task(task) for task in tasks]
sorted_tasks = sorted(tasks, key=lambda x: x["tile_index_x"])
_log.info(f"Total number of tasks: {len(sorted_tasks)}")

In [None]:
# Write the tasks to a text file.
max_parallel_steps = 1
task_chunks = np.array_split(np.array(sorted_tasks), max_parallel_steps)
task_chunks = [chunk.tolist() for chunk in task_chunks]
task_chunks = list(filter(None, task_chunks))
task_chunks_count = str(len(task_chunks))
_log.info(f"{len(sorted_tasks)} tasks chunked into {task_chunks_count} chunks")
task_chunks_json_array = json.dumps(task_chunks)

tasks_directory = "/tmp/"
tasks_output_file = os.path.join(tasks_directory, "tasks_chunks")
tasks_count_file = os.path.join(tasks_directory, "tasks_chunks_count")

fs = get_filesystem(path=tasks_directory, anon=False)

if not check_directory_exists(path=tasks_directory):
    fs.mkdirs(path=tasks_directory, exist_ok=True)
    _log.info(f"Created directory {tasks_directory}")

with fs.open(tasks_output_file, "w") as file:
    file.write(task_chunks_json_array)
_log.info(f"Tasks chunks written to {tasks_output_file}")

with fs.open(tasks_count_file, "w") as file:
    file.write(task_chunks_count)
_log.info(f"Tasks chunks count written to {tasks_count_file}")

### Loop over each tile and generate the waterbodies for the tile

In an Argo workflow, this is done by `waterbodies historical-extent process-tasks` cli.

In [None]:
tasks_list_file = "/tmp/tasks_chunks"

In [None]:
if not check_directory_exists(path=land_sea_mask_rasters_directory):
    e = FileNotFoundError(f"Directory {land_sea_mask_rasters_directory} does not exist!")
    _log.error(e)
    raise e

In [None]:
# Get the list of tiles we generated in the previous step.
fs = get_filesystem(path=tasks_list_file, anon=True)
with fs.open(tasks_list_file) as file:
    content = file.read()
    decoded_content = content.decode()
    tasks = json.loads(decoded_content)

# In case file contains list of lists
if all(isinstance(item, list) for item in tasks):
    tasks = list(chain(*tasks))
else:
    pass
_log.info(f"Found {len(tasks)} tasks")

In [None]:
if not check_directory_exists(path=output_directory):
    fs = get_filesystem(output_directory, anon=False)
    fs.mkdirs(output_directory)
    _log.info(f"Created the directory {output_directory}")

The next cell takes about 12 hours to run for the entire continent. Therefore for processing for all of Africa please run the `waterbodies historical-extent process-tasks` cli in Argo using parallel pods.

In [None]:
failed_tasks = []
for idx, task in enumerate(tasks):
    _log.info(f"Processing task: {task}   {idx+1}/{len(tasks)}")
    tile_index_x = task["tile_index_x"]
    tile_index_y = task["tile_index_y"]
    task_datasets_ids = task["task_datasets_ids"]

    task_id_tuple = (tile_index_x, tile_index_y)
    task_id_str = get_tile_index_str_from_tuple(task_id_tuple)
    output_file_name = os.path.join(output_directory, f"waterbodies_{task_id_str}.parquet")
    try:
        if not overwrite:
            exists = check_file_exists(output_file_name)

        if overwrite or not exists:
            waterbody_polygons = get_waterbodies(
                tile_index_x=tile_index_x,
                tile_index_y=tile_index_y,
                task_datasets_ids=task_datasets_ids,
                dc=dc,
                land_sea_mask_rasters_directory=land_sea_mask_rasters_directory,
                detection_threshold=detection_threshold,
                extent_threshold=extent_threshold,
                min_valid_observations=min_valid_observations,
                min_polygon_size=min_polygon_size,
                max_polygon_size=max_polygon_size
            )
            if waterbody_polygons.empty:
                _log.info(f"Task {task_id_str} has no waterbody polygons")
            else:
                _log.info(
                    f"Task {task_id_str} has {len(waterbody_polygons)} waterbody polygons"
                )
                waterbody_polygons.to_parquet(output_file_name)
                _log.info(f"Waterbodies written to {output_file_name}")
        else:
            _log.info(f"Task {task_id_str} already exists, skipping")
    except Exception as error:
        _log.exception(error)
        _log.error(f"Failed to process task {task}")
        failed_tasks.append(task)

In [None]:
if failed_tasks:
    _log.info(f"The following tasks failed: {failed_tasks}")
    failed_tasks_json_array = json.dumps(failed_tasks)

    tasks_directory = "/tmp/"
    failed_tasks_output_file = os.path.join(tasks_directory, "failed_tasks")

    fs = get_filesystem(path=tasks_directory, anon=False)

    if not check_directory_exists(path=tasks_directory):
        fs.mkdirs(path=tasks_directory, exist_ok=True)
        _log.info(f"Created directory {tasks_directory}")

    with fs.open(failed_tasks_output_file, "a") as file:
        file.write(failed_tasks_json_array + "\n")
    _log.info(f"Failed tasks written to {failed_tasks_output_file}")

## Merge polygons that have an edge at a tile boundary

Now that we have all of the polygons across our whole region of interest, we need to check for artifacts in the data caused by tile boundaries.

We have created a GeoDataFrame `buffered_30m_tiles`, that consists of the tile boundaries, plus a 1 pixel (30 m) buffer. This GeoDataFrame will help us to find any polygons that have a boundary at the edge of a tile. We can then find where polygons touch across this boundary, and join them up.

In an Argo workflow, this is done by `waterbodies historical-extent process-polygons` cli.

In [None]:
polygons_directory = output_directory
polygons_directory

In [None]:
files = find_parquet_files(directory_path=polygons_directory, file_name_pattern=r".*")
_log.info(f"Found {len(files)} files containing waterbodies.")

In [None]:
%%time
# Load all the waterbodies files into a single GeoDataFrame.
gridspec = WaterbodiesGrid().gridspec

waterbodies_list = []
for file in files:
    gdf = gpd.read_file(file).to_crs(gridspec.crs)
    waterbodies_list.append(gdf)

waterbodies = pd.concat(waterbodies_list, ignore_index=True)
_log.info(f"Loaded {len(waterbodies)} waterbodies.")

In [None]:
# Get all the tiles used to generate the waterbodies.
datasets = dc.find_datasets(product="wofs_ls_summary_alltime")
tasks = create_tasks_from_datasets(
    datasets=datasets, tile_index_filter=None, bin_solar_day=False
)
tile_indices = [k for task in tasks for k, v in task.items()]
buffered_tile_boundaries = [
    gridspec.tile_geobox(tile_index=tile_index).extent.geom.boundary.buffer(
        30, cap_style="flat", join_style="mitre"
    )
    for tile_index in tile_indices
]
buffered_tile_boundaries_gdf = gpd.GeoDataFrame(
    data={"tile_index": tile_indices, "geometry": buffered_tile_boundaries}, crs=gridspec.crs
)
buffered_tile_boundaries_gdf.set_index("tile_index", inplace=True)
_log.info(f"Found {len(buffered_tile_boundaries_gdf)} tiles")

In [None]:
%%time
_log.info("Merging waterbodies at tile boundaries...")
joined = gpd.sjoin(
    waterbodies, buffered_tile_boundaries_gdf, how="inner", predicate="intersects"
)
if joined.empty:
    pass
else:
    tile_boundary_waterbodies = waterbodies[waterbodies.index.isin(joined.index)]
    not_tile_boundary_waterbodies = waterbodies[~waterbodies.index.isin(joined.index)]
    tile_boundary_waterbodies_merged = (
        gpd.GeoDataFrame(
            crs=gridspec.crs, geometry=[unary_union(tile_boundary_waterbodies.geometry)]
        )
        .explode(index_parts=True)
        .reset_index(drop=True)
    )
    waterbodies = pd.concat(
        [not_tile_boundary_waterbodies, tile_boundary_waterbodies_merged],
        ignore_index=True,
        sort=True,
    )
_log.info(f"Waterbodies count after merging waterbodies at tile boundaries: {len(waterbodies)}")

## Add waterbodies attributes

Add the perimeter, area and length attributes to each waterbody.

Remove waterbodies that meet the following criteria:
- Waterbodies whose area is less than 4500 m<sup>2</sup>
- Waterbodies whose length is greater than 150km


In an Argo workflow, this is included in the `waterbodies historical-extent process-polygons` cli.

In [None]:
waterbodies["area_m2"] = waterbodies.geometry.area
waterbodies = waterbodies[waterbodies.area_m2 > 4500]
_log.info(
    f"Waterbodies count after filtering out waterbodies smaller than 4500m2: {len(waterbodies)}"
)

In [None]:
waterbodies["length_m"] = waterbodies.geometry.apply(get_polygon_length)
waterbodies = waterbodies[waterbodies.length_m <= (150 * 1000)]
_log.info(
    f"Waterbodies count after filtering out waterbodies longer than than 150km: {len(waterbodies)}"
)

In [None]:
waterbodies["perim_m"] = waterbodies.geometry.length
waterbodies = waterbodies.to_crs("EPSG:4326")

## Generate a unique ID for each polygon

A unique identifier is required for every polygon to allow it to be referenced. The naming convention for generating unique IDs here is the [geohash](geohash.org).

A Geohash is a geocoding system used to generate short unique identifiers based on latitude/longitude coordinates. It is a short combination of letters and numbers, with the length of the string a function of the precision of the location. The methods for generating a geohash are outlined [here - yes, the official documentation is a wikipedia article](https://en.wikipedia.org/wiki/Geohash).

Here we use the python package `python-geohash` to generate a geohash unique identifier for each polygon. We use `precision = 10` geohash characters, which represents an on the ground accuracy of <20 metres. This ensures that the precision is high enough to differentiate between waterbodies located next to each other.

In an Argo workflow, this is included in the `waterbodies historical-extent process-polygons` cli.

In [None]:
%%time
waterbodies["uid"] = waterbodies.geometry.apply(
    lambda x: gh.encode(x.centroid.y, x.centroid.x, precision=10)
)
assert waterbodies["uid"].is_unique
waterbodies.sort_values(by=["uid"], inplace=True)
waterbodies.reset_index(inplace=True, drop=True)
waterbodies["wb_id"] = waterbodies.index + 1
assert waterbodies["wb_id"].min() > 0

In [None]:
_log.info(f"Final waterbodies count: {len(waterbodies)}")

## Write the waterbodies to the database

In an Argo workflow, this is included in the `waterbodies historical-extent process-polygons` cli.

In [None]:
%%time
engine = get_waterbodies_engine()
add_waterbodies_polygons_to_db(
    waterbodies_polygons=waterbodies, engine=engine, update_rows=True
)