In [3]:
import sys
sys.path.append("..")  # Add the parent directory to the path

import os 
import numpy as np 
import open3d as o3d 
from utils.colmap_loader import read_pose_matrices
from plyfile import PlyData, PlyElement


### visualize point cloud 

In [6]:
dataset = "hope"
object_id = 1
n_views = 7
path_pcd_up = f"/home/sergio/onboarding_stage/data/{dataset}/obj_{object_id:06d}/up/{n_views}_views/sparse/0/points3D.ply"
path_pcd_down = f"/home/sergio/onboarding_stage/data/{dataset}/obj_{object_id:06d}/down/{n_views}_views/sparse/0/points3D.ply"
point_cloud_up = o3d.io.read_point_cloud(path_pcd_up)
point_cloud_down = o3d.io.read_point_cloud(path_pcd_down)

pcd_aux_up = o3d.geometry.PointCloud()
pcd_aux_up.points = o3d.utility.Vector3dVector(np.asarray(point_cloud_up.points))
pcd_aux_up.colors = o3d.utility.Vector3dVector(np.asarray(point_cloud_up.colors))

pcd_aux_down = o3d.geometry.PointCloud()
pcd_aux_down.points = o3d.utility.Vector3dVector(np.asarray(point_cloud_down.points))
pcd_aux_down.colors = o3d.utility.Vector3dVector(np.asarray(point_cloud_down.colors))

o3d.visualization.draw_geometries([pcd_aux_up, pcd_aux_down])

### visualize point cloud aligned 

In [7]:
dataset = "hope"
object_id = 1
path_pcd_align = f"/home/sergio/onboarding_stage/data/{dataset}/obj_{object_id:06d}/align/sparse/0/points3D.ply"
pcd_align = o3d.io.read_point_cloud(path_pcd_align)

pcd_aux_align = o3d.geometry.PointCloud()
pcd_aux_align.points = o3d.utility.Vector3dVector(np.asarray(pcd_align.points))
pcd_aux_align.colors = o3d.utility.Vector3dVector(np.asarray(pcd_align.colors))

o3d.visualization.draw_geometries([pcd_aux_align])

### test poses 

In [3]:
dataset = "hope"
object_id = 1
path_pcd = f"/home/sergio/onboarding_stage/data/{dataset}/obj_{object_id:06d}/align/sparse/0/points3D.ply"
point_cloud = o3d.io.read_point_cloud(path_pcd)

pcd_aux = o3d.geometry.PointCloud()
pcd_aux.points = o3d.utility.Vector3dVector(np.asarray(point_cloud.points))
pcd_aux.colors = o3d.utility.Vector3dVector(np.asarray(point_cloud.colors))
center_pcd = pcd_aux.get_center()

path_poses = f"/home/sergio/onboarding_stage/data/{dataset}/obj_{object_id:06d}/align/sparse/0/images.txt"
poses = read_pose_matrices(path_poses)
poses_mat = [poses[idx]['pose'] for idx in range(len(poses))]
t_vecs = [pose_mat[:3, 3] for pose_mat in poses_mat]
t_vecs = np.vstack(t_vecs)

# pcd align up
pcd_cams_up = o3d.geometry.PointCloud()
pcd_cams_up.points = o3d.utility.Vector3dVector(t_vecs[:7])
pcd_cams_up.colors = o3d.utility.Vector3dVector(np.array([0, 255, 0] * 7).reshape(7, -1))
# pcd align down 
pcd_cams_down = o3d.geometry.PointCloud()
pcd_cams_down.points = o3d.utility.Vector3dVector(t_vecs[7:])
pcd_cams_down.colors = o3d.utility.Vector3dVector(np.array([0, 0, 255] * 7).reshape(7, -1))


o3d.visualization.draw_geometries([pcd_aux, pcd_cams_up, pcd_cams_down])

### color icp

