In [None]:
import os
import copy
import numpy as np
import struct
import open3d as o3d

In [None]:
source = o3d.io.read_point_cloud("cloud_bin_0.pcd")
target = o3d.io.read_point_cloud("cloud_bin_1.pcd")

In [None]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0, 0])
    target_temp.paint_uniform_color([0, 0, 1])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp,target_temp],
                                      zoom=0.4459,
                                      front=[0.9288, -0.2951, -0.2242],
                                      lookat=[1.6784, 2.0612, 1.4451],
                                      up=[-0.3402, -0.9189, -0.1996])

In [None]:

# trans_init = np.asarray([[0.862, 0.011, -0.507, 0.5],
#                          [-0.139, 0.967, -0.215, 0.7],
#                          [0.487, 0.255, 0.835, -1.4], [0.0, 0.0, 0.0, 1.0]])



draw_registration_result(source, target, trans_init)

In [None]:
print("Apply point-to-point ICP")
threshold = 0.03
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint())
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)

In [None]:
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint(),
    o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=2000))
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)

In [None]:
def solve_icp(P, Q):
    """
    Solve ICP

    Parameters
    ----------
    P: numpy.ndarray
        source point cloud as N-by-3 numpy.ndarray
    Q: numpy.ndarray
        target point cloud as N-by-3 numpy.ndarray

    Returns
    ----------
    T: transform matrix as 4-by-4 numpy.ndarray
        transformation matrix from one-step ICP

    """
    # compute centers:
    up = P.mean(axis = 0)
    uq = Q.mean(axis = 0)

    # move to center:
    P_centered = P - up
    Q_centered = Q - uq

    U, s, V = np.linalg.svd(np.dot(Q_centered.T, P_centered), full_matrices=True, compute_uv=True)
    R = np.dot(U, V)
    t = uq - np.dot(R, up)

    # format as transform:
    T = np.zeros((4, 4))
    T[0:3, 0:3] = R
    T[0:3, 3] = t
    T[3, 3] = 1.0

    return T

In [None]:


def read_bin_velodyne(path):
    # pc_list=[]
    # with open(path,'rb') as f:
    #     content=f.read()
    #     pc_iter=struct.iter_unpack('ffff',content)
    #     for idx,point in enumerate(pc_iter):
    #         pc_list.append([point[0],point[1],point[2]])
    # return np.asarray(pc_list,dtype=np.float32)
    return np.fromfile(path,dtype=np.float32).reshape(-1,4)

def registration():
    source = o3d.io.read_point_cloud("cloud_bin_0.pcd")
    target = o3d.io.read_point_cloud("cloud_bin_1.pcd")
    # threshold = 0.02
    # trans_init = np.asarray([[0.862, 0.011, -0.507, 0.5],
    #                      [-0.139, 0.967, -0.215, 0.7],
    #                      [0.487, 0.255, 0.835, -1.4], [0.0, 0.0, 0.0, 1.0]])
    # draw_registration_result(source, target, trans_init)



def main():
    # data_path = '/home/book/data/kitti/Rawdata/2011_09_26/2011_09_26_drive_0005_sync/'
    pcd=o3d.geometry.PointCloud()
    source = o3d.io.read_point_cloud("cloud_bin_0.pcd")
    target = o3d.io.read_point_cloud("cloud_bin_1.pcd")
    source_ndarray = np.asarray(source.points)
    target_ndarray = np.asarray(target.points)

    src_mean = source_ndarray.mean(axis = 0)
    tar_mean = target_ndarray.mean(axis = 0)

    # move to center:
    print('move to center')
    source.points = o3d.utility.Vector3dVector(source_ndarray - src_mean)
    print(np.asarray(source.points))
    # target.points = target_ndarray - tar_mean

    o3d.visualization.draw_geometries([source],zoom=0.3412,
                                        front=[0.4257, -0.2125, -0.8795],
                                        lookat=[2.6172, 2.0475, 1.532],
                                        up=[-0.0694, -0.9768, 0.2024])
    # o3d.visualization.draw_geometries([target],zoom=0.3412,
    #                                     front=[0.4257, -0.2125, -0.8795],
    #                                     lookat=[2.6172, 2.0475, 1.532],
    #                                     up=[-0.0694, -0.9768, 0.2024])

    
    # print('source')
    # print(source_ndarray.shape)
    # print('\n')
    # print('target')
    # # print(target_ndarray)
    # print(target_ndarray.shape)
