In [1]:
import logging
import os

import click
import datacube
import fsspec
import geopandas as gpd
import pandas as pd
from deafrica_waterbodies.cli.logs import logging_setup
from deafrica_waterbodies.io import (
    check_dir_exists,
    check_file_exists,
    check_if_s3_uri,
    find_parquet_files,
)
from deafrica_waterbodies.make_polygons import (
    check_wetness_thresholds,
    get_polygons_from_tile,
    merge_polygons_at_tile_boundaries
)
from deafrica_waterbodies.tiling import (
    filter_tiles,
    get_tiles_ids,
    tile_wofs_ls_summary_alltime,
)

In [2]:
import os

# These are the default AWS configurations for the Analysis Sandbox.
# that are set in the environmnet variables.
aws_default_config = {
    # "AWS_NO_SIGN_REQUEST": "YES",
    "AWS_SECRET_ACCESS_KEY": "fake",
    "AWS_ACCESS_KEY_ID": "fake",
}

# To access public bucket, need to remove the AWS credentials in
# the environment variables or the following error will occur.
# PermissionError: The AWS Access Key Id you provided does not exist in our records.

for key in aws_default_config.keys():
    if key in os.environ:
        del os.environ[key]

In [3]:
verbose = 1

aoi_vector_file = "data/SenegalBasin.geojson"
tile_size_factor = 2
num_workers = 16

primary_threshold: float = 0.1
secondary_threshold: float = 0.05
minimum_valid_observations: int = 128
output_directory = "s3://deafrica-waterbodies-dev/test_out_dir/0-0-1/shapefile3"
overwrite = True

In [4]:
# Set up logger.
logging_setup(verbose=verbose)
_log = logging.getLogger(__name__)

In [5]:
# Support pathlib Paths.
aoi_vector_file = str(aoi_vector_file)
output_directory = str(output_directory)

In [6]:
# Parameters to use when loading datasets.
dask_chunks = {"x": 3200, "y": 3200, "time": 1}

In [7]:
# Load the area of interest as a GeoDataFrame.
if aoi_vector_file is not None:
    try:
        aoi_gdf = gpd.read_file(aoi_vector_file)
    except Exception as error:
        _log.exception(f"Could not read the file {aoi_vector_file}")
        raise error
else:
    aoi_gdf = None

In [8]:
# Tile the wofs_ls_summary_alltime product.
tiles, grid_workflow = tile_wofs_ls_summary_alltime(tile_size_factor)

[2023-10-06 21:01:14,843] {tiling.py:113} INFO - New tile size is (192000.0, 192000.0).
[2023-10-06 21:01:17,564] {tiling.py:132} INFO - Number of wofs_ls_summary_alltime tiles: 1188


In [9]:
# Filter the tiles to the area of interest.
filtered_tile_ids = filter_tiles(tiles, aoi_gdf, num_workers)
filtered_tiles = {k: v for k, v in tiles.items() if k in filtered_tile_ids}

1188it [00:03, 370.18it/s]


In [10]:
# Directory to write generated waterbody polygons to.
polygons_from_thresholds_dir = os.path.join(output_directory, "polygons_from_thresholds")

In [11]:
# Set the filesystem to use.
if check_if_s3_uri(polygons_from_thresholds_dir):
    fs = fsspec.filesystem("s3")
else:
    fs = fsspec.filesystem("file")

In [12]:
# Check if the directory exists. If it does not, create it.
if not check_dir_exists(polygons_from_thresholds_dir):
    fs.mkdirs(polygons_from_thresholds_dir, exist_ok=True)
    _log.info(f"Created directory {polygons_from_thresholds_dir}")

[2023-10-06 21:01:21,159] {credentials.py:620} INFO - Found credentials in shared credentials file: ~/.aws/credentials


In [13]:
# Check if the wetness thresholds have been set correctly.
minimum_wet_thresholds = [secondary_threshold, primary_threshold]
_log.info(check_wetness_thresholds(minimum_wet_thresholds))

[2023-10-06 21:01:21,380] {3616587049.py:3} INFO - We will be running a hybrid wetness threshold. 
**You have set 0.1 as the primary threshold, which will define the location of the waterbody polygons 
 with 0.05 set as the supplementary threshold, which will define the extent/shape of the waterbody polygons.**


