In [21]:
import open3d as o3d
import numpy as np
import tqdm
import pyvista as pv
import pye57
import laspy
import os

In [22]:
# Functions

def preprocess_point_cloud(pcd, voxel_size):
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh

def prepare_dataset(source, target,voxel_size):

    trans_init = np.asarray([[0.0, 0.0, 1.0, 0.0], [1.0, 0.0, 0.0, 0.0],
                             [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]])
    source.transform(trans_init)

    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
    return source, target, source_down, target_down, source_fpfh, target_fpfh

def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size):
    distance_threshold = voxel_size * 1
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999))
    return result

def estimate_std(pointCloud, distance_threshold):
    pcd = np_point_cloud2_pcd(pointCloud)
    _, inliers = pcd.segment_plane(
        distance_threshold=distance_threshold, ransac_n=5, num_iterations=1000
    )
    pcd_inliers = pcd.select_by_index(inliers)
    dists = pcd.compute_point_cloud_distance(pcd_inliers)
    dists = np.asarray(dists)
    return 1 - np.std(dists)

def request_points(data, reference_data, buff):
    # data = data.reshape(-1,6)
    noise_point = 1
    # print(data.shape)
    points = data[
        np.where(
            (data[:, 0] < reference_data[0] + buff)
            & (data[:, 0] > reference_data[0] - buff)
            & (data[:, 1] < reference_data[1] + buff)
            & (data[:, 1] > reference_data[1] - buff)
            & (data[:, 2] < reference_data[2] + buff)
            & (data[:, 2] > reference_data[2] - buff)
        ),
        :,
    ]
    points = points.reshape(-1, 6)
    if points.shape[0] > 10:
        noise_point = estimate_std(points, buff * 0.01)
    return points, noise_point

def compute_rmse(lst_PC,lst_acc,voxel_size):
    PC_list = lst_PC.copy()
    target =  np_point_cloud2_pcd(PC_list.pop(lst_acc.index(min(lst_acc))))
    rmse = []
    lst_pcs = []
    PC_list = lst_PC.copy()
    for pc in PC_list:
        source =  np_point_cloud2_pcd(pc)

        source, target, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(source, target,voxel_size)
        result_ransac = execute_global_registration(source_down, target_down,
                                                    source_fpfh, target_fpfh,
                                                    voxel_size)
        rmse.append(result_ransac.inlier_rmse)
        lst_pcs.append( pcd2np_point_cloud(source.transform(result_ransac.transformation)))
        # print(result_ransac.inlier_rmse)
    return lst_pcs, np.array(rmse)


def check_points_distribution(selected_point, reference_point, threshold, voxel_size):
    selected_point = selected_point.reshape(-1, 6)
    if selected_point.shape[0] > 1:
        return np.linalg.norm(
            np.mean(selected_point, axis=0)[:3] - reference_point[:3]
        ) > (threshold * voxel_size)
    else:
        return False
    

def adjusted_voxel2(ds_sub_reference,ds_reference_pnt,super_voxel_points, g_weights,buff,threshold,point_cont):
    # point_cont = np.zeros(len(super_voxel_points))
    fused_sub_point = np.zeros((1,6))
    sub_voxel_points,_ =  request_points(ds_sub_reference,ds_reference_pnt,buff)
    for i in range(sub_voxel_points.shape[0]):
        sub_vox_pnt = []
        l_weights = []
        for data in range(len(super_voxel_points)):
            points, noise =  request_points(super_voxel_points[data],sub_voxel_points[i],buff*0.5)
            weight_l = points.shape[0]*noise
            l_weights.append(weight_l)
            sub_vox_pnt.append(points)
        weights = np.array(np.array(g_weights)*np.array(l_weights))
        weights = weights/np.sum(weights)
        point = sub_vox_pnt[np.argmax(weights)]
        # print(np.argmax(weights))
        point_cont[np.argmax(weights)] = point_cont[np.argmax(weights)]+1

        if  check_points_distribution(point,sub_voxel_points[i],threshold,buff):
            point = np.vstack(sub_vox_pnt)
            point_cont = point_cont+1
        fused_sub_point = np.concatenate((fused_sub_point,point.reshape(-1,6)),axis=0)
    return fused_sub_point[1:,:],point_cont


def weighted_fusion_filter(data_list,g_weight,reference_pc,sub_reference_pc,voxel_size,k_1,k_2, threshold):
    point_cont = np.zeros(len(data_list))
    fused_points = np.zeros((1,6))
    buff = (voxel_size+voxel_size*0.1)/2
    for i in tqdm.tqdm(range(reference_pc.shape[0])):
    # for i in range(6):
        points = []
        l_weights = []
        for data in range(len(data_list)):
            pnts,noise_point =  request_points(data_list[data],reference_pc[i],buff)
            points.append(pnts)
            weight = (pnts.shape[0]/(voxel_size*10)**3)*noise_point
            l_weights.append(weight)            
        l_weights = l_weights/np.sum(l_weights)
        weights = (k_1* np.array(g_weight)) + (k_2*np.array(l_weights))
        weights = weights/np.sum(weights)  
        point = points[np.argmax(weights)]
        

        if  check_points_distribution(point,reference_pc[i],threshold,voxel_size):
            point, point_cont_ = adjusted_voxel2(sub_reference_pc,reference_pc[i],points,g_weight,buff,0.5*threshold,point_cont)
        else:
            point_cont[np.argmax(weights)] = point_cont[np.argmax(weights)] + 1
    
        # point =  opend3d_outlier(point,option="statistical")
        fused_points = np.concatenate((fused_points,point.reshape(-1,6)),axis=0)
    return fused_points[1:,:],point_cont_

