In [99]:
import requests
from io import BytesIO
import gzip
import os
import pandas as pd
import geopandas as gpd
import numpy as np
import rasterio
from shapely.geometry import mapping
from rasterio.features import geometry_mask
from scipy.interpolate import NearestNDInterpolator
import startinpy
from rasterio import Affine
from shapely.geometry import shape
from rasterio.crs import CRS
import pyproj
from shapely.geometry import box

import laspy
import time
from tqdm import tqdm
from scipy.spatial import cKDTree
from scipy.ndimage import median_filter
from pathlib import Path
import matplotlib.pyplot as plt

In [109]:
pyproj_crs = pyproj.CRS.from_epsg(28992)  
wkt_string = pyproj_crs.to_wkt()
crs = CRS.from_wkt(wkt_string)

In [23]:
with rasterio.open("output/R5_27CN1.TIF", "r") as f:
    print(f.crs)

LOCAL_CS["Amersfoort / RD New"]


In [47]:
def fetch_ahn_wcs(bbox, output_file="output/dtm.tif", coverage="dtm_05m", resolution=0.5):
    # Calculate width and height from bbox and resolution
    width = int((bbox[2] - bbox[0]) / resolution)
    height = int((bbox[3] - bbox[1]) / resolution)

    # WCS Service URL
    WCS_URL = "https://service.pdok.nl/rws/ahn/wcs/v1_0"

    # Construct query parameters
    params = {
        "SERVICE": "WCS",
        "VERSION": "1.0.0",
        "REQUEST": "GetCoverage",
        "FORMAT": "GEOTIFF",
        "COVERAGE": coverage,
        "BBOX": f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}",
        "CRS": "EPSG:28992",
        "RESPONSE_CRS": "EPSG:28992",
        "WIDTH": str(width),
        "HEIGHT": str(height)
    }

    # Send GET request to fetch the data
    response = requests.get(WCS_URL, params=params, headers={"User-Agent": "Mozilla/5.0"})
    
    # getting correct crs data (using rasterio directly for crs fails on my laptop...)
    pyproj_crs = pyproj.CRS.from_epsg(28992)  
    wkt_string = pyproj_crs.to_wkt()
    crs = CRS.from_wkt(wkt_string)

    if response.status_code == 200:
        with open("temp.tif", "wb") as f:
            f.write(response.content)

        with rasterio.open("temp.tif", "r") as f:
            print(f.crs)
        # Step 1: Clean the file using GDAL
        gdal_translate_command = f"gdal_translate -of GTiff -a_srs EPSG:28992 temp.tif {output_file}"
        os.system(gdal_translate_command)  # This runs the GDAL command

        # Step 2: Open the cleaned raster and modify NoData value
        try:
            with rasterio.open(output_file) as dataset:
                # Read the array
                array = dataset.read(1)
                   
                # Get the current NoData value
                old_nodata = dataset.nodata

                # Set the new NoData value
                new_nodata = -9999

                # Replace old NoData values with the new NoData value
                array[array == old_nodata] = new_nodata

                # Step 3: Save the modified raster with the new NoData value
                # Open the file in write mode and update the NoData value
                with rasterio.open(output_file, 'r+') as dst:
                    dst.write(array, 1)
                    dst.nodata = new_nodata

        except Exception as e:
            print(f"Error reading or modifying raster: {e}")
            return None
        
        # Step 4: Delete the temporary file after use
        if os.path.exists("temp.tif"):
            os.remove("temp.tif")
            print("Temporary file 'temp.tif' has been deleted.")
        
        return dst, array

    else:
        print(f"Failed to fetch AHN data: HTTP {response.status_code}")
        return None

In [48]:
bbox = (94500, 469500, 95000, 470000)  # (minx, miny, maxx, maxy)
dtm, dtm_array = fetch_ahn_wcs(bbox)
dsm, dsm_array = fetch_ahn_wcs(bbox, "output/dsm.tif", "dsm_05m")

LOCAL_CS["Amersfoort / RD New"]
Temporary file 'temp.tif' has been deleted.
LOCAL_CS["Amersfoort / RD New"]
Temporary file 'temp.tif' has been deleted.