In [29]:
def align_point_clouds(source_path, target_path, mode, threshold=0.02):
    # Load source and target point clouds
    source = o3d.io.read_point_cloud(source_path)
    target = o3d.io.read_point_cloud(target_path)

    # flip 180 degrees 
    R = np.array([[1,  0,  0],
                  [0, -1,  0],
                  [0,  0, -1]])
    centroid_src = source.get_center()
    source.translate(-centroid_src)
    source.rotate(R, center=(0, 0, 0))
    source.translate(centroid_src)
    # align centroids 
    centroid_trg = target.get_center()
    diff_centers = centroid_trg - centroid_src
    source.translate(diff_centers)

    if mode == "simple":
        result_icp = simple_icp(source, target, threshold)
    elif mode == "color":
        result_icp = multi_scale_color_icp(source, target)
    elif mode == "features":
        result_icp = multi_scale_color_icp_with_fpfh(source, target)
    else:
        return print("no aligning method recognized")

    # Transform source point cloud
    source.transform(result_icp.transformation)

    return source, target, result_icp.transformation, centroid_src, centroid_trg

def multi_scale_color_icp(source, target):
    voxel_radius = [2, 1, 0.5]
    max_iter = [1500, 1000, 250]
    current_transformation = np.identity(4)
    print("Colored point cloud registration")
    for scale in range(3):
        iter = max_iter[scale]
        radius = voxel_radius[scale]
        print([iter, radius, scale])

        print("Downsample with a voxel size %.2f" % radius)
        # source_down = source.voxel_down_sample(radius)
        # target_down = target.voxel_down_sample(radius)
        source_down = source
        target_down = target

        print("Estimate normal.")
        source_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
        target_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))

        print("Applying colored point cloud registration")
        result_icp = o3d.pipelines.registration.registration_colored_icp(
            source_down, target_down, radius, current_transformation,
            o3d.pipelines.registration.TransformationEstimationForColoredICP(),
            o3d.pipelines.registration.ICPConvergenceCriteria(relative_fitness=1e-7,
                                                            relative_rmse=1e-7,
                                                            max_iteration=iter))
        current_transformation = result_icp.transformation
        print(result_icp)

    return  result_icp

def simple_icp(source, target, threshold):
    # Perform ICP registration
    initial_transform = np.eye(4)
    result_icp = o3d.pipelines.registration.registration_icp(
        source,
        target,
        threshold,
        initial_transform,
        o3d.pipelines.registration.TransformationEstimationPointToPoint()
    )
    print(result_icp)
    return result_icp

def concatenate_point_clouds(source, target):
    # Load vertices and colors from source and target
    source_xyz = np.asarray(source.points)
    target_xyz = np.asarray(target.points)

    source_rgb = np.asarray(source.colors) * 255  # Convert to 0-255 scale
    target_rgb = np.asarray(target.colors) * 255

    source_normals = np.asarray(source.normals) if source.has_normals() else np.zeros_like(source_xyz)
    target_normals = np.asarray(target.normals) if target.has_normals() else np.zeros_like(target_xyz)

    # Concatenate points, colors, and normals
    combined_xyz = np.vstack((source_xyz, target_xyz))
    combined_rgb = np.vstack((source_rgb, target_rgb)).astype(np.uint8)
    combined_normals = np.vstack((source_normals, target_normals))

    return combined_xyz, combined_rgb, combined_normals

def save_combined_point_cloud(combined_xyz, combined_rgb, combined_normals, output_path):
    dtype = [('x', 'f4'), ('y', 'f4'), ('z', 'f4'),
             ('nx', 'f4'), ('ny', 'f4'), ('nz', 'f4'),
             ('red', 'u1'), ('green', 'u1'), ('blue', 'u1')]

    elements = np.empty(combined_xyz.shape[0], dtype=dtype)
    attributes = np.concatenate((combined_xyz, combined_normals, combined_rgb), axis=1)
    elements[:] = list(map(tuple, attributes))

    vertex_element = PlyElement.describe(elements, 'vertex')
    ply_data = PlyData([vertex_element])
    ply_data.write(output_path)

###################################################

