In [1]:
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import math
import sys
import os
from sklearn.cluster import KMeans
import dlib
from imutils import face_utils
import open3d as o3d
import pyrealsense2 as rs
import scipy.ndimage
import copy
#import pptk
from skimage.color import rgb2hsv
from scipy import signal
import colorsys
from squaternion import Quaternion
import random
import time
import xlsxwriter 
from scipy.spatial.transform import Rotation as R
import glob
import matplotlib.image as mpimg
from scipy.io import savemat
import imutils
import pandas as pd  # Import pandas library
import ast
import re

In [1]:
def create_folder(folder_name):
    try:
        os.mkdir(folder_name)
        print(f"Folder '{folder_name}' created successfully.")
    except FileExistsError:
        print(f"Folder '{folder_name}' already exists.")
        

def sort_files_numerically(file):
    # Extract all the numbers from the filename
    numbers = [int(num) for num in re.findall(r'\d+', os.path.basename(file))]
    return numbers

In [None]:
def create_scaling_matrix(scale_factor):
    return np.array([
        [scale_factor, 0, 0, 0],
        [0, scale_factor, 0, 0],
        [0, 0, scale_factor, 0],
        [0, 0, 0, 1]
    ])

In [1]:
def get_camera_intrinsics(serial_number):
    pipeline = rs.pipeline()
    config = rs.config()
    config.enable_device(serial_number)
    config.enable_stream(rs.stream.color, 640, 480, rs.format.rgb8, 30)
    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    profile = pipeline.start(config)

    # Get color and depth stream profiles
    color_stream_profile = profile.get_stream(rs.stream.color)
    depth_stream_profile = profile.get_stream(rs.stream.depth)

    # Get color and depth intrinsics
    intrinsics_color = color_stream_profile.as_video_stream_profile().get_intrinsics()
    intrinsics_depth = depth_stream_profile.as_video_stream_profile().get_intrinsics()

    # Stop the pipeline
    pipeline.stop()
    
    print("\nIntrinsics for Color Stream:")
    print("Width:", intrinsics_color.width)
    print("Height:", intrinsics_color.height)
    print("Fx:", intrinsics_color.fx)
    print("Fy:", intrinsics_color.fy)
    print("Cx:", intrinsics_color.ppx)
    print("Cy:", intrinsics_color.ppy)

    print("\nIntrinsics for Depth Stream:")
    print("Width:", intrinsics_depth.width)
    print("Height:", intrinsics_depth.height)
    print("Fx:", intrinsics_depth.fx)
    print("Fy:", intrinsics_depth.fy)
    print("Cx:", intrinsics_depth.ppx)
    print("Cy:", intrinsics_depth.ppy)

    return intrinsics_color, intrinsics_depth

def resize_intrinsics(intrinsics, original_width, original_height, new_width, new_height):
    intrinsics_resized = rs.intrinsics()
    
    # Calculate scaling factors
    scale_x = new_width / original_width
    scale_y = new_height / original_height
    
    # Adjust intrinsic parameters
    intrinsics_resized.width = new_width
    intrinsics_resized.height = new_height
    intrinsics_resized.fx = intrinsics.fx * scale_x
    intrinsics_resized.fy = intrinsics.fy * scale_y
    intrinsics_resized.ppx = intrinsics.ppx * scale_x
    intrinsics_resized.ppy = intrinsics.ppy * scale_y
    
    return intrinsics_resized


def get_depth_to_color_extrinsics(serial_number): #for D435i (L515 no need this transformation)
    # Create a pipeline
    pipeline = rs.pipeline()
    config = rs.config()
    config.enable_device(serial_number)
    config.enable_stream(rs.stream.color, 640, 480, rs.format.rgb8, 30)
    config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
    profile = pipeline.start(config)

    # Get depth sensor extrinsics to color stream
    extrinsics = profile.get_stream(rs.stream.depth).get_extrinsics_to(profile.get_stream(rs.stream.color))

    # Stop the pipeline
    pipeline.stop()

    return extrinsics

def rt_to_T(rotation,translation):
    # Create a 3x3 rotation matrix
    rotation_matrix = np.array(rotation).reshape(3, 3)

    # Create a 4x4 transformation matrix
    transformation_matrix = np.eye(4)
    transformation_matrix[:3, :3] = rotation_matrix
    transformation_matrix[:3, 3] = translation
    return transformation_matrix

