In [1]:
import os
import gc
import cv2
import json
import pdal

import whitebox
import rasterio
import traceback
import subprocess

import numpy as np
import geopandas as gpd

from tqdm import tqdm

from shapely.geometry import Point, Polygon
from rasterio.transform import xy
from multiprocessing import Pool, cpu_count

from sklearn.decomposition import PCA
from skimage.restoration import denoise_tv_chambolle

# whitebox.download_wbt(linux_musl=True, reset=True)

wbt = whitebox.WhiteboxTools()
wbt.verbose = False

In [None]:
raw_lidar_dir = "../data/point_cloud/merged_point_cloud"
ground_lidar_dir = "../data/point_cloud/ground_lidar"

intensity_tif_dir = "../data/point_cloud/intensity_tif"
elevation_tif_dir = "../data/point_cloud/elevation_tif"

os.makedirs(ground_lidar_dir, exist_ok=True)
os.makedirs(intensity_tif_dir, exist_ok=True)
os.makedirs(elevation_tif_dir, exist_ok=True)

# Generate Ground LiDAR (PDAL CSF)

In [None]:
raw_lidar_files = [f for f in os.listdir(raw_lidar_dir) if f.endswith('.laz') or f.endswith('.las')]

In [None]:
pipeline_template = {
    "pipeline": [
        {
        "type" : "readers.las",
        "filename" : "input.las"
        },
        {
            "type": "filters.range",
            "limits": "Classification[0:18]"
        },
        {
            "type": "filters.csf"
        },
        {
            "type": "filters.range",
            "limits": "Classification[2:2]"
        },
        {
        "type" : "writers.las",
        "filename" : "output.las"
        }
    ]
}


def process_file(raw_lidar):
    input_path = os.path.join(raw_lidar_dir, raw_lidar)
    output_filename = f"ground_{raw_lidar.replace('.laz', '.las')}"
    output_path = os.path.join(ground_lidar_dir, output_filename)

    if os.path.exists(output_path):
        return f"Skipped: {raw_lidar}"

    pipeline = json.loads(json.dumps(pipeline_template))  # deep copy
    pipeline["pipeline"][0]['filename'] = input_path
    pipeline["pipeline"][-1]['filename'] = output_path

    try:
        pipeline_obj = pdal.Pipeline(json.dumps(pipeline))
        pipeline_obj.execute()
        return f"Processed: {raw_lidar}"
    except RuntimeError as e:
        return f"Failed: {raw_lidar} with error {e}"

In [None]:
if __name__ == "__main__":
    with Pool(processes=cpu_count()) as pool:
        results = list(tqdm(pool.imap_unordered(process_file, raw_lidar_files), total=len(raw_lidar_files)))
    
    for res in results:
        print(res)

In [None]:
def las_folder_to_bbox_gpkg(folder_path, output_gpkg):
    polygons = []
    filenames = []
    num_pts = []

    for filename in tqdm(os.listdir(folder_path)):
        if filename.lower().endswith('.las') or filename.lower().endswith('.laz'):
            filepath = os.path.join(folder_path, filename)
            # Get bbox using pdal info
            result = subprocess.run(['pdal', 'info', filepath], capture_output=True, text=True)
            info = json.loads(result.stdout)
            try:
                bounds = info["stats"]['bbox']['native']['bbox']
                points = info["stats"]['statistic'][0]['count']
                num_pts.append(points)
                
                min_x, max_x = bounds['minx'], bounds['maxx']
                min_y, max_y = bounds['miny'], bounds['maxy']
                # We don't use Z here for 2D polygons

                # Create a polygon from bbox corners
                poly = Polygon([
                    (min_x, min_y),
                    (max_x, min_y),
                    (max_x, max_y),
                    (min_x, max_y),
                    (min_x, min_y)  # Close the polygon
                ])

                polygons.append(poly)
                filenames.append(filename)

            except Exception:
                num_pts.append(0)
                polygons.append(None)
                filenames.append(filename)

    # Create a GeoDataFrame
    gdf = gpd.GeoDataFrame({'filename': filenames, 'num_pts': num_pts, 'geometry': polygons}, crs="EPSG:2056")  # You can set your real CRS here!

    # Save to GPKG
    gdf.to_file(output_gpkg, driver="GPKG")

# Example usage:
output_gpkg = 'ground_lidar_bbox.gpkg'
las_folder_to_bbox_gpkg(ground_lidar_dir, output_gpkg)

print("✅ Saved bounding boxes to GPKG successfully.")


# Interpolate raster