def multi_scale_color_icp_with_fpfh(source, target):
    """
    Perform multi-scale color ICP with feature-based matching using FPFH features.

    Parameters:
        source: open3d.geometry.PointCloud - Source point cloud
        target: open3d.geometry.PointCloud - Target point cloud

    Returns:
        result_icp: RegistrationResult - The final registration result
    """
    voxel_radius = [1.5, 1, 0.8]
    max_iter = [600, 200, 100]
    current_transformation = np.identity(4)
    
    print("Extracting FPFH features...")
    source_fpfh = compute_fpfh_features(source, voxel_radius[0])
    target_fpfh = compute_fpfh_features(target, voxel_radius[0])

    print("Performing feature-based global registration...")
    distance_threshold = voxel_radius[0] * 1.5
    global_registration = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source, target, source_fpfh, target_fpfh, False, distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(),
        4,  # Number of RANSAC iterations
        [o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
         o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)],
        o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
    )

    current_transformation = global_registration.transformation
    # print("Initial transformation from global registration:")
    # print(current_transformation)

    print("Performing multi-scale colored ICP...")
    for scale in range(len(voxel_radius)):
        iter = max_iter[scale]
        radius = voxel_radius[scale]
        print(f"Scale {scale}: voxel size = {radius}, max iterations = {iter}")
        
        print("Downsampling point clouds...")
        source_down = source.voxel_down_sample(radius)
        target_down = target.voxel_down_sample(radius)

        print("Estimating normals...")
        source_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))
        target_down.estimate_normals(
            o3d.geometry.KDTreeSearchParamHybrid(radius=radius * 2, max_nn=30))

        print("Refining registration with colored ICP...")
        result_icp = o3d.pipelines.registration.registration_colored_icp(
            source_down, target_down, radius, current_transformation,
            o3d.pipelines.registration.TransformationEstimationForColoredICP(),
            o3d.pipelines.registration.ICPConvergenceCriteria(relative_fitness=1e-6,
                                                                relative_rmse=1e-6,
                                                                max_iteration=iter))
        current_transformation = result_icp.transformation

    # print("Final transformation:")
    # print(current_transformation)
    return result_icp

def compute_fpfh_features(point_cloud, voxel_size):
    """
    Compute FPFH features for a point cloud.
    
    Parameters:
        point_cloud: open3d.geometry.PointCloud - The input point cloud
        voxel_size: float - The voxel size for downsampling and feature extraction
    
    Returns:
        fpfh: open3d.pipelines.registration.Feature - The computed FPFH features
    """
    print("Downsampling for FPFH computation...")
    pcd_down = point_cloud.voxel_down_sample(voxel_size)

    print("Estimating normals...")
    pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(
        radius=voxel_size * 2, max_nn=30))

    print("Computing FPFH features...")
    fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100))
    return fpfh

source_path = "/home/sergio/onboarding_stage/data/hope/obj_000001/down/20_views/sparse/0/points3D.ply"
target_path = "/home/sergio/onboarding_stage/data/hope/obj_000001/up/20_views/sparse/0/points3D.ply"

source, target, transformation, centroid_source, centroid_target = align_point_clouds(
    source_path,
    target_path, mode="color" 
)

# Concatenate point clouds
combined_xyz, combined_rgb, combined_normals = concatenate_point_clouds(source, target)
save_combined_point_cloud(combined_xyz, combined_rgb, combined_normals, "./points3D.ply")

Colored point cloud registration
[1500, 2, 0]
Downsample with a voxel size 2.00
Estimate normal.
Applying colored point cloud registration
RegistrationResult with fitness=1.000000e+00, inlier_rmse=3.104685e-03, and correspondence_set size of 87687
Access transformation to get result.
[1000, 1, 1]
Downsample with a voxel size 1.00
Estimate normal.
Applying colored point cloud registration
RegistrationResult with fitness=1.000000e+00, inlier_rmse=3.104694e-03, and correspondence_set size of 87687
Access transformation to get result.
[250, 0.5, 2]
Downsample with a voxel size 0.50
Estimate normal.
Applying colored point cloud registration
RegistrationResult with fitness=1.000000e+00, inlier_rmse=3.105831e-03, and correspondence_set size of 87687
Access transformation to get result.
