<a href="https://colab.research.google.com/github/joekelly211/masfi/blob/main/3_features_topo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports, directories and global functions

In [None]:
# Define base directory
# Use '/content/drive/MyDrive/' for a personal drive
# Use '/gdrive/Shareddrives/' for a shared drive (must be created first)

base_dir = "/gdrive/Shareddrives/masfi"
# base_dir = '/content/drive/MyDrive/masfi'

# Mount Google Drive
from google.colab import drive
import os
import sys
if base_dir.startswith('/gdrive/Shareddrives/'):
  drive.mount('/gdrive', force_remount=True)
elif base_dir.startswith('/content/drive/MyDrive/'):
  drive.mount('/content/drive', force_remount=True)
  os.makedirs(base_dir, exist_ok=True)
else: print("Create a base_dir beginning with '/gdrive/Shareddrives/' or '/content/drive/MyDrive/'.")

_path_to_add = os.path.realpath(base_dir)
if _path_to_add not in sys.path:
    sys.path.append(_path_to_add)

Mounted at /gdrive


In [None]:
# Installs
%%capture
!pip install astropy
!pip install geopandas
!pip install whitebox

In [None]:
!# Reload imports, replacing those in the cache
%load_ext autoreload
%autoreload 2
# Imports
from astropy.convolution import convolve, Gaussian2DKernel
import csv
import geopandas as gpd
import glob
from google.colab import runtime
import ipywidgets as widgets
from math import sqrt, cos, radians
import matplotlib.pyplot as plt
from numba import jit
import numpy as np
from os import makedirs
from os.path import exists, join
from osgeo import gdal, ogr
gdal.UseExceptions()
from pathlib import Path
import requests
from scipy.ndimage import maximum_filter, minimum_filter, uniform_filter, distance_transform_edt
from scipy.ndimage import label, sum as ndi_sum
import whitebox
wbt = whitebox.WhiteboxTools()
import zipfile

Downloading WhiteboxTools pre-compiled binary for first time use ...
Downloading WhiteboxTools binary from https://www.whiteboxgeo.com/WBT_Linux/WhiteboxTools_linux_musl.zip
Decompressing WhiteboxTools_linux_musl.zip ...
WhiteboxTools package directory: /usr/local/lib/python3.11/dist-packages/whitebox
Downloading testdata ...


In [None]:
# 1_areas directories
areas_dir = join(base_dir, "1_areas")
polygons_dir = join(areas_dir, "polygons")
template_dir = join(areas_dir, "template.tif")

# 3_features directories
features_dir = join(base_dir, "3_features")

# 6_scenarios directory
scenario_dir = join(base_dir, "6_scenarios")

In [None]:
# Global function: export an array as a .tif
template_tif_path = join(areas_dir, "template.tif")
nodatavalue = -1111111
compress = True
def export_array_as_tif(input_array, output_tif, template=template_tif_path, nodatavalue=nodatavalue, compress=compress, dtype=gdal.GDT_Float32):
    template_ds = gdal.Open(template)
    template_band = template_ds.GetRasterBand(1)
    template_dimensions, template_projection = template_ds.GetGeoTransform(), template_ds.GetProjection()
    if compress: options = ["COMPRESS=DEFLATE", "PREDICTOR=3", "ZLEVEL=9"]
    else: options = []
    driver = gdal.GetDriverByName("GTiff").Create(output_tif, template_band.XSize, template_band.YSize, 1, dtype, options=options)
    driver.GetRasterBand(1).WriteArray(input_array)
    driver.GetRasterBand(1).SetNoDataValue(nodatavalue)
    driver.SetGeoTransform(template_dimensions)
    driver.SetProjection(template_projection)
    template_ds = driver = None

# Global function: burn a polygon to raster
def burn_polygon_to_raster(raster_path, polygon_path, fixed=True, fixed_value=1, column_name=None, all_touched=True):
    raster = vector = None
    try:
        raster = gdal.Open(raster_path, gdal.GA_Update)
        vector = ogr.Open(polygon_path)
        if not raster or not vector:
            raise ValueError("Cannot open input files")
        layer = vector.GetLayer()
        options = ["ALL_TOUCHED=TRUE"] if all_touched else []
        if fixed:
            gdal.RasterizeLayer(raster, [1], layer, burn_values=[fixed_value], options=options)
        else:
            attr_name = column_name or layer.GetLayerDefn().GetFieldDefn(0).GetName()
            options.append(f"ATTRIBUTE={attr_name}")
            gdal.RasterizeLayer(raster, [1], layer, options=options)
    finally:
        if raster: raster.FlushCache()
        raster = vector = None

# Define base DEM

In [None]:
# After predicting GEDI elevation to create a GEDI DTM (Digital Terrain Model),
# Change to True to generate DTM features to use instead of DSM (Digital Surface Model) features.
# If no GEDI DTM has been created this will default to the DSM.
enable_gedi_dtm = True

# Select which prediction path to use as the GEDI DTM
# Use an 'unmasked' version so area calculations (e.g. hydrography) are calculated correctly.
if enable_gedi_dtm:
  gedi_dtm_exists = False
  for subdir, dirs, files in os.walk(scenario_dir):
    for raster in files:
      if raster.endswith('.tif'):
        if 'gedi_elevation' in raster:
          print('# Remember to modify montane transition values to be specific to study area.')
          print(f'gedi_elevation_path = "{subdir}/{raster}"')
          gedi_dtm_exists = True
  if not gedi_dtm_exists: print("No GEDI DTM found in scenarios folder. Defaulting to DSM.")
else: print(f"Using the DSM in {areas_dir}.")