def load_PC_acc():
    pc_path = str(input("Enter point cloud directory: "))
    accuracy = float(input("Enter the accuracy of the sensor system in meters: "))
    print("Loading point cloud")
    np_pointCloud =  load_pointCloud(pc_path)
    print("Done.")
    return np_pointCloud, accuracy,pc_path

def compute_fusion_accuracy(point_cont,lst_acc):
    lst_contrib = point_cont/np.sum(point_cont)
    acc_contrib = lst_contrib * (np.array(lst_acc)**2)
    # acc_contrib = acc_contrib**2
    fusion_acc = np.sqrt(np.sum(acc_contrib))
    return lst_contrib,fusion_acc

def np_point_cloud2_pcd(point_cloud):
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(point_cloud[:, :3])
    pcd.colors = o3d.utility.Vector3dVector(point_cloud[:, 3:6])
    pcd.estimate_normals(fast_normal_computation=False)
    return pcd

def compute_surface_area(pcd, voxel_size):
    radii = [voxel_size, voxel_size * 2, voxel_size * 4, voxel_size * 5]
    rec_mesh = o3d.geometry.TriangleMesh.create_from_point_cloud_ball_pivoting(
        pcd, o3d.utility.DoubleVector(radii)
    )
    return rec_mesh.get_surface_area()

def compute_coverage(reference_pcd, pcd, voxel_size):
    ds_ref_pcd = reference_pcd.voxel_down_sample(voxel_size=voxel_size)
    ds_pcd = pcd.voxel_down_sample(voxel_size=voxel_size)
    coverage = compute_surface_area(ds_pcd, voxel_size) / compute_surface_area(
        ds_ref_pcd, voxel_size
    )
    return coverage

def weight_compute(reference_pcd, pcd, accuracy, voxel_size):
    completeness = compute_coverage(reference_pcd, pcd, voxel_size)
    weight = accuracy / completeness
    return weight

def pcd2np_point_cloud(pcd):
    pc_down_point = np.asarray(pcd.points)
    pc_down_color = np.asarray(pcd.colors)
    np_point_Cloud = np.concatenate((pc_down_point, pc_down_color), axis=1)
    return np_point_Cloud

def compute_weights(reference_pc, pc_list, accuracy, voxel_size):
    weights = []
    reference_pcd = np_point_cloud2_pcd(reference_pc)
    for data in range(len(pc_list)):
        data_pcd = np_point_cloud2_pcd(pc_list[data])
        weights.append(
            weight_compute(reference_pcd, data_pcd, accuracy[data], voxel_size * 2)
        )
        data_pcd = []
    if len(weights) > 1:
        weights = np.sum(weights) - weights

    weights_ = weights / np.sum(weights)
    lst_PCs, rmse= compute_rmse(pc_list,accuracy,voxel_size) 
    g_weight = weights/(1-rmse)
    return g_weight


###############
# Point cloid IO

def load_pointCloud(pc_path):
    pc_ext = pc_path.split(".")[-1]
    if pc_ext == "e57":
        np_pointCloud = load_pye57_file(pc_path)
    elif pc_ext == "las" or pc_ext == "laz":
        np_pointCloud = load_las_file(pc_path)
    elif pc_ext == "ply":
        np_pointCloud = load_ply_file(pc_path)
    else:
        print(
            "ERROR: Check the file extension of the file. \n Only accepts *.ply, *.las/*.laz and *.e57En"
        )
    return np_pointCloud

def load_las_file(path, intensity=False):
    with laspy.open(path) as las_path:
        las = las_path.read()

    """ 
    convert laspy.lasdata.LasData into a np array
    """

    points = np.stack((np.asarray(las.x), np.asarray(las.y), np.asarray(las.z)), axis=1)
    try:
        colors = np.stack(
            (np.asarray(las.red), np.asarray(las.green), np.asarray(las.blue)), axis=1
        )
        if intensity:
            intens = np.asarray(las.intensity)
            pc_np = np.concatenate((points, colors, intens), axis=1)
        pc_np = np.concatenate((points, colors), axis=1)
        return pc_np
    except KeyError:
        return points
    

def load_ply_file(path, color=True, estimate_normals=True):
    pcd = o3d.io.read_point_cloud(path)
    points = pcd.points
    point_cloud = np.array(points)
    if color:
        colors = pcd.colors
        point_cloud = np.concatenate((points, colors), axis=1)

    if estimate_normals:
        pcd.estimate_normals(fast_normal_computation=False)
        normals = pcd.normals
        point_cloud = np.concatenate((points, colors, normals), axis=1)
    return point_cloud