# 
    # T = solve_icp(source_ndarray,target_ndarray)
    # print('result')
    # print(T)
    # for frame in range(153):
    #     point_cloud = read_bin_velodyne(os.path.join(data_path,'velodyne_points/data/%010d.bin' % frame))
    #     pcd.points= open3d.open3d.utility.Vector3dVector(point_cloud[:,:3])
    #     open3d.open3d.visualization.draw_geometries([pcd])
    
    # filename=os.listdir(root_dir)
    # file_number=len(filename)

    
    # for i in range(file_number):
        # path=os.path.join(root_dir, filename[i])
        # print(path)
    
        # example=read_bin_velodyne(path)
        # From numpy to Open3D
    

# if __name__=="__main__":
#     main()
    

In [None]:
def read_bin_velodyne(path):
    # pc_list=[]
    # with open(path,'rb') as f:
    #     content=f.read()
    #     pc_iter=struct.iter_unpack('ffff',content)
    #     for idx,point in enumerate(pc_iter):
    #         pc_list.append([point[0],point[1],point[2]])
    # return np.asarray(pc_list,dtype=np.float32)
    return np.fromfile(path,dtype=np.float32).reshape(-1,4)

In [None]:
import numpy as np
import struct
from open3d import *

def convert_kitti_bin_to_pcd(binFilePath):
    size_float = 4
    list_pcd = []
    with open(binFilePath, "rb") as f:
        byte = f.read(size_float * 4)
        while byte:
            x, y, z, intensity = struct.unpack("ffff", byte)
            list_pcd.append([x, y, z])
            byte = f.read(size_float * 4)
    np_pcd = np.asarray(list_pcd)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(np_pcd)
    return pcd

In [None]:
print("Testing kdtree in Open3D...")
print("Load a point cloud and paint it gray.")
pcd = o3d.io.read_point_cloud("cloud_bin_1.pcd")
pcd.paint_uniform_color([0.5, 0.5, 0.5])
pcd_tree = o3d.geometry.KDTreeFlann(pcd)

In [None]:
print("Paint the 1500th point red.")
pcd.colors[1500] = [1, 0, 0]

In [None]:
print("Find its 200 nearest neighbors, and paint them blue.")
[k, idx, _] = pcd_tree.search_knn_vector_3d(pcd.points[1500], 200)
np.asarray(pcd.colors)[idx[1:], :] = [0, 0, 1]

In [None]:
print("Visualize the point cloud.")
o3d.visualization.draw_geometries([pcd],
                                  zoom=0.5599,
                                  front=[-0.4958, 0.8229, 0.2773],
                                  lookat=[2.1126, 1.0163, -1.8543],
                                  up=[0.1007, -0.2626, 0.9596])

## 1 preprocessing (read point cloud)

In [740]:
trans_init = np.asarray([[1, 0, 0, 0],
                         [0, 1, 0, 0],
                         [0, 0, 1, 0], [0.0, 0.0, 0.0, 1.0]])
data_path = '/home/book/data/kitti/Rawdata/2011_09_26/2011_09_26_drive_0005_sync/'

import numpy as np
import struct
from open3d import *

def convert_kitti_bin_to_pcd(binFilePath):
    size_float = 4
    list_pcd = []
    with open(binFilePath, "rb") as f:
        byte = f.read(size_float * 4)
        while byte:
            x, y, z, intensity = struct.unpack("ffff", byte)
            list_pcd.append([x, y, z])
            byte = f.read(size_float * 4)
    np_pcd = np.asarray(list_pcd)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(np_pcd)
    return pcd

