## 2.5. LiDAR to Canopy DSM

In [None]:
import os
from pathlib import Path
import pathlib
import rasterio as rio
from rasterio.merge import merge
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio.io import MemoryFile
import numpy as np

# Input files
DSM_A_PATH = "/media/remap/NO_HEAT_RB/City_Atlanta/Processed/DSM/atlanta_DSM_reprojected.tif"
DSM_B_PATH = "/media/remap/NO_HEAT_RB/City_Atlanta/Processed/DEM/output/DEM_merged.tif"
OUTPUT_PATH = "/media/remap/NO_HEAT_RB/City_Atlanta/Processed/DSM/normalized_DSM.tif"

In [None]:
def match_raster_to_reference(reference_path: str, source_path: str, output_path: str) -> None:
    """
    Resamples and reprojects source raster to match the resolution, extent,
    and CRS of the reference raster, and saves the result to output_path.

    Parameters:
    - reference_path (str): Path to the raster whose spatial properties should be matched.
    - source_path (str): Path to the input raster to be aligned.
    - output_path (str): Path where the resampled raster will be saved.
    """
    with rio.open(reference_path) as ref:
        ref_crs = ref.crs
        ref_transform = ref.transform
        ref_width = ref.width
        ref_height = ref.height
        ref_profile = ref.profile.copy()
        
        with rio.open(source_path) as src:
            src_data = src.read(1)
            src_crs = src.crs
            src_dtype = src.dtypes[0]

            dst_data = np.empty((ref_height, ref_width), dtype=src_dtype)

            reproject(
                source=src_data,
                destination=dst_data,
                src_transform=src.transform,
                src_crs=src_crs,
                dst_transform=ref_transform,
                dst_crs=ref_crs,
                resampling=Resampling.bilinear  
            )

            ref_profile.update({
                "dtype": src_dtype,
                "height": ref_height,
                "width": ref_width,
                "transform": ref_transform,
                "count": 1
            })

            with rio.open(output_path, "w", **ref_profile) as dst:
                dst.write(dst_data, 1)

    print(f"Aligned raster saved to: {output_path}")

In [None]:
def subtract_a_b(
    from_dsm_path, 
    subtract_dsm_path,
    output
) -> None:
    """
    Subtract array from from_dsm_path, to subtract_dsm_path
    """    
    with rio.open(from_dsm_path) as src1, rio.open(subtract_dsm_path) as src2:
        if (src1.width != src2.width or src1.height != src2.height or src1.transform != src2.transform):
            aligned_path = os.path.join(os.path.dirname(subtract_dsm_path), pathlib.Path(subtract_dsm_path).stem + "aligned.tif")
            match_raster_to_reference(
                reference_path=from_dsm_path,
                source_path=subtract_dsm_path,
                output_path=aligned_path
            )
            src2 = rio.open(aligned_path) # update src2 to the aligned one.
        meta = src1.meta.copy()
        meta.update(dtype='float32', count=1, nodata=0)

        with rio.open(output, 'w', **meta) as dst:
            for ji, window in src1.block_windows(1):
                v1 = src1.read(1, window = window)
                v2 = src2.read(1, window = window)
                diff = v1 - v2
                diff[diff < 0] = 0
                dst.write(diff.astype('float32'), window = window, indexes = 1)

    print(f"Saved to {output}")

In [None]:
subtract_a_b(DSM_A_PATH, DSM_B_PATH, OUTPUT_PATH)