In [36]:
from PIL import Image
from direct.preprocessor import Preprocessor, pose_inv, SceneData
import numpy as np
import open3d as o3d
import json

dir = "experiments/scissor"

demo_head_rgb = np.array(Image.open("../{0}/demo_head_rgb.png".format(dir)))
demo_head_depth = np.array(Image.open("../{0}/demo_head_depth.png".format(dir)))
demo_head_mask = np.array(Image.open("../{0}/demo_head_seg.png".format(dir)))

demo_wrist_rgb = np.array(Image.open("../{0}/live_d415_rgb.png".format(dir)))
demo_wrist_depth = np.array(Image.open("../{0}/live_d415_depth.png".format(dir)))
demo_wrist_mask = np.array(Image.open("../{0}/live_d415_seg.png".format(dir)))

intrinsics_d415 = np.load("../handeye/intrinsics_d415.npy")

data = SceneData(
    image_0=demo_head_rgb,
    image_1=demo_wrist_rgb,
    depth_0=demo_head_depth,
    depth_1=demo_wrist_depth,
    seg_0=demo_head_mask,
    seg_1=demo_wrist_mask,
    intrinsics_0=intrinsics_d415,
    intrinsics_1=intrinsics_d415,
    T_WC=np.eye(4) # cam frame
)

processor = Preprocessor()
data.update(processor(data))

In [11]:
def draw_registration_result(source, target, transformation):
    source_temp = source.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target])

pcd0 = o3d.geometry.PointCloud()
pcd1 = o3d.geometry.PointCloud()
pcd0.points = o3d.utility.Vector3dVector(data["pc0"][:, :3])
pcd1.points = o3d.utility.Vector3dVector(data["pc1"][:, :3])

draw_registration_result(pcd0, pcd1, mean_transform)



In [37]:
import numpy as np
import open3d as o3d
from scipy.spatial.transform import Rotation as R
import traceback
import time

def draw_registration_result(source, target, transformation):
    source_temp = source.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target])

def preprocess_point_cloud(pcd, voxel_size):
    try:
        pcd_down = pcd.voxel_down_sample(voxel_size)
        pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30))
        return pcd_down
    except Exception as e:
        print(f"Error in preprocess_point_cloud: {e}")
        return None