# source = o3d.geometry.PointCloud()
# src_pts_ndarray = read_bin_velodyne(os.path.join(data_path,'velodyne_points/data/%010d.bin' % 0))
source = convert_kitti_bin_to_pcd(os.path.join(data_path,'velodyne_points/data/%010d.bin' % 10))
target = convert_kitti_bin_to_pcd(os.path.join(data_path,'velodyne_points/data/%010d.bin' % 12))
source.paint_uniform_color([1,0,0])
target.paint_uniform_color([0,0,1])
np_source = np.asarray(source.points)
np_target = np.asarray(target.points)
#source.points = o3d.utility.Vector3dVector(src_pts_ndarray[:,:3])
# np.asarray(source.colors)[:,:] = [1, 0, 0]

# target = o3d.geometry.PointCloud()
# tar_pts_ndarray = read_bin_velodyne(os.path.join(data_path,'velodyne_points/data/%010d.bin' % 1))
# target.points = o3d.utility.Vector3dVector(tar_pts_ndarray[:,:3])

# o3d.visualization.draw_geometries([source],zoom=0.3412,
#                                         front=[0.4257, -0.2125, -0.8795],
#                                         lookat=[2.6172, 2.0475, 1.532],
#                                         up=[-0.0694, -0.9768, 0.2024])

# o3d.visualization.draw_geometries([source,target],
#                                   zoom=0.5599,
#                                   front=[-0.4958, 0.8229, 0.2773],
#                                   lookat=[2.1126, 1.0163, -1.8543],
#                                   up=[0.1007, -0.2626, 0.9596])


threshold = 0.1
evaluation = o3d.pipelines.registration.evaluate_registration(source, target, threshold, trans_init)
draw_registration_result(source,target,trans_init)
print(evaluation)





RegistrationResult with fitness=4.321677e-01, inlier_rmse=5.736704e-02, and correspondence_set size of 53253
Access transformation to get result.


## 2 get correspondences(kdtree)

In [747]:
np_source = np.asarray(source.points)
np_target = np.asarray(target.points)
pcd_tree = o3d.geometry.KDTreeFlann(target)
choice = np.random.choice(np_source.shape[0],50000,replace = False)
# print(choice)

matches = []
for i in choice:
    [_, idx, _] = pcd_tree.search_knn_vector_3d(source.points[i], 1)
    matches.append([i,idx[0]])
matches = np.asarray(matches)

def isValidMatch(match):
    dist = np.linalg.norm(target.points[match[1]]-source.points[match[0]])
    return  dist>1 and dist < 2 ####################### need to modify according to frame frequency

matches = np.asarray(list(filter(isValidMatch,matches)))
print(len(matches))
# print(matches)

match_source_pcd = o3d.geometry.PointCloud()
match_target_pcd = o3d.geometry.PointCloud()
match_source_pcd.paint_uniform_color([1,0,0])
match_source_pcd.paint_uniform_color([0,0,1])
match_source_pcd.points = o3d.utility.Vector3dVector(np.asarray(source.points)[matches[:,0]])
match_target_pcd.points = o3d.utility.Vector3dVector(np.asarray(target.points)[matches[:,1]])
draw_registration_result(match_source_pcd,match_target_pcd,trans_init)



1566


## 3 remove outliers in correspondences and get initial transfomation matrix T (ransac)

In [745]:
inlier_ratio = 0
cur_inlier_ratio = 0
deviation = np.linalg.norm(match_tar_pts - match_src_pts)

print(deviation)
for i in range(50):
    # get corresponding points: 
    # print(len(matches))  
    match_src_pts = np.asarray(source.points)[matches[:,0]]
    match_tar_pts = np.asarray(target.points)[matches[:,1]]
    matches_rac = matches[[np.random.choice(len(matches),50,replace = False)]]
    rac_src_pts = np.asarray(source.points)[matches_rac[:,0]]
    rac_tar_pts = np.asarray(target.points)[matches_rac[:,1]]
    cur_T = solve_icp(rac_src_pts,rac_tar_pts)
    cur_R, cur_t = cur_T[0:3, 0:3], cur_T[0:3, 3]
    cur_deviation = np.linalg.norm(match_tar_pts - np.dot(match_src_pts, cur_R.T) - cur_t)

    if cur_deviation < deviation:
        # print(cur_deviation)
        deviation = cur_deviation
        T = cur_T
        R = cur_R
        t = cur_t

