In [2]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from IPython.display import Image
import glob
import open3d as o3d
from open3d.open3d.geometry import voxel_down_sample


In [3]:
image_files = sorted(glob.glob("data/img/*.png"))
point_files = sorted(glob.glob("data/velodyne/*.pcd"))
label_files = sorted(glob.glob("data/label/*.txt"))
calib_files = sorted(glob.glob("data/calib/*.txt"))

In [4]:
image_files = sorted(glob.glob("data\\data_train\\img\\*.png"))
point_files = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
label_files = sorted(glob.glob("data\\data_train\\label\\*.txt"))
calib_files = sorted(glob.glob("data\\data_train\\calib\\*.txt"))

In [5]:
class LiDAR2Camera(object):
    def __init__(self, calib_file):
        calibs = self.read_calib_file(calib_file)
        P = calibs["P2"]
        self.P = np.reshape(P, [3, 4])
        V2C = calibs["Tr_velo_cam"]
        self.V2C = np.reshape(V2C, [3, 4])
        R0 = calibs["R_rect"]
        self.R0 = np.reshape(R0, [3, 3])

    def read_calib_file(self, filepath):
        data = {}
        with open(filepath, "r") as f:
            for line in f.readlines():
                line = line.rstrip()
                if len(line) == 0:
                    continue
                key, value = line.split(":", 1)
                try:
                    data[key] = np.array([float(x) for x in value.split()])
                except ValueError:
                    pass
        return data
    
    def cart2hom(self, pts_3d):
        n = pts_3d.shape[0]
        pts_3d_hom = np.hstack((pts_3d, np.ones((n, 1))))
        return pts_3d_hom
    
    def project_velo_to_image(self, pts_3d_velo):
        R0_homo = np.vstack([self.R0, [0, 0, 0]])
        R0_homo_2 = np.column_stack([R0_homo, [0, 0, 0, 1]])
        p_r0 = np.dot(self.P, R0_homo_2) #PxR0
        p_r0_rt =  np.dot(p_r0, np.vstack((self.V2C, [0, 0, 0, 1]))) #PxROxRT
        pts_3d_homo = np.column_stack([pts_3d_velo, np.ones((pts_3d_velo.shape[0],1))])
        p_r0_rt_x = np.dot(p_r0_rt, np.transpose(pts_3d_homo))#PxROxRTxX
        pts_2d = np.transpose(p_r0_rt_x)
        
        pts_2d[:, 0] /= pts_2d[:, 2]
        pts_2d[:, 1] /= pts_2d[:, 2]
        return pts_2d[:, 0:2]
        
    def get_lidar_in_image_fov(self,pc_velo, xmin, ymin, xmax, ymax, return_more=False, clip_distance=2.0):
        pts_2d = self.project_velo_to_image(pc_velo)
        fov_inds = (
            (pts_2d[:, 0] < xmax)
            & (pts_2d[:, 0] >= xmin)
            & (pts_2d[:, 1] < ymax)
            & (pts_2d[:, 1] >= ymin)
        )
        fov_inds = fov_inds & (pc_velo[:, 0] > clip_distance) # We don't want things that are closer to the clip distance (2m)
        imgfov_pc_velo = pc_velo[fov_inds, :]
        if return_more:
            return imgfov_pc_velo, pts_2d, fov_inds
        else:
            return imgfov_pc_velo
        
    def show_lidar_on_image(self, pc_velo, img, debug="False"):
        imgfov_pc_velo, pts_2d, fov_inds = self.get_lidar_in_image_fov(
            pc_velo, 0, 0, img.shape[1], img.shape[0], True
        )
        self.imgfov_pts_2d = pts_2d[fov_inds, :]

        cmap = plt.cm.get_cmap("hsv", 256)
        cmap = np.array([cmap(i) for i in range(256)])[:, :3] * 255
        self.imgfov_pc_velo = imgfov_pc_velo
        
        for i in range(self.imgfov_pts_2d.shape[0]):
            depth = imgfov_pc_velo[i,0]
            color = cmap[int(510.0 / depth), :]
            cv2.circle(
                img,(int(np.round(self.imgfov_pts_2d[i, 0])), int(np.round(self.imgfov_pts_2d[i, 1]))),2,
                color=tuple(color),
                thickness=-1,
            )
        return img

    def pipeline_lidar (self, image, point_cloud):
        img = image.copy()
        lidar_img = self.show_lidar_on_image(point_cloud[:,:3], image)
        return lidar_img
        

## original

