In [1]:
import json
from osgeo import gdal
import pdal
import os
from tqdm.notebook import tqdm
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from pprint import pprint
import laspy
import time

gdal.UseExceptions()

FULL_BBOXES_FOLDER = "../../data/annotations_full/"
CROPPED_BBOXES_FOLDER = "../../data/annotations_cropped/"
FULL_IMAGES_FOLDER = "../../data/images_full/"
CROPPED_IMAGES_FOLDER = "../../data/images_cropped/"
GEOTILES_LIDAR_FOLDER = "../../data/point_clouds_geotiles/"
GEOTILES_NO_OVERLAP_LIDAR_FOLDER = "../../data/point_clouds_geotiles_no_overlap/"
FULL_LIDAR_FOLDER = "../../data/point_clouds_full/"
CROPPED_LIDAR_FOLDER = "../../data/point_clouds_cropped/"

folder_paths = [
    FULL_BBOXES_FOLDER,
    CROPPED_BBOXES_FOLDER,
    FULL_IMAGES_FOLDER,
    CROPPED_IMAGES_FOLDER,
    GEOTILES_LIDAR_FOLDER,
    GEOTILES_NO_OVERLAP_LIDAR_FOLDER,
    FULL_LIDAR_FOLDER,
    CROPPED_LIDAR_FOLDER,
]

# # Create the output directories if they doesn't exist
# for folder_path in folder_paths:
#     if not os.path.exists(folder_path):
#         os.makedirs(folder_path)

In [2]:
# Define tile size and OVERLAP
# TILE_SIZE = 1920  # Size of each tile
# OVERLAP = 480  # Overlap between tiles
TILE_SIZE = 640  # Size of each tile
OVERLAP = 0  # Overlap between tiles

In [3]:
def measure_execution_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        print(f"Execution of {func.__name__}({args})...")
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Done in {round(execution_time, 3)} seconds")
        return result

    return wrapper

### To crop LiDAR point clouds into the size of the RGB images

In [4]:
@measure_execution_time
def merge_crop_las(
    input_las_list: list[str], output_las: str, x_limits: tuple, y_limits: tuple
):
    if x_limits[0] > x_limits[1]:
        raise Exception("You should have x_limits[0] <= x_limits[1]")
    if y_limits[0] > y_limits[1]:
        raise Exception("You should have y_limits[0] <= y_limits[1]")
    bounds = f"([{x_limits[0]},{x_limits[1]}],[{y_limits[0]},{y_limits[1]}])"
    pipeline_list = []
    for index, input_las in enumerate(input_las_list):
        pipeline_list.append(
            {"type": "readers.las", "file_name": input_las, "tag": f"A{index}"}
        )
    pipeline_list.extend(
        [
            {
                "type": "filters.merge",
                "inputs": [f"A{index}" for index in range(len(input_las_list))],
            },
            {"type": "filters.crop", "bounds": bounds},
            {"type": "writers.las", "file_name": output_las},
        ]
    )
    pipeline = pdal.Pipeline(json.dumps(pipeline_list))
    pipeline.execute()


@measure_execution_time
def crop_las(input_las: str, output_las: str, x_limits: tuple, y_limits: tuple):
    if x_limits[0] > x_limits[1]:
        raise Exception("You should have x_limits[0] <= x_limits[1]")
    if y_limits[0] > y_limits[1]:
        raise Exception("You should have y_limits[0] <= y_limits[1]")
    bounds = f"([{x_limits[0]},{x_limits[1]}],[{y_limits[0]},{y_limits[1]}])"
    pipeline_list = [
        {
            "type": "readers.las",
            "file_name": input_las,
        },
        {"type": "filters.crop", "bounds": bounds},
        {"type": "writers.las", "file_name": output_las},
    ]
    pipeline = pdal.Pipeline(json.dumps(pipeline_list))
    pipeline.execute()


def remove_las_overlap_from_geotiles(input_las: str, output_las: str):
    overlap = 20
    with laspy.open(input_las, mode="r") as las_file:
        # Get the bounding box information from the header
        min_x = las_file.header.min[0] + overlap
        max_x = las_file.header.max[0] - overlap
        min_y = las_file.header.min[1] + overlap
        max_y = las_file.header.max[1] - overlap

    crop_las(input_las, output_las, (min_x, max_x), (min_y, max_y))