In [14]:
# Generate the first set of primary and secondary threhsold polygons for each of the tiles.
for tile in filtered_tiles.items():
    tile_id = tile[0]
    primary_threshold_polygons_fp = os.path.join(
        polygons_from_thresholds_dir, f"{tile_id[0]}_{tile_id[1]}_primary_threshold_polygons.parquet"
    )
    secondary_threshold_polygons_fp = os.path.join(
        polygons_from_thresholds_dir, f"{tile_id[0]}_{tile_id[1]}_secondary_threshold_polygons.parquet"
    )

    if not overwrite:
        _log.info(f"Checking existence of {primary_threshold_polygons_fp} and {secondary_threshold_polygons_fp}")
        exists = check_file_exists(primary_threshold_polygons_fp) and check_file_exists(secondary_threshold_polygons_fp)

    if overwrite or not exists:
        (
            primary_threshold_polygons,
            secondary_threshold_polygons,
        ) = get_polygons_from_tile(
            tile=tile,
            grid_workflow=grid_workflow,
            dask_chunks=dask_chunks,
            min_valid_observations=minimum_valid_observations,
            primary_threshold=primary_threshold,
            secondary_threshold=secondary_threshold,
        )
        # Write the polygons to parquet files.
        primary_threshold_polygons.to_parquet(primary_threshold_polygons_fp)
        secondary_threshold_polygons.to_parquet(secondary_threshold_polygons_fp)

[2023-10-06 21:01:21,386] {make_polygons.py:502} INFO - Generating water body polygons for tile (85, 49)
[2023-10-06 21:01:26,699] {make_polygons.py:502} INFO - Generating water body polygons for tile (85, 50)
[2023-10-06 21:01:29,594] {make_polygons.py:502} INFO - Generating water body polygons for tile (85, 46)
[2023-10-06 21:01:34,605] {make_polygons.py:502} INFO - Generating water body polygons for tile (85, 48)
[2023-10-06 21:01:39,030] {make_polygons.py:502} INFO - Generating water body polygons for tile (86, 47)
[2023-10-06 21:01:44,092] {make_polygons.py:502} INFO - Generating water body polygons for tile (86, 50)
[2023-10-06 21:01:47,102] {make_polygons.py:502} INFO - Generating water body polygons for tile (86, 48)
[2023-10-06 21:01:50,953] {make_polygons.py:502} INFO - Generating water body polygons for tile (86, 49)
[2023-10-06 21:01:54,239] {make_polygons.py:502} INFO - Generating water body polygons for tile (86, 46)
[2023-10-06 21:02:02,050] {make_polygons.py:502} INFO -

In [15]:
# Get the extents for each tile.
crs = grid_workflow.grid_spec.crs
filtered_tiles_extents_geoms = [tile[1].geobox.extent.geom for tile in filtered_tiles.items()]
filtered_tiles_extents_gdf = gpd.GeoDataFrame(geometry=filtered_tiles_extents_geoms, crs=crs)

In [16]:
# Find all parquet files for the primary threshold.
primary_threshold_polygons_paths = find_parquet_files(path=polygons_from_thresholds_dir, pattern=".*primary.*")
_log.info(f"Found {len(primary_threshold_polygons_paths)} parquet files for the primary threshold polygons.")

[2023-10-06 21:04:28,249] {2643269420.py:3} INFO - Found 23 parquet files for the primary threshold polygons.


In [17]:
# Load all the primary threshold polygons into a single GeoDataFrame.
_log.info("Loading the primary threshold polygons parquet files..")
primary_threshold_polygons_list = []
for path in primary_threshold_polygons_paths:
    gdf = gpd.read_parquet(path)
    primary_threshold_polygons_list.append(gdf)

primary_threshold_polygons = pd.concat(primary_threshold_polygons_list, ignore_index=True)
_log.info(f"Found {len(primary_threshold_polygons)} primary threshold polygons.")

[2023-10-06 21:04:28,254] {370454188.py:2} INFO - Loading the primary threshold polygons parquet files..
[2023-10-06 21:04:30,616] {370454188.py:9} INFO - Found 58371 primary threshold polygons.