print(deviation)
# draw_registration_result(source,target,T)
# # # print(T)

evaluation_before = o3d.pipelines.registration.evaluate_registration(source, target, threshold, trans_init)
print(evaluation_before)

evaluation_after = o3d.pipelines.registration.evaluate_registration(source, target, threshold, T)
print(evaluation_after)

draw_registration_result(source,target,T)


52.02981380391993
11.421337501435541
RegistrationResult with fitness=4.321677e-01, inlier_rmse=5.736704e-02, and correspondence_set size of 53253
Access transformation to get result.
RegistrationResult with fitness=4.585021e-01, inlier_rmse=6.590223e-02, and correspondence_set size of 56498
Access transformation to get result.


In [333]:
#compute the transformed points from source to target based on the R/T found in Kabsch Algorithm
def _transform(np_source,R,t):
    points = []
    for point in np_source:
        points.append(np.dot(R,point)+t)
    return points

#compute the root mean square error between source and target
def compute_rmse(np_source,np_target,R,t):
    rmse = 0
    number = len(np_target)
    points = _transform(np_source,R,t)
    for i in range(number):
        error = np_target[i]-points[i]
        rmse = rmse + math.sqrt(error[0]**2+error[1]**2+error[2]**2)
    return rmse



In [None]:
R = np.array([[1,0,0],[0,1,0],[0,0,1]])
res = np.dot(R,np.array([1,2,3]))
res = res - [1,2,3]
print(res)
# b = np.zeros((3,1))
# a = []
# a.append(res)
# a.append(b)
# print(a)

# t = res.reshape(-1,1) - np.array([1,2,3])
# print(t)
# print(nparray[0].reshape(-1,1)-[1,1,1])


## 4 calculate transformation and translation for refinement

In [None]:
# my calculation

P = np.asarray(source.points)[matches[:,0]]
Q = np.asarray(target.points)[matches[:,1]]
T = solve_icp(P, Q)
print(T)

threshold = 0.05
evaluation = o3d.pipelines.registration.evaluate_registration(source, target, threshold, T)
draw_registration_result(source,target,T)
print(evaluation)

In [None]:
def solve_icp(P, Q):
    """
    Solve ICP

    Parameters
    ----------
    P: numpy.ndarray
        source point cloud as N-by-3 numpy.ndarray
    Q: numpy.ndarray
        target point cloud as N-by-3 numpy.ndarray

    Returns
    ----------
    T: transform matrix as 4-by-4 numpy.ndarray
        transformation matrix from one-step ICP

    """
    # compute centers:
    up = P.mean(axis = 0)
    uq = Q.mean(axis = 0)

    # move to center:
    P_centered = P - up
    Q_centered = Q - uq

    U, s, V = np.linalg.svd(np.dot(Q_centered.T, P_centered), full_matrices=True, compute_uv=True)
    R = np.dot(U, V)
    t = uq - np.dot(R, up)

    # format as transform:
    T = np.zeros((4, 4))
    T[0:3, 0:3] = R
    T[0:3, 3] = t
    T[3, 3] = 1.0

    return T

In [None]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0, 0])
    target_temp.paint_uniform_color([0, 0, 1])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp,target_temp],
                                      zoom=0.4459,
                                      front=[0.9288, -0.2951, -0.2242],
                                      lookat=[1.6784, 2.0612, 1.4451],
                                      up=[-0.3402, -0.9189, -0.1996])

In [None]:
## for comparison

reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint(),
    o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=2000))
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)

In [None]:
# for comparision

trans_init = np.asarray([[1, 0, 0, 0],
                         [0, 1, 0, 0],
                         [0, 0, 1, 0], [0.0, 0.0, 0.0, 1.0]])

print("Apply point-to-point ICP")
threshold = 0.02
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint())
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)
draw_registration_result(source, target, reg_p2p.transformation)