def remove_las_overlap_from_geotiles_all():
    point_clouds_overlap_folder = "../../data/point_clouds_geotiles"
    point_clouds_no_overlap_folder = "../../data/point_clouds_geotiles_no_overlap"
    if not os.path.exists(point_clouds_no_overlap_folder):
        os.makedirs(point_clouds_no_overlap_folder)
    for file_name in os.listdir(point_clouds_overlap_folder):
        # Check if the file is a regular file (not a directory)
        overlap_file_path = os.path.join(point_clouds_overlap_folder, file_name)
        no_overlap_file_path = os.path.join(point_clouds_no_overlap_folder, file_name)
        if (os.path.isfile(overlap_file_path)) and (
            not os.path.exists(no_overlap_file_path)
        ):
            remove_las_overlap_from_geotiles(overlap_file_path, no_overlap_file_path)


@measure_execution_time
def filter_classification_las(input_las: str, output_las: str):
    pipeline_list = [
        {
            "type": "readers.las",
            "file_name": input_las,
        },
        {
            "type": "filters.range",
            "limits": "Classification[1:5]",  # Keep only unclassified, ground and vegetation
        },
        {"type": "writers.las", "file_name": output_las},
    ]
    pipeline = pdal.Pipeline(json.dumps(pipeline_list))
    pipeline.execute()


# # Open the GeoTIFF file
# tiff_image_test = "../../data/images_full/2023_122000_484000_RGB_hrl.tif"
# ds = gdal.Open(tiff_image_test)
# # Get the geotransform parameters
# gt = ds.GetGeoTransform()
# # Calculate the image coordinates
# width = ds.RasterXSize
# height = ds.RasterYSize
# # Calculate the coordinates of the four corners
# x1 = int(gt[0])
# y1 = int(gt[3])
# x2 = int(gt[0] + (gt[1] * width))
# y2 = int(gt[3] + (gt[5] * height))
# # Close the dataset
# ds = None
# # Crop the point cloud
# geotiles_point_clouds_path = ["../../data/point_clouds_geotiles/25GN1_13.LAZ", "../../data/point_clouds_geotiles/25GN1_18.LAZ"]
# geotiles_no_overlap_point_clouds_path = ["../../data/point_clouds_geotiles_no_overlap/25GN1_13.LAZ", "../../data/point_clouds_geotiles_no_overlap/25GN1_18.LAZ"]
# for overlap_path, no_overlap_path in zip(geotiles_point_clouds_path, geotiles_no_overlap_point_clouds_path):
#     remove_las_overlap_from_geotiles(overlap_path, no_overlap_path)
# full_point_cloud_path = f"../../data/point_clouds_full/{int(x1)}_{int(y1)}.laz"
# crop_las(geotiles_no_overlap_point_clouds_path, full_point_cloud_path, (x1, x2), (y2, y1))
# full_point_filtered_cloud_path = f"../../data/point_clouds_full/{int(x1)}_{int(y1)}_filtered.laz"
# filter_classification_las(full_point_cloud_path, full_point_filtered_cloud_path)

### Crop annotations

In [5]:
bboxes_path = FULL_BBOXES_FOLDER + "3"

