In [None]:
import open3d as o3d
import numpy as np
import copy
from os import listdir
import os
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

from winsound import Beep


In [None]:
def draw_point_clouds(point_clouds, black_point_cloud = None):
    np.random.seed(0)
    pc_copy = []
    colors = [[1, 0.706, 0], [0, 0.651, 0.929], [0.3, 0.351, 0.529]]
    for idx, x in enumerate(point_clouds):
        temp = copy.deepcopy(x)
        if idx < len(colors):
            temp.paint_uniform_color(colors[idx])
        else:
            temp.paint_uniform_color(np.random.rand(3))
        pc_copy.append(temp)
    
    if black_point_cloud is not None:
        temp = copy.deepcopy(black_point_cloud)
        temp.paint_uniform_color([249/256,1/256,0])
        pc_copy.append(temp)
   
    o3d.visualization.draw_geometries(pc_copy)
    

In [None]:
source_folder = "C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\Daten\\rotated_and_translated\\100000000\\cropped_frames\\"

# target_path = "C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\LiDAR_Punktwolke-RevA-201455_Vermessungsdaten\\Cleaned Export\\intersection_dirty\\intersection_dirty2_xyzi_down_transformed_compressed_ground_removed.pcd"
target_path = "C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\LiDAR_Punktwolke-RevA-201455_Vermessungsdaten\\Cleaned Export\\intersection_dirty\\intersection_dirty2_xyzi_down_transformed_compressed_cropped.pcd"
target = o3d.io.read_point_cloud(target_path)
voxel_size = 1
target = target.voxel_down_sample(voxel_size)
target.estimate_normals()

filenames = listdir(source_folder)[:]
frames = [target]
for file in filenames:
    frames.append(o3d.io.read_point_cloud(source_folder + file))

draw_point_clouds(frames)






In [None]:
draw_point_clouds(frames)

In [None]:
def compute_registration(source, target):
    max_correspondence_distance = 0.5
    evaluation = o3d.pipelines.registration.evaluate_registration( source, target, max_correspondence_distance)
    print(evaluation.fitness, evaluation.inlier_rmse, evaluation)

def transform(point_cloud, transformation):
    temp = copy.deepcopy(point_cloud) 
    return temp.transform(transformation)

def compute_icp_p2plane(source, target):
    max_correspondence_distance = 10
    return o3d.pipelines.registration.registration_icp( source, target, max_correspondence_distance, np.identity(4), o3d.pipelines.registration.TransformationEstimationPointToPlane())

def compute_icp_p2p(source, target):
    max_correspondence_distance = 10
    # return o3d.pipelines.registration.registration_icp( source, target, threshold, np.identity(4), o3d.pipelines.registration.TransformationEstimationPointToPoint(),  o3d.pipelines.registration.ICPConvergenceCriteria(relative_fitness=1.000000e-07, relative_rmse=1.000000e-07,))#max_iteration = 2000,
    return o3d.pipelines.registration.registration_icp( source, target, max_correspondence_distance, np.identity(4), o3d.pipelines.registration.TransformationEstimationPointToPoint())

def show_transformation(target, source, transformation):
    draw_point_clouds([target, transform(source, transformation)], source)
    # draw_point_clouds([target, ], source)

def rotate(point_cloud, x_degrees,y_degrees = 0, z_degrees = 0):
    R = point_cloud.get_rotation_matrix_from_xyz(np.pi/180 * np.array([z_degrees, y_degrees, x_degrees]))
    point_cloud.rotate(R)
    return R

def relative_translation_error(translation_vector1, translation_vector2):
    return np.linalg.norm(translation_vector1-translation_vector2)

def relative_rotation_error(rotation, ground_truth_rotation):
    return np.arccos((np.trace(np.matmul(rotation, ground_truth_rotation))-1)/2)

def decompose_transformation_matrix(transformation):
    translation = transformation[0:3, 3]
    rotation = transformation[0:3, 0:3]
    return translation, rotation

Global registration

In [None]:
def preprocess_point_cloud(pcd, voxel_size):
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature( pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh

def compute_global_registration_no_threshold(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
    distance_threshold = 10 #voxel_size * 1.5
    return o3d.pipelines.registration.registration_ransac_based_on_feature_matching( source_down, target_down, source_fpfh, target_fpfh, True, distance_threshold)

def compute_fast_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
    distance_threshold = 10 #voxel_size * 0.5
    # result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching( source_down, target_down, source_fpfh, target_fpfh, o3d.pipelines.registration.FastGlobalRegistrationOption( maximum_correspondence_distance=distance_threshold))
    return o3d.pipelines.registration.registration_fgr_based_on_feature_matching( source_down, target_down, source_fpfh, target_fpfh)

In [None]:
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
print("Target fpfh computation done")

In [None]:
def compute_icp_errors(source_folder, target, method, suffix = "", frame_number = None, translation_number = None, show_tran = False):
    voxel_size = 1
    suffix = method + suffix


    filenames = listdir(source_folder)
    

    frames = []
    frames_fpfh = []
    if frame_number is not None:
        print(f"Frame number: {frame_number}, method: {method}")
        filenames = [filenames[frame_number]]

    for file in filenames:
        point_cloud = o3d.io.read_point_cloud(source_folder + file)
        source_down, source_fpfh = preprocess_point_cloud(point_cloud, voxel_size)
        frames.append(source_down)
        frames_fpfh.append(source_fpfh)
        
    df_translation = pd.DataFrame({"frame_number":filenames})
    df_rotation = pd.DataFrame({"frame_number":filenames})

    now = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")


    rotations = [0,1,2]
    translations = [[0,0,0],[1,0,0],[2,0,0],[3,0,0],[3,3,0],[5,5,0],[6,6,0],[7,7,0],[10,10,0]]
    if translation_number is not None:
        translations = [translations[translation_number]]

    # translations = [[0,0,0],[1,1,0],[5,5,0],[10,10,0]]
    for translation in translations:
        print(translation)
        i = 1
        translation_error = []
        rotation_error = []
        for idx, frame in enumerate(frames):
            print(str(idx) + " / " + str(len(frames)) + " ")

            temp_frame = copy.deepcopy(frame)
            temp_frame.translate(translation)
            ground_truth_rotation = rotate(temp_frame, 0,0,0)

            if method == "p2p":
                evaluation = compute_icp_p2p(temp_frame, target)
            if method == "p2plane":
                evaluation = compute_icp_p2plane(temp_frame, target)
            if method == "global":
                evaluation = compute_global_registration_no_threshold(temp_frame, target_down, frames_fpfh[idx], target_fpfh, voxel_size)
            if method == "fast":
                evaluation = compute_fast_global_registration(temp_frame, target_down, frames_fpfh[idx], target_fpfh, voxel_size)

            

            # draw_point_clouds([target, temp_frame], transform(temp_frame, evaluation.transformation))
            translation_vector, rotation_matrix = decompose_transformation_matrix(evaluation.transformation)
            t_error = relative_translation_error(-np.array(translation), translation_vector)
            r_error = relative_rotation_error(rotation_matrix,ground_truth_rotation.T)
            translation_error.append(t_error)
            rotation_error.append(r_error)

            if show_tran:
                print(f"Translation error {t_error}, rotation error {r_error}")
                show_transformation(target, temp_frame, evaluation.transformation)


        df_translation[str(translation)] = translation_error
        df_rotation[str(translation)] = rotation_error
        df_translation.to_csv("C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\Daten\\rotated_and_translated\\100000000\\csv_results\\cropped\\" + now + "_translation_error_" + suffix + ".csv")
        df_rotation.to_csv("C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\Daten\\rotated_and_translated\\100000000\\csv_results\\cropped\\" + now + "_rotation_error_" + suffix + ".csv")



    # Beep(300, 200)
    # Beep(300, 200)
    # Beep(300, 700)

In [None]:
compute_icp_errors(source_folder, target, "p2p")
compute_icp_errors(source_folder, target, "p2plane")
compute_icp_errors(source_folder, target, "global")
compute_icp_errors(source_folder, target, "fast")

Point 2 Point

In [None]:
folder = "C:\\Users\\amoff\\Documents\\Meine Textdokumente\\Masterarbeit\\Daten\\rotated_and_translated\\100000000\\csv_results\\cropped\\"
filename = "2023_08_01_14_02_40_rotation_error__fast.csv"

filenames = [f for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]

def plot_errors(folder, filename):
    df = pd.read_csv(folder+filename)
    df = df.iloc[: , 1:]
    ax = df.plot()

    arr = filename.split("_")
    method = arr[8].split(".")[0]
    ax.set_ylabel(arr[6] + " " + arr[7])
    ax.set_xlabel('Frame')
    ax.set_title(method + " " + arr[6] + " " + arr[7])
    

for filename in filenames:
    plot_errors(folder, filename)

In [None]:
compute_icp_errors(source_folder, target, "global", frame_number=3, translation_number=8, show_tran=True)