In [49]:
def download_wfs_data(bbox, gpkg_name, output_folder, output_name):
    """
    Download data from a WFS server in batches and save it to a GeoPackage.
    -----------------------------------------------------
    Input:
    -   wfs_url (str): URL of the WFS service.
    -   layer_name (str): The layer name to download.
    -   bbox (tuple): Bounding box as (minx, miny, maxx, maxy).
    -   gpkg_name (str): Name for the output GeoPackage file.
    -   tile_name (str): Layer name for saving in the GeoPackage.
    Output:
    -   None: saves a GeoPackage file to the given {output_gpkg} at layer {tile_name}.
    """
    # Initialize variables for feature collection, max requestable amount from server is 10000
    all_features = []
    start_index = 0
    count = 10000

    wfs_url = "https://data.3dbag.nl/api/BAG3D/wfs"
    layer_name = "BAG3D:lod13"

    while True:
        params = {
            "SERVICE": "WFS",
            "REQUEST": "GetFeature",
            "VERSION": "2.0.0",
            "TYPENAMES": layer_name,
            "SRSNAME": "urn:ogc:def:crs:EPSG::28992",
            "BBOX": f"{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]},urn:ogc:def:crs:EPSG::28992",
            "COUNT": count,
            "STARTINDEX": start_index
        }

        # Mimicking a QGIS request
        headers = {
            "User-Agent": "Mozilla/5.0 QGIS/33411/Windows 11 Version 2009"
        }

        response = requests.get(wfs_url, params=params, headers=headers)

        # Check if the request was successful & download data
        if response.status_code == 200:
            if response.headers.get('Content-Encoding', '').lower() == 'gzip' and response.content[:2] == b'\x1f\x8b':
                data = gzip.decompress(response.content)
            else:
                data = response.content

            with BytesIO(data) as f:
                gdf = gpd.read_file(f)

            all_features.append(gdf)

            # Check if the number of features retrieved is less than the requested count: then we can stop
            if len(gdf) < count:
                break

                # Start index for next request
            start_index += count

        else:
            print(f"Failed to download WFS data. Status code: {response.status_code}")
            print(f"Error message: {response.text}")
            break

            # Concatenate all features into a single GeoDataFrame
    if all_features:
        full_gdf = gpd.GeoDataFrame(pd.concat(all_features, ignore_index=True))

        os.makedirs(output_folder, exist_ok=True)
        output_gpkg = os.path.join(output_folder, f"{gpkg_name}.gpkg")

        full_gdf.to_file(output_gpkg, layer=output_name, driver="GPKG")
    else:
        print("No features were downloaded.")



In [50]:
download_wfs_data(bbox, "buildtest", "output", "test")

In [51]:
def load_buildings(buildings_path, layer):
    """
    Load in the building shapes from a geopackage file.
    ----
    Input:
    - buildings_path (string):   path to the geopackage file.
    - layer (string):            (Tile) name of the layer of buildings to be used

    Output:
    - List of dictionaries: A list of dictionaries containing:
      - "geometry": building geometry in GeoJSON-like format.
      - "parcel_id": corresponding parcel ID.
    """
    buildings_gdf = gpd.read_file(buildings_path, layer=layer)

    if 'identificatie' not in buildings_gdf.columns:
        raise ValueError("Column 'identificatie' not found in the dataset")

    return [{"geometry": mapping(geom), "parcel_id": identificatie} for geom, identificatie in zip(buildings_gdf.geometry, buildings_gdf["identificatie"])]

In [52]:
buildings = load_buildings("output/buildtest.gpkg", "test")

In [8]:
print(buildings)