def compute_fpfh_features(pcd, voxel_size):
    try:
        radius_feature = voxel_size * 5
        return o3d.pipelines.registration.compute_fpfh_feature(
            pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    except Exception as e:
        print(f"Error in compute_fpfh_features: {e}")
        return None

def execute_fast_global_registration(source, target, source_fpfh, target_fpfh, voxel_size):
    try:
        distance_threshold = voxel_size * 0.5
        result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching(
            source, target, source_fpfh, target_fpfh,
            o3d.pipelines.registration.FastGlobalRegistrationOption(
                maximum_correspondence_distance=distance_threshold))
        return result
    except Exception as e:
        print(f"Error in execute_fast_global_registration: {e}")
        return None

def extract_translation_rotation(transformation):
    translation = transformation[:3, 3]
    rotation = transformation[:3, :3]
    return translation, rotation

def transformations_to_vector(transformations):
    vectors = []
    for transformation in transformations:
        translation, rotation = extract_translation_rotation(transformation)
        rotation_quat = R.from_matrix(rotation).as_quat()
        vector = np.concatenate((translation, rotation_quat))
        vectors.append(vector)
    return np.array(vectors)

def compute_mean_transformation(transformations):
    vectors = transformations_to_vector(transformations)
    
    # Compute mean translation
    mean_translation = np.mean(vectors[:, :3], axis=0)
    
    # Compute mean rotation using quaternion averaging
    mean_rotation_quat = np.mean(vectors[:, 3:], axis=0)
    mean_rotation_quat /= np.linalg.norm(mean_rotation_quat)
    mean_rotation = R.from_quat(mean_rotation_quat).as_matrix()
    
    mean_transformation = np.eye(4)
    mean_transformation[:3, :3] = mean_rotation
    mean_transformation[:3, 3] = mean_translation
    
    return mean_transformation

def compute_covariance_matrix(transformations):
    vectors = transformations_to_vector(transformations)
    mean_vector = np.mean(vectors, axis=0)
    centered_vectors = vectors - mean_vector
    covariance_matrix = np.cov(centered_vectors, rowvar=False)
    return covariance_matrix

def estimate_registration_uncertainty(source, target, transformation, num_iterations=1000):
    try:
        transformations = []
        for _ in range(num_iterations):
            source_sample = source.random_down_sample(0.8)
            target_sample = target.random_down_sample(0.8)
            
            result = o3d.pipelines.registration.registration_icp(
                source_sample, target_sample, 0.01, transformation,
                o3d.pipelines.registration.TransformationEstimationPointToPoint())
            
            transformations.append(result.transformation)
        
        transformations = np.array(transformations)

        mean_transformation = compute_mean_transformation(transformations)
        covariance_matrix = compute_covariance_matrix(transformations)
        
        return mean_transformation, covariance_matrix
    
    except Exception as e:
        print(f"Error in estimate_registration_uncertainty: {e}")
        return None, None

def decompose_covariance_matrix(covariance_matrix):
    # Ensure the covariance matrix is of the expected size (7x7)
    if covariance_matrix.shape != (7, 7):
        raise ValueError("Covariance matrix should be of shape (7, 7)")

    # Extract the translation and quaternion covariance submatrices
    translation_covariance = covariance_matrix[:3, :3]
    quaternion_covariance = covariance_matrix[3:, 3:]

    # Compute standard deviations (square roots of diagonal elements)
    translation_std = np.sqrt(np.diag(translation_covariance))
    quaternion_std = np.sqrt(np.diag(quaternion_covariance))

    return translation_std, quaternion_std

def main(data, confidence_threshold=0.7):
    try:
        pcd0 = o3d.geometry.PointCloud()
        pcd1 = o3d.geometry.PointCloud()
        pcd0.points = o3d.utility.Vector3dVector(data["pc0"][:, :3])
        pcd1.points = o3d.utility.Vector3dVector(data["pc1"][:, :3])

        voxel_size = 0.005
        source_down = preprocess_point_cloud(pcd0, voxel_size)
        target_down = preprocess_point_cloud(pcd1, voxel_size)

        if source_down is None or target_down is None:
            raise ValueError("Preprocessing failed")

        source_fpfh = compute_fpfh_features(source_down, voxel_size)
        target_fpfh = compute_fpfh_features(target_down, voxel_size)

        if source_fpfh is None or target_fpfh is None:
            raise ValueError("Feature computation failed")

        result_fgr = execute_fast_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size)

        if result_fgr is None:
            raise ValueError("Fast Global Registration failed")

        result_icp = o3d.pipelines.registration.registration_icp(
            source_down, target_down, voxel_size, result_fgr.transformation,
            o3d.pipelines.registration.TransformationEstimationPointToPlane())

        mean_transformation, covariance_matrix = estimate_registration_uncertainty(
            source_down, target_down, result_icp.transformation)

        if mean_transformation is None or covariance_matrix is None:
            raise ValueError("Uncertainty estimation failed")

        return mean_transformation, covariance_matrix

    except Exception as e:
        print(f"An error occurred in main: {e}")
        traceback.print_exc()
        return None, None

if __name__ == "__main__":
    try:
        start = time.time()
        result = main(data)
        print(time.time() - start)
        if result[0] is not None:
            mean_transformation, covariance_matrix = result
            print("Mean Transformation Matrix:\n", mean_transformation)
            translation_std, quaternion_std = decompose_covariance_matrix(covariance_matrix)
            print("Translation Standard Deviation:", translation_std)
            print("Quaternion Standard Deviation:", quaternion_std)
            pcd0 = o3d.geometry.PointCloud()
            pcd1 = o3d.geometry.PointCloud()
            pcd0.points = o3d.utility.Vector3dVector(data["pc0"][:, :3])
            pcd1.points = o3d.utility.Vector3dVector(data["pc1"][:, :3])

            draw_registration_result(pcd0, pcd1, mean_transformation)
        else:
            print("Registration failed. Unable to estimate pose and uncertainties.")
    except Exception as e:
        print(f"An error occurred: {e}")
        traceback.print_exc()

5.809966087341309
Mean Transformation Matrix:
 [[ 0.96500478 -0.1454336   0.21820828 -0.26366997]
 [ 0.14357181  0.98933779  0.02445126 -0.02056536]
 [-0.21943773  0.00773298  0.97559586  0.02839094]
 [ 0.          0.          0.          1.        ]]
Translation Standard Deviation: [0.00465381 0.00413093 0.00096257]
Quaternion Standard Deviation: [0.00305934 0.00321561 0.00105461 0.00038684]