def flip_uptodown_matrix(rotation_angle):
    flip_updown = np.array([
        [np.cos(np.radians(rotation_angle)), 0, -np.sin(np.radians(rotation_angle)), 0],
        [0, 1, 0, 0],
        [np.sin(np.radians(rotation_angle)), 0, np.cos(np.radians(rotation_angle)), 0],
        [0, 0, 0, -1]
    ])
    return flip_updown



In [1]:
# #aligb rgb and depth to visualize in images (quite difficilt way selecting depth_scale)
def align_depth_to_color(depth_image, color_image, depth_intrinsics, color_intrinsics, depth_to_color_extrinsics, depthscale):
    aligned_depth = np.zeros_like(np.asarray(color_image))
    
    for y in range(depth_intrinsics.height):  
        for x in range(depth_intrinsics.width):
            depth_value = np.asarray(depth_image)[y, x]* depthscale
            if depth_value > 0:  # Skip invalid depth values
                depth_point = np.array([[(x - depth_intrinsics.ppx) * depth_value / depth_intrinsics.fx],
                                        [(y - depth_intrinsics.ppy) * depth_value / depth_intrinsics.fy],
                                        [depth_value],
                                        [1.0]])
                transformed_point = np.dot(depth_to_color_extrinsics, depth_point)
                projected_point = transformed_point[:3] / transformed_point[2]
                
                color_x = int(projected_point[0] * color_intrinsics.fx / projected_point[2] + color_intrinsics.ppx + 0.5)
                color_y = int(projected_point[1] * color_intrinsics.fy / projected_point[2] + color_intrinsics.ppy + 0.5)
                
                if 0 <= color_x < color_intrinsics.width and 0 <= color_y < color_intrinsics.height:
                    aligned_depth[y, x] = np.asarray(color_image)[color_y, color_x]
                    

    
    return aligned_depth

# for "Arisa_PCRegist_SIFT

**1.Generating a random initial alignment between two point clouds**

In [None]:
#Random initial alignment
def random_initial_alignment(source: o3d.geometry.PointCloud, translation_bound: float, rotation_bound: float) -> o3d.geometry.PointCloud:
    # Create a copy of the source to avoid modifying the original point cloud
    transformed_source = copy.deepcopy(source)

    # Generate random translation within bounds
    translation = np.random.uniform(-translation_bound, translation_bound, 3)

    # Generate random rotation within bounds
    rx = np.random.uniform(-rotation_bound, rotation_bound)
    ry = np.random.uniform(-rotation_bound, rotation_bound)
    rz = np.random.uniform(-rotation_bound, rotation_bound)
    rotation = o3d.geometry.get_rotation_matrix_from_xyz((rx, ry, rz))

    # Compute the transformation matrix
    transformation = np.eye(4)
    transformation[:3, :3] = rotation
    transformation[:3, 3] = translation

    # Apply transformation to the source point cloud
    transformed_source.transform(transformation)

    return transformed_source, transformation

def random_initial(source_point_cloud, target_point_cloud):
    # Calculate bounding boxes of both point clouds
    source_bbox = source_point_cloud.get_axis_aligned_bounding_box()
    target_bbox = target_point_cloud.get_axis_aligned_bounding_box()

    # Manually compute the diagonal length of bounding boxes
    source_diagonal_length = np.linalg.norm(np.asarray(source_bbox.max_bound) - np.asarray(source_bbox.min_bound))
    target_diagonal_length = np.linalg.norm(np.asarray(target_bbox.max_bound) - np.asarray(target_bbox.min_bound))

    # Determine translation bounds
    # Set the translation bounds as a fraction of the diagonal length of bounding boxes
    translation_bound = max(source_diagonal_length, target_diagonal_length) * 0.1  # 10% of the maximum diagonal length

    # Determine rotation bounds
    # Here, we use a small fixed rotation bound (10 degrees converted to radians)
    rotation_bound = np.radians(10)

    randinit_source, randinit_transformation = random_initial_alignment(source_point_cloud, translation_bound, rotation_bound)