def load_pye57_file(path, intensity=False):
    """
    input:
        path (String): directory to file
        intensity (Boolean): intensity measurement from pointcloud

    output:
        pointcloud (numpy array): point cloud in a numpy array
    """
    e57 = pye57.E57(path)
    data = e57.read_scan(
        0, colors=True, intensity=intensity, ignore_missing_fields=True
    )

    points = np.array([data["cartesianX"], data["cartesianY"], data["cartesianZ"]]).T
    colors = np.array([data["colorRed"], data["colorGreen"], data["colorBlue"]]).T / 255

    # return a numpy array
    if intensity:
        intensity = np.array([data["intensity"]]).T
        point_cloud = np.concatenate((points, colors, intensity), axis=1)

    point_cloud = np.concatenate((points, colors), axis=1)
    return point_cloud


def write_las_file(pc_np, las_path):
    header = laspy.LasHeader(point_format=2)
    xmin = np.floor(np.min(pc_np[:, 0]))
    ymin = np.floor(np.min(pc_np[:, 1]))
    zmin = np.floor(np.min(pc_np[:, 2]))
    header.offset = [xmin, ymin, zmin]
    header.scale = [0.001, 0.001, 0.001]

    las = laspy.LasData(header)
    las.x = pc_np[:, 0]
    las.y = pc_np[:, 1]
    las.z = pc_np[:, 2]
    las.red = pc_np[:, 3] * 255
    las.green = pc_np[:, 4] * 255
    las.blue = pc_np[:, 5] * 255

    if pc_np.shape[1] == 7:
        las.intensity = pc_np[:, 6]

    las.write(las_path)


In [23]:
def viz_pyvista(list_of_pointcloud:list,save_img:bool =False,**kwargs):
    filename = kwargs.get("filename","filename.png")
    # point_size = kwargs.get("point_size","filename.png")

    point_size =1

    pl = pv.Plotter()
    pl.background_color = 'w' 

    sargs = dict(
            title_font_size=20,
            label_font_size=16,
            shadow=True,
            n_labels=3,
            italic=True,
            fmt="%.1f",
            font_family="arial",
            height=0.3, vertical=True, position_x=0.75, position_y=0.2
        )

    for pc in list_of_pointcloud:
        points = pc[:,:3]
        if pc.shape[1]>4:
            # rgba = pc[:,3:]/np.max(pc[:,3:],axis=0)
            rgba = pc[:,3:]
            pl.add_points(points,scalars=rgba,rgba=True,point_size=point_size,render_points_as_spheres=True,smooth_shading=True,)

        elif pc.shape[1]==4:
            scalars = pc[:, -1]
            pl.add_points(points, cmap="Spectral", scalars=scalars, point_size=5, scalar_bar_args=sargs)
            # pl.add_scalar_bar("Coverage Score",fmt='%10.5f',label_font_size=30,)
        else:
            rgba = np.ones(pc[:,:3].shape) 
            point_size = 5
            pl.add_points(points,scalars=rgba,rgba=True,point_size=point_size,render_points_as_spheres=True,smooth_shading=True,)


    if save_img:
        # pl.camera.zoom(1.5)
        pl.screenshot(filename)

In [27]:
# Mian Paramss
voxel_size = 0.05 
k_1 = 1
k_2 = 1
threshold = 0.01

In [None]:
# Load Point Clouds and prepocess data 
PC1 = "bun000_Cloud.las"
PC2 = "bun045_Cloud.las"
output_path = ""

lst_PC = [load_pointCloud(PC1),load_pointCloud(PC2)]
lst_acc = [0.01,0.01]
reference_pc = np.vstack(lst_PC)

ds_reference = np_point_cloud2_pcd(reference_pc).voxel_down_sample(voxel_size = voxel_size)
ds_sub_reference = np_point_cloud2_pcd(reference_pc).voxel_down_sample(voxel_size = voxel_size/2)
ds_reference = pcd2np_point_cloud(ds_reference)
ds_sub_reference =  pcd2np_point_cloud(ds_sub_reference)
print("Number of points in search space (voxels): ", ds_reference.shape[0])

Number of points in search space (voxels):  25


In [None]:
# Main Code

weights = compute_weights(reference_pc,lst_PC,lst_acc,voxel_size=voxel_size)
print("Computing point cloud weights .... Done")

fused_PC_1,point_contrib = weighted_fusion_filter(lst_PC,weights,ds_reference,ds_sub_reference,voxel_size,k_1,k_2, threshold)
fused_PC_1 = np.unique(fused_PC_1,axis=0)
write_las_file(fused_PC_1,os.path.join(output_path,"weighted_fused_filtered_{}_cm_vox.las".format(int(voxel_size*100))))
fused_PC_1 = []
point_contrib = []

Computing point cloud weights .... Done


100%|██████████| 25/25 [00:13<00:00,  1.84it/s]