# Remember to modify montane transition values to be specific to study area.
gedi_elevation_path = "/gdrive/Shareddrives/masfi/6_scenarios/gedi_elevation_tekai_250622_103110/scenario_predictions_unmasked/2015__gedi_elevation_tekai_250622_103110_unmasked.tif"
# Remember to modify montane transition values to be specific to study area.
gedi_elevation_path = "/gdrive/Shareddrives/masfi/6_scenarios/gedi_elevation_tekai_250622_103110/scenario_predictions/2015__gedi_elevation_tekai_250622_103110.tif"


In [None]:
# This code block is only relevant (and otherwise does nothing) if using a GEDI DTM.
# Remember to modify montane transition values to be specific to study area (see below)
gedi_elevation_path = "/gdrive/Shareddrives/masfi/6_scenarios/gedi_elevation_tekai_250622_103110/scenario_predictions_unmasked/2015__gedi_elevation_tekai_250622_103110_unmasked.tif"

base_dsm_path = join(areas_dir, "base_dem_dsm.tif")
base_dtm_path = join(areas_dir, "base_dem_dtm.tif")
# Copy GEDI DTM to areas directory
if enable_gedi_dtm and gedi_dtm_exists:
  if not exists(base_dtm_path):
    # Sea level is post-processed back to 0 m, and areas that might have been predicted
    # Below this. At ~sea level, the original DEM was likely the true terrain height.

    # Define a low transition zone between original and DTM.
    # This delineates a transition from 0 to 100 % DTM values.
    low_transition_lower_limit = 0
    low_transition_upper_limit = 5

    # Higher elevations are poorly predicted in some study areas due to low sample size.
    # Original values should be used to avoid erroneous topographic metrics.
    # Vegetation at these high elevations tend not to change much between disturbance scenarios.

    # Define a montane transition zone between original and DTM.
    # This delineates a transition from 0 to 100 % original DEM values.
    # E.g. >1,500 is typically scrub in Peninsular Malaysia.
    montane_transition_lower_limit = 1500
    montane_transition_upper_limit = 1800

    # Read original base DEM
    base_dsm_array = gdal.Open(base_dsm_path).ReadAsArray()

    # Scale <=0 to >=5 m values to a ratio (0 - 1) of DTM values
    base_dsm_array_low_ratio = base_dsm_array.copy()
    base_dsm_array_low_ratio[base_dsm_array_low_ratio <= low_transition_lower_limit] = low_transition_lower_limit
    base_dsm_array_low_ratio[base_dsm_array_low_ratio >= low_transition_upper_limit] = low_transition_upper_limit
    base_dsm_array_low_ratio = base_dsm_array_low_ratio / low_transition_upper_limit

    # Scale <=0 to >=5 m values to a ratio (0 - 1) of DTM values
    base_dsm_array_montane_ratio = base_dsm_array.copy()
    base_dsm_array_montane_ratio[base_dsm_array_montane_ratio <= montane_transition_lower_limit] = montane_transition_lower_limit
    base_dsm_array_montane_ratio[base_dsm_array_montane_ratio >= montane_transition_upper_limit] = montane_transition_upper_limit
    base_dsm_array_montane_ratio = (montane_transition_upper_limit - base_dsm_array_montane_ratio) / (montane_transition_upper_limit - montane_transition_lower_limit)

    # Use original DEM values for surface water.
    # The Copernicus DEM rounds all surface water values to 1 or 0 decimal places.
    # This is used to differentiate them from land values, creating a 'land binary'.
    base_dsm_array_land_binary = base_dsm_array.copy()
    base_dsm_array_land_binary = np.floor(base_dsm_array_land_binary * 10) / 10 # Round DOWN 1 decimal place
    base_dsm_array_land_binary = base_dsm_array - base_dsm_array_land_binary
    base_dsm_array_land_binary[base_dsm_array_land_binary > 0] = 1
    # Invert the binary array to target 0 values for sieving single water pixels (usually erroneous)
    base_dsm_array_land_binary_inverted = np.logical_not(base_dsm_array_land_binary)
    # Sieve to 0.5 ha, using 8-connectedness (3, 3)
    lb_array_labelled, lb_array_features = label(base_dsm_array_land_binary_inverted, structure=np.ones((3, 3)))
    # Determine the size of each patch
    lb_array_sizes = ndi_sum(base_dsm_array_land_binary_inverted, lb_array_labelled, range(lb_array_features + 1))
    # Create a mask to remove patches smaller than the threshold
    lb_array_mask_sizes = lb_array_sizes >= 2
    lb_array_mask_sizes[0] = 0 # Ensure non-target values are excluded
    lb_array_mask = lb_array_mask_sizes[lb_array_labelled]
    # Apply the mask to the inverted binary array
    lb_array_sieved_inverted = base_dsm_array_land_binary_inverted * lb_array_mask
    # Invert the array back to original representation
    base_dsm_array_land_binary = np.logical_not(lb_array_sieved_inverted)

    # Read the GEDI DTM and create the final modifier
    gedi_elevation_array = gdal.Open(gedi_elevation_path).ReadAsArray()
    base_dtm_array_modifier = gedi_elevation_array.copy()
    # Change all DTM values < sea level to 0 (most are erroneous)
    base_dtm_array_modifier[base_dtm_array_modifier < 0] = 0
    # Sutract DTM from the DSM as the modifier
    base_dtm_array_modifier = base_dsm_array - base_dtm_array_modifier
    # Multiply the DTM modifier by low ratio, montane ratio and land binary
    base_dtm_array_modifier = base_dtm_array_modifier * base_dsm_array_low_ratio * base_dsm_array_montane_ratio * base_dsm_array_land_binary

    # Apply the modifier
    base_dtm_array = base_dsm_array - base_dtm_array_modifier

    # Export uncompressed for further topographic metrics
    export_array_as_tif(base_dtm_array, base_dtm_path, compress=False)
    print(f"GEDI DTM has been postprocessed and uncompressed to: {base_dtm_path}")

  else: print(f"A base DTM already exists, first remove from {areas_dir} for replacement.")