In [None]:
image_files = sorted(glob.glob("data\\data_train\\img\\*.png"))
point_files = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
label_files = sorted(glob.glob("data\\data_train\\label\\*.txt"))
calib_files = sorted(glob.glob("data\\data_train\\calib\\*.txt"))

In [None]:
# --for linux--
# lidar_images = sorted(glob.glob("data/img/*.png"))
# lidar_points = sorted(glob.glob("data/velodyne/*.pcd"))
# calib_files = sorted(glob.glob("data/calib/*.txt"))

# --for window--
lidar_images = sorted(glob.glob("data\\data_train\\img\\*.png"))
lidar_points = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
calib_files = sorted(glob.glob("data\\data_train\\calib\\*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])

result_lidar_video = []

for idx, img in enumerate(lidar_images):
    img = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = np.asarray(o3d.io.read_point_cloud(lidar_points[idx]).points)
    result_lidar_video.append(lidar2cam_video.pipeline_lidar(img, point_cloud))
    cv2.imshow("result_lidar_video", result_lidar_video[idx])
    cv2.imshow("img", img)
    cv2.waitKey(1)
cv2.destroyAllWindows()


## downsampling

In [None]:
# --for linux--
# lidar_images = sorted(glob.glob("data/img/*.png"))
# lidar_points = sorted(glob.glob("data/velodyne/*.pcd"))
# calib_files = sorted(glob.glob("data/calib/*.txt"))

# --for window--
lidar_images = sorted(glob.glob("data\\data_train\\img\\*.png"))
lidar_points = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
calib_files = sorted(glob.glob("data\\data_train\\calib\\*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])

result_lidar_video = []

for idx, img in enumerate(lidar_images):
    img = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = o3d.io.read_point_cloud(lidar_points[idx])
    point_cloud = point_cloud.voxel_down_sample(voxel_size=0.1)
    
    point_cloud = np.asarray(point_cloud.points)
    
    
    result_lidar_video.append(lidar2cam_video.pipeline_lidar(img, point_cloud))
    cv2.imshow("result_lidar_video", result_lidar_video[idx])
    cv2.waitKey(1)
cv2.destroyAllWindows()

## ground removal

In [None]:
# --for linux--
# lidar_images = sorted(glob.glob("data/img/*.png"))
# lidar_points = sorted(glob.glob("data/velodyne/*.pcd"))
# calib_files = sorted(glob.glob("data/calib/*.txt"))

# --for window--
lidar_images = sorted(glob.glob("data\\data_train\\img\\*.png"))
lidar_points = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
calib_files = sorted(glob.glob("data\\data_train\\calib\\*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])

result_lidar_video = []

for idx, img in enumerate(lidar_images):
    img = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = o3d.io.read_point_cloud(lidar_points[idx])
    _, inliers = point_cloud.segment_plane(distance_threshold = 0.23, ransac_n = 8, num_iterations = 500)
    
    point_cloud.points = o3d.utility.Vector3dVector(np.delete(np.asarray(point_cloud.points), inliers, axis=0))
    point_cloud = np.asarray(point_cloud.points)
    
    
    result_lidar_video.append(lidar2cam_video.pipeline_lidar(img, point_cloud))
    cv2.imshow("result_lidar_video", result_lidar_video[idx])
    cv2.waitKey(1)
cv2.destroyAllWindows()

In [None]:
image = cv2.imread("data/img/000000.png")

out = cv2.VideoWriter('output/out_lidar_data_ground_remove.mp4',cv2.VideoWriter_fourcc(*'MP4V'), 15, (image.shape[1],image.shape[0]))
for i in range(len(result_lidar_video)):
    out.write(cv2.cvtColor(result_lidar_video[i], cv2.COLOR_BGR2RGB))
out.release()

## remove ground & downsample

In [None]:
lidar_images = sorted(glob.glob("data/img/*.png"))
lidar_points = sorted(glob.glob("data/velodyne/*.pcd"))
calib_files = sorted(glob.glob("data/calib/*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])

result_lidar_video = []

for idx, img in enumerate(lidar_images):
    img = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    point_cloud = o3d.io.read_point_cloud(lidar_points[idx])
    point_cloud = point_cloud.voxel_down_sample(voxel_size=0.03)
    _, inliers = point_cloud.segment_plane(distance_threshold = 0.23, ransac_n = 8, num_iterations = 500)
    
    point_cloud.points = o3d.utility.Vector3dVector(np.delete(np.asarray(point_cloud.points), inliers, axis=0))
    point_cloud = np.asarray(point_cloud.points)
    
    
    result_lidar_video.append(lidar2cam_video.pipeline_lidar(img, point_cloud))
    cv2.imshow("result_lidar_video", result_lidar_video[idx])
    cv2.waitKey(1)
cv2.destroyAllWindows()

## store video

In [None]:
image = cv2.imread("data/img/000000.png")

out = cv2.VideoWriter('output/out_ground_remove_and_downsample.mp4',cv2.VideoWriter_fourcc(*'MP4V'), 15, (image.shape[1],image.shape[0]))
for i in range(len(result_lidar_video)):
    out.write(cv2.cvtColor(result_lidar_video[i], cv2.COLOR_BGR2RGB))
out.release()

## cluster_dbscan

In [None]:
def remove_large_clusters(point_cloud, labels, cluster_size_threshold):
       
    # Count points in each cluster
    unique, counts = np.unique(labels, return_counts=True)
    cluster_sizes = dict(zip(unique, counts))
    
    # Filter out clusters with size greater than threshold
    large_clusters = [cluster_idx for cluster_idx in cluster_sizes if cluster_sizes[cluster_idx] > cluster_size_threshold]
    filtered_labels = np.where(np.isin(labels, large_clusters), -1, labels)
    
    # Remove points in filtered clusters
    filtered_point_cloud = point_cloud.select_by_index(np.where(filtered_labels > -1)[0])
    
    return filtered_point_cloud

import pandas as pd
import time

lidar_images = sorted(glob.glob("data\\data_train\\img\\*.png"))
lidar_points = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
calib_files = sorted(glob.glob("data\\calib\\*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])

result_lidar_video = []

for idx in range(len(lidar_images)):
    # if idx > 130:
    img = cv2.cvtColor(cv2.imread(lidar_images[idx]), cv2.COLOR_BGR2RGB)
    point_cloud = o3d.io.read_point_cloud(lidar_points[idx])

    # down sampling
    downpcd = point_cloud.voxel_down_sample(voxel_size=0.05)

    # remove ground
    _, inliers = downpcd.segment_plane(distance_threshold = 0.23, ransac_n = 8, num_iterations = 200)

    # delete ground
    downpcd.points = o3d.utility.Vector3dVector(np.delete(np.asarray(downpcd.points), inliers, axis=0))

    # dbscan
    labels = np.array(downpcd.cluster_dbscan(eps=0.28, min_points=10, print_progress=False))
    max_label = labels.max()
    print(f"point cloud has {max_label + 1} clusters")
    
    downpcd = remove_large_clusters(downpcd, labels, 4000)
    downpcd = downpcd.select_by_index(np.where(labels >= 0)[0])

    final_points = np.asarray(downpcd.points)
    final_aa = lidar2cam_video.pipeline_lidar(img, final_points)
    result_lidar_video.append(final_aa)
    cv2.imshow("final_aa", final_aa)
    if cv2.waitKey(0) == ord('q'):
        break
cv2.destroyAllWindows()


In [None]:
image = cv2.imread("data/img/000000.png")

out = cv2.VideoWriter('output/out_ground_remove_and_downsample.mp4',cv2.VideoWriter_fourcc(*'MP4V'), 15, (image.shape[1],image.shape[0]))
for i in range(len(result_lidar_video)):
    out.write(cv2.cvtColor(result_lidar_video[i], cv2.COLOR_BGR2RGB))
out.release()

## DBSCAN clustering video

In [6]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from IPython.display import Image
import glob
import open3d as o3d
from open3d.open3d.geometry import voxel_down_sample

class LiDAR2Camera(object):
    def __init__(self, calib_file):
        calibs = self.read_calib_file(calib_file)
        P = calibs["P2"]
        self.P = np.reshape(P, [3, 4])
        V2C = calibs["Tr_velo_cam"]
        self.V2C = np.reshape(V2C, [3, 4])
        R0 = calibs["R_rect"]
        self.R0 = np.reshape(R0, [3, 3])

    def read_calib_file(self, filepath):
        data = {}
        with open(filepath, "r") as f:
            for line in f.readlines():
                line = line.rstrip()
                if len(line) == 0:
                    continue
                key, value = line.split(":", 1)
                try:
                    data[key] = np.array([float(x) for x in value.split()])
                except ValueError:
                    pass
        return data
    
    def cart2hom(self, pts_3d):
        n = pts_3d.shape[0]
        pts_3d_hom = np.hstack((pts_3d, np.ones((n, 1))))
        return pts_3d_hom
    
    def project_velo_to_image(self, pts_3d_velo):
        R0_homo = np.vstack([self.R0, [0, 0, 0]])
        R0_homo_2 = np.column_stack([R0_homo, [0, 0, 0, 1]])
        p_r0 = np.dot(self.P, R0_homo_2) #PxR0
        p_r0_rt =  np.dot(p_r0, np.vstack((self.V2C, [0, 0, 0, 1]))) #PxROxRT
        pts_3d_homo = np.column_stack([pts_3d_velo, np.ones((pts_3d_velo.shape[0],1))])
        p_r0_rt_x = np.dot(p_r0_rt, np.transpose(pts_3d_homo))#PxROxRTxX
        pts_2d = np.transpose(p_r0_rt_x)
        
        pts_2d[:, 0] /= pts_2d[:, 2]
        pts_2d[:, 1] /= pts_2d[:, 2]
        return pts_2d[:, 0:2]
        
    def get_lidar_in_image_fov(self,pc_velo, xmin, ymin, xmax, ymax, return_more=False, clip_distance=2.0):
        pts_2d = self.project_velo_to_image(pc_velo)
        fov_inds = (
            (pts_2d[:, 0] < xmax)
            & (pts_2d[:, 0] >= xmin)
            & (pts_2d[:, 1] < ymax)
            & (pts_2d[:, 1] >= ymin)
        )
        fov_inds = fov_inds & (pc_velo[:, 0] > clip_distance) # We don't want things that are closer to the clip distance (2m)
        imgfov_pc_velo = pc_velo[fov_inds, :]
        if return_more:
            return imgfov_pc_velo, pts_2d, fov_inds
        else:
            return imgfov_pc_velo
        
    def show_lidar_on_image(self, pc_velo, img, debug="False"):
        imgfov_pc_velo, pts_2d, fov_inds = self.get_lidar_in_image_fov(
            pc_velo, 0, 0, img.shape[1], img.shape[0], True
        )
        self.imgfov_pts_2d = pts_2d[fov_inds, :]

        cmap = plt.cm.get_cmap("hsv", 256)
        cmap = np.array([cmap(i) for i in range(256)])[:, :3] * 255
        self.imgfov_pc_velo = imgfov_pc_velo
        
        for i in range(self.imgfov_pts_2d.shape[0]):
            depth = imgfov_pc_velo[i,0]
            color = cmap[int(510.0 / depth), :]
            cv2.circle(
                img,(int(np.round(self.imgfov_pts_2d[i, 0])), int(np.round(self.imgfov_pts_2d[i, 1]))),2,
                color=tuple(color),
                thickness=-1,
            )
        return img

    def pipeline_lidar (self, image, point_cloud):
        img = image.copy()
        lidar_img = self.show_lidar_on_image(point_cloud[:,:3], image)
        return lidar_img
    
def remove_large_clusters(point_cloud, labels, cluster_size_threshold):
       
    # Count points in each cluster
    unique, counts = np.unique(labels, return_counts=True)
    cluster_sizes = dict(zip(unique, counts))
    
    # Filter out clusters with size greater than threshold
    large_clusters = [cluster_idx for cluster_idx in cluster_sizes if cluster_sizes[cluster_idx] > cluster_size_threshold]
    filtered_labels = np.where(np.isin(labels, large_clusters), -1, labels)
    
    # Remove points in filtered clusters
    point_cloud = point_cloud.select_by_index(np.where(filtered_labels > -1)[0])
    
    return point_cloud


lidar_images = sorted(glob.glob("data\\data_train\\img\\*.png"))
lidar_points = sorted(glob.glob("data\\data_train\\velodyne_pcd\\*.pcd"))
calib_files = sorted(glob.glob("data\\calib\\*.txt"))

lidar2cam_video = LiDAR2Camera(calib_files[0])


img = cv2.cvtColor(cv2.imread(lidar_images[0]), cv2.COLOR_BGR2RGB)
point_cloud = o3d.io.read_point_cloud(lidar_points[0])

# # down sampling
# point_cloud = point_cloud.voxel_down_sample(voxel_size=0.01)

# # remove ground
# _, inliers = point_cloud.segment_plane(distance_threshold = 0.23, ransac_n = 8, num_iterations = 200)

# # delete ground
# point_cloud.points = o3d.utility.Vector3dVector(np.delete(np.asarray(point_cloud.points), inliers, axis=0))

# # dbscan
# labels = np.array(point_cloud.cluster_dbscan(eps=0.25, min_points=10, print_progress=True))

# point_cloud = remove_large_clusters(point_cloud, labels, 1000)
# max_label = labels.max()

# # colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1))
# # colors[labels < 0] = 0
# # downpcd.colors = o3d.utility.Vector3dVector(colors[:, :3])
# point_cloud = point_cloud.select_by_index(np.where(labels >= 0)[0])

final_points = np.asarray(point_cloud.points)
final_aa = lidar2cam_video.pipeline_lidar(img, final_points)
cv2.imshow("final_aa", final_aa)
cv2.waitKey(0)
cv2.destroyAllWindows()


In [None]:
o3d.visualization.draw_geometries([point_cloud])

In [2]:
cv2.imshow("final_aa", final_aa)
cv2.imwrite("cluster_off.png", final_aa)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
max_label = labels.max()

# colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1))
# colors[labels < 0] = 0
# downpcd.colors = o3d.utility.Vector3dVector(colors[:, :3])
pcd = downpcd.select_by_index(np.where(labels >= 0)[0])
time.sleep(0.1)

final_points = np.asarray(pcd.points)
final_aa = lidar2cam_video.pipeline_lidar(img, final_points)

In [None]:
cv2.imshow("final_aa", final_aa)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
# remove ground
_, inliers = downpcd.segment_plane(distance_threshold = 0.23, ransac_n = 8, num_iterations = 500)
downpcd.points = o3d.utility.Vector3dVector(np.delete(np.asarray(downpcd.points), inliers, axis=0)) # delete ground

In [None]:
o3d.visualization.draw_geometries([downpcd])

In [None]:
# dbscan
labels = np.array(downpcd.cluster_dbscan(eps=0.2, min_points=20, print_progress=True))
max_label = labels.max()
print(f"point cloud has {max_label + 1} clusters")

In [None]:
colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1))
colors[labels < 0] = 0
downpcd.colors = o3d.utility.Vector3dVector(colors[:, :3])
pcd = downpcd.select_by_index(np.where(labels >= 0)[0])

In [None]:
# downpcd.colors = o3d.utility.Vector3dVector(colors[:, :3])
# # pcd.colors = o3d.utility.Vector3dVector(colors[:, :3])

o3d.visualization.draw_geometries([pcd])
# o3d.visualization.draw_geometries([downpcd])

In [None]:
img = cv2.cvtColor(cv2.imread(lidar_images[0]), cv2.COLOR_BGR2RGB)
point_cloud = np.asarray(o3d.io.read_point_cloud(lidar_points[0]).points)
final_points = np.asarray(pcd.points)
final_aa = lidar2cam_video.pipeline_lidar(img, final_points)
cv2.imshow("final_aa", final_aa)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
# Bounding Boxes
bounding_boxes = []
inds =  pd.Series(range(len(labels))).groupby(labels, sort = False).apply(list).tolist()

for i in range(len(inds)):
    cluster = pcd.select_by_index(inds[i])
    bb = cluster.get_axis_aligned_bounding_box()
    bb.color = (1,0,0)
    bounding_boxes.append(bb)

visuals = []
visuals.append(pcd)
visuals.extend(bounding_boxes)
o3d.visualization.draw_geometries(visuals)

In [None]:



# downpcd_points = downpcd.points
# clustered_points = downpcd_points[cluster_indices, :]
downpcd = np.asarray(downpcd.points)
clustered_points = downpcd[cluster_indices, :]

pcd_final = o3d.geometry.PointCloud()
pcd_final.points = o3d.utility.Vector3dVector(clustered_points)

max_label = labels.max()
print(f"point cloud has {max_label + 1} clusters")

colors = plt.get_cmap("tab20")(labels / (max_label if max_label > 0 else 1))
colors[labels < 0] = 0
point_cloud.colors = o3d.utility.Vector3dVector(colors[:, :3])

pcd_final.colors = o3d.utility.Vector3dVector(colors[:, :3])


# o3d.visualization.draw_geometries([point_cloud],
#                                   zoom=0.455,
#                                   front=[-0.4999, -0.1659, -0.8499],
#                                   lookat=[2.1813, 2.0619, 2.0999],
#                                   up=[0.1204, -0.9852, 0.1215])

# o3d.visualization.draw_geometries([point_cloud])

# Bounding Boxes
bounding_boxes = []
inds =  pd.Series(range(len(labels))).groupby(labels, sort = False).apply(list).tolist()

for i in range(len(inds)):
    cluster = pcd_final.select_by_index(inds[i])
    bb = cluster.get_axis_aligned_bounding_box()
    bb.color = (0,1,0)
    bounding_boxes.append(bb)

visuals = []
visuals.append(pcd_final)
visuals.extend(bounding_boxes)
o3d.visualization.draw_geometries(visuals)

img_final = lidar2cam_video.pipeline_lidar(img, clustered_points)

cv2.imshow("img_final", img_final)
cv2.waitKey(0)
cv2.destroyAllWindows()

In [None]:
cluster_indices

In [None]:
len(labels)

In [None]:
cluster_indices