#     print(f"translation_bound:{translation_bound}, rotation_bound:{rotation_bound}")
#     print("random initial transformation:\n",randinit_transformation)
    return randinit_source, randinit_transformation

# randinit_source, randinit_transformation = random_initial(source_point_cloud, target_point_cloud)
# o3d.visualization.draw_geometries([source_point_cloud, target_point_cloud], width=800, height=600)
# o3d.visualization.draw_geometries([randinit_source, target_point_cloud], width=800, height=600)

**2.Point cloud registration with SIFT**

In [None]:
#1. Keypoints extraction
#1.1 When point cloud has no colors then use FPFH features
def check_pc_has_color(pcd):
    # Check if the point cloud has colors
    if len(pcd.colors) > 0:
        # print("The point cloud has color information.")
        return 'Yes'
    else:
       #  print("The point cloud does not have color information.")
        return 'No'

def compute_fpfh_features(pcd, search_radius=0.25):
    #Extract non-colors point clouds
    pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=search_radius, max_nn=100))
    fpfh = o3d.registration.compute_fpfh_feature(pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=search_radius, max_nn=100))
    return fpfh

#1.2 When point cloud has colors then use SIFT features
def point_cloud_to_color_and_point_map(pcd, img_dim=200):  # 200 #How dim should be set?!
    # Convert point cloud data to numpy array
    points = np.asarray(pcd.points)
    colors = np.asarray(pcd.colors) * 255  # Scaling to 0-255 range

    # Normalize x, y to image dimensions
    normalized_points = (((points - points.min(axis=0)) / (points.max(axis=0) - points.min(axis=0))) * (img_dim - 1)).astype(int)
    img_coords = normalized_points[:, :2].astype(int)

    # Create color map and count map for averaging
    color_map = np.zeros((img_dim, img_dim, 3))
    count_map = np.zeros((img_dim, img_dim, 3))

    # Create point_map to store 3D coordinates
    point_map = np.zeros((img_dim, img_dim, 3))

    for img_coord, color, point in zip(img_coords, colors, points):
        # Check if the indices are within the valid range
        if 0 <= img_coord[1] < img_dim and 0 <= img_coord[0] < img_dim:
            color_map[img_coord[1], img_coord[0]] += color  # accumulate color values
            count_map[img_coord[1], img_coord[0]] += 1  # count the number of points per bin
            # For point map, just store the 3D point for now without averaging
            # This could be changed based on specific requirements
            point_map[img_coord[1], img_coord[0]] = point

    # Average the accumulated color values
    avg_mask = count_map > 0
    color_map[avg_mask] /= count_map[avg_mask]

    return color_map.astype(np.uint8), point_map


def extract_sift_feat(img):
    gray = cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_BGR2GRAY)
    sift = cv2.xfeatures2d.SIFT_create(contrastThreshold=0.08, edgeThreshold=0.2)
    keypoints, descriptors = sift.detectAndCompute(gray, None)
    return keypoints, descriptors

def draw_keypoints(img, keypoints, title='Keypoints'):
    img_with_keypoints = cv2.drawKeypoints(img, keypoints, outImage=np.array([]), color=(0, 255, 0))
    cv2.imshow(title, img_with_keypoints)

def project_keypoints_to_3D(keypoints, descriptors, point_map):
    keypoints_3d = []
    descriptors_3d = []
    for i, keypoint in enumerate(keypoints):
        x, y = np.round(keypoint.pt).astype(int)
        keypoints_3d.append(point_map[y, x])
        descriptors_3d.append(descriptors[i])
    return keypoints_3d, np.array(descriptors_3d)

def visualize_keypoints(point_cloud, keypoints_3d):
    # Convert keypoints to Open3D format
    keypoints_o3d = o3d.geometry.PointCloud()
    keypoints_o3d.points = o3d.utility.Vector3dVector(np.array(keypoints_3d))
    # Set the color of the keypoints to red
    keypoints_o3d.paint_uniform_color([1, 0, 0])
    # Visualize the point cloud and keypoints
#     o3d.visualization.draw_geometries([point_cloud, keypoints_o3d], width=800, height=600)