with open(bboxes_path, "r") as file:
    # Load the annotation data
    bboxes_json = json.load(file)

    # Get the path to the full image
    full_image_path = bboxes_json["task"]["data"]["image"].replace(
        "/data/local-files/?d=", "/"
    )
    full_image_path_tif = full_image_path.replace(".png", ".tif")
    full_image_path_tif = "../../data/images_full/2023_122000_484000_RGB_hrl.tif"

    # Create the paths
    output_image_prefix = os.path.splitext(os.path.basename(full_image_path_tif))[0]
    annotation_output_directory = os.path.join(
        CROPPED_BBOXES_FOLDER, output_image_prefix
    )
    if not os.path.exists(annotation_output_directory):
        os.makedirs(annotation_output_directory)

    # Get the dimensions of the full image
    full_image = Image.open(full_image_path_tif)
    full_image_width, full_image_height = full_image.size
    full_image_width_factor, full_image_height_factor = (
        full_image_width / 100.0,
        full_image_height / 100.0,
    )

    # Calculate the number of rows and columns needed
    num_cols = int(np.ceil((full_image_width - OVERLAP) / (TILE_SIZE - OVERLAP)))
    num_rows = int(np.ceil((full_image_height - OVERLAP) / (TILE_SIZE - OVERLAP)))

    # Get the limits of all the cropped images
    cropping_limits_x = np.array(
        [
            [i * (TILE_SIZE - OVERLAP), (i + 1) * (TILE_SIZE - OVERLAP) + OVERLAP]
            for i in range(num_cols)
        ]
    )
    cropping_limits_y = np.array(
        [
            [j * (TILE_SIZE - OVERLAP), (j + 1) * (TILE_SIZE - OVERLAP) + OVERLAP]
            for j in range(num_rows)
        ]
    )

    bboxes_repartition = [[[] for _ in range(num_rows)] for _ in range(num_cols)]
    for index, bbox_info in enumerate(bboxes_json["result"]):
        bbox = bbox_info["value"]
        min_x = int(np.round(bbox["x"] * full_image_width_factor))
        min_y = int(np.round(bbox["y"] * full_image_height_factor))
        max_x = int(np.round((bbox["x"] + bbox["width"]) * full_image_width_factor))
        max_y = int(np.round((bbox["y"] + bbox["height"]) * full_image_height_factor))
        # Find the indices of the cropped images in which the bounding box fits
        i_x_0 = min_x // (TILE_SIZE - OVERLAP)
        i_y_0 = min_y // (TILE_SIZE - OVERLAP)

        found_image = False  # To check if the bounding box fits entirely in at least one cropped image

        # Check the 4 possibilities
        if max_x < (i_x_0 + 1) * (TILE_SIZE - OVERLAP) + OVERLAP:
            # First possible image (bottom right)
            if max_y < (i_y_0 + 1) * (TILE_SIZE - OVERLAP) + OVERLAP:
                bboxes_repartition[i_x_0][i_y_0].append(index)
                found_image = True
            # Second possible image (top right)
            if max_y < (i_y_0) * (TILE_SIZE - OVERLAP) + OVERLAP:
                bboxes_repartition[i_x_0][i_y_0 - 1].append(index)
                found_image = True
        if max_x < (i_x_0) * (TILE_SIZE - OVERLAP) + OVERLAP:
            # Third possible image (bottom left)
            if max_y < (i_y_0 + 1) * (TILE_SIZE - OVERLAP) + OVERLAP:
                bboxes_repartition[i_x_0 - 1][i_y_0].append(index)
                found_image = True
            # Fourth possible image (top left)
            if max_y < (i_y_0) * (TILE_SIZE - OVERLAP) + OVERLAP:
                bboxes_repartition[i_x_0 - 1][i_y_0 - 1].append(index)
                found_image = True

        if not found_image:
            raise Exception(
                f"The bounding box at index {index} doesn't fit entirely in any image."
            )

    # Create and store the cropped annotation files
    for row in tqdm(range(num_rows)):
        for col in tqdm(range(num_cols), leave=False):
            bboxes_dict = {
                "full_image": {
                    "path": full_image_path_tif,
                    "coordinates_of_cropped_image": {
                        "x": col * (TILE_SIZE - OVERLAP),
                        "y": row * (TILE_SIZE - OVERLAP),
                        "width": TILE_SIZE,
                        "height": TILE_SIZE,
                    },
                    "overlap": OVERLAP,
                },
                "col": col,
                "row": row,
                "width": TILE_SIZE,
                "height": TILE_SIZE,
                "bounding_boxes": [
                    {
                        "id": bboxes_json["result"][i]["id"],
                        "index": i,
                        "x": bboxes_json["result"][i]["value"]["x"]
                        * full_image_width_factor
                        - col * (TILE_SIZE - OVERLAP),
                        "y": bboxes_json["result"][i]["value"]["y"]
                        * full_image_width_factor
                        - row * (TILE_SIZE - OVERLAP),
                        "width": bboxes_json["result"][i]["value"]["width"]
                        * full_image_width_factor,
                        "height": bboxes_json["result"][i]["value"]["height"]
                        * full_image_width_factor,
                        "label": bboxes_json["result"][i]["value"]["rectanglelabels"][
                            0
                        ],
                    }
                    for i in bboxes_repartition[col][row]
                ],
            }

            annotation_output_file_name = f"{output_image_prefix}_{row}_{col}.json"
            output_path = os.path.join(
                annotation_output_directory, annotation_output_file_name
            )
            with open(output_path, "w") as outfile:
                json.dump(bboxes_dict, outfile)

FileNotFoundError: [Errno 2] No such file or directory: '../../data/annotations_full/3'

### Crop images

In [None]:
# All this can also be done using gdal_retile.py

# Define output paths
image_output_directory = os.path.join(CROPPED_IMAGES_FOLDER, output_image_prefix)
if not os.path.exists(image_output_directory):
    os.makedirs(image_output_directory)

