In [6]:
import os
import os.path as osp
import sys

import numpy as np
import open3d as o3d

from tqdm import tqdm
import matplotlib
import matplotlib.pyplot as plt

import utils

In [7]:
pcd_path = "data/kinth/pointcloud/1720509720.141261339.pcd"
pcd = o3d.io.read_point_cloud(pcd_path)
pcd = pcd.remove_radius_outlier(nb_points=25, radius=0.2)[0] # (pcd, new indexed from old)
points = np.asarray(pcd.points)

print(points.shape)

(12234, 3)


In [8]:
def pipe2st(
    points: np.ndarray,
    voxel_size: float,
    stage1_radius: float,
    stage2_border: float,
    stage1_thresh: float=0.7,
    stage2_thresh: float=0.3
):
    '''
    Perform filtration with a two-stage pipeline, one PCA and one statistical.

    Params:
    - points (np.ndarray): [n, 3] xyz coordinates
    - voxel_size (float): voxel size for voxel downsample
    - stage1_radius (float): search radius of sphere neighbour to perform PCA
    - stage2_border (float): search border of square neighbour to perform statistic

    Return:
    - points (np.ndarray): [n, 3] xyz coordinates after clustered
    - labels (np.ndarray): [n,] cluster labels of each point
    '''
    
    # voxel downsample
    points, _, _, _ = utils.voxel_downsample(points, voxel_size, use_avg=False)
    
    # do the stage 1
    stage1_feat_list = []
    search_tree = o3d.geometry.KDTreeFlann(utils.npy2o3d(points))
    for query_idx in tqdm(range(len(points)), total=len(points), ncols=100):
        query = points[query_idx]
        neighbour_num, neighbour_idx_list, _ = search_tree.search_radius_vector_3d(query, stage1_radius)
        if neighbour_num <= 3:
            stage1_feat_list.append([0.0, 0.0])
            continue
        eigvals, eigvecs = utils.pca_k(points[neighbour_idx_list], 3)
        assert eigvals[0] >= eigvals[1] and eigvals[1] >= eigvals[2]
        feat = np.array([
            (eigvals[0] - eigvals[1]) / (eigvals[0] + 1e-9),
            (eigvals[1] - eigvals[2]) / (eigvals[1] + 1e-9)
        ])
        feat = feat / feat.sum()
        stage1_feat_list.append(feat)
    stage1_feat_list = np.array(stage1_feat_list)
    
    # do the stage 2
    stage2_feat_list = []
    for query_idx in tqdm(range(len(points)), total=len(points), ncols=100):
        query = points[query_idx]
        mask = (
            (np.abs(points[:, 0] - query[0]) < stage2_border / 2.0) &
            (np.abs(points[:, 1] - query[1]) < stage2_border / 2.0) &
            (np.abs(points[:, 2] - query[2]) < stage2_border)
        )
        mask[query_idx] = False
        vicinity = points[mask]
        if len(vicinity) < 3:
            stage2_feat_list.append(0.0)
            continue
        
        feat = (1.0 - utils.sin_batch(points[mask] - query, np.array([0, 0, 1]))).mean()
        stage2_feat_list.append(feat)
    stage2_feat_list = np.array(stage2_feat_list)

    # do the filtration
    mask1 = (stage1_feat_list[:, 1] > stage1_thresh)
    mask2 = (stage2_feat_list > stage2_thresh)
    mask_filtered = mask1 & mask2
    points_filtered = points[mask_filtered]
    mask_denoised = utils.radius_filter(points_filtered, 0.2, 20)
    points_denoised = points_filtered[mask_denoised] 

    # do the dbscan cluster
    cluster_label = np.array(utils.npy2o3d(points_denoised).cluster_dbscan(eps=0.20, min_points=30))
    
    return points_denoised, cluster_label


In [12]:
points_clustered, labels_clustered = pipe2st(points, 0.02, 0.15, 0.40)
# do the color map
norm = plt.Normalize(labels_clustered.min(), labels_clustered.max())
cmap = matplotlib.colormaps["hsv"]
colors_clustered = (cmap(norm(labels_clustered))[:, :3] * 255.0).astype(np.int32)

utils.npy2ply(points_clustered, colors_clustered, "data/output/clustered.ply")

  0%|                                                                      | 0/7677 [00:00<?, ?it/s]

100%|████████████████████████████████████████████████████████| 7677/7677 [00:00<00:00, 13360.73it/s]
100%|████████████████████████████████████████████████████████| 7677/7677 [00:00<00:00, 12525.85it/s]


# draw bounding box for each cluster

In [13]:
from plyfile import PlyData, PlyElement

draw_vtx = []
draw_clr = []
draw_seq = []
for label in range(labels_clustered.max() + 1):
    bbox_points, bbox_seq = utils.aabb_draw_meta(points_clustered[labels_clustered == label])
    bbox_colors = (np.ones(bbox_points.shape) * cmap(norm(label))[:3] * 255.0).astype(np.int32)
    draw_seq.append(bbox_seq + len(draw_vtx) * 8 + len(points_clustered)) # need vertex offset
    draw_vtx.append(bbox_points)
    draw_clr.append(bbox_colors)
draw_vtx = np.concatenate(draw_vtx, axis=0)
draw_clr = np.concatenate(draw_clr, axis=0)
draw_seq = np.concatenate(draw_seq, axis=0)

ply_vtx = PlyElement.describe(
    utils.join_struct_arrays([
        np.core.records.fromarrays(
            np.concatenate([points_clustered, draw_vtx], axis=0).transpose(),
            dtype=[('x', 'f4'), ('y', 'f4'), ('z', 'f4')]
        ),
        np.core.records.fromarrays(
            np.concatenate([colors_clustered, draw_clr], axis=0).transpose(),
            dtype=[('red', 'u1'), ('green', 'u1'), ('blue', 'u1')]
        )
    ]),
    name = "vertex"
)

ply_edg = PlyElement.describe(
    data=np.core.records.fromarrays(
        draw_seq.transpose(),
        dtype=[('vertex1', 'u4'), ('vertex2', 'u4')]
    ),
    name="edge"
)

ply_data = PlyData([ply_vtx, ply_edg], text=False)
ply_data.write("data/output/bbox_test.ply")