def find_good_matches(src_descriptors, tar_descriptors):
    try:
        # Check if descriptors are empty
        if src_descriptors is None or tar_descriptors is None:
            print("Descriptors are empty.")
            return []
        
        # Ensure descriptors are of type float32
        src_descriptors = src_descriptors.astype(np.float32)
        tar_descriptors = tar_descriptors.astype(np.float32)
        
        # BFMatcher with default params
        bf = cv2.BFMatcher()
        matches = bf.knnMatch(src_descriptors, tar_descriptors, k=2)
        
        # Apply ratio test
        good_matches = []
        for match in matches:
            if len(match) >= 2:  # Ensure there are at least two matches to unpack
                m, n = match
                if m.distance < 0.75 * n.distance:
                    good_matches.append(m)
            else:
                print("Not enough matches found to apply ratio test.")
                
        return good_matches
        
    except cv2.error as e:
        print(f"An OpenCV error occurred: {e}")
        return []
    
def extract_matched_keypoints_descriptors(matches, src_keypoints_3d, tar_keypoints_3d, src_descriptors, tar_descriptors):
    matched_src_keypoints = [src_keypoints_3d[m.queryIdx] for m in matches]
    matched_tar_keypoints = [tar_keypoints_3d[m.trainIdx] for m in matches]
    
    matched_src_descriptors = [src_descriptors[m.queryIdx] for m in matches]
    matched_tar_descriptors = [tar_descriptors[m.trainIdx] for m in matches]
    
    # Converting lists of keypoints to numpy arrays
    matched_src_keypoints = np.array(matched_src_keypoints)
    matched_tar_keypoints = np.array(matched_tar_keypoints)
    
    return matched_src_keypoints, matched_tar_keypoints, matched_src_descriptors, matched_tar_descriptors

#2. Matching Keypoints
def apply_ransac(src_keypoints_3d, tar_keypoints_3d, src_descriptors, tar_descriptors):
    try:
        if len(src_keypoints_3d) == 0:
            raise ValueError("src_keypoints_3d is empty.")

        
        src_pcd = o3d.geometry.PointCloud()
        src_pcd.points = o3d.utility.Vector3dVector(np.array(src_keypoints_3d))

        tar_pcd = o3d.geometry.PointCloud()
        tar_pcd.points = o3d.utility.Vector3dVector(np.array(tar_keypoints_3d))

        # Ensuring the descriptors are numpy arrays
        src_descriptors = np.array(src_descriptors)
        tar_descriptors = np.array(tar_descriptors)

        # Creating Open3D Feature objects from the descriptors
        src_feature = o3d.registration.Feature()
        src_feature.data = src_descriptors.T  # Transpose due to Open3D's [dim, num_points] format

        tar_feature = o3d.registration.Feature()
        tar_feature.data = tar_descriptors.T

        # Applying RANSAC based on feature matching
        result = o3d.registration.registration_ransac_based_on_feature_matching(
            source=src_pcd,
            target=tar_pcd,
            source_feature=src_feature,
            target_feature=tar_feature,
            max_correspondence_distance=0.03,
            estimation_method=o3d.registration.TransformationEstimationPointToPoint(False)
        )
        return result.transformation
    except ValueError as e:
        print(f"An error occurred: {e}")
        

**3.Evaluation**

In [None]:
def compute_rmse(transformed_src, tar):
    distances = transformed_src.compute_point_cloud_distance(tar)
    rmse = np.sqrt(np.mean(np.square(distances)))
    return rmse

def compute_differences(transformed_src, tar):
    # Find nearest neighbors
    pcd_tree = o3d.geometry.KDTreeFlann(tar)
    src_colors = np.asarray(transformed_src.colors)
    
    color_differences = []
    position_differences = []
    
    for i, point in enumerate(transformed_src.points):
        _, idx, _ = pcd_tree.search_knn_vector_3d(point, 1)
        
        # Calculate the color differences
        src_color = np.asarray(transformed_src.colors)[i]
        tar_color = np.asarray(tar.colors)[idx[0]]
        color_difference = np.linalg.norm(src_color - tar_color)
        color_differences.append(color_difference)
        
        # Calculate the position differences
        tar_point = np.asarray(tar.points)[idx[0]]
        position_difference = np.linalg.norm(np.array(point) - np.array(tar_point))
        position_differences.append(position_difference)
    
    return np.mean(color_differences), np.mean(position_differences)