In [7]:
las_files = [f for f in os.listdir(ground_lidar_dir) if f.startswith('ground_') and f.endswith('.las')]

def process_las_file(las_file):
    input_path = os.path.join(ground_lidar_dir, las_file)
    elevation_raster = os.path.join(elevation_tif_dir, f"elevation_{las_file.lstrip('ground_').replace('.las', '.tif')}")
    intensity_raster = os.path.join(intensity_tif_dir, f"intensity_{las_file.lstrip('ground_').replace('.las', '.tif')}")

    if not os.path.exists(intensity_raster):
        wbt.lidar_idw_interpolation(
            input_path,
            intensity_raster,
            parameter="intensity",
            returns="first",
            resolution=0.02,
            radius=0.1,
            weight=2.0
        )


    if not os.path.exists(elevation_raster):
        wbt.lidar_idw_interpolation(
            input_path,
            elevation_raster,
            parameter="elevation",
            returns="first",
            resolution=0.02,
            radius=0.1,
            weight=2.0
        )
    
    return None

In [None]:
with Pool(processes=int(cpu_count()/2)) as pool:
    results = list(tqdm(pool.imap_unordered(process_las_file, las_files), total=len(las_files)))

for r in results:
    print(r)

# 2D GT (Circle detection)

In [None]:
def preprocess(img, nodata_value):
    """
    Preprocess the image for Hough Circle detection.
    Args:
        img: Input image (2D NumPy array).
        nodata_value: Value representing no data in the image.
    Returns:
        Preprocessed image (2D NumPy array).
    """
    img[img == nodata_value] = np.median(img[img>=0])
    img = np.clip(img, 25000, 45000)
    img = cv2.medianBlur(img, 5)

    img = cv2.normalize(img, None, 0, 255, cv2.NORM_MINMAX)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    img = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)
    img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)

    img = img.astype(np.uint8)
    img = denoise_tv_chambolle(img, weight=0.2, channel_axis=None) * 255
    img = img.astype(np.uint8)

    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(64,64))
    img = clahe.apply(img)
    return img

In [None]:
def preprocess(img):
    """
    Preprocess the image for Hough Circle detection with contrast enhancement using quantile clipping.
    Args:
        img: Input image (2D NumPy array).
        nodata_value: Value representing no data in the image.
        low_q: Lower quantile for contrast clipping (default 2%).
        high_q: Upper quantile for contrast clipping (default 98%).
    Returns:
        Preprocessed image (2D NumPy array).
    """
    valid_mask = img >= 0
    median_val = np.median(img[valid_mask])
    img[~valid_mask] = median_val

    # Compute quantiles and clip
    img = np.clip(img, 0, 1200)

    img = cv2.medianBlur(img, 5)

    # Normalize to 0–255 range
    img = cv2.normalize(img, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    img = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)
    img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)

    # img = img.astype(np.uint8)
    # img = denoise_tv_chambolle(img, weight=0.2, channel_axis=None) * 255
    img = img.astype(np.uint8)

    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(64, 64))
    img = clahe.apply(img)
    
    return img


def is_circle_flat(points, epsilon=0.15):
    """
    Fits a plane to 3D points and checks if the points lie approximately on that plane.
    
    Args:
        points: (N, 3) array-like list of 3D points.
        epsilon: Threshold for maximum allowed deviation from the plane.
        
    Returns:
        is_flat (bool): True if all points lie within epsilon of the plane.
        normal (np.ndarray): The normal vector of the fitted plane.
        max_distance (float): Maximum distance from points to the plane.
    """

    # Input validation
    if len(points) < 3:
        return False
    
    points = np.asarray(points, dtype=np.float64)
    centroid = points.mean(axis=0)
    centered = points - centroid

    # Check rank deficiency
    if np.linalg.matrix_rank(centered) < 2:
        return False
    
    try:
        # Regularized SVD approach
        _, _, vh = np.linalg.svd(centered + 1e-10*np.random.randn(*centered.shape), 
                                full_matrices=False)
        normal = vh[-1]
        
        # Safe normalization
        norm = np.linalg.norm(normal)
        if norm < 1e-10:
            return False, None
        
    except Exception:
        # Final fallback to PCA if everything else fails

        pca = PCA(n_components=3)
        pca.fit(points)
        normal = pca.components_[2]
        norm = np.linalg.norm(normal)
        if norm < 1e-10:
            return False, None
        print(f"Using PCA for normal vector.")

    normal /= norm

    # Compute distances to plane
    distances = np.dot(centered, normal)
    max_distance = np.abs(np.max(distances) - np.min(distances))

    return int(max_distance < epsilon), max_distance