# Iterate over rows and columns to create tiles
for row in tqdm(range(num_rows)):
    for col in tqdm(range(num_cols), leave=False):
        # Calculate the pixel offsets for the tile
        x_offset = col * (TILE_SIZE - OVERLAP)
        y_offset = row * (TILE_SIZE - OVERLAP)

        # Create output file_name
        output_file_name = f"{output_image_prefix}_{row}_{col}.tif"
        output_path = os.path.join(image_output_directory, output_file_name)

        # Define the subset area to read from the input image
        window = (x_offset, y_offset, TILE_SIZE, TILE_SIZE)

        gdal.Translate(output_path, full_image_path_tif, srcWin=window)

VBox(children=(  0%|          | 0/9 [00:00<?, ?it/s],))

### Crop point clouds

In [None]:
remove_las_overlap_from_geotiles_all()


def get_coordinates_from_image_file_name(file_name: str):
    return (int(file_name[5:11]), int(file_name[12:18]))


output_coordinates = get_coordinates_from_image_file_name(output_image_prefix)
output_coordinates_prefix = f"{output_coordinates[0]}_{output_coordinates[1]}"

# Define output paths
point_cloud_output_directory = os.path.join(
    CROPPED_LIDAR_FOLDER, output_coordinates_prefix
)
if not os.path.exists(point_cloud_output_directory):
    os.makedirs(point_cloud_output_directory)

full_point_cloud_path = (
    f"../../data/point_clouds_full/{output_coordinates_prefix}_filtered.laz"
)

# Iterate over rows and columns to create tiles
for row in tqdm(range(num_rows)):
    for col in tqdm(range(num_cols), leave=False):
        # Create output file_name
        output_point_cloud_file_name = f"{output_coordinates_prefix}_{row}_{col}.laz"
        output_point_cloud_path = os.path.join(
            point_cloud_output_directory, output_point_cloud_file_name
        )
        output_point_cloud_filtered_file_name = (
            f"{output_coordinates_prefix}_{row}_{col}_filtered.laz"
        )
        output_point_cloud_filtered_path = os.path.join(
            point_cloud_output_directory, output_point_cloud_filtered_file_name
        )

        if os.path.exists(output_point_cloud_path) and os.path.exists(
            output_point_cloud_filtered_path
        ):
            continue

        # Get the corresponding image
        corresponding_image_file_name = f"{output_image_prefix}_{row}_{col}.tif"
        corresponding_image_path = os.path.join(
            image_output_directory, corresponding_image_file_name
        )
        print(f"{corresponding_image_path = }")

        ### Get the coordinates
        ds = gdal.Open(corresponding_image_path)
        # Get the geotransform parameters
        gt = ds.GetGeoTransform()
        # Calculate the image coordinates
        width = ds.RasterXSize
        height = ds.RasterYSize
        # Calculate the coordinates of the four corners
        x1 = round(gt[0], 3)
        y1 = round(gt[3], 3)
        x2 = round(gt[0] + (gt[1] * width), 3)
        y2 = round(gt[3] + (gt[5] * height), 3)
        # Close the dataset
        ds = None

        print(f"{(x1, x2, y1, y2) = }")

        crop_las(full_point_cloud_path, output_point_cloud_path, (x1, x2), (y2, y1))
        filter_classification_las(
            output_point_cloud_path, output_point_cloud_filtered_path
        )

Execution of crop_las(('../../data/point_clouds_geotiles/25GN1_13.LAZ', '../../data/point_clouds_geotiles_no_overlap/25GN1_13.LAZ', (122000.0, 122999.999), (483750.0, 484999.999)))...
Done in 47.111 seconds
Execution of crop_las(('../../data/point_clouds_geotiles/25GN1_18.LAZ', '../../data/point_clouds_geotiles_no_overlap/25GN1_18.LAZ', (122000.0, 122999.999), (482500.0, 483749.999)))...
Done in 49.171 seconds


VBox(children=(  0%|          | 0/9 [00:00<?, ?it/s],))