In [18]:
_log.info("Merging primary threshold waterbody polygons located at tile boundaries...")
primary_threshold_polygons_merged = merge_polygons_at_tile_boundaries(
    primary_threshold_polygons, filtered_tiles_extents_gdf
)
_log.info(f"Primary threshold polygons count {len(primary_threshold_polygons_merged)}.")

[2023-10-06 21:04:30,620] {1380469088.py:1} INFO - Merging primary threshold waterbody polygons located at tile boundaries...
[2023-10-06 21:04:35,788] {1380469088.py:5} INFO - Primary threshold polygons count 58291.


In [19]:
_log.info("Writing primary threshold polygons merged at tile boundaries to disk..")
primary_threshold_polygons_output_fp = os.path.join(
    output_directory, "primary_threshold_polygons_merged_at_tile_boundaries.parquet"
)

primary_threshold_polygons_merged.to_parquet(primary_threshold_polygons_output_fp)
_log.info(f"Polygons written to {primary_threshold_polygons_output_fp}")

[2023-10-06 21:04:35,792] {3770163632.py:1} INFO - Writing primary threshold polygons merged at tile boundaries to disk..
[2023-10-06 21:04:36,189] {3770163632.py:7} INFO - Polygons written to s3://deafrica-waterbodies-dev/test_out_dir/0-0-1/shapefile3/primary_threshold_polygons_merged_at_tile_boundaries.parquet


In [20]:
# Find all parquet files for the secondary threshold.
secondary_threshold_polygons_paths = find_parquet_files(path=polygons_from_thresholds_dir, pattern=".*secondary.*")
_log.info(f"Found {len(secondary_threshold_polygons_paths)} parquet files for the secondary threshold polygons.")

[2023-10-06 21:04:36,241] {1989755468.py:3} INFO - Found 23 parquet files for the secondary threshold polygons.


In [21]:
# Load all the secondary threshold polygons into a single GeoDataFrame.
_log.info("Loading the secondary threshold polygons parquet files...")
secondary_threshold_polygons_list = []
for path in secondary_threshold_polygons_paths:
    gdf = gpd.read_parquet(path)
    secondary_threshold_polygons_list.append(gdf)

secondary_threshold_polygons = pd.concat(secondary_threshold_polygons_list, ignore_index=True)
_log.info(f"Found {len(secondary_threshold_polygons)} secondary threshold polygons.")

[2023-10-06 21:04:36,247] {4278796675.py:2} INFO - Loading the secondary threshold polygons parquet files...
[2023-10-06 21:04:38,997] {4278796675.py:9} INFO - Found 113853 secondary threshold polygons.


In [22]:
_log.info("Merging secondary threshold waterbody polygons located at dataset/scene boundaries...")
secondary_threshold_polygons_merged = merge_polygons_at_tile_boundaries(
    secondary_threshold_polygons, filtered_tiles_extents_gdf
)
_log.info(f"Secondary threshold polygons count {len(secondary_threshold_polygons_merged)}.")

[2023-10-06 21:04:39,002] {2540286757.py:1} INFO - Merging secondary threshold waterbody polygons located at dataset/scene boundaries...
[2023-10-06 21:04:52,572] {2540286757.py:5} INFO - Secondary threshold polygons count 113723.


In [23]:
_log.info("Writing secondary threshold polygons merged at tile boundaries to disk..")
secondary_threshold_polygons_output_fp = os.path.join(
    output_directory, "secondary_threshold_polygons_merged_at_ds_boundaries.parquet"
)

secondary_threshold_polygons_merged.to_parquet(secondary_threshold_polygons_output_fp)

_log.info(f"Polygons written to {secondary_threshold_polygons_output_fp}")

[2023-10-06 21:04:52,577] {1287271369.py:1} INFO - Writing secondary threshold polygons merged at tile boundaries to disk..
[2023-10-06 21:04:53,180] {1287271369.py:8} INFO - Polygons written to s3://deafrica-waterbodies-dev/test_out_dir/0-0-1/shapefile3/secondary_threshold_polygons_merged_at_ds_boundaries.parquet