def to_gdf(circles, transform, crs, elevation_raster):
    """
    Converts detected circles in image coordinates to a GeoDataFrame with 3D polygons.

    Parameters:
    - circles: List of detected circles with each entry as [x_pix, y_pix, r_pix].
    - transform: Affine transform of the raster.
    - crs: Coordinate Reference System of the raster.
    - elevation: 2D NumPy array representing the elevation raster.

    Returns:
    - GeoDataFrame with 3D circle polygons.
    """
    geometries = []
    data = []

    with rasterio.open(elevation_raster) as src_elevation:
        elevation = src_elevation.read(1)
        elev_nodata = src_elevation.profile['nodata']

    for x_pix, y_pix, r_pix in circles:
        # Create a point and buffer it to get a circle polygon
        # adding 2 pixels to the radius to fully cover the circle
        r_pix = r_pix + 1
        center_pix = Point(x_pix, y_pix)
        circle_pix = center_pix.buffer(r_pix)

        img_coords = np.array(circle_pix.exterior.coords).round().astype(int)
        img_coords[:, 1] = np.clip(img_coords[:, 1], 0, elevation.shape[0] - 1)
        img_coords[:, 0] = np.clip(img_coords[:, 0], 0, elevation.shape[1] - 1)
        z_map = elevation[img_coords[:, 1], img_coords[:, 0]]


        if sum(z_map == elev_nodata) > 0 and sum(z_map != elev_nodata) > 0:
            # Replace nodata values with the median of the valid elevation values
            z_map[z_map == elev_nodata] = np.median(z_map[z_map != elev_nodata])
        
        elif sum(z_map == elev_nodata) == len(z_map):
            # If all values are nodata, skip this circle
            print(f"All elevation values are nodata for circle at ({x_pix}, {y_pix})")
            z_map = np.zeros_like(z_map)

        # Calculate center coordinates in map space
        x_center, y_center = xy(transform, y_pix, x_pix)

        # Pixel size (assuming square pixels)
        pixel_size = (transform.a + abs(transform.e)) / 2.0
        r_map = r_pix * pixel_size

        # Create a point and buffer it to get a circle polygon
        center_point = Point(x_center, y_center)
        circle_polygon = center_point.buffer(r_map)

        # Convert to 3D polygon with Z elevation
        coords_3d = [(x, y, z) for x, y, z in np.concatenate([np.array(circle_polygon.exterior.coords), z_map[:, None]], axis=1)]

        try:
            # Check if the circle is flat
            is_flat, max_distance = is_circle_flat(coords_3d, epsilon=0.15)

        except Exception as e:
            print(f"Error checking flatness: {e}")
            continue
        
        circle_polygon = Polygon(coords_3d)

        # Append to lists
        geometries.append(circle_polygon)
        data.append({
            'x_center': x_center,
            'y_center': y_center,
            'radius_pix': r_pix,
            'radius_map': r_map,
            'z_center': np.median(z_map),
            'is_flat': is_flat,
            'max_distance': max_distance
        })

    return gpd.GeoDataFrame(data, geometry=geometries, crs=crs)


def save_empty_gpkg(output_path, crs="EPSG:2056"):  # Using Swiss CRS as example
    # Create empty GeoDataFrame with correct schema
    empty_gdf = gpd.GeoDataFrame(columns=[
        'x_center', 
        'y_center', 
        'radius_pix', 
        'radius_map', 
        'z_center', 
        'is_flat',
        'max_distance',
        'geometry'
    ], crs=crs)
    
    # Save to file
    empty_gdf.to_file(output_path, driver="GPKG")