corresponding_image_path = '../../data/images_cropped/2023_122000_484000_RGB_hrl/2023_122000_484000_RGB_hrl_0_0.tif'
(x1, x2, y1, y2) = (122000.0, 122153.6, 484000.0, 483846.4)
Execution of crop_las(('../../data/point_clouds_full/122000_484000_filtered.laz', '../../data/point_clouds_cropped/122000_484000/122000_484000_0_0.laz', (122000.0, 122153.6), (483846.4, 484000.0)))...
Done in 10.468 seconds
Execution of filter_classification_las(('../../data/point_clouds_cropped/122000_484000/122000_484000_0_0.laz', '../../data/point_clouds_cropped/122000_484000/122000_484000_0_0_filtered.laz'))...
Done in 0.479 seconds
corresponding_image_path = '../../data/images_cropped/2023_122000_484000_RGB_hrl/2023_122000_484000_RGB_hrl_0_1.tif'
(x1, x2, y1, y2) = (122115.2, 122268.8, 484000.0, 483846.4)
Execution of crop_las(('../../data/point_clouds_full/122000_484000_filtered.laz', '../../data/point_clouds_cropped/122000_484000/122000_484000_0_1.laz', (122115.2, 122268.8), (483846.4, 484000.0)))...
Done

### Display bounding boxes