else: print(f"A GEDI DTM does not exist in the scenarios directory. Proceeding with DSM.")

A base DTM already exists, first remove from /gdrive/Shareddrives/masfi/1_areas for replacement.


In [None]:
# Define base DEM and properties
if enable_gedi_dtm:
  print("Post-processed GEDI DTM enabled.")
  if not exists(base_dtm_path):
    print("A post-processed GEDI DTM does not exist. Defaulting to the original DSM.")
    base_dem = gdal.Open(base_dsm_path)
    topo_temp_dir = join(features_dir, 'topo_dsm_temp')
    topo_final_dir = join(features_dir, 'topo_dsm_final')
    makedirs(topo_temp_dir, exist_ok=True)
    makedirs(topo_final_dir, exist_ok=True)
  else:
    base_dem = gdal.Open(base_dtm_path)
    topo_temp_dir = join(features_dir, 'topo_dtm_temp')
    topo_final_dir = join(features_dir, 'topo_dtm_final')
    makedirs(topo_temp_dir, exist_ok=True)
    makedirs(topo_final_dir, exist_ok=True)
else:
  print("Post-processed GEDI DTM disabled. Using the original DSM.")
  base_dem = gdal.Open(base_dsm_path)
  topo_temp_dir = join(features_dir, 'topo_dsm_temp')
  topo_final_dir = join(features_dir, 'topo_dsm_final')
  makedirs(topo_temp_dir, exist_ok=True)
  makedirs(topo_final_dir, exist_ok=True)

