In [1]:
import open3d as o3d
import numpy as np
from copy import deepcopy
from pathlib import Path
from typing import List, Tuple


def load_capture_sequence(
    dir_path: Path,
    num_captures: int = None,
) -> Tuple[List[o3d.geometry.PointCloud], List[np.ndarray]]:
    pcd_files = sorted(dir_path.glob("*.ply"))
    pose_files = sorted(dir_path.glob("*.txt"))

    if num_captures:
        pcd_files = pcd_files[:num_captures]
        pose_files = pose_files[:num_captures]

    pcds = [o3d.io.read_point_cloud(str(pcd_file)) for pcd_file in pcd_files]
    poses = [np.loadtxt(str(pose_file)) for pose_file in pose_files]

    if len(pcds) == 0:
        raise Exception("No point clouds found")
    if len(poses) == 0:
        raise Exception("No poses found")
    assert len(pcds) == len(poses)

    print(f"Loaded {len(pcds)} point clouds and {len(poses)} poses")

    return pcds, poses


def multiway_pt2pl_registration(
    pcds: List[o3d.geometry.PointCloud],
    reg_max_correspondence_fine: float,
    global_optimization_max_correspondence: float,
    reg_max_correspondence_coarse: float = None,
) -> o3d.pipelines.registration.PoseGraph:
    pose_graph = o3d.pipelines.registration.PoseGraph()
    odometry = np.identity(4)
    pose_graph.nodes.append(o3d.pipelines.registration.PoseGraphNode(odometry))
    n_pcds = len(pcds)

    for source_id in range(n_pcds):
        for target_id in range(source_id + 1, n_pcds):
            if reg_max_correspondence_coarse is not None:
                icp_coarse = o3d.pipelines.registration.registration_icp(
                    source=pcds[source_id],
                    target=pcds[target_id],
                    max_correspondence_distance=reg_max_correspondence_coarse,
                    init=np.identity(4),
                )
            transformation_icp = o3d.pipelines.registration.registration_icp(
                source=pcds[source_id],
                target=pcds[target_id],
                max_correspondence_distance=reg_max_correspondence_fine,
                init=icp_coarse.transformation,
                estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane(),
            ).transformation

            information_icp = o3d.pipelines.registration.get_information_matrix_from_point_clouds(
                source=pcds[source_id],
                target=pcds[target_id],
                max_correspondence_distance=reg_max_correspondence_fine,
                transformation=transformation_icp,
            )
            if target_id == source_id + 1:  # odometry case
                odometry = np.dot(transformation_icp, odometry)
                pose_graph.nodes.append(
                    o3d.pipelines.registration.PoseGraphNode(np.linalg.inv(odometry))
                )
                pose_graph.edges.append(
                    o3d.pipelines.registration.PoseGraphEdge(
                        source_id, target_id, transformation_icp, information_icp, uncertain=False
                    )
                )
            else:  # loop closure case
                pose_graph.edges.append(
                    o3d.pipelines.registration.PoseGraphEdge(
                        source_id, target_id, transformation_icp, information_icp, uncertain=True
                    )
                )
    option = o3d.pipelines.registration.GlobalOptimizationOption(
        max_correspondence_distance=global_optimization_max_correspondence,
        edge_prune_threshold=0.25,
        reference_node=0,
    )

    o3d.pipelines.registration.global_optimization(
        pose_graph,
        o3d.pipelines.registration.GlobalOptimizationLevenbergMarquardt(),
        o3d.pipelines.registration.GlobalOptimizationConvergenceCriteria(),
        option,
    )

    return pose_graph


def downsampled_multiway_pt2pl_registration(
    pcds: List[o3d.geometry.PointCloud],
    voxel_size: float = 0.002,
    reg_dist_max_corr_coarse: float = 0.004,
    reg_dist_max_corr_fine: float = 0.002,
    global_optimization_max_correspondence: float = 0.002,
    visualize: bool = False,
) -> List[o3d.geometry.PointCloud]:
    down_pcds = []
    for i in range(len(pcds)):
        down_pcds.append(deepcopy(pcds[i]).voxel_down_sample(voxel_size=voxel_size))

    if visualize:
        o3d.visualization.draw_geometries(down_pcds, window_name="downsampled for registration")

    pose_graph = multiway_pt2pl_registration(
        down_pcds,
        reg_dist_max_corr_coarse,
        reg_dist_max_corr_fine,
        global_optimization_max_correspondence,
    )

    for point_id in range(len(pcds)):
        print(pose_graph.nodes[point_id].pose)
        pcds[point_id].transform(pose_graph.nodes[point_id].pose)

    if visualize:
        o3d.visualization.draw_geometries(pcds, window_name="registration")

    return pcds


def merge_pcds(point_clouds: List[o3d.geometry.PointCloud]) -> o3d.geometry.PointCloud:
    points = []
    colors = []
    normals = []

    for pcd in point_clouds:
        points.append(np.asarray(pcd.points))

        if pcd.has_colors():
            colors.append(np.asarray(pcd.colors))

        if pcd.has_normals():
            normals.append(np.asarray(pcd.normals))

    merged_pcd = o3d.geometry.PointCloud()
    merged_pcd.points = o3d.utility.Vector3dVector(np.vstack(points))

    if colors:
        merged_pcd.colors = o3d.utility.Vector3dVector(np.vstack(colors))

    if normals:
        merged_pcd.normals = o3d.utility.Vector3dVector(np.vstack(normals))

    return merged_pcd

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [4]:
# dir_path = Path("/home/v/capture/log2")
dir_path = Path("/home/v/capture/env")

In [5]:
with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Debug):
    pcds, tfs = load_capture_sequence(dir_path)

    pcds = [pcd.remove_non_finite_points() for pcd in pcds]
    pcds = [pcd.transform(tf) for pcd, tf in zip(pcds, tfs)]

    marker = o3d.geometry.TriangleMesh.create_coordinate_frame(size=0.1)

    o3d.visualization.draw_geometries(pcds + [marker], window_name="robot coordinate space")

    pcds = [pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0)[0] for pcd in pcds]

    o3d.visualization.draw_geometries(pcds, window_name="outlier removal")

    for pcd in pcds:
        pcd.estimate_normals()

    voxel_size = 0.01
    pcds = downsampled_multiway_pt2pl_registration(
        pcds,
        voxel_size=voxel_size,
        reg_dist_max_corr_coarse=voxel_size * 2,
        reg_dist_max_corr_fine=voxel_size,
        global_optimization_max_correspondence=voxel_size,
        visualize=True,
    )

    merged_pcd = merge_pcds(pcds).voxel_down_sample(voxel_size=0.005)

    o3d.visualization.draw_geometries([merged_pcd], window_name="merged point cloud")

[Open3D DEBUG] Format auto File /home/v/capture/env/0_2023_09_12__15_21_04_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/10_2023_09_12__15_24_00_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/1_2023_09_12__15_21_21_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/2_2023_09_12__15_21_37_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/3_2023_09_12__15_21_54_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/4_2023_09_12__15_22_13_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices.
[Open3D DEBUG] Format auto File /home/v/capture/env/5_2023_09_12__15_22_28_pcd.ply
[Open3D DEBUG] Read geometry::PointCloud: 3145728 vertices