def process_file(file_name):
    """
    Process a single file to detect manholes and save results to a GeoPackage.
    Args:
        file_name: Name of the input intensity raster file.
    Returns:
        Number of detected manholes or None if no circles were found.
    """
    
    # Define directories (consistant to previous definition)
    intensity_tif_dir = "path_to_intensity_tif_dir"
    elevation_tif_dir = "path_to_elevation_tif_dir"
    gt_3d_dir = 'path_to_3d_ground_truth_dir'

    if os.path.exists(os.path.join(gt_3d_dir, file_name.replace('.tif', '.gpkg'))):
        return None  
          
    elevation_raster = os.path.join(elevation_tif_dir, file_name.replace('intensity', 'elevation'))
    intensity_raster = os.path.join(intensity_tif_dir, file_name)

    try:
        # Load intensity first and immediately process/release
        with rasterio.open(intensity_raster) as src_intensity:
            intensity = src_intensity.read(1)
            inten_nodata = src_intensity.profile['nodata']
            transform = src_intensity.transform
            crs = src_intensity.crs
        
        # Process and immediately clear intermediate arrays
        intensity = preprocess(intensity)
        processed_img = intensity.copy()
        del intensity  # Explicitly free memory

        circles = cv2.HoughCircles(
            processed_img,
            cv2.HOUGH_GRADIENT,
            dp=1.0,
            minDist=25,     # minimum distance between circle centers
            param1=80,      # higher threshold for Canny edge detector
            param2=20,      # accumulator threshold (lower is more sensitive)
            minRadius=10,
            maxRadius=25
        )

        if circles is not None:
            # print(f'{len(detected_circles)} manholes detected in {file_name}')
            gdf = to_gdf(circles[0], transform, crs, elevation_raster)
            # return gdf
            gdf.to_file(os.path.join(gt_3d_dir, file_name.replace('.tif', '.gpkg')), driver="GPKG")
            return len(circles)
        else:
            # Create empty file when no circles found
            output_path = os.path.join(gt_3d_dir, file_name.replace('.tif', '.gpkg'))
            print(f"No circles found in {file_name}. Saving empty GeoPackage.")
            save_empty_gpkg(output_path, crs)
            return None
        
    except MemoryError:
        print(f"Memory error processing {file_name}")
        return None
    except cv2.error as e:
        print(f"OpenCV error in {file_name}: {e}")
        return None
    except rasterio.errors.RasterioError as e:
        print(f"Rasterio error in {file_name}: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error in {file_name}: {traceback.format_exc()}")
        return None

    finally:
        # Clean up
        gc.collect()


In [None]:
file_list = [f for f in os.listdir(intensity_tif_dir) if f.endswith('.tif')]

with Pool(processes=4) as pool:
    results = list(tqdm(pool.imap_unordered(process_file, file_list), total=len(file_list)))

  0%|          | 0/58 [00:00<?, ?it/s]

 33%|███▎      | 19/58 [01:13<01:51,  2.87s/it]

No circles found in intensity_merged_2682600.0_1247600.0.tif. Saving empty GeoPackage.


100%|██████████| 58/58 [03:12<00:00,  3.32s/it]


In [None]:
# merge all gpkg to a single file
gt_3d_dir = 'path_to_3d_ground_truth_dir'
file_name = 'init_3d_proposal.gpkg'
cmd = f'ogrmerge.py -f GPKG -o ../{file_name} -single *.gpkg'

# Change to the directory
os.chdir(gt_3d_dir)

# Run the command
subprocess.run(cmd, shell=True, check=True)

gdf_gt = gpd.read_file(f'../{file_name}')
gdf_gt = gdf_gt.set_crs(epsg=2056, allow_override=True)
gdf_gt.to_file(f'../{file_name}', driver='GPKG')

# Transform to EPSG:32632
# gdf_transformed = gdf_gt.to_crs(epsg=32632)
# # Replace 'transformed_file.gpkg' with your desired output file name
# gdf_transformed.to_file('../init_3d_proposal_32632.gpkg', driver='GPKG')

# 3D GT - Smooth elevation

As point cloud has limited density and coverage, the elevation raster might have missing values. Also, above-ground objects are sometimes kept as a drawback of CSF algorithmn. Therefore, it is necessary to smooth the Z value from elevation raster. Here, we remove outliers further than two times of standard deviation and fit 3D polygon to a ellipse plane.

In [None]:
import numpy as np
import geopandas as gpd
from shapely.geometry import Polygon


def smooth_gpkg_polygons(input_gpkg, layer, output_gpkg=None):

    def fit_plane_to_points(points):
        # Fit plane z = ax + by + c
        X = np.c_[points[:,0], points[:,1], np.ones(points.shape[0])]
        y = points[:,2]
        coeffs, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
        a, b, c = coeffs
        return a, b, c

    def project_points_to_plane(points, a, b, c):
        # For each (x, y), compute z = a*x + b*y + c
        x = points[:,0]
        y = points[:,1]
        z = a * x + b * y + c
        return np.stack([x, y, z], axis=1)

    def fit_ellipse_2d(xy):
        # Fit ellipse to 2D points (x, y)
        # Direct least squares method (Fitzgibbon et al. 1999)
        x = xy[:,0][:,np.newaxis]
        y = xy[:,1][:,np.newaxis]
        D = np.hstack((x*x, x*y, y*y, x, y, np.ones_like(x)))
        S = np.dot(D.T, D)
        C = np.zeros([6,6])
        C[0,2] = C[2,0] = 2
        C[1,1] = -1
        # Solve generalized eigenvalue problem
        try:
            import scipy.linalg
            eigvals, eigvecs = scipy.linalg.eig(S, C)
            cond = np.isreal(eigvals)
            eigvals = eigvals[cond]
            eigvecs = eigvecs[:,cond]
            a = eigvecs[:, np.argmax(np.abs(eigvals))]
            a = np.real(a)
        except Exception:
            # fallback: return mean/var ellipse
            mean = xy.mean(axis=0)
            cov = np.cov(xy.T)
            vals, vecs = np.linalg.eigh(cov)
            order = vals.argsort()[::-1]
            vals = vals[order]
            vecs = vecs[:,order]
            a = np.array([vals[0], 0, vals[1], 0, 0, -vals[0]*vals[1]])
            return mean, vecs, np.sqrt(vals)
        return a

    def ellipse_to_polygon(a, num_points=32):
        # Convert ellipse parameters to polygon points
        # a: [A, B, C, D, E, F] for Ax^2 + Bxy + Cy^2 + Dx + Ey + F = 0
        # We'll sample points on the ellipse
        # Use center, axes, angle extraction
        A,B,C,D,E,F = a
        # Center
        denom = B*B - 4*A*C
        if denom == 0:
            return None
        x0 = (2*C*D - B*E) / denom
        y0 = (2*A*E - B*D) / denom
        # Angle
        if B == 0:
            if A < C:
                angle = 0
            else:
                angle = np.pi/2
        else:
            angle = 0.5 * np.arctan2(B, A - C)
        # Axes
        up = 2*(A*E*E + C*D*D + F*B*B - 2*B*D*E - A*C*F)
        down1 = (B*B - 4*A*C)*( (C-A)*np.sqrt(1 + 4*B*B/((A-C)*(A-C))) - (C+A) )
        down2 = (B*B - 4*A*C)*( -(C-A)*np.sqrt(1 + 4*B*B/((A-C)*(A-C))) - (C+A) )
        if down1 == 0 or down2 == 0:
            return None
        a_len = np.sqrt(np.abs(up/down1))
        b_len = np.sqrt(np.abs(up/down2))
        # Sample points
        t = np.linspace(0, 2*np.pi, num_points, endpoint=False)
        pts = np.stack([a_len*np.cos(t), b_len*np.sin(t)], axis=1)
        # Rotate and shift
        R = np.array([[np.cos(angle), -np.sin(angle)],
                      [np.sin(angle),  np.cos(angle)]])
        pts = pts @ R.T
        pts += np.array([x0, y0])
        return pts

    def fit_polygon_to_plane_and_ellipse(geom):
        if not isinstance(geom, Polygon):
            return geom
        coords = np.array(geom.exterior.coords)
        if coords.shape[1] < 3:
            return geom
        # Filter out points with z greater than 2*std from mean
        z = coords[:,2]
        z_mean = np.mean(z)
        z_std = np.std(z)
        mask = np.abs(z - z_mean) <= 2 * z_std
        filtered_coords = coords[mask]
        # If too few points remain, fallback to original
        if filtered_coords.shape[0] < 3:
            filtered_coords = coords
        # Fit plane
        a, b, c = fit_plane_to_points(filtered_coords)
        # Project all points to plane
        new_coords = project_points_to_plane(coords, a, b, c)
        # Fit ellipse to (x, y)
        ellipse_params = fit_ellipse_2d(new_coords[:,:2])
        ellipse_xy = ellipse_to_polygon(ellipse_params, num_points=65)
        if ellipse_xy is None:
            # fallback: use projected polygon
            return Polygon([tuple(pt) for pt in new_coords])
        # Recompute z for each (x, y) on ellipse
        z = a * ellipse_xy[:,0] + b * ellipse_xy[:,1] + c
        ellipse_xyz = np.column_stack([ellipse_xy, z])
        return Polygon([tuple(pt) for pt in ellipse_xyz])

    # Read input
    gdf_gt = gpd.read_file(input_gpkg, layer=layer)
    gdf_gt['geometry'] = gdf_gt['geometry'].apply(fit_polygon_to_plane_and_ellipse)
    if output_gpkg is None:
        output_gpkg = input_gpkg
    gdf_gt.to_file(output_gpkg, driver='GPKG', layer=layer, overwrite=True)
    return gdf_gt


In [None]:
# Example usage:
smooth_gpkg_polygons('../../data/neuchatel/NE_GT_3D.gpkg', 'NE_GT_3D')
smooth_gpkg_polygons('../../data/zurich/zh_gt_3d.gpkg', 'zh_gt_3d')