# Get base DEM attributes
base_dem_array = base_dem.ReadAsArray()
dem_dimensions = base_dem.GetGeoTransform()
y_origin, pixel_height, raster_height = dem_dimensions[3], dem_dimensions[5], len(base_dem_array)
dem_central_latitude = y_origin + (raster_height // 2) * pixel_height

Post-processed GEDI DTM enabled.


# Topography metrics

In [None]:
%%capture
# Calculates a total of 24 topographic metrics using either Whitebox or custom functions.
# https://www.whiteboxgeo.com/manual/wbt_book/preface.html
# These are later finalised with an automatic reduction in precision (rounding)
# for faster moodelling, and the creation of 'unsmooth' and 'smooth' versions.
# Smoothed versions allow the model to account for geolocation inaccuracies,
# and adjacent topography types not captured in the various metrics.

# Clear the temporary directory and recalculate topographic metrics if issues.
clear_temp_directory = False
if clear_temp_directory:
  for raster in Path(topo_temp_dir).glob("**/*"):
    if raster.is_file(): raster.unlink()

# Elevation
elevation_path_temp = join(topo_temp_dir, "elevation.tif")
if not exists(elevation_path_temp):
  elevation = base_dem_array
  export_array_as_tif(elevation, elevation_path_temp, compress=False)

# Slope
slope_path_temp = join(topo_temp_dir, "slope.tif")
if not exists(slope_path_temp):
  wbt.slope(elevation_path_temp, slope_path_temp, units = "degrees")

# Aspect
aspect_path_temp = join(topo_temp_dir, "aspect.tif")
if not exists(aspect_path_temp):
  wbt.aspect(elevation_path_temp, aspect_path_temp)

# Profile Curvature
profile_curvature_path_temp = join(topo_temp_dir, "profile_curvature.tif")
if not exists(profile_curvature_path_temp):
  wbt.profile_curvature(elevation_path_temp, profile_curvature_path_temp, log=False)

# Tangential Curvature
tangential_curvature_path_temp = join(topo_temp_dir, "tangential_curvature.tif")
if not exists(tangential_curvature_path_temp):
  wbt.tangential_curvature(elevation_path_temp, tangential_curvature_path_temp, log=False)

# Topographic Ruggedness Index
topographic_ruggedness_index_path_temp = join(topo_temp_dir, "topographic_ruggedness_index.tif")
if not exists(topographic_ruggedness_index_path_temp):
  wbt.ruggedness_index(elevation_path_temp, topographic_ruggedness_index_path_temp)

# Deviation from Mean Elevation
dev_kernel_sizes = [3, 7, 11]
for kernel_size in dev_kernel_sizes:
  deviation_mean_elevation_path_temp = join(topo_temp_dir, f"deviation_mean_elevation_{str(kernel_size).rjust(2, '0')}.tif")
  if not exists(deviation_mean_elevation_path_temp):
    wbt.dev_from_mean_elev(elevation_path_temp, deviation_mean_elevation_path_temp, filterx=kernel_size, filtery=kernel_size)

# Circular Variance of Aspect
cva_kernel_sizes = [3, 7, 11]
for kernel_size in cva_kernel_sizes:
  circular_variance_aspect_path_temp = join(topo_temp_dir, f"circular_variance_aspect_{str(kernel_size).rjust(2, '0')}.tif")
  if not exists(circular_variance_aspect_path_temp):
    wbt.circular_variance_of_aspect(elevation_path_temp, circular_variance_aspect_path_temp, filter=kernel_size)

# Fill Single Cell Pits for Breach Depressions
dem_fill_single_cell_pits_path_temp = join(topo_temp_dir, "dem_fill_single_cell_pits.tif")
if not exists(dem_fill_single_cell_pits_path_temp):
  wbt.fill_single_cell_pits(elevation_path_temp, dem_fill_single_cell_pits_path_temp)
  # Raw output doesn't work, needs to be saved again.
  dem_fill_single_cell_pits = gdal.Open(dem_fill_single_cell_pits_path_temp).ReadAsArray()
  export_array_as_tif(dem_fill_single_cell_pits, dem_fill_single_cell_pits_path_temp, compress=False)

# Breach Depressions for Specific Contributing Area
max_search_dist = 2 # Maximum search distance for breach paths in cells (pixels)
dem_breach_depressions_path_temp = join(topo_temp_dir, "dem_breach_depressions.tif")
if not exists(dem_breach_depressions_path_temp):
  wbt.breach_depressions_least_cost(dem_fill_single_cell_pits_path_temp, dem_breach_depressions_path_temp, dist=max_search_dist)

# Specific Contributing Area (Qin) (for TWI and SPI)
specific_contributing_area_qin_path_temp = join(topo_temp_dir, "specific_contributing_area_qin.tif")
if not exists(specific_contributing_area_qin_path_temp):
  wbt.qin_flow_accumulation(dem_breach_depressions_path_temp, specific_contributing_area_qin_path_temp, out_type="specific contributing area")

# Topographic Wetness Index (TWI)
topographic_wetness_index_path_temp = join(topo_temp_dir, "topographic_wetness_index.tif")
if not exists(topographic_wetness_index_path_temp):
  wbt.wetness_index(specific_contributing_area_qin_path_temp, slope_path_temp, topographic_wetness_index_path_temp)

# Stream Power Index (SPI)
exponent = 1.0
stream_power_index_path_temp = join(topo_temp_dir, "stream_power_index.tif")
if not exists(stream_power_index_path_temp):
  wbt.stream_power_index(specific_contributing_area_qin_path_temp, slope_path_temp, stream_power_index_path_temp, exponent=exponent)

# The whitebox algorithm 'wbt.surface_area_ratio' is not currently working correctly.
# This SAR function below is based on the Whitebox source code:
# https://github.com/jblindsay/whitebox-tools/blob/master/whitebox-tools-app/src/tools/terrain_analysis/surface_area_ratio.rs
# 'jit' makes it orders of magnitude faster.

surface_area_ratio_path_temp = join(topo_temp_dir, "surface_area_ratio.tif")

if not exists(surface_area_ratio_path_temp):
  elevation_raster = gdal.Open(elevation_path_temp)
  transform = elevation_raster.GetGeoTransform()
  elevation_array = elevation_raster.ReadAsArray()
  @jit(nopython=True)
  def calculate_surface_area_ratio(dem, transform, nodata):
      resx, resy = transform[1], -transform[5]
      output = np.full(dem.shape, nodata, dtype=np.float32)
      for i in range(1, dem.shape[0]-1):
          mid_lat = transform[3] + i*transform[5]
          resx_adjusted = abs(resx) * 111_111.0 * cos(radians(mid_lat))
          resy_adjusted = abs(resy) * 111_111.0
          res_diag = sqrt(resx_adjusted**2 + resy_adjusted**2)
          cell_area = resx_adjusted * resy_adjusted
          eigth_area = cell_area / 8.0
          for j in range(1, dem.shape[1]-1):
              if dem[i, j] == nodata:
                  continue
              window = dem[i-1:i+2, j-1:j+2]
              dx = np.array([-1, 0, 1, -1, 0, 1, -1, 0, 1])
              dy = np.array([-1, -1, -1, 0, 0, 0, 1, 1, 1])
              zvals = np.array([window[dy[k]+1, dx[k]+1] for k in range(9)])
              dist_planar = np.array([resx_adjusted]*6 + [resy_adjusted]*6 + [res_diag]*4)
              dist_pairs = [(0, 1), (1, 2), (3, 4), (4, 5), (6, 7), (7, 8), (0, 3), (1, 4), (2, 5), (3, 6), (4, 7), (5, 8), (0, 4), (2, 4), (6, 4), (8, 4)]
              distances = np.array([sqrt(dist_planar[k]**2 + (zvals[i] - zvals[j])**2) / 2 for k, (i, j) in enumerate(dist_pairs) if zvals[i] != nodata and zvals[j] != nodata])
              triangle_sides = [(0, 7, 12), (1, 7, 13), (2, 6, 12), (3, 8, 13), (2, 9, 14), (3, 11, 15), (4, 10, 14), (5, 10, 15)]
              area = 0.0
              cell_area2 = cell_area
              for a, b, c in triangle_sides:
                  if a < len(distances) and b < len(distances) and c < len(distances):
                      s = (distances[a] + distances[b] + distances[c]) / 2.0
                      area += sqrt(s * (s - distances[a]) * (s - distances[b]) * (s - distances[c]))
                  else:
                      cell_area2 -= eigth_area
              if cell_area2 > 0.0:
                  output[i, j] = area / cell_area2
          resx, resy = transform[1], -transform[5]
      return output

  # Run the jitted function on the entire array
  surface_area_ratio_array = calculate_surface_area_ratio(elevation_array, transform, nodatavalue)
  export_array_as_tif(surface_area_ratio_array, surface_area_ratio_path_temp, template=elevation_path_temp, compress=False)

# Calculate Aspect Sine
aspect_sine_path_temp = join(topo_temp_dir,"aspect_sine.tif")
if not exists(aspect_sine_path_temp):
  aspect = gdal.Open(aspect_path_temp).ReadAsArray()
  aspect_sine = np.sin(np.radians(aspect))
  export_array_as_tif(aspect_sine, aspect_sine_path_temp, compress=False)

# Calculate Aspect Cosine
aspect_cosine_path_temp = join(topo_temp_dir,"aspect_cosine.tif")
if not exists(aspect_cosine_path_temp):
  aspect = gdal.Open(aspect_path_temp).ReadAsArray()
  aspect_cosine = np.cos(np.radians(aspect))
  export_array_as_tif(aspect_cosine, aspect_cosine_path_temp, compress=False)

# Calculate Eastness
eastness_path_temp = join(topo_temp_dir,"eastness.tif")
if not exists(eastness_path_temp):
  slope = gdal.Open(slope_path_temp).ReadAsArray()
  aspect_sine = gdal.Open(aspect_sine_path_temp).ReadAsArray()
  eastness = aspect_sine * np.sin(np.radians(slope))
  export_array_as_tif(eastness, eastness_path_temp, compress=False)

# Calculate Northness
northness_temp = join(topo_temp_dir,"northness.tif")
if not exists(northness_temp):
  slope = gdal.Open(slope_path_temp).ReadAsArray()
  aspect_cosine = gdal.Open(aspect_cosine_path_temp).ReadAsArray()
  northness = aspect_cosine * np.sin(np.radians(slope))
  export_array_as_tif(northness, northness_temp, compress=False)

elevation = gdal.Open(elevation_path_temp).ReadAsArray()

# Calculate Roughness
roughness_kernel_sizes = [3, 7, 11]
for kernel_size in roughness_kernel_sizes:
  roughness_path_temp = join(topo_temp_dir,f"roughness_{str(kernel_size).rjust(2, '0')}.tif")
  if not exists(roughness_path_temp):
    roughness = maximum_filter(elevation, size=kernel_size) - minimum_filter(elevation, size=kernel_size)
    export_array_as_tif(roughness, roughness_path_temp, compress=False)

# Calculate Topographic Position Index (TPI)
tpi_kernel_sizes = [3, 7, 11]
for kernel_size in tpi_kernel_sizes:
  tpi_path_temp = join(topo_temp_dir, f"topographic_position_index_{str(kernel_size).rjust(2, '0')}.tif")
  if not exists(tpi_path_temp):
    topographic_position_index = elevation - uniform_filter(elevation, size=kernel_size)
    export_array_as_tif(topographic_position_index, tpi_path_temp, compress=False)

# Calculate Stream Power Index (SPI) log10
spi_log10_path_temp = join(topo_temp_dir,"stream_power_index_log10.tif")
if not exists(spi_log10_path_temp):
  stream_power_index = gdal.Open(stream_power_index_path_temp).ReadAsArray()
  stream_power_index[stream_power_index <= 0] = 1.0e-30 # Convert 0, negative or 'nodata' values
  stream_power_index_log10 = np.log10(stream_power_index)
  export_array_as_tif(stream_power_index_log10, spi_log10_path_temp, compress=False)

In [None]:
# Check there weren't errors in updated Whitebox algorithms
visualise_topo_temp = False

if visualise_topo_temp:
  raster_files = [os.path.join(topo_temp_dir, file) for file in os.listdir(topo_temp_dir) if file.endswith('.tif')]
  for raster_file in raster_files:
      ds = gdal.Open(raster_file)
      if ds is None:
          print('Could not open ' + raster_file)
          continue
      band = ds.GetRasterBand(1)
      raster_data = band.ReadAsArray()
      ds = None
      p2, p98 = np.percentile(raster_data, [2, 98]) # Compute the 2% and 98% percentiles
      plt.figure()
      plt.imshow(raster_data, cmap='viridis', vmin=p2, vmax=p98)
      plt.colorbar()
      plt.title(os.path.basename(raster_file))
      plt.show()

# Round and smooth

In [None]:
# Creates a dictionary of optiomal precision based on number of desired unique values.
# Limiting the number unique values avoids overfitting and reduces training time.
# As a rule of thumb, set this to at least 256 and above the elevation range in metres.
# E.g. project area is 200 - 2900 m, set to at least 2,700.

override_max_unique_values = False
max_unique_values = 5000 # Should be >=10

if override_max_unique_values == False:
  max_unique_values = int(np.ptp(base_dem_array)) # Precision based on elevation variance
topo_precision_dict = {}

# Define list of topography metrics to finalise
topography_list = []
for temp_tif in os.listdir(topo_temp_dir):
    topography_list.append(str(temp_tif))
topography_list = sorted(topography_list)
print("topography_final_list = [")
for topography in topography_list:
    print(f"'{topography}',")
print("]")

topography_final_list = [
'aspect.tif',
'aspect_cosine.tif',
'aspect_sine.tif',
'circular_variance_aspect_03.tif',
'circular_variance_aspect_07.tif',
'circular_variance_aspect_11.tif',
'dem_breach_depressions.tif',
'dem_fill_single_cell_pits.tif',
'deviation_mean_elevation_03.tif',
'deviation_mean_elevation_07.tif',
'deviation_mean_elevation_11.tif',
'eastness.tif',
'elevation.tif',
'northness.tif',
'profile_curvature.tif',
'roughness_03.tif',
'roughness_07.tif',
'roughness_11.tif',
'rounding_dictionary.csv',
'slope.tif',
'specific_contributing_area_qin.tif',
'stream_power_index.tif',
'stream_power_index_log10.tif',
'surface_area_ratio.tif',
'tangential_curvature.tif',
'topographic_position_index_03.tif',
'topographic_position_index_07.tif',
'topographic_position_index_11.tif',
'topographic_ruggedness_index.tif',
'topographic_wetness_index.tif',
]


In [None]:
topography_final_list = [
# 'aspect.tif',
'aspect_cosine.tif',
'aspect_sine.tif',
'circular_variance_aspect_03.tif',
'circular_variance_aspect_07.tif',
'circular_variance_aspect_11.tif',
# 'dem_breach_depressions.tif',
# 'dem_fill_single_cell_pits.tif',
'deviation_mean_elevation_03.tif',
'deviation_mean_elevation_07.tif',
'deviation_mean_elevation_11.tif',
'eastness.tif',
'elevation.tif',
'northness.tif',
'profile_curvature.tif',
'roughness_03.tif',
'roughness_07.tif',
'roughness_11.tif',
'slope.tif',
# 'specific_contributing_area_qin.tif',
# 'stream_power_index.tif',
'stream_power_index_log10.tif',
'surface_area_ratio.tif',
'tangential_curvature.tif',
'topographic_position_index_03.tif',
'topographic_position_index_07.tif',
'topographic_position_index_11.tif',
'topographic_ruggedness_index.tif',
'topographic_wetness_index.tif',
]

In [None]:
create_precision_dict = True
overwrite_existing_precision_dict = False

precision_dict_csv_path = join(topo_temp_dir, 'rounding_dictionary.csv')
if not exists(precision_dict_csv_path) or overwrite_existing_precision_dict:
  for topography_final in topography_final_list:
    print(f"Reading {topography_final}...")
    # Read raster as array
    topography_raster_path = join(topo_temp_dir, topography_final)
    topography_raster_array = gdal.Open(topography_raster_path).ReadAsArray()
    # Convert 'nodata' values to nan
    topography_raster_array[topography_raster_array == nodatavalue] = np.nan
    topography_raster_array_masked = np.ma.array(topography_raster_array, mask=np.isnan(topography_raster_array))
    # Count unique values in raster
    unique_values = len(np.unique(topography_raster_array_masked))
    print(f"There are {unique_values} unique values in {topography_final}")
    # Generate histogram from 100,000 random points
    random_selection = np.random.choice(topography_raster_array_masked.ravel(), size = 100_000, replace = False)
    _ = plt.hist(random_selection, bins='auto')  # arguments are passed to np.histogram
    plt.title(f"{topography_final}")
    plt.show()
    # Remove 0 values for log10
    topography_raster_array_masked[topography_raster_array_masked == 0] = np.nan
    topography_raster_array_masked = np.ma.array(topography_raster_array, mask=np.isnan(topography_raster_array))
    # Create log10 array for determining positions for rounding
    array_log10 = np.log10(abs(topography_raster_array_masked))
    place_value_decimal = int(abs(np.min(array_log10)))
    place_value_integer = int(0 - np.max(array_log10))
    # Iterate down precision levels to determine optimal number of unique values
    min_starting_precision = len(str(max_unique_values))
    for precision in reversed(range(place_value_integer, max(min_starting_precision, place_value_decimal +1))):
      rounded_array = np.round(topography_raster_array, decimals=precision)
      round_unique_values = len(np.unique(rounded_array))
      optimal_precision = None
      if round_unique_values <= max_unique_values:
        optimal_precision = precision
        print(f"The optimal precison for {topography_final} is {optimal_precision}, with {round_unique_values} unique values.")
        topo_precision_dict.update({f'{topography_final}':f'{optimal_precision}'})
        break
    if optimal_precision == None: print("There's a problem with setting precision.")
    print("___________________\n")

  print("Dictionary for optimal rounding values:")
  topo_precision_dict

  precision_dict_csv_path = join(topo_temp_dir, 'rounding_dictionary.csv')
  # Save rounding dictionary to CSV
  with open(precision_dict_csv_path, 'w', newline='') as precision_dict_csv:
      writer = csv.writer(precision_dict_csv)
      writer.writerow(topo_precision_dict.keys())
      writer.writerow(topo_precision_dict.values())

# Open rounding dictionary and verify
precision_dict_csv_path = join(topo_temp_dir, 'rounding_dictionary.csv')
with open(precision_dict_csv_path, 'r') as file:
    keys, values = list(csv.reader(file))
    topo_precision_dict = dict(zip(keys, values))

# Verify precision and correct if necessary
print("topo_precision_dict = {")
for key, value in topo_precision_dict.items():
    print(f'"{key}": {value},')
print("}")

topo_precision_dict = {
"aspect_cosine.tif": 3,
"aspect_sine.tif": 3,
"circular_variance_aspect_03.tif": 3,
"circular_variance_aspect_07.tif": 3,
"circular_variance_aspect_11.tif": 3,
"deviation_mean_elevation_03.tif": 2,
"deviation_mean_elevation_07.tif": 2,
"deviation_mean_elevation_11.tif": 2,
"eastness.tif": 3,
"elevation.tif": 0,
"northness.tif": 3,
"profile_curvature.tif": 4,
"roughness_03.tif": 1,
"roughness_07.tif": 0,
"roughness_11.tif": 0,
"slope.tif": 1,
"stream_power_index_log10.tif": 2,
"surface_area_ratio.tif": 3,
"tangential_curvature.tif": 4,
"topographic_position_index_03.tif": 1,
"topographic_position_index_07.tif": 1,
"topographic_position_index_11.tif": 1,
"topographic_ruggedness_index.tif": 1,
"topographic_wetness_index.tif": 1,
}


In [None]:
topo_precision_dict = {
"aspect_cosine.tif": 2,
"aspect_sine.tif": 2,
"circular_variance_aspect_03.tif": 3,
"circular_variance_aspect_07.tif": 3,
"circular_variance_aspect_11.tif": 3,
"deviation_mean_elevation_03.tif": 2,
"deviation_mean_elevation_07.tif": 2,
"deviation_mean_elevation_11.tif": 2,
"eastness.tif": 2,
"elevation.tif": 0,
"northness.tif": 2,
"profile_curvature.tif": 4,
"roughness_03.tif": 1,
"roughness_07.tif": 0,
"roughness_11.tif": 0,
"slope.tif": 1,
"stream_power_index_log10.tif": 1,
"surface_area_ratio.tif": 3,
"tangential_curvature.tif": 4,
"topographic_position_index_03.tif": 1,
"topographic_position_index_07.tif": 1,
"topographic_position_index_11.tif": 1,
"topographic_ruggedness_index.tif": 1,
"topographic_wetness_index.tif": 1,
}

In [None]:
# Smoothed versions allow the model to account for geolocation inaccuracies,
# and adjacent topography types not captured in the various metrics.

# Set smoothing kernel
kernel = Gaussian2DKernel(x_stddev=1, y_stddev=1)

# Topography progress
topography_progress_index = 0
topography_progress_label = widgets.Label(f"Topography progress: {topography_progress_index}/{len(topo_precision_dict.items())}")
display(topography_progress_label)

# Iterate over selected topography rasters
for topography, precision in topo_precision_dict.items():
  topo_raster_temp_path = join(topo_temp_dir, topography)
  topo_raster_temp_array = gdal.Open(topo_raster_temp_path).ReadAsArray()
  # Convert nodata values to 0
  topo_raster_temp_array[topo_raster_temp_array == nodatavalue] = 0
  # Set path and check if exists
  if topo_temp_dir.endswith("dtm_temp"): topo_raster_unsmoothed_filename = f"topo_dtm_unsmooth_{topography}"
  else: topo_raster_unsmoothed_filename = f"topo_dsm_unsmooth_{topography}"
  topo_raster_unsmoothed_path = join(topo_final_dir, topo_raster_unsmoothed_filename)
  if not exists(topo_raster_unsmoothed_path):
    # Round and export unsmoothed topography raster
    topo_raster_unsmoothed_rounded = np.round(topo_raster_temp_array, decimals=int(precision))
    export_array_as_tif(topo_raster_unsmoothed_rounded, topo_raster_unsmoothed_path)
  # Smooth using 2D spatial convolution
  if topo_temp_dir.endswith("dtm_temp"): topo_raster_smoothed_filename = f"topo_dtm_smooth_{topography}"
  else: topo_raster_smoothed_filename = f"topo_dsm_smooth_{topography}"
  topo_raster_smoothed_path = join(topo_final_dir, topo_raster_smoothed_filename)
  if not exists(topo_raster_smoothed_path):
    topo_raster_smoothed = convolve(topo_raster_temp_array, kernel, boundary='extend')
    # Round and export smoothed topography raster
    topo_raster_smoothed_rounded = np.round(topo_raster_smoothed, decimals=int(precision))
    export_array_as_tif(topo_raster_smoothed_rounded, topo_raster_smoothed_path)
  # Update topography progress
  topography_progress_index += 1
  topography_progress_label.value = f"Topography progress: {topography_progress_index}/{len(topo_precision_dict.items())}"

Label(value='Topography progress: 0/24')

# Distance from coast

In [None]:
# Creates a feature which accounts for coastal / continental effects.
# Requires a polygon of a landmass or 'coastline extent', e.g. Peninsular Malaysia.
# This must include the nearest coastline to all parts of the project area,
# even coasts outside the project area (if they're still the nearest).

# Define and create directory
coast_dir = join(features_dir, 'coast')
makedirs(coast_dir, exist_ok=True)

# Download global coast data from https://osmdata.openstreetmap.de/data/coastlines.html
coastlines_url = 'https://osmdata.openstreetmap.de/download/coastlines-split-4326.zip'
coastlines_global_file_path = join(coast_dir, 'coastlines-split-4326.zip')
if not exists(coastlines_global_file_path):
  request = requests.get(coastlines_url, allow_redirects=True)
  open(coastlines_global_file_path, 'wb').write(request.content)

coastlines_global_dir = join(coast_dir, 'coastlines-split-4326')
if not exists(coastlines_global_dir):
  with zipfile.ZipFile(coastlines_global_file_path, 'r') as zip_ref:
      zip_ref.extractall(coast_dir)

# Upload and select a polygon with full coastline extent.
# It must be the template.gpkg polygon OR a polygon that entirely contains template.gpkg.
polygons_to_exclude = ['project_area.gpkg', 'project_area_buffered_bbox.gpkg', 'gedi_area.gpkg']
for polygon in os.listdir(polygons_dir):
  if polygon not in polygons_to_exclude:
    print(f"coastline_extent_polygon = '{polygon}'")

In [None]:
coastline_extent_polygon = 'peninsular_malaysia.gpkg'

# Get extent of polygon
coastline_extent_polygon_path = join(polygons_dir, coastline_extent_polygon)
coastline_extent_bounds = gpd.read_file(coastline_extent_polygon_path).total_bounds
coastline_min_x, coastline_max_x = coastline_extent_bounds[0], coastline_extent_bounds[2]
coastline_min_y, coastline_max_y = coastline_extent_bounds[1], coastline_extent_bounds[3]

# Set precision (in km) of distance
precision = 1

# Load template
template = gdal.Open(template_dir)

# Path of the new coast raster (where coastline will be rasterized)
rasterized_coast_path = join(coast_dir, 'rasterized_coast.tif')

# Create a new empty raster based on the coastline extent polygon
if coastline_extent_polygon != 'template.gpkg':
  # Get dimensions of template
  template_band = template.GetRasterBand(1)
  template_dimensions, template_projection = template.GetGeoTransform(), template.GetProjection()
  template_size_x, template_size_y = template_band.XSize, template_band.YSize
  template_res_x, template_res_y = template_dimensions[1], -template_dimensions[5]

  # Calculate the minimum x and y of the template
  template_min_x = template_dimensions[0]
  template_max_y = template_dimensions[3]

  # Calculate the centre x and y of the template
  template_centre_x = template_min_x + ((template_size_x / 2) * template_res_x)
  template_centre_y = template_max_y - ((template_size_y / 2) * template_res_y)

  # Calculate the size (in pixels) difference between the polygon minimum and template minimum
  coastline_min_diff_x = template_min_x - coastline_min_x
  coastline_max_diff_y = coastline_max_y - template_max_y
  coastline_min_diff_x_size = int(np.ceil(coastline_min_diff_x / template_res_x))
  coastline_max_diff_y_size = int(np.ceil(coastline_max_diff_y / template_res_y))

  # Calculate when the coastline raster should start while maintaining template resolution and position
  coastline_start_x = template_min_x - (coastline_min_diff_x_size * template_res_x)
  coastline_start_y = template_max_y + (coastline_max_diff_y_size * template_res_y)

  # Calculate the size of the coastline raster
  coastline_size_x = int(np.ceil((coastline_max_x - coastline_start_x)/template_res_x))
  coastline_size_y = int(np.ceil((coastline_start_y - coastline_min_y)/template_res_y))

  if not exists(rasterized_coast_path):
    # Create coast raster dataset
    driver = gdal.GetDriverByName("GTiff").Create(rasterized_coast_path, coastline_size_x, coastline_size_y, bands=1, eType=gdal.GDT_Float32,
                                                    options=["COMPRESS=DEFLATE","PREDICTOR=2","ZLEVEL=9"])
    driver.SetProjection(template_projection)
    driver.SetGeoTransform((coastline_start_x, template_res_x, 0, coastline_start_y, 0, -template_res_y))

    #  Create and write array (all pixels with value 1)
    raster_data = np.ones((coastline_size_y, coastline_size_x), dtype=np.float32)
    driver.GetRasterBand(1).WriteArray(raster_data)

    # Close coast raster dataset
    driver.FlushCache()
    driver = None
    print("A blank raster at the extent of coastlines polygon has been generated, ready for rasterization.")
  else: print("A rasterization raster already exists.")

else: # If just using the template area, copy the template.
  if not exists(rasterized_coast_path):
    template_array = template.ReadAsArray()
    export_array_as_tif(template_array, rasterized_coast_path)
  else: print("A rasterization raster already exists.")

# Clip coastlines polygon to the extent (speeds up rasterization)
coastlines_clipped_path = join(coast_dir, 'clipped_coastlines.gpkg')
if not exists(coastlines_clipped_path):
  coastlines_shp_path = join(coastlines_global_dir, 'lines.shp')
  coastlines_shp_df = gpd.read_file(coastlines_shp_path)
  coastlines_clipped_df = gpd.clip(coastlines_shp_df, coastline_extent_bounds)
  coastlines_clipped_df.to_file(coastlines_clipped_path, driver='GPKG')
  print(f"Coastlines clipped to the polygon: {coastlines_clipped_path}")
else: print(f"Coastlines have already been clipped to the polygon: {coastlines_clipped_path}")

# Rasterize coastlines (2), if not already
rasterized_coast_array = gdal.Open(rasterized_coast_path).ReadAsArray()
if not np.any(rasterized_coast_array == 2):
  burn_polygon_to_raster(rasterized_coast_path, coastlines_clipped_path, fixed_value=2)
  print(f"Coastlines rasterized: {rasterized_coast_path}")
else: print(f"Coastlines have already been rasterized: {rasterized_coast_path}")

# Calculate proximity in pixels coast
coast_proximity_pixels_path = join(coast_dir, "coast_proximity_pixels.tif")
if not exists(coast_proximity_pixels_path):
  rasterized_coast_array = gdal.Open(rasterized_coast_path).ReadAsArray()
  # If the rasterized coastline is different dimensions from the template
  if coastline_extent_polygon != 'template.gpkg':
    coast_proximity_pixels_unclipped_path = join(coast_dir, "coast_proximity_pixels_unclipped.tif")
    if not exists(coast_proximity_pixels_unclipped_path):
      coast_proximity_pixels_unclipped = distance_transform_edt(rasterized_coast_array != 2) # Target the coastal '2' pixels
      export_array_as_tif(coast_proximity_pixels_unclipped, coast_proximity_pixels_unclipped_path, rasterized_coast_path)
    coast_proximity_pixels_unclipped = gdal.Open(coast_proximity_pixels_unclipped_path).ReadAsArray()
    clip_start_x, clip_start_y = coastline_min_diff_x_size, coastline_max_diff_y_size
    clip_size_x, clip_size_y = template_size_x, template_size_y
    coast_proximity_pixels = coast_proximity_pixels_unclipped[clip_start_y:clip_start_y + clip_size_y,
                                                            clip_start_x:clip_start_x + clip_size_x]
  # If the rasterized coastline is the same dimensions as the template
  else: coast_proximity_pixels = distance_transform_edt(rasterized_coast_array != 2) # Target the coastal '2' pixels
  # Export coast proximity in pixels
  export_array_as_tif(coast_proximity_pixels, coast_proximity_pixels_path)
  print(f"A proximity (pixel number) raster has been generated at: {coast_proximity_pixels_path}")
else: print(f"A proximity (pixel number) raster already exists at: {coast_proximity_pixels_path}")

# Convert proximity to km (distance from coast)
coast_distance_path = join(coast_dir, "coast_proximity_km.tif")
if not exists(coast_distance_path):
  coast_proximity_array = gdal.Open(coast_proximity_pixels_path).ReadAsArray()
  cell_size_x = gdal.Open(join(areas_dir, 'cell_size_x.tif')).ReadAsArray()
  cell_size_y = gdal.Open(join(areas_dir, 'cell_size_y.tif')).ReadAsArray()
  cell_size_mean_km = ((np.mean(cell_size_x) + np.mean(cell_size_y)) / 2) / 1000
  coast_proximity_km = coast_proximity_array * cell_size_mean_km
  coast_proximity_round = np.round(coast_proximity_km, precision)
  export_array_as_tif(coast_proximity_round, coast_distance_path)
  print(f"A distance from coast (km) raster has been generated at: {coast_distance_path}")
else: print(f"A distance from coast (km) raster already exists at: {coast_distance_path}")

# Disconnect runtime

In [None]:
# Useful for stopping background execution
runtime.unassign()