[{'geometry': {'type': 'Polygon', 'coordinates': (((94591.5234, 469858.7188), (94592.4922, 469857.7188), (94592.3828, 469857.625), (94594.3203, 469855.625), (94600.7734, 469861.9375), (94595.9063, 469866.9063), (94589.5078, 469860.6563), (94589.5156, 469860.4688), (94589.375, 469860.3438), (94591.5234, 469858.7188)),)}, 'parcel_id': 'NL.IMBAG.Pand.1525100000003981'}, {'geometry': {'type': 'Polygon', 'coordinates': (((94592.4922, 469857.7188), (94591.5234, 469858.7188), (94589.375, 469860.3438), (94587.0469, 469858.0625), (94588.9297, 469856.1563), (94591.4297, 469858.5938), (94592.3828, 469857.625), (94592.4922, 469857.7188)),)}, 'parcel_id': 'NL.IMBAG.Pand.1525100000003981'}, {'geometry': {'type': 'Polygon', 'coordinates': (((94500.0078, 469800.2188), (94503.8125, 469803.5), (94494.5078, 469814.625), (94494.7109, 469814.8125), (94494.2422, 469815.375), (94490.0313, 469811.7188), (94500.0078, 469800.2188)),)}, 'parcel_id': 'NL.IMBAG.Pand.1525100000013697'}, {'geometry': {'type': 'Polyg

In [87]:
def extract_center_cells(geo_array, no_data=-9999):
    """
    Extract the values of each cell in the input data and save these with the x and y (row and col)
    indices. Thereby, make sure that the corners of the dataset are filled for a full coverage triangulation
    in the next step.
    ----
    Input:
    - (2d numpy array): raster data.
    - no_data (int, optional): no_data value to replace source no data value with.

    Output:
    - xyz_filled (list): list containing x, y and z coordinates of the cells.
    """
    # Get the indices of the rows and columns
    rows, cols = np.indices(geo_array.shape)

    # Identify corner coordinates
    corners = {
        "top_left": (0, 0),
        "top_right": (0, geo_array.shape[1] - 1),
        "bottom_left": (geo_array.shape[0] - 1, 0),
        "bottom_right": (geo_array.shape[0] - 1, geo_array.shape[1] - 1)
    }

    # Mask for valid center cells (non-no_data)
    valid_center_cells = (geo_array != no_data)

    # Extract x, y, z values for valid cells
    x_valid = cols[valid_center_cells]
    y_valid = rows[valid_center_cells]
    z_valid = geo_array[valid_center_cells]

    # Create interpolator from valid points
    interpolator = NearestNDInterpolator(list(zip(x_valid, y_valid)), z_valid)

    # Check each corner for no data and interpolate if necessary
    for corner_name, (row, col) in corners.items():
        if geo_array[row, col] == no_data:
            # Interpolate the nearest valid value
           geo_array[row, col] = interpolator((col, row))

    # Extract non-no_data and center cells again after filling corners
    valid_center_cells = (geo_array != no_data)

    # Extract final x, y, z values after filling corners
    x_filled = cols[valid_center_cells]
    y_filled = rows[valid_center_cells]
    z_filled = geo_array[valid_center_cells]

    # Prepare final list of [x, y, z]
    xyz_filled = []
    for x_i, y_i, z_i in zip(x_filled, y_filled, z_filled):
        xyz_filled.append([x_i, y_i, z_i])

    return xyz_filled


def fill_raster(geo_array, nodata_value, transform):
    """
    Fill the no data values of a given raster using Laplace interpolation.
    ----
    Input:
    - geo_array (2d numpy array): cropped raster data.
    - nodata_value (int): nodata value to replace NAN after interplation with.
    - transform (rasterio transform): affine transform matrix.

    Output:
    - new_data[0, 1:-1, 1:-1] (2d numpy array): filled raster data with first and last rows and columns remove to ensure
                                                there are no nodata values.
    - new_transform (rasterio transform): affine transform matrix reflecting the one column one row removal shift.
    """

    # creating delaunay
    points = extract_center_cells(geo_array, no_data=nodata_value)
    dt = startinpy.DT()
    dt.insert(points, "BBox")

    # now interpolation
    new_data = np.copy(geo_array)

    # for interpolation, grid of all column and row positions, excluding the first and last rows/cols
    cols, rows = np.meshgrid(
        np.arange(1, geo_array.shape[1] - 1),
        np.arange(1, geo_array.shape[0] - 1)

    )

    # flatten the grid to get a list of all (col, row) locations
    locs = np.column_stack((cols.ravel(), rows.ravel()))
    interpolated_values = dt.interpolate({"method": "Laplace"}, locs)

    # reshape interpolated grid back to original
    interpolated_grid = np.reshape(interpolated_values, (geo_array.shape[0] - 2, geo_array.shape[1] - 2))

    # fill new_data with interpolated values
    new_data[1:-1, 1:-1] = interpolated_grid
    new_data = np.where(np.isnan(new_data), nodata_value, new_data)

    new_transform = transform * Affine.translation(1, 1)

    return new_data[1:-1, 1:-1], new_transform


def chm_finish(chm_array, dtm_array, transform, min_height=2, max_height=40):
    """
    Finish the CHM file by first removing the ground height. Then remove vegetation height
    below and above a certain range to ensure effective shade and remove noise.
    ----
    Input:
    - chm_array (2d numpy array):       cropped raster array of the CHM.
    - dtm_array (2d numpy array):       cropped raster array of the filled DSM.
    - transform (rasterio transform):   affine transform matrix.
    - min_height (float, optional):     minimal height for vegetation to be included.
    - max_height (float, optional):     maximum height for vegetation to be included.

    Output:
    - result_array (2d numpy array):    Array of the CHM with normalized height and min and max heights removed.
    - new_transform (rasterio transform): affine transform matrix reflecting the one column one row removal shift.
    """

    result_array = chm_array[1:-1, 1:-1] - dtm_array
    result_array[(result_array < min_height) | (result_array > max_height)] = 0
    result_array[np.isnan(result_array)] = 0

    new_transform = transform * Affine.translation(1, 1)

    return result_array, new_transform


def replace_buildings(filled_dtm, dsm_buildings, buildings_geometries, transform):
    """
    Replace the values of the filled dtm with the values of the filled dsm, if there is a building.
    ----
    Input:
    - filled_dtm (2d np array):         filled array of the cropped AHN dtm.
    - dsm_buildings (2d np array):      Filled array of the cropped AHN dsm.
    - building_geometries (list):       A list of the building geometries
    - transform (rasterio transform):   affine transform matrix.

    Output:
    - final_dsm (2d numpy array):   a np array representing the final dsm, containing only ground and building
                                    heights.

    """
    geometries = [shape(building['geometry']) for building in buildings_geometries]
    
    # Ensure mask has same shape as filled_dtm
    building_mask = geometry_mask(geometries, transform=transform, invert=False, out_shape=filled_dtm.shape)

    # Get shape differences
    dtm_shape = filled_dtm.shape
    dsm_shape = dsm_buildings.shape

    if dtm_shape != dsm_shape:
        # Compute the cropping offsets
        row_diff = dsm_shape[0] - dtm_shape[0]
        col_diff = dsm_shape[1] - dtm_shape[1]

        # Ensure even cropping from all sides (center alignment)
        row_start = row_diff // 2
        col_start = col_diff // 2
        row_end = row_start + dtm_shape[0]
        col_end = col_start + dtm_shape[1]

        # Crop dsm_buildings to match filled_dtm
        dsm_buildings = dsm_buildings[row_start:row_end, col_start:col_end]

    # Apply the mask
    final_dsm = np.where(building_mask, filled_dtm, dsm_buildings)

    return final_dsm


def load_buildings(buildings_path, layer):
    """
    Load in the building shapes from a geopackage file.
    ----
    Input:
    - buildings_path (string):   path to the geopackage file.
    - layer (string):            (Tile) name of the layer of buildings to be used

    Output:
    - List of dictionaries: A list of geometries in GeoJSON-like dictionary format.
      Each dictionary represents a building geometry with its spatial coordinates.
    """
    buildings_gdf = gpd.read_file(buildings_path, layer=layer)
    return [mapping(geom) for geom in buildings_gdf.geometry]


In [95]:
def write_output(dataset, crs, output, transform, name, change_nodata=False):
    """
    Write grid to .tiff file.
    ----
    Input:
    - dataset: Can be either a rasterio dataset (for rasters) or laspy dataset (for point clouds)
    - output (Array): the output grid, a numpy grid.
    - name (String): the name of the output file.
    - transform:
      a user defined rasterio Affine object, used for the transforming the pixel coordinates
      to spatial coordinates.
    - change_nodata (Boolean): true: use a no data value of -9999, false: use the datasets no data value
    """
    output_file = name    

    print(type(output))
    output = np.squeeze(output)
    print(output.shape)
    print(output.dtype)
    
    # Set the nodata value: use -9999 if nodata_value is True or dataset does not have nodata.
    if change_nodata:
        nodata_value = -9999
    else:
        try:
            nodata_value = dataset.nodata
            if nodata_value is None:
                raise AttributeError("No no data value found in dataset.")
        except AttributeError as e:
            print(f"Warning: {e}. Defaulting to -9999.")
            nodata_value = -9999
    
    
    # output the dataset
    with rasterio.open(output_file, 'w',
                       driver='GTiff',
                       height=output.shape[0],  # Assuming output is (rows, cols)
                       width=output.shape[1],
                       count=1,
                       dtype=np.float32,
                       crs=crs,
                       nodata=nodata_value,
                       transform=transform) as dst:
        dst.write(output, 1)
    print("File written to '%s'" % output_file)

In [89]:
def fill_dems(buildings, dsm, dsm_array, dtm, dtm_array):
    transform = dtm.transform
    filled_dtm, _ = fill_raster(dtm_array, dtm.nodata, transform)
    filled_dsm, new_transform = fill_raster(dsm_array, dsm.nodata, transform)
    final_dsm = replace_buildings(filled_dtm, filled_dsm, buildings, new_transform)
    print(type(final_dsm))
    
    pyproj_crs = pyproj.CRS.from_epsg(28992)  
    wkt_string = pyproj_crs.to_wkt()
    crs = CRS.from_wkt(wkt_string)
    
    
    write_output(dsm, crs, filled_dsm, new_transform, "output/filled_final_dsm.tif")
    write_output(dsm, crs, final_dsm, new_transform, "output/final_dsm.tif")
    write_output(dtm, crs, filled_dtm, new_transform, "output/final_dtm.tif")  
    
    

In [90]:
fill_dems(buildings, dsm, dsm_array, dtm, dtm_array)

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
(998, 998)
float32
File written to 'output/filled_final_dsm.tif'
<class 'numpy.ndarray'>
(998, 998)
float32
File written to 'output/final_dsm.tif'
<class 'numpy.ndarray'>
(998, 998)
float32
File written to 'output/final_dtm.tif'


# CHM

In [3]:
# Load the shapefile
shapefile_path = "geotiles\AHN_lookup.geojson"
gdf = gpd.read_file(shapefile_path)

In [17]:
def find_tiles(x_min, y_min, x_max, y_max):
    query_geom = box(x_min, y_min, x_max, y_max) 
    matches = gdf.sindex.query(query_geom) # ,  predicate="overlaps": tricky i want to still get something if it is all contained in one
    return gdf.iloc[matches]["GT_AHNSUB"].tolist()

# Example query
search_bounds = (94500, 469500, 95000, 470000)  # Search area
matching_tiles = find_tiles(*search_bounds)

print("Tiles covering the area:", matching_tiles)

Tiles covering the area: ['30FN1_20', '30FN1_25', '30FN2_21', '30FN2_16']


In [18]:
def download_las_tiles(tile_names, output_folder):
    """
    Download LAZ files for each GeoTiles subtile from the found tiles.
    ------
    Input:
    - tile_names (list): List of subtiles to download.
    - output_folder (str): Path to the folder where the downloaded files should be saved.
    Output:
    - none:  The function writes output files directly to the specified `output_folder{subtile}`.
    """

    base_url = "https://geotiles.citg.tudelft.nl/AHN5_T"
    os.makedirs(output_folder, exist_ok=True)

    for full_tile_name in tile_names:
        # Extract tile name and sub-tile number
        if '_' in full_tile_name:
            tile_name, sub_tile = full_tile_name.split('_')
        else:
            print(f"Skipping invalid tile entry: {full_tile_name}")
            continue

        # Construct the URL and file path
        sub_tile_str = f"_{int(sub_tile):02}"  # Convert sub_tile to zero-padded two-digit number
        url = f"{base_url}/{tile_name}{sub_tile_str}.LAZ"
        filename = f"{tile_name}{sub_tile_str}.LAZ"
        file_path = os.path.join(output_folder, filename)

        # Check if the file already exists to avoid re-downloading
        if os.path.exists(file_path):
            print(f"File {file_path} already exists, skipping download.")
            continue

        # Attempt to download the file
        try:
            response = requests.get(url)
            response.raise_for_status()  # Ensure the request was successful
            with open(file_path, 'wb') as f:
                f.write(response.content)
            print(f"Downloaded and saved {file_path}")
        except requests.exceptions.RequestException as e:
            print(f"Failed to download {url}: {e}")

In [20]:
download_las_tiles(matching_tiles, "temp")

Downloaded and saved temp\30FN1_20.LAZ
Downloaded and saved temp\30FN1_25.LAZ
Downloaded and saved temp\30FN2_21.LAZ
Downloaded and saved temp\30FN2_16.LAZ


In [21]:
def raster_center_coords(min_x, max_x, min_y, max_y, resolution):
    """
    Compute the center xy coordinates of a grid.
    ----
    Input:
    - min_x, max_x, min_y, max_y(float): Minimum and maximum x and y coordinates of the grid.
    - resolution (float): The length of each cell, function can only be used for square cells.

    Output:
    - grid_center_x: a grid where each cell contains the value of its center point's x coordinates.
    - grid_center_y: a grid where each cell contains the value of its center point's y coordinates.
    """
    # create coordinates for the x and y border of every cell.
    x_coords = np.arange(min_x, max_x, resolution)  # x coordinates expand from left to right.
    y_coords = np.arange(max_y, min_y, -resolution)  # y coordinates reduce from top to bottom.

    # create center point coordinates for evey cell.
    grid_x, grid_y = np.meshgrid(x_coords, y_coords)
    grid_center_x = grid_x + resolution / 2
    grid_center_y = grid_y - resolution / 2
    return grid_center_x, grid_center_y


In [110]:
def median_filter_chm(chm_array, nodata_value=-9999, size=3):
    """
    Apply a median filter to a CHM, handling NoData values.
    -----
    Parameters:
    - chm_array (np.ndarray): The array representing the height values of the CHM.
    - nodata_value (float): Value representing NoData in the input raster.
    - size (int): Size of the median filter. It defines the footprint of the filter.

    Returns:
    - smoothed_chm (np.ndarray): The smoothed CHM array.
    """
    # Create a mask for valid data
    valid_mask = chm_array != nodata_value

    # Pad the data with nodata_value
    pad_width = size // 2
    padded_chm = np.pad(chm_array, pad_width, mode='constant', constant_values=nodata_value)

    # Apply median filter to padded data
    filtered_padded = median_filter(padded_chm.astype(np.float32), size=size)

    # Remove padding
    smoothed_chm = filtered_padded[pad_width:-pad_width, pad_width:-pad_width]

    # Only keep valid data in smoothed result
    smoothed_chm[~valid_mask] = nodata_value

    return smoothed_chm

def extract_vegetation_points(LasData, ndvi_threshold=0.1, pre_filter=False):
    """
    Extract vegetation points based on classification and NDVI threshold.
    ------
    Input:
    - LasData (laspy.LasData): Input point cloud data in LAS format.
    - ndvi_threshold (float): The NDVI threshold for identifying vegetation points.
                              NDVI values greater than this threshold are considered vegetation.
    - pre_filter (bool): If True, applies an additional filter to remove vegetation points below a certain height
                         threshold (1.5 meters above the lowest vegetation point).
    Output:
    - laspy.LasData: A new LasData object containing only the filtered vegetation points based on the specified criteria.
    """


    # Filter points based on classification (vegetation-related classes), note: vegetation classes are empty in AHN4
    possible_vegetation_points = LasData[(LasData.classification == 1) |  # Unclassified
                                         (LasData.classification == 3) |  # Low vegetation
                                         (LasData.classification == 4) |  # Medium vegetation
                                         (LasData.classification == 5)]  # High vegetation

    # Calculate NDVI
    red = possible_vegetation_points.red
    nir = possible_vegetation_points.nir
    ndvi = (nir.astype(float) - red) / (nir + red)

    # Filter the points whose NDVI is greater than the threshold
    veg_points = possible_vegetation_points[ndvi > ndvi_threshold]

    # Option: already filter away the points with a height below 1.5m from the lowest veg point, introduced because
    # of one very large tile (25GN2_24.LAZ)
    if pre_filter:
        heights =veg_points.z
        min_height = heights.min()

        # Filter out points with heights between the minimum height and 1.5 meters
        filtered_veg_points = veg_points[(heights <= min_height) | (heights > 1.5)]
        return filtered_veg_points
    return veg_points


def chm_creation(LasData, vegetation_data, output_filename, resolution=0.5, smooth=False, nodata_value=-9999, filter_size=3):
    """
    Create a CHM from LiDAR vegetation data and save it as a raster.
    -------
    Input:
    - LasData (laspy.LasData):      Input LiDAR point cloud data used for metadata and output CRS.
    - vegetation_data (tuple):      A tuple containing:
                        - veg_raster (numpy.ndarray): The array representing the height values of vegetation.
                        - grid_centers (tuple of numpy.ndarrays): Contains two arrays (x, y) with the coordinates
                          of the center points of each grid cell.
    - output_filename (str): The name of the output .tif file for saving the CHM.
    - resolution (float, optional): The spatial resolution of the output raster in the same units as the input data
                                    (default: 0.5).
    - smooth (bool, optional): If True, applies a median filter to smooth the CHM.
    - nodata_value (float, optional): The value for NoData pixels (default: -9999).
    - filter_size (int, optional): Size of the median filter (default: 3).

    Output:
    - None: The function saves the CHM as a raster file (.tif) to the specified output path.
    """
    veg_raster = vegetation_data[0]
    grid_centers = vegetation_data[1]
    top_left_x = grid_centers[0][0, 0] - resolution / 2
    top_left_y = grid_centers[1][0, 0] + resolution / 2

    transform =Affine.translation(top_left_x, top_left_y) * Affine.scale(resolution, -resolution)

    if smooth:
        veg_raster = median_filter_chm(veg_raster, nodata_value=nodata_value, size=filter_size)
    
    print(veg_raster)

    write_output(LasData, crs, veg_raster, transform, output_filename, True)


def interpolation_vegetation(LasData, veg_points, resolution, no_data_value=-9999):
    """
    Create a vegetation raster using Laplace interpolation.

    InpurL
    - LasData (laspy.LasData):          Input LiDAR point cloud data.
    - veg_points (laspy.LasData):       Vegetation points to be interpolated.
    - resolution (float):               Resolution of the raster.
    - no_data_value (int, optional):    Value for no data

    Returns:
    - interpolated_grid (np.ndarray): Generated raster for vegetation.
    - grid_center_xy (tuple): Grid of x, y center coordinates for each raster cell.
    """

    # Extents of the pc
    min_x, max_x = round(LasData.x.min()), round(LasData.x.max())
    min_y, max_y = round(LasData.y.min()), round(LasData.y.max())

    # Define size of the region
    x_length = max_x - min_x
    y_length = max_y - min_y

    # Number of rows and columns
    cols = round(x_length / resolution)
    rows = round(y_length / resolution)

    # Initialize raster grid
    vege_raster = np.full((rows, cols), no_data_value, dtype=np.float32)

    # Calculate center coords for each grid cell
    grid_center_xy = raster_center_coords(min_x, max_x, min_y, max_y, resolution)

    if veg_points.x.shape[0] == 0:
        print("There are no vegetation points in the current area.")
        vege_raster = np.full((rows, cols), -200, dtype=np.float32)
        return vege_raster, grid_center_xy

    # create the delaunay triangulation
    dt = startinpy.DT()
    dt.insert(veg_points.xyz, "BBox")

    # Flatten the grid to get a list of all center coords
    locs = np.column_stack((grid_center_xy[0].ravel(), grid_center_xy[1].ravel()))

    vegetation_points = np.column_stack((veg_points.x, veg_points.y))
    tree = cKDTree(vegetation_points)

    # Find the distance to the nearest vegetation point for each grid cell
    distances, _ = tree.query(locs, k=1)

    distance_threshold = 1
    # masking cells that exceed threshold
    within_threshold_mask = distances <= distance_threshold
    # Interpolation only for those near
    valid_locs = locs[within_threshold_mask]

    # laplace interpolation
    interpolated_values = dt.interpolate({"method": "Laplace"}, valid_locs)

    # reshape interpolated grid back to og
    interpolated_grid = np.full_like(vege_raster, no_data_value, dtype=np.float32)  # Start with no_data
    interpolated_grid.ravel()[within_threshold_mask] = interpolated_values

    return interpolated_grid, grid_center_xy



In [120]:
def dem_creation(LasData, data, output_filename, resolution=0.5, smooth=False, nodata_value=-9999, filter_size=3):
    raster = data[0]
    grid_centers = data[1]
    top_left_x = grid_centers[0][0, 0] - resolution / 2
    top_left_y = grid_centers[1][0, 0] + resolution / 2

    transform =Affine.translation(top_left_x, top_left_y) * Affine.scale(resolution, -resolution)


    write_output(LasData, crs, raster, transform, output_filename, True)


def interpolation(LasData, points, resolution, no_data_value=-9999):
    """
    Create a vegetation raster using Laplace interpolation.

    InpurL
    - LasData (laspy.LasData):          Input LiDAR point cloud data.
    - veg_points (laspy.LasData):       Vegetation points to be interpolated.
    - resolution (float):               Resolution of the raster.
    - no_data_value (int, optional):    Value for no data

    Returns:
    - interpolated_grid (np.ndarray): Generated raster for vegetation.
    - grid_center_xy (tuple): Grid of x, y center coordinates for each raster cell.
    """

    # Extents of the pc
    min_x, max_x = round(LasData.x.min()), round(LasData.x.max())
    min_y, max_y = round(LasData.y.min()), round(LasData.y.max())

    # Define size of the region
    x_length = max_x - min_x
    y_length = max_y - min_y

    # Number of rows and columns
    cols = round(x_length / resolution)
    rows = round(y_length / resolution)

    # Initialize raster grid
    vege_raster = np.full((rows, cols), no_data_value, dtype=np.float32)

    # Calculate center coords for each grid cell
    grid_center_xy = raster_center_coords(min_x, max_x, min_y, max_y, resolution)

    if points.x.shape[0] == 0:
        print("There are no vegetation points in the current area.")
        vege_raster = np.full((rows, cols), -200, dtype=np.float32)
        return vege_raster, grid_center_xy

    # create the delaunay triangulation
    dt = startinpy.DT()
    dt.insert(points.xyz, "BBox")

    # Flatten the grid to get a list of all center coords
    locs = np.column_stack((grid_center_xy[0].ravel(), grid_center_xy[1].ravel()))


    # laplace interpolation
    interpolated_values = dt.interpolate({"method": "Laplace"}, locs)

    # reshape interpolated grid back to og
    interpolated_grid = np.full_like(vege_raster, no_data_value, dtype=np.float32)  # Start with no_data
    interpolated_grid.ravel()[:] = interpolated_values

    return interpolated_grid, grid_center_xy



In [92]:
def filter_points_within_bounds(las_data, bounds):
    """Filter points within the given bounding box."""
    x_min, y_min, x_max, y_max = bounds
    mask = (
        (las_data.x >= x_min) & (las_data.x <= x_max) &
        (las_data.y >= y_min) & (las_data.y <= y_max)
    )
    return las_data[mask]

def merge_las_files(laz_files, bounds, merged_output):
    """Efficiently merge multiple LAZ files into a single LAS dataset, correcting positions."""
    merged_output = Path(merged_output)
    
    las_merged = None 
    all_points = [] 
    merged_scales = None
    merged_offset = None
    
    if merged_output.exists():
        with laspy.open(merged_output) as las:
            las_merged = las.read()
        return las_merged
    


    for file in laz_files:
        with laspy.open(file) as las:
            las_data = las.read()
            cropped_las = filter_points_within_bounds(las_data, bounds)

            if las_merged is None:
                # Initialize merged LAS file using the first input file
                las_merged = laspy.LasData(las_data.header)
                las_merged.points = cropped_las.points
                
                merged_scales =  las_merged.header.scales
                merged_offset = las_merged.header.offset
            else:
                
                scale = las_data.header.scales
                offset = las_data.header.offsets
                # Convert integer coordinates to real-world values & Transform into merged coordinate system
                new_x = ((cropped_las.X * scale[0] + offset[0]) - merged_offset[0]) / merged_scales[0]
                new_y = ((cropped_las.Y * scale[1] + offset[1]) - merged_offset[1]) / merged_scales[1]
                new_z = ((cropped_las.Z * scale[2] + offset[2]) - merged_offset[2]) / merged_scales[2]

                # Copy points and update X, Y, Z
                new_points = cropped_las.points
                new_points["X"] = new_x.astype(np.int32)  
                new_points["Y"] = new_y.astype(np.int32)
                new_points["Z"] = new_z.astype(np.int32)

                all_points.append(new_points.array) 

    # Final merge step 
    if las_merged is not None:
        if all_points:
            all_points.append(las_merged.points.array) 
            
            merged_array = np.concatenate(all_points, axis=0)
            las_merged.points = laspy.ScaleAwarePointRecord(merged_array, las_merged.header.point_format, las_merged.header.scales, las_merged.header.offsets)

        las_merged.write(str(merged_output))

    return las_merged

In [121]:
def process_laz_files(input_folder, output_folder, search_bounds, merged_output="output/pointcloud.las", ndvi_threshold=0.0, resolution=0.5, remove=False, smooth_chm=True,
                      filter_size=3, pre_filter=False, run=0):
    """
    Process a folder of LAZ files to extract vegetation points and generate Canopy Height Models (CHMs).
    -------
    Input:
    - input_folder (str):       The folder containing folders with the input .LAZ files.
    - output_folder (str):      The folder where the output CHM .tif files will be saved.
    - ndvi_threshold (float):   The NDVI threshold for classifying vegetation points.
    - resolution (float):       The resolution of the output CHM rasters, defining the size of each pixel (default: 0.5).
    - remove (bool):            If True, deletes the original .LAZ files after processing (default: False).
    - smooth_chm (bool):        If True, applies smoothing to the CHM using a median filter (default: False).
    - filter_size (int):        Size of the median filter to use if smoothing (default: 3).
    - pre_filter (boolean):     If True, deletes points 1.5 m above lowest potential vegetation point before NDVI
                                filtering (default: False).

    Output: 
    - None: The function processes each .LAZ file, creates corresponding CHM .tif files, and saves them to the output folder.
            Optionally deletes the original .LAZ files if `remove` is set to True.
    """

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    laz_files = [os.path.join(input_folder, file) for file in os.listdir(input_folder) if file.endswith(".LAZ")]

    if not laz_files:
        print("No LAZ files found in the input folder or its subfolders.")
        return
    
    las_data = merge_las_files(laz_files, search_bounds, merged_output)

    if las_data is None:
        print("No valid points found in the given boundary.")
        return

    # Extract vegetation points
    # veg_points = extract_vegetation_points(las_data, ndvi_threshold=ndvi_threshold, pre_filter=pre_filter)
    # 
    # vegetation_data = interpolation_vegetation(las_data, veg_points, 0.5)
    # output_filename = os.path.join(output_folder, f"CHM_{run}.TIF")
    # 
    # # Create the CHM and save it
    # chm_creation(las_data, vegetation_data, output_filename, resolution=resolution, smooth=smooth_chm, nodata_value=-9999,
    #              filter_size=filter_size)
    # 
    # DTM AND DSM CREATION

    building_ground_points = las_data[(las_data.classification == 2) | (las_data.classification == 6)]
    ground_points =  building_ground_points[(building_ground_points.classification == 2)]
    all_data = interpolation(las_data, building_ground_points, 0.5)
    dem_creation(las_data, all_data, "testing.tif")

    # if remove:
    #     os.remove(file_path)
    #     print(f"Deleted file: {file_path}")


In [122]:
input = "temp"
output = "output"
new_bounds =(94500, 469500, 96000, 470500)
process_laz_files(input, output, search_bounds)


<class 'numpy.ndarray'>
(1000, 1000)
float32
File written to 'testing.tif'