In [None]:
def display_image_with_boxes(image_path: str, boxes: list | None = None) -> None:
    reduction_ratio = 3

    # Load image
    image = Image.open(image_path)
    image_smaller = Image.open(image_path)
    image_smaller.thumbnail(
        (TILE_SIZE // reduction_ratio, TILE_SIZE // reduction_ratio)
    )

    # Create figure and axis
    fig, axs = plt.subplots(2, 1, figsize=(20, 40))

    # Display image
    axs[0].imshow(image)
    axs[1].imshow(image_smaller)

    # Annotation colors
    colors = {
        "Tree": "#9effb1",
        "Tree_unsure": "#ffd79e",
        "Tree_disappeared": "#9eaeff",
        "Tree_replaced": "#ff5a52",
        "Tree_new": "#fb6ae1",
    }

    # Add bounding boxes if provided
    if boxes:
        for box in boxes:
            # Extract box coordinates
            x, y, width, height, label = box
            # Create a Rectangle patch
            rect = Rectangle(
                (x, y),
                width,
                height,
                linewidth=1,
                edgecolor=colors[label],
                facecolor="none",
            )
            # Add the patch to the Axes
            axs[0].add_patch(rect)
            # Create a Rectangle patch
            rect = Rectangle(
                (x // reduction_ratio, y // reduction_ratio),
                width // reduction_ratio,
                height // reduction_ratio,
                linewidth=1,
                edgecolor=colors[label],
                facecolor="none",
            )
            # Add the patch to the Axes
            axs[1].add_patch(rect)

    # Add each rectangle to the legend individually
    for label, color in colors.items():
        axs[0].add_patch(Rectangle((0, 0), 0, 0, color=color, label=label))
        axs[1].add_patch(Rectangle((0, 0), 0, 0, color=color, label=label))

    axs[0].set_axis_off()
    axs[0].legend()
    axs[1].set_axis_off()
    axs[1].legend()

    # Show plot
    plt.show()


def get_bounding_boxes(bboxes_path: str) -> list:
    with open(bboxes_path, "r") as file:
        # Load the annotation data
        bboxes_json = json.load(file)

        # Get every bounding box
        bboxes = []
        for bbox in bboxes_json["bounding_boxes"]:
            bboxes.append(
                (bbox["x"], bbox["y"], bbox["width"], bbox["height"], bbox["label"])
            )

    return bboxes


image_path = "../../data/images_cropped/2023_122000_484000_RGB_hrl/2023_122000_484000_RGB_hrl_1_3.tif"
bboxes_path = "../../data/annotations_cropped/2023_122000_484000_RGB_hrl/2023_122000_484000_RGB_hrl_1_3.json"

# bboxes = get_bounding_boxes(bboxes_path)
# display_image_with_boxes(image_path, bboxes)

### Duplicate points removal (post-processing)

In [None]:
def remove_duplicate_points(input_las_file, output_las_file):
    # Open the input LAS file
    in_las = laspy.read(input_las_file)
    # Convert coordinates and all dimensions to a NumPy array
    points = np.column_stack(
        [getattr(in_las, dim) for dim in in_las.point_format.dimension_names]
    )
    # Find unique points
    unique_points, idx = np.unique(points, axis=0, return_index=True)
    # Extract unique coordinates
    num_points = len(unique_points)
    # Create a new LAS file
    out_header = in_las.header
    out_header.point_count = num_points
    out_las = laspy.LasData(out_header)
    # Set all dimensions in the output LAS file
    for dim_idx, dim_name in enumerate(in_las.point_format.dimension_names):
        dim_values = unique_points[:, dim_idx]
        # If the dimension is not used, don't set it
        if np.all(dim_values == dim_values[0]):
            print(dim_name)
            continue
        setattr(out_las, dim_name, dim_values)
    out_las.write(output_las_file)


def las_to_laz(input_las_path: str):
    input_las_path_no_extension, initial_extension = os.path.splitext(input_las_path)
    if initial_extension not in [".las", ".LAS"]:
        raise Exception("The input must be a LAS file.")
    output_laz_path = input_las_path_no_extension + ".laz"
    pipeline_list = [
        {
            "type": "readers.las",
            "file_name": input_las_path,
        },
        {"type": "writers.las", "file_name": output_laz_path},
    ]
    pprint(json.dumps(pipeline_list))
    pipeline = pdal.Pipeline(json.dumps(pipeline_list))
    pipeline.execute()


# # Usage
# input_las_file = "../../data/point_clouds_full/122000_484000_with_duplicates_2.las"
# output_las_file = "../../data/point_clouds_full/122000_484000_2.laz"
# output_las_file_2 = "../../data/point_clouds_full/122000_484000_3_2.las"
# las_to_laz(input_las_file)
# remove_duplicate_points(input_las_file, output_las_file)
# remove_duplicate_points(input_las_file, output_las_file_2)
# las_to_laz(output_las_file_2)

### Coordinates verifications for files

In [None]:
### Get the coordinates
ds = gdal.Open(
    "../../data/images_cropped/2023_122000_484000_RGB_hrl/2023_122000_484000_RGB_hrl_0_0.tif"
)
# Get the geotransform parameters
gt = ds.GetGeoTransform()
# Calculate the image coordinates
width = ds.RasterXSize
height = ds.RasterYSize
# Calculate the coordinates of the four corners
x1 = round(gt[0], 3)
y1 = round(gt[3], 3)
x2 = round(gt[0] + (gt[1] * width), 3)
y2 = round(gt[3] + (gt[5] * height), 3)
# Close the dataset
ds = None

print("Minimum X:", x1)
print("Maximum X:", x2)
print("Minimum Y:", y1)
print("Maximum Y:", y2)
print(f"{width = }")
print(f"{height = }")

with laspy.open(
    "../../data/point_clouds_cropped/122000_484000/122000_484000_0_0_filtered.laz",
    mode="r",
) as las_file:
    # Get the bounding box information from the header
    min_x = las_file.header.min[0]
    max_x = las_file.header.max[0]
    min_y = las_file.header.min[1]
    max_y = las_file.header.max[1]
    min_z = las_file.header.min[2]
    max_z = las_file.header.max[2]

# Print out the bounds
print("Minimum X:", min_x)
print("Maximum X:", max_x)
print("Minimum Y:", min_y)
print("Maximum Y:", max_y)
print("Width:", max_x - min_x)
print("Height:", max_y - min_y)

# with laspy.open(f"../../data/point_clouds_test/122000_484000_filtered_1.laz", mode="r") as las_file:
#     # Get the bounding box information from the header
#     min_x = las_file.header.min[0]
#     max_x = las_file.header.max[0]
#     min_y = las_file.header.min[1]
#     max_y = las_file.header.max[1]
#     min_z = las_file.header.min[2]
#     max_z = las_file.header.max[2]

# # Print out the bounds
# print("Minimum X:", min_x)
# print("Maximum X:", max_x)
# print("Minimum Y:", min_y)
# print("Maximum Y:", max_y)
# print("Width:", max_x - min_x)
# print("Height:", max_y - min_y)

print(1920 / 153.6)
print(1920 * 153.6 / 1920)
print(480 * 153.6 / 1920)
print((1920 - 480) * 153.6 / 1920)

Minimum X: 122000.0
Maximum X: 122153.6
Minimum Y: 484000.0
Maximum Y: 483846.4
width = 1920
height = 1920
Minimum X: 122000.0
Maximum X: 122153.6
Minimum Y: 483846.4
Maximum Y: 484000.0
Width: 153.60000000000582
Height: 153.59999999997672
12.5
153.6
38.4
115.2


### Crop point clouds in one PDAL pipeline (not working yet)

In [None]:
def crop_point_cloud(input_file, output_folder):
    # Construct the PDAL pipeline
    pipeline_list = [
        {
            "type": "readers.las",
            "file_name": input_file,
        },
        {
            "type": "filters.splitter",
            "length": "50",
            "buffer": "20",
        },
        {
            "type": "writers.las",
            "file_name": os.path.join(
                output_folder,
                f"{os.path.basename(os.path.splitext(input_file)[0])}_#.laz",
            ),
        },
    ]

    # Execute the pipeline
    pipeline = pdal.Pipeline(json.dumps(pipeline_list))
    pipeline.execute()


# Specify the input LAS/LAZ file
input_file = "../../data/point_clouds_full/122000_484000_filtered.laz"

# Specify the output folder
point_cloud_test_output_folder = "../../data/point_clouds_test"
if not os.path.exists(point_cloud_test_output_folder):
    os.makedirs(point_cloud_test_output_folder)

# Crop the point cloud into tiles
crop_point_cloud(input_file, point_cloud_test_output_folder)

### Download the point clouds corresponding to an image

In [None]:
from osgeo import gdal, ogr
from shapely.geometry import box
from shapely.wkt import dumps
import requests


def download_file(url, save_path):
    # Send a GET request to the URL
    response = requests.get(url)

    # Check if the request was successful (status code 200)
    if response.status_code == 200:
        # Open the file in binary write mode and write the content of the response
        print(f"Downloading {url}...", end=" ", flush=True)
        with open(save_path, "wb") as f:
            f.write(response.content)
        print(f"Saved as '{save_path}'")
    else:
        print(
            f"Failed to download file from '{url}'. Status code: {response.status_code}"
        )


# Path to your TIF image
tif_file = full_image_path_tif

# Path to your Shapefile
shapefile = (
    "../../data/point_clouds_geotiles/TOP-AHN_subunit_compat/TOP-AHN_subunit_compat.shp"
)

# Open the TIF image
ds = gdal.Open(tif_file)

# Get the geotransform and projection
gt = ds.GetGeoTransform()
proj = ds.GetProjection()

# Get the extent of the TIF image
min_x = gt[0]
max_y = gt[3]
max_x = min_x + gt[1] * ds.RasterXSize
min_y = max_y + gt[5] * ds.RasterYSize

# Create a box geometry representing the extent of the TIF image
overlap = 20
bbox = box(min_x + overlap, min_y + overlap, max_x - overlap, max_y - overlap)

# Convert the Shapely geometry to an ogr.Geometry object
bbox_ogr = ogr.CreateGeometryFromWkt(dumps(bbox))

# Open the Shapefile
driver = ogr.GetDriverByName("ESRI Shapefile")
shp_ds = driver.Open(shapefile, 0)
layer = shp_ds.GetLayer()

# Get the intersection between TIF image and Shapefile
intersection_file_names = []
for feature in layer:
    geom = feature.GetGeometryRef()
    if geom.Intersects(bbox_ogr):
        intersection_file_names.append(
            feature.GetField("AHN")
        )  # Replace "file_name_column" with the actual column name

# Close the Shapefile
shp_ds = None


intersection_file_paths = []
for file_name in intersection_file_names:
    url = f"https://geotiles.citg.tudelft.nl/AHN4_T/{file_name}.LAZ"
    # Create the paths
    geotiles_with_overlap_path = os.path.join(GEOTILES_LIDAR_FOLDER, f"{file_name}.LAZ")
    geotiles_without_overlap_path = os.path.join(
        GEOTILES_NO_OVERLAP_LIDAR_FOLDER, f"{file_name}.LAZ"
    )
    intersection_file_paths.append(geotiles_without_overlap_path)
    # Download the point clouds
    if not os.path.exists(geotiles_with_overlap_path):
        download_file(url, geotiles_with_overlap_path)
    # Remove the overlap from the point clouds
    if not os.path.exists(geotiles_without_overlap_path):
        remove_las_overlap_from_geotiles(
            geotiles_with_overlap_path, geotiles_without_overlap_path
        )

# Crop the point clouds into the area of the full image
full_point_cloud_path = f"../../data/point_clouds_full/{int(min_x)}_{int(max_y)}.laz"
if not os.path.exists(full_point_cloud_path):
    merge_crop_las(
        intersection_file_paths, full_point_cloud_path, (min_x, max_x), (min_y, max_y)
    )

# Filter the full point cloud to remove buildings
full_point_filtered_cloud_path = (
    f"../../data/point_clouds_full/{int(min_x)}_{int(max_y)}_filtered.laz"
)
if not os.path.exists(full_point_filtered_cloud_path):
    filter_classification_las(full_point_cloud_path, full_point_filtered_cloud_path)

Downloading https://geotiles.citg.tudelft.nl/AHN4_T/25GN1_13.LAZ... Saved as '../../data/point_clouds_geotiles/25GN1_13.LAZ'
Execution of crop_las(('../../data/point_clouds_geotiles/25GN1_13.LAZ', '../../data/point_clouds_geotiles_no_overlap/25GN1_13.LAZ', (122000.0, 122999.999), (483750.0, 484999.999)))...
Done in 47.519 seconds
Downloading https://geotiles.citg.tudelft.nl/AHN4_T/25GN1_18.LAZ... Saved as '../../data/point_clouds_geotiles/25GN1_18.LAZ'
Execution of crop_las(('../../data/point_clouds_geotiles/25GN1_18.LAZ', '../../data/point_clouds_geotiles_no_overlap/25GN1_18.LAZ', (122000.0, 122999.999), (482500.0, 483749.999)))...
Done in 47.76 seconds
Execution of merge_crop_las((['../../data/point_clouds_geotiles_no_overlap/25GN1_13.LAZ', '../../data/point_clouds_geotiles_no_overlap/25GN1_18.LAZ'], '../../data/point_clouds_full/122000_484000.laz', (122000.0, 123000.0), (483000.0, 484000.0)))...
Done in 50.813 seconds
Execution of filter_classification_las(('../../data/point_clouds_

### Transform QGIS bounding boxes to Label Studio format

In [None]:
def bboxes_qgis_to_label_studio(
    input_folder: str, output_json_path: str, image_path: str, task_id: int
) -> None:
    bboxes_dict = {
        "id": task_id,
        "annotations": [{"result": []}],
        "data": {"image": f"/data/local-files/?d={image_path}"},
    }

    ds = gdal.Open(image_path)
    gt = ds.GetGeoTransform()
    width = ds.RasterXSize
    height = ds.RasterYSize
    xmin = round(gt[0], 3)
    ymax = round(gt[3], 3)
    xmax = round(gt[0] + (gt[1] * width), 3)
    ymin = round(gt[3] + (gt[5] * height), 3)
    width_factor = 100 / (xmax - xmin)
    height_factor = 100 / (ymax - ymin)
    ds = None

    for file_name in os.listdir(input_folder):
        # Check if the file is a regular file (not a directory)
        geojson_file_path = os.path.join(input_folder, file_name)
        if os.path.splitext(geojson_file_path)[1] == ".geojson":
            with open(geojson_file_path, "r") as file:
                bbox_json = json.load(file)
                coordinates = bbox_json["features"][0]["geometry"]["coordinates"][0]
                x0 = coordinates[0][0]
                y0 = coordinates[0][1]
                x1 = coordinates[2][0]
                y1 = coordinates[2][1]
                bboxes_dict["annotations"][0]["result"].append(
                    {
                        "original_width": 12500,
                        "original_height": 12500,
                        "value": {
                            "x": (x0 - xmin) * width_factor,
                            "y": (ymax - y1 + 1) * height_factor,
                            "width": (x1 - x0) * width_factor,
                            "height": (y1 - y0) * height_factor,
                            "rotation": 0,
                            "rectanglelabels": ["Tree"],
                        },
                        "from_name": "label",
                        "to_name": "image",
                        "type": "rectanglelabels",
                    }
                )

    with open(output_json_path, "w") as outfile:
        json.dump([bboxes_dict], outfile)


input_folder = "../../data/QGIS_bounding_boxes"
output_json = "../../data/images_full/label_studio_pre_annotations.json"
image_path = "/home/alexandre/Documents/tree-segmentation/data/images/full/2023_122000_484000_RGB_hrl.png"
task_id = 1

bboxes_qgis_to_label_studio(input_folder, output_json, image_path, task_id)

In [None]:
def bboxes_label_studio_to_label_studio(
    input_json_path: str, output_json_path: str, image_path: str, task_id: int
):
    bboxes_dict = {
        # "id": task_id,
        "data": {"image": f"/data/local-files/?d={image_path}"},
        "annotations": [{"result": []}],
    }

    with open(input_json_path, "r") as file:
        bbox_json = json.load(file)
        bboxes_dict["annotations"][0]["result"] = bbox_json["result"]

    with open(output_json_path, "w") as outfile:
        json.dump(bboxes_dict, outfile)


input_file = "../../data/annotations/full/6"
output_json = "../../data/label_studio/medium_trees.json"
image_path = "home/alexandre/Documents/tree-segmentation/data/images/full/2023_122000_484000_RGB_hrl.png"
task_id = 1

bboxes_label_studio_to_label_studio(input_file, output_json, image_path, task_id)