In [1]:
import os
import numpy as np
import cv2
import pandas as pd
import k3d
import open3d as o3d
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from IPython.display import display, clear_output
import ipywidgets as widgets

In [2]:
# ---------- KITTI Utilities ---------- #
def load_lidar_bin(file_path):
    points = np.fromfile(file_path, dtype=np.float32).reshape(-1, 4)
    return points[:, :3]

def extract_labels(file_path):
    columns = [
        "ObjectType", "Truncation", "Occlusion", "Alpha", 
        "X1", "Y1", "X2", "Y2", "H", "W", "L", "X", "Y", "Z", "Rotation_Y"
    ]
    data = []
    with open(file_path, "r") as file:
        for line in file:
            values = line.strip().split()
            object_data = {
                columns[i]: values[i] if i == 0 else float(values[i])
                for i in range(len(columns))
            }
            data.append(object_data)
    return pd.DataFrame(data)

def extract_matrices(filename):
    global P2, P3, R0_rect, Tr_velo_to_cam, Tr_cam_to_velo
    with open(filename, 'r') as file:
        for line in file:
            if line.strip():
                parts = line.split(':')
                key = parts[0].strip()
                values = np.fromstring(parts[1], sep=' ')
                if key == "P2":
                    P2 = values.reshape(3, 4)
                elif key == "P3":
                    P3 = values.reshape(3, 4)
                elif key == "R0_rect":
                    R0_rect = values.reshape(3, 3)
                elif key == "Tr_velo_to_cam":
                    matrix_3x4 = values.reshape(3, 4)
                    Tr_velo_to_cam = np.vstack([matrix_3x4, np.array([[0, 0, 0, 1]])])
                    Tr_cam_to_velo = np.linalg.inv(Tr_velo_to_cam)

def compute_3D_from_disparity(proj_mat, disparity_img, R0_rect):
    if disparity_img.dtype == np.uint16:
        disparity_img = disparity_img.astype(np.float32) / 256.0
    height, width = disparity_img.shape
    xyz_img = np.zeros((height, width, 3), dtype=np.float32)

    f = proj_mat[0, 0]
    c_x = proj_mat[0, 2]
    c_y = proj_mat[1, 2]
    T_x = proj_mat[0, 3]

    mask = disparity_img > 0
    depth = np.full_like(disparity_img, np.nan, dtype=np.float32)
    depth[mask] = (-T_x) / disparity_img[mask]

    u, v = np.meshgrid(np.arange(width), np.arange(height))
    X = (u - c_x) * depth / f
    Y = (v - c_y) * depth / f
    Z = depth

    xyz_cam = np.stack((X, Y, Z), axis=-1)
    xyz_flat = xyz_cam.reshape(-1, 3)
    xyz_rect = xyz_flat @ R0_rect.T
    xyz_img = xyz_rect.reshape(height, width, 3)
    xyz_img[xyz_img[:, :, 2] > 17.5] = np.nan
    return xyz_img

def transform_lidar_to_camera(lidar_points, Tr_velo_to_cam):
    ones = np.ones((lidar_points.shape[0], 1))
    lidar_hom = np.hstack((lidar_points, ones))
    cam_points = lidar_hom @ Tr_velo_to_cam.T
    return cam_points[:, :3]

def plot_img(left_path, right_path, figsize=(22, 6)):
    img_left = mpimg.imread(left_path)
    img_right = mpimg.imread(right_path)
    combined_img = np.hstack((img_left, img_right))
    plt.figure(figsize=figsize)
    plt.imshow(combined_img)
    plt.axis("off")
    plt.show()

In [3]:
# ---------- KITTI Paths ---------- #

file_path_dataset = os.path.expanduser("~/computer_vision/Dataset")
lidar_relative_path = "/data_object_velodyne/training/velodyne/{:06d}.bin"
left_img_relative_path = "/data_object_image_2/training/image_2/{:06d}.png"
right_img_relative_path = "/data_object_image_3/training/image_3/{:06d}.png"
disparity_relative_path = "/disparity_images/disparity{:06d}.png"
labels_relative_path = "/data_object_label_2/training/label_2/{:06d}.txt"
calib_relative_path = "/data_object_calib/training/calib/{:06d}.txt"

In [5]:
# ---------- Visualizer Function ---------- #

def visualize_frame(sequence_number):
    clear_output(wait=True)
    print(f"Frame: {sequence_number}")

    paths = {
        "lidar": file_path_dataset + lidar_relative_path.format(sequence_number),
        "left_img": file_path_dataset + left_img_relative_path.format(sequence_number),
        "right_img": file_path_dataset + right_img_relative_path.format(sequence_number),
        "disparity": file_path_dataset + disparity_relative_path.format(sequence_number),
        "labels": file_path_dataset + labels_relative_path.format(sequence_number),
        "calib": file_path_dataset + calib_relative_path.format(sequence_number),
    }

    extract_matrices(paths["calib"])
    lidar_points = load_lidar_bin(paths["lidar"])
    lidar_points_cam = transform_lidar_to_camera(lidar_points, Tr_velo_to_cam)

    disparity = cv2.imread(paths["disparity"], cv2.IMREAD_UNCHANGED).astype(np.float32)
    xyz_img = compute_3D_from_disparity(P2, disparity, R0_rect)
    disp_points = xyz_img.reshape(-1, 3)
    disp_points = disp_points[~np.isnan(disp_points).any(axis=1)]

    # K3D Visualization
    plot = k3d.plot()
    plot += k3d.points(lidar_points_cam, point_size=0.05, color=0x0000ff)
    depth_vals = np.clip(disp_points[:, 2], 0, 17.5)
    color_vals = ((depth_vals / 17.5) * 255).astype(np.uint8)
    color_rgb = np.stack([255 - color_vals, color_vals, np.zeros_like(color_vals)], axis=1)
    plot += k3d.points(positions=disp_points, colors=color_rgb, point_size=0.05)
    display(plot)

    # Open3D ICP + Hausdorff
    pcd_disp = o3d.geometry.PointCloud()
    pcd_disp.points = o3d.utility.Vector3dVector(disp_points)
    pcd_lidar = o3d.geometry.PointCloud()
    pcd_lidar.points = o3d.utility.Vector3dVector(lidar_points_cam)

    pcd_disp, _ = pcd_disp.remove_statistical_outlier(20, 2.0)
    pcd_lidar, _ = pcd_lidar.remove_statistical_outlier(20, 2.0)

    d1 = pcd_disp.compute_point_cloud_distance(pcd_lidar)
    d2 = pcd_lidar.compute_point_cloud_distance(pcd_disp)
    hausdorff_before = max(max(d1), max(d2))
    print(f"Hausdorff BEFORE ICP: {hausdorff_before:.3f} m")

    result_icp = o3d.pipelines.registration.registration_icp(
        pcd_disp, pcd_lidar, 0.5, np.eye(4),
        o3d.pipelines.registration.TransformationEstimationPointToPoint()
    )
    print("ICP converged:", result_icp.is_converged)
    print("ICP RMSE:", result_icp.inlier_rmse)

    pcd_disp.transform(result_icp.transformation)
    d1 = pcd_disp.compute_point_cloud_distance(pcd_lidar)
    d2 = pcd_lidar.compute_point_cloud_distance(pcd_disp)
    hausdorff_after = max(max(d1), max(d2))
    print(f"Hausdorff AFTER ICP: {hausdorff_after:.3f} m")

    o3d.visualization.draw_geometries([pcd_disp, pcd_lidar])
    plot_img(paths["left_img"], paths["right_img"])

In [None]:
# ---------- GUI Slider ---------- #

slider = widgets.IntSlider(value=0, min=0, max=9, step=1, description="Frame")
widgets.interact(visualize_frame, sequence_number=slider)


In [None]:
# ---------- Export Functions ---------- #

def save_pointcloud_bin(filename, points):
    # Save N x 3 or N x 4 numpy array to .bin format
    if points.shape[1] == 3:
        points = np.hstack([points, np.zeros((points.shape[0], 1), dtype=np.float32)])  # add dummy intensity
    points.astype(np.float32).tofile(filename)


In [None]:

# ---------- Batch Processing ---------- #

from tqdm import tqdm

def process_all_frames(start=0, end=7481, export_dir="output/pointclouds"):
    os.makedirs(export_dir, exist_ok=True)
    hausdorff_log = []

    for sequence_number in tqdm(range(start, end), desc="Processing KITTI frames"):
        try:
            paths = {
                "lidar": file_path_dataset + lidar_relative_path.format(sequence_number),
                "disparity": file_path_dataset + disparity_relative_path.format(sequence_number),
                "calib": file_path_dataset + calib_relative_path.format(sequence_number),
            }

            extract_matrices(paths["calib"])
            lidar_points = load_lidar_bin(paths["lidar"])
            lidar_cam = transform_lidar_to_camera(lidar_points, Tr_velo_to_cam)

            disparity = cv2.imread(paths["disparity"], cv2.IMREAD_UNCHANGED).astype(np.float32)
            xyz_img = compute_3D_from_disparity(P2, disparity, R0_rect)
            disp_pc = xyz_img.reshape(-1, 3)
            disp_pc = disp_pc[~np.isnan(disp_pc).any(axis=1)]

            # Open3D ICP
            pcd_disp = o3d.geometry.PointCloud()
            pcd_disp.points = o3d.utility.Vector3dVector(disp_pc)
            pcd_lidar = o3d.geometry.PointCloud()
            pcd_lidar.points = o3d.utility.Vector3dVector(lidar_cam)
            pcd_disp, _ = pcd_disp.remove_statistical_outlier(20, 2.0)
            pcd_lidar, _ = pcd_lidar.remove_statistical_outlier(20, 2.0)

            result_icp = o3d.pipelines.registration.registration_icp(
                pcd_disp, pcd_lidar, 0.5, np.eye(4),
                o3d.pipelines.registration.TransformationEstimationPointToPoint()
            )
            pcd_disp.transform(result_icp.transformation)
            aligned_disp_pc = np.asarray(pcd_disp.points)

            # Save aligned point clouds
            save_pointcloud_bin(os.path.join(export_dir, f"{sequence_number:06d}_lidar.bin"), lidar_cam)
            save_pointcloud_bin(os.path.join(export_dir, f"{sequence_number:06d}_disp.bin"), aligned_disp_pc)

            # Log distances
            d1 = pcd_disp.compute_point_cloud_distance(pcd_lidar)
            d2 = pcd_lidar.compute_point_cloud_distance(pcd_disp)
            hausdorff = max(max(d1), max(d2))
            hausdorff_log.append((sequence_number, hausdorff))

        except Exception as e:
            print(f"Error at frame {sequence_number}: {e}")

    # Save log
    with open(os.path.join(export_dir, "hausdorff_log.csv"), "w") as f:
        f.write("frame,hausdorff")
        for frame, dist in hausdorff_log:
            f.write(f"{frame},{dist:.4f}")


In [None]:
# ---------- PointPillars Inference Placeholder ---------- #

def run_pointpillars_inference(bin_file):
    """Placeholder for PointPillars inference."""
    # Simulate loading PointPillars model
    print(f"Running PointPillars on {bin_file} ...")
    # Example: simulate result with dummy box
    dummy_boxes = [
        [10, 0, 0, 1.6, 3.9, 1.5, 0]  # [x, y, z, w, l, h, yaw]
    ]
    return dummy_boxes

def visualize_inference(bin_file):
    points = np.fromfile(bin_file, dtype=np.float32).reshape(-1, 4)[:, :3]
    bboxes = run_pointpillars_inference(bin_file)

    plot = k3d.plot()
    plot += k3d.points(points.astype(np.float32), point_size=0.05, color=0x888888)

    for bbox in bboxes:
        x, y, z, w, l, h, heading = bbox
        box = k3d.bounding_box(bounds=[x-w/2, y-l/2, z-h/2, x+w/2, y+l/2, z+h/2])
        plot += box

    plot.display()