In [1]:
import open3d as o3d
import numpy as np
import copy
import time


Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp],
                                      zoom=0.4559,
                                      front=[0.6452, -0.3036, -0.7011],
                                      lookat=[1.9892, 2.0208, 1.8945],
                                      up=[-0.2779, -0.9482, 0.1556])
    
def preprocess_point_cloud(pcd, voxel_size):
    print(":: Downsample with a voxel size %.3f." % voxel_size)
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh



In [21]:
def move_pointCloud_to_center(pointCloud):

    center = pointCloud.get_center()
    pointCloud.translate(-center, relative = True)

    return pointCloud


def prepare_dataset(voxel_size):
    print(":: Load two point clouds and disturb initial pose.")

    demo_icp_pcds = o3d.data.DemoICPPointClouds()
    #cc_pc = o3d.io.read_point_cloud(demo_icp_pcds.paths[0])
    #lidar_pc = o3d.io.read_point_cloud(demo_icp_pcds.paths[1])

    #cc_pc = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_GaussianSplat_CloudCompare.ply")
    cc_pc = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_GaussianSplat_CloudCompare_bereinigt.ply")
    #lidar_pc = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_LIDAR_CloudCompare_unalligned.ply")
    lidar_pc = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_LIDAR_CloudCompare_unalligned_bereinigt.ply")

    cc_pc = move_pointCloud_to_center(cc_pc)
    lidar_pc = move_pointCloud_to_center(lidar_pc)
    
    source = cc_pc
    target = lidar_pc
    trans_init = np.asarray([[0.0, 0.0, 1.0, 0.0], [1.0, 0.0, 0.0, 0.0],
                             [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]])
    source.transform(trans_init)
    draw_registration_result(source, target, np.identity(4))

    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
    return source, target, source_down, target_down, source_fpfh, target_fpfh



voxel_size = 0.1 # means 20cm for this dataset
source, target, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(
    voxel_size)
print(len(source.points))
print(len(target.points))

:: Load two point clouds and disturb initial pose.
:: Downsample with a voxel size 0.100.
:: Estimate normal with search radius 0.200.
:: Compute FPFH feature with search radius 0.500.
:: Downsample with a voxel size 0.100.
:: Estimate normal with search radius 0.200.
:: Compute FPFH feature with search radius 0.500.
930773
94565822


In [20]:
def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size):
    distance_threshold = voxel_size * 0.5
    normal_angle_threshold = 45
    print(":: RANSAC registration on downsampled point clouds.")
    print("   Since the downsampling voxel size is %.3f," % voxel_size)
    print("   we use a liberal distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(True),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.99),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnNormal(
                normal_angle_threshold
            )
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(10000, 1.0))
    return result

result_ransac = execute_global_registration(source_down, target_down,
                                            source_fpfh, target_fpfh,
                                            voxel_size)
print(result_ransac)
draw_registration_result(source_down, target_down, result_ransac.transformation)

:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 0.100,
   we use a liberal distance threshold 0.050.
RegistrationResult with fitness=0.000000e+00, inlier_rmse=0.000000e+00, and correspondence_set size of 0
Access transformation to get result.


In [5]:
def refine_registration(source, target, source_fpfh, target_fpfh, voxel_size):
    source.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30))
    target.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30))

    distance_threshold = voxel_size * 0.4
    print(":: Point-to-plane ICP registration is applied on original point")
    print("   clouds to refine the alignment. This time we use a strict")
    print("   distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, result_ransac.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result

result_icp = refine_registration(source, target, source_fpfh, target_fpfh,
                                 voxel_size)
print(result_icp)
draw_registration_result(source, target, result_icp.transformation)

:: Point-to-plane ICP registration is applied on original point
   clouds to refine the alignment. This time we use a strict
   distance threshold 0.080.
RegistrationResult with fitness=7.858307e-02, inlier_rmse=4.593532e-02, and correspondence_set size of 73143
Access transformation to get result.


In [22]:

def execute_fast_global_registration(source_down, target_down, source_fpfh,
                                     target_fpfh, voxel_size):
    distance_threshold = voxel_size * 0.5

    print(":: Apply fast global registration with distance threshold %.3f" \
            % distance_threshold)
    result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh,
        o3d.pipelines.registration.FastGlobalRegistrationOption(
            maximum_correspondence_distance=distance_threshold))
    return result

start = time.time()
result_fast = execute_fast_global_registration(source_down, target_down,
                                               source_fpfh, target_fpfh,
                                               voxel_size)
print("Fast global registration took %.3f sec.\n" % (time.time() - start))
print(result_fast)
draw_registration_result(source_down, target_down, result_fast.transformation)

:: Apply fast global registration with distance threshold 0.050
Fast global registration took 12.056 sec.

RegistrationResult with fitness=1.698881e-02, inlier_rmse=3.757945e-02, and correspondence_set size of 973
Access transformation to get result.


In [13]:
result_icp = refine_registration(source_down, target_down, source_fpfh, target_fpfh,
                                 voxel_size)
print(result_icp)
draw_registration_result(source, target, result_icp.transformation)

:: Point-to-plane ICP registration is applied on original point
   clouds to refine the alignment. This time we use a strict
   distance threshold 0.080.
RegistrationResult with fitness=3.544272e-02, inlier_rmse=5.999299e-02, and correspondence_set size of 564
Access transformation to get result.


In [4]:
import cv2
import open3d as o3d
from matplotlib import pyplot as pyplot
import numpy as np
import copy
import scipy
from scipy import spatial 
import random
import sys
import math

#Kabsch Algorithm
def compute_transformation(source,target):
    #Normalization
    number = len(source)
    #the centroid of source points
    cs = np.zeros((3,1))
    #the centroid of target points
    ct = copy.deepcopy(cs)
    cs[0] = np.mean(source[:][0]);cs[1]=np.mean(source[:][1]);cs[2]=np.mean(source[:][2])
    ct[0] = np.mean(target[:][0]);cs[1]=np.mean(target[:][1]);cs[2]=np.mean(target[:][2])
    #covariance matrix
    cov = np.zeros((3,3))
    #translate the centroids of both models to the origin of the coordinate system (0,0,0)
    #subtract from each point coordinates the coordinates of its corresponding centroid
    for i in range(number):
        sources = source[i].reshape(-1,1)-cs
        targets = target[i].reshape(-1,1)-ct
        cov = cov + np.dot(sources,np.transpose(targets))
    #SVD (singular values decomposition)
    u,w,v = np.linalg.svd(cov)
    #rotation matrix
    R = np.dot(u,np.transpose(v))
    #Transformation vector
    T = ct - np.dot(R,cs)
    return R, T

#compute the transformed points from source to target based on the R/T found in Kabsch Algorithm
def _transform(source,R,T):
    points = []
    for point in source:
        points.append(np.dot(R,point.reshape(-1,1)+T))
    return points

#compute the root mean square error between source and target
def compute_rmse(source,target,R,T):
    rmse = 0
    number = len(target)
    points = _transform(source,R,T)
    for i in range(number):
        error = target[i].reshape(-1,1)-points[i]
        rmse = rmse + math.sqrt(error[0]**2+error[1]**2+error[2]**2)
    return rmse

def draw_registrations(source, target, transformation = None, recolor = False):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    if(recolor): # recolor the points
        source_temp.paint_uniform_color([1, 0.706, 0])
        target_temp.paint_uniform_color([0, 0.651, 0.929])
    if(transformation is not None): # transforma source to targets
        source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp])

def pc2array(pointcloud):
    return np.asarray(pointcloud.points)

def registration_RANSAC(source,target,source_feature,target_feature,ransac_n=3,max_iteration=100000,max_validation=100):
    #the intention of RANSAC is to get the optimal transformation between the source and target point cloud
    s = pc2array(source) #(4760,3)
    t = pc2array(target)
    #source features (33,4760)
    sf = np.transpose(source_feature.data)
    tf = np.transpose(target_feature.data)
    #create a KD tree
    tree = spatial.KDTree(tf)
    corres_stock = tree.query(sf)[1]
    for i in range(max_iteration):
        #take ransac_n points randomly
        idx = [random.randint(0,s.shape[0]-1) for j in range(ransac_n)]
        corres_idx = corres_stock[idx]
        source_point = s[idx,...]
        target_point = t[corres_idx,...]
        #estimate transformation
        #use Kabsch Algorithm
        R, T = compute_transformation(source_point,target_point)
        #calculate rmse for all points
        source_point = s
        target_point = t[corres_stock,...]
        rmse = compute_rmse(source_point,target_point,R,T)
        #compare rmse and optimal rmse and then store the smaller one as optimal values
        if not i:
            opt_rmse = rmse
            opt_R = R
            opt_T = T
        else:
            if rmse < opt_rmse:
                opt_rmse = rmse
                opt_R = R
                opt_T = T
    return opt_R, opt_T

#used for downsampling
voxel_size = 0.05
#this is to get the fpfh features, just call the library
def get_fpfh(cp):
    cp = cp.voxel_down_sample(voxel_size)
    cp.estimate_normals()
    return cp, o3d.pipelines.registration.compute_fpfh_feature(cp, o3d.geometry.KDTreeSearchParamHybrid(radius=5, max_nn=100))

In [5]:
source = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_GaussianSplat_CloudCompare.ply")
target = o3d.io.read_point_cloud("D:\MASTERPROJEKT\scripts\Testfiles\Masterkueche_LIDAR_CloudCompare_unalligned.ply")
#if we want to use RANSAC registration, get_fpfh features should be acquired firstly                 
r1, f1 = get_fpfh(source)
r2, f2 = get_fpfh(target)
R, T = registration_RANSAC(r1,r2,f1,f2)
#transformation matrix is formed by R, T based on np.hstack and np.vstack(corporate two matrices by rows)
#Notice we need add the last row [0 0 0 1] to make it homogeneous 
transformation = np.vstack((np.hstack((np.float64(R), np.float64(T))), np.array([0,0,0,1])))
draw_registrations(r1, r2, transformation, True)