## ONCE-3DLanes Data Loader
Created by Jett Penner<br>
December 2025 <br>


Load the dataset and convert it into the readable form by the reusable thesis code. This also converts the 3D ground truth lane positions into 2D-3D ground truth pairs, and organizes all of the data into an easily-accessible format for the main thesis code. <br>

<br><br>
Note: The current code is setup to look in train/val/test splits

In [None]:
import yaml
import os
from dataclasses import dataclass
import open3d as o3d
import numpy as np
import json
from PIL import Image
from tqdm import tqdm
import glob
import warnings

### Dataset Loading
Search for matching sequence ids

In [None]:
# Open config
config_path = "config.yaml"

with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

# Match candidate sequence IDs (between ONCE and ONCE-3DLanes)
once_3dlanes_data_base = config["once_3dlanes_data"]
once_cam_data_base = config["once_data"]["cam_data"]
once_lidar_data_base = config["once_data"]["lidar_data"]
once_annot_base = config["once_data"]["annotation"]

def get_split_df(split_path, split, dict):
    if os.path.exists(split_path):
        seq_ids = [seq for seq in os.listdir(split_path)
                   if os.path.isdir(os.path.join(split_path, seq))]
        for seq in seq_ids:
            dict[seq] = split
    else:
        print(f"Split '{split}' does not exist at path {split_path}")
  
splits = ["train", "val", "test"] # search in these splits
lanes_seq_dict = {}
cam_seq_dict = {}
lidar_seq_dict = {}
annot_seq_dict = {}
for split in splits:
    # ONCE-3DLanes dataset search
    get_split_df(
        os.path.join(once_3dlanes_data_base, split), 
        split, 
        lanes_seq_dict
    )
    # ONCE dataset search
    embedded_split = os.path.join(split, "data") # hardcoded filepath
    get_split_df(
        os.path.join(once_cam_data_base, embedded_split), 
        embedded_split, 
        cam_seq_dict
    )
    get_split_df(
        os.path.join(once_lidar_data_base, embedded_split), 
        embedded_split, 
        lidar_seq_dict
    )
    get_split_df(
        os.path.join(once_annot_base, embedded_split), 
        embedded_split, 
        annot_seq_dict
    )
matching_sequence_ids = sorted(set(lanes_seq_dict) & set(cam_seq_dict) & set(lidar_seq_dict) & set(annot_seq_dict))

# Generate matches, getting sequences and frames
@dataclass
class SequenceMatch:
    sequence_id: str
    match_count: int
    match_ids: any
    sequence_info_path: str
    cam_folder_path: str
    lane_folder_path: str
    lidar_folder_path: str

def count_matching_frames(match):
    lane_ids = {os.path.splitext(f)[0] for f in os.listdir(match.lane_folder_path)
                if f.endswith(".json")}
    cam_ids = {os.path.splitext(f)[0] for f in os.listdir(match.cam_folder_path)
               if f.endswith(".jpg")}
    lidar_ids = {os.path.splitext(f)[0] for f in os.listdir(match.lidar_folder_path)
                 if f.endswith(".bin")}
    
    matching_ids = cam_ids & lane_ids & lidar_ids
    count = len(matching_ids)
    
    return count, matching_ids

matched_sequences = []
for seq_id in matching_sequence_ids:
    lane_folder_path = os.path.join(once_3dlanes_data_base, lanes_seq_dict[seq_id], seq_id, "cam01")
    cam_folder_path = os.path.join(once_cam_data_base, cam_seq_dict[seq_id], seq_id, "cam01")
    lidar_folder_path = os.path.join(once_lidar_data_base, lidar_seq_dict[seq_id], seq_id, "lidar_roof")
    sequence_info_path = os.path.join(once_annot_base, annot_seq_dict[seq_id], seq_id, f"{seq_id}.json")

    missing_files = [f for f in [lane_folder_path, cam_folder_path, lidar_folder_path, sequence_info_path] if not os.path.exists(f)]
    if missing_files:
        print("The following files could not be loaded:")
        for f in missing_files:
            print(f"  -", f)

    match = SequenceMatch(
        sequence_id=seq_id,
        match_count=-1,
        match_ids=None,
        sequence_info_path=sequence_info_path,
        cam_folder_path=cam_folder_path,
        lane_folder_path=lane_folder_path,
        lidar_folder_path=lidar_folder_path
    )
    match_count, match_ids = count_matching_frames(match)
    match.match_count = match_count
    match.match_ids = sorted(match_ids, key=int)
    matched_sequences.append(match)

print(f"Matching {len(matched_sequences)} sequence IDs and paths:")
for match in matched_sequences:
    print(f"sequence id: {match.sequence_id}\t\t# frames: {match.match_count}")

print(f"Total frames: {sum(m.match_count for m in matched_sequences)}")

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Matching 20 sequence IDs and paths:
sequence id: 000027		# frames: 1147
sequence id: 000028		# frames: 485
sequence id: 000034		# frames: 810
sequence id: 000076		# frames: 1028
sequence id: 000077		# frames: 610
sequence id: 000080		# frames: 961
sequence id: 000092		# frames: 583
sequence id: 000104		# frames: 615
sequence id: 000112		# frames: 1225
sequence id: 000113		# frames: 946
sequence id: 000121		# frames: 857
sequence id: 000168		# frames: 771
sequence id: 000200		# frames: 1049
sequence id: 000201		# frames: 468
sequence id: 000273		# frames: 960
sequence id: 000275		# frames: 747
sequence id: 000303		# frames: 923
sequence id: 000318		# frames: 770
sequence id: 000322		# frames: 766
sequence id: 000334		# frames: 524
Total frames: 16245


In [None]:
# Preload variables 
base_folder = config["output"]["base_folder"]
os.makedirs(base_folder, exist_ok=True)
output_images_folder = config["output"]["images"]
output_lidar_folder = config["output"]["lidar"]
output_detections_2d_folder = config["output"]["detections_2d"]
output_detections_3d_folder = config["output"]["detections_3d"]

ransac_a_priori_sensor_height = config["ground_plane_ransac"]["camera_height"]
ransac_prefilter_band_half_width =  config["ground_plane_ransac"]["prefilter_band_half_width"]
snap_curvature_threshold = config["projection_mapping"]["snap_curvature_threshold"]

override_existing_output = config["output"]["override_existing_output"]

### Math Functions

In [None]:
def prefilter_ground_plane(points, sensor_height=1.5, height_range=None, vertical_axis=0):
    """
    Prefilter a point cloud to keep only points near the expected ground plane height.

    Args:
        points (N,3) np array: Input 3D point cloud.
        sensor_height (float): Expected height of the sensor above the ground plane (approximate).
        height_range (float, optional): Acceptable deviation (±) from the expected ground level. Default to half sensor height.
        vertical_axis (int): Axis index representing "up" direction (0=x, 1=y, 2=z).

    Returns:
       (M,3) np array, the filtered point cloud containing only points near ground level.
    """
    if points is None or len(points) == 0:
        return np.empty((0, 3), dtype=np.float32)
    
    if height_range is None:
        height_range = sensor_height/2

    # Compute ground height range relative to sensor
    min_height = -sensor_height - height_range
    max_height = -sensor_height + height_range

    # Select points whose coordinate along the vertical axis is within range
    mask = (points[:, vertical_axis] >= min_height) & (points[:, vertical_axis] <= max_height)
    filtered_points = points[mask]

    return filtered_points.astype(np.float32)



def estimate_ground_plane(
    points,
    config,
    vertical_axis=0,
    random_state=None
):
    '''
    Estimate the ground plane from 3D points using a band filter + RANSAC + least-squares refit.

    Args:
        points (N,3) np array: Input 3D point cloud.
        config (dict): Configuration file.
        vertical_axis (int, optional): Axis index representing "up" direction. Default is 0 (x-axis).
        random_state (int, optional): Seed for random generator. Default is seedless.

    Returns:
        (3,) np array, plane normal unit vector.
        (float), plane offset d in n·x + d = 0.
        (N,3) np array, mask of inlier points.
    '''

    sensor_height = config["ground_plane_ransac"]["camera_height"]
    band_half_width = config["ground_plane_ransac"]["band_half_width"]
    ransac_iters = config["ground_plane_ransac"]["ransac_iters"]
    inlier_threshold = config["ground_plane_ransac"]["inlier_threshold"]
    min_inliers_for_accept = config["ground_plane_ransac"]["min_inliers_for_accept"]
    inlier_fallback_range_mult = config["ground_plane_ransac"]["inlier_fallback_range_mult"]
    plane_norm_degenerate_threshold = float(config["ground_plane_ransac"]["plane_norm_degenerate_threshold"])

    if random_state is None:
        rng = np.random.default_rng()
    else:
        rng = np.random.default_rng(random_state)

    pts = np.asarray(points)

    ground_coord = -sensor_height
    use_fallback_plane=False

    if pts.size == 0:
        use_fallback_plane=True
        print("[Error]: No points in lidar ransac around ground coordinate after prefunction filter")
    else:

        # Pre-filter: keep points within a vertical band around expected ground
        vert_vals = pts[:, vertical_axis]
        band_mask = np.logical_and(vert_vals >= (ground_coord - band_half_width),
                                vert_vals <= (ground_coord + band_half_width))

        candidate_idx = np.nonzero(band_mask)[0]
        if candidate_idx.size < min_inliers_for_accept:
            # too few candidates - relax band slightly (quick fallback)
            band_mask = np.logical_and(vert_vals >= (ground_coord - inlier_fallback_range_mult * band_half_width),
                                    vert_vals <= (ground_coord + inlier_fallback_range_mult * band_half_width))
            candidate_idx = np.nonzero(band_mask)[0]

        
        if candidate_idx.size < 3:
            # not enough points to fit a plane
            use_fallback_plane=True
            print(f"[Error]: Not enough candidate points for lidar ransac around ground coordinate after infunction filter: need 3, got {candidate_idx.size}")

    if not use_fallback_plane:
        cand_pts = pts[candidate_idx]

        best_inliers = None
        best_count = 0

        # RANSAC loop: sample 3 points, form plane, count inliers
        M = cand_pts.shape[0]
        for _ in range(ransac_iters):
            # pick 3 distinct random indices from candidates
            ids = rng.choice(M, size=3, replace=False)
            p0, p1, p2 = cand_pts[ids]

            # compute plane normal via cross product
            v1 = p1 - p0
            v2 = p2 - p0
            n = np.cross(v1, v2)
            norm_n = np.linalg.norm(n)
            if norm_n < plane_norm_degenerate_threshold:
                continue  # degenerate sample, skip

            n = n / norm_n
            d = -np.dot(n, p0)

            # point-to-plane distances for candidate points
            distances = np.abs(cand_pts.dot(n) + d)

            # count inliers
            mask_in = distances <= inlier_threshold
            count = int(mask_in.sum())

            if count > best_count:
                best_count = count
                best_inliers = candidate_idx[mask_in]

    if use_fallback_plane or best_count < min_inliers_for_accept: # failure case or not enough inliers

        # print appropriate error
        if best_count < min_inliers_for_accept:
            print(f"[Warning]: Not enough points on best ground plane to surpass the min_inliers threshold: needed {min_inliers_for_accept}, got {best_count}")
        print("[Warning]: Using fallback constant-height plane")
        
        # not confident, return fallback: use median around band as ground level with vertical normal
        # create default flat plane normal pointing up
        fallback_n = np.zeros(3)
        fallback_n[vertical_axis] = 1.0
        fallback_d = -ground_coord
        if pts.size == 0:
            inlier_mask = np.zeros(0, dtype=bool)
        else:
            inlier_mask = band_mask  # treat band as inliers
        return fallback_n, fallback_d, inlier_mask


    # non-degenerate: refit plane with all best inliers using LLS SVD
    inlier_pts = pts[best_inliers]
    centroid = inlier_pts.mean(axis=0)
    cov = inlier_pts - centroid
    _, _, vh = np.linalg.svd(cov, full_matrices=False)
    normal = vh.T[:, -1]
    # enforce normal to point up (positive along vertical axis), structure prior
    if normal[vertical_axis] < 0:
        normal = -normal
    d = -np.dot(normal, centroid)

    # compute final inlier mask (on the whole cloud) using threshold
    distances_all = np.abs(pts.dot(normal) + d)
    inlier_mask = distances_all <= inlier_threshold

    if d > 0:
        normal = -normal
        d = -d

    return normal, d, inlier_mask



def snap_points_to_plane_and_fit_line(points, plane_normal, plane_offset, curvature_threshold=0.05):
    """
    Projects 3D points onto a plane, fits a line (or a quadratic curve if curvature is high),
    and returns the closest points on the fitted model to each input point.

    Args:
        points (N,3) np array: input 3D points.
        plane_normal (3,) np array: plane normal.
        plane_offset (float): d in plane equation n·x + d = 0.
        curvature_threshold (float): RMS distance above which to switch to quadratic model (in m).

    Returns:
        (N,3) np array, points on best-fit line or curve.
        (str), model type, either "linear" or "quadratic".
        (dict), model coefficients and direction vectors.
    """

    # project points on plane
    pts = np.asarray(points)
    n = plane_normal / np.linalg.norm(plane_normal)
    distances = (pts @ n + plane_offset)
    projected_pts = pts - np.outer(distances, n)

    # make local 2D coordinate system on the plane 
    arbitrary = np.array([1, 0, 0]) if abs(n[0]) < 0.9 else np.array([0, 1, 0])
    plane_x = np.cross(n, arbitrary)
    plane_x /= np.linalg.norm(plane_x)
    plane_y = np.cross(n, plane_x)

    origin = projected_pts.mean(axis=0)
    local_2d = np.stack([
        (projected_pts - origin) @ plane_x,
        (projected_pts - origin) @ plane_y
    ], axis=1)
    x, y = local_2d[:, 0], local_2d[:, 1]

    # try linear model y = a*x + b
    A = np.vstack([x, np.ones_like(x)]).T
    a, b = np.linalg.lstsq(A, y, rcond=None)[0]
    dir_2d = np.array([1, a])
    dir_2d /= np.linalg.norm(dir_2d)

    # project points to linear model
    snapped_linear = []
    for p in local_2d:
        p_vec = p - np.array([0, b])
        t = np.dot(p_vec, dir_2d)
        proj_2d = np.array([t, a * t + b])
        proj_3d = origin + proj_2d[0]*plane_x + proj_2d[1]*plane_y
        snapped_linear.append(proj_3d)
    snapped_linear = np.vstack(snapped_linear)

    #  RMS residual error for linear model
    linear_rms_error = np.sqrt(np.mean(np.sum((projected_pts - snapped_linear)**2, axis=1)))

    # large linear error, use quadratic model y=a*(x^2) + b*x + c
    if linear_rms_error > curvature_threshold:
        with warnings.catch_warnings():
            warnings.simplefilter('ignore', np.RankWarning)
            coeffs = np.polyfit(x, y, 2)
        a2, b2, c2 = coeffs

        ts = np.linspace(x.min(), x.max(), 400)
        curve_pts = np.stack([ts, a2*ts**2 + b2*ts + c2], axis=1)
        snapped_quadratic = []
        for p in local_2d:
            idx = np.argmin(np.sum((curve_pts - p)**2, axis=1))
            proj_2d = curve_pts[idx]
            proj_3d = origin + proj_2d[0]*plane_x + proj_2d[1]*plane_y
            snapped_quadratic.append(proj_3d)
        snapped_quadratic = np.vstack(snapped_quadratic)
        quadratic_rms_error = np.sqrt(np.mean(np.sum((projected_pts - snapped_quadratic)**2, axis=1)))

        model_type = "quadratic"
        model_params = {
            "coeffs": (a2, b2, c2),
            "plane_x": plane_x,
            "plane_y": plane_y,
            "plane_origin": origin,
            "linear_rms_error": linear_rms_error,
            "quadratic_rms_error": quadratic_rms_error
        }
        return snapped_quadratic, model_type, model_params

    else:
        model_type = "linear"
        line_dir = dir_2d[0]*plane_x + dir_2d[1]*plane_y
        line_dir /= np.linalg.norm(line_dir)
        model_params = {
            "a": a,
            "b": b,
            "line_origin": origin + b*plane_y,
            "line_dir": line_dir,
            "linear_rms_error": linear_rms_error,
        }
        return snapped_linear, model_type, model_params



def project_detections_onto_image(projection, lane):
    """
    Project a 3D lane to 2D image pixels using a 3x4 projection matrix.
    
    Args:
        projection (3,4) np array: projection matrix (P = K [I|0])
        lane (N,3) np array: 3D points in camera coordinates (in m).
    
    Returns:
        (N,2) np array, pixel coordinates (u,v).
        (N,) np array, depth values (Z in camera frame)
    """

    lane_h = np.hstack([lane, np.ones((lane.shape[0], 1))])

    proj = (projection @ lane_h.T).T
    u = proj[:, 0] / proj[:, 2]
    v = proj[:, 1] / proj[:, 2]
    pixels = np.stack([u, v], axis=1)
    
    depths = lane[:, 2]
    
    return pixels, depths


### Main Processing Pipeline

Loop through all matched sequence ids and generate necessary data.

Preserves current state through override flag (can stop and return if no overriding). 

In [None]:
total_ids = sum(len(match.match_ids) for match in matched_sequences)
with tqdm(total=total_ids, desc="Processing match_ids") as inner_bar:
    for match in matched_sequences:

        # generate output folders
        seq_output_images_folder = os.path.join(base_folder, match.sequence_id, output_images_folder)
        seq_output_lidar_folder = os.path.join(base_folder, match.sequence_id, output_lidar_folder)
        seq_output_detections_2d_folder = os.path.join(base_folder, match.sequence_id, output_detections_2d_folder)
        seq_output_detections_3d_folder = os.path.join(base_folder, match.sequence_id, output_detections_3d_folder)
        for folder in [seq_output_images_folder, seq_output_lidar_folder, seq_output_detections_2d_folder, seq_output_detections_3d_folder]:
            os.makedirs(folder, exist_ok=True)

        # extract sequence calibration parameters
        with open(match.sequence_info_path, "r") as f:
            seq_info = json.load(f)
        image_size = seq_info["meta_info"]["image_size"]
        img_width, img_height = image_size[0], image_size[1]
        cam01_calib = seq_info["calib"]["cam01"]
        T_camera_lidar = np.array(cam01_calib["cam_to_velo"], dtype=np.float64)
        cam_intrinsic = np.array(cam01_calib["cam_intrinsic"], dtype=np.float64)
        fx = cam_intrinsic[0, 0]
        fy = cam_intrinsic[1, 1]
        cx = cam_intrinsic[0, 2]
        cy = cam_intrinsic[1, 2]
        distortion = np.array(cam01_calib["distortion"], dtype=np.float64)
        weather = seq_info["meta_info"]["weather"]
        tod = seq_info["meta_info"]["period"]


        # create calib file 
        T_applanix_lidar = np.eye(4, dtype=float)
        calibration_data = {
            "T_camera_lidar": T_camera_lidar.tolist(),
            "T_applanix_lidar": T_applanix_lidar.tolist(),
            "camera_intrinsics": {
                "fx": fx,
                "fy": fy,
                "cx": cx,
                "cy": cy,
                "distortion": distortion.tolist() if isinstance(distortion, np.ndarray) else distortion
            },
            "image_params": {
                "width": img_width,
                "height": img_height
            },
            "weather": weather,
            "period": tod
        }
        with open(os.path.join(base_folder, match.sequence_id, "calibration.json"), 'w') as f:
            json.dump(calibration_data, f, indent=4)


        for id in match.match_ids:
            # load data
            lidar_out_path = os.path.join(seq_output_lidar_folder, f"{id}.bin")
            img_out_path = os.path.join(seq_output_images_folder, f"{id}.png")
            detections_2d_path = os.path.join(seq_output_detections_2d_folder, f"{id}.txt")
            detections_3d_files = glob.glob(os.path.join(seq_output_detections_3d_folder, f"{id}_*.pcd"))
            if not override_existing_output:
                # skip existing output files
                existing_files = [lidar_out_path, img_out_path, detections_2d_path] + detections_3d_files
                if all(os.path.exists(f) for f in existing_files):
                    inner_bar.update(1)
                    continue

            # copy lidar to the output file, make 6 column for input formatting
            lidar_path = os.path.join(match.lidar_folder_path, f"{id}.bin")
            if not os.path.exists(lidar_path):
                print(f"Error: lidar path is not found: {lidar_path}")
            points = np.fromfile(lidar_path, dtype=np.float32).reshape(-1, 4)
            N = points.shape[0]
            ring_col = np.zeros((N, 1), dtype=np.float32)
            time_col = np.zeros((N, 1), dtype=np.float32)
            points_6d = np.hstack([points, ring_col, time_col])
            points_6d.tofile(lidar_out_path)


            # recreate images in output as png
            img_path = os.path.join(match.cam_folder_path, f"{id}.jpg")
            if not os.path.exists(img_path):
                print(f"Error: image path is not found {img_path}")
            img = Image.open(img_path)
            img.save(img_out_path, "PNG")


            # 2D inputs + 3D ground truths
                # data loading
            detections_path = os.path.join(match.lane_folder_path, f"{id}.json")
            if not os.path.exists(detections_path):
                print(f"Error: detections path is not found {detections_path}")
            
            with open(detections_path, 'r') as f:
                det = json.load(f)
            projection = np.array(det["calibration"], dtype=np.float64)

                # RANSAC (for correction)
            points_lidar_hom = np.hstack((points[:,:3], np.ones((points[:,:3].shape[0], 1))))
            points_lidar_camera = (T_camera_lidar @ points_lidar_hom.T).T[:,:3]
            perfiltered_points = prefilter_ground_plane(
                points_lidar_camera,
                -ransac_a_priori_sensor_height,
                ransac_prefilter_band_half_width,
                vertical_axis=1
            )
            plane_normal, plane_offset, ground_plane_mask = estimate_ground_plane(
                perfiltered_points,
                config,
                vertical_axis=0,
            )

            with open(detections_2d_path, 'w') as f:
                filepath_idx = 0
                for lane_points in det["lanes"]:

                    # fit to plane and line of best fit
                    lane = np.array(lane_points, dtype=np.float64)
                    lane_snapped, model_type, model_params = snap_points_to_plane_and_fit_line(
                        lane,
                        plane_normal,
                        plane_offset,
                        curvature_threshold=snap_curvature_threshold
                    )

                    # filter incorrect pixels (and corresponding 3d points)
                    pixels, _ = project_detections_onto_image(projection, lane_snapped)
                    mask = (pixels[:, 0] >= 0) & (pixels[:, 0] <= img_width) & (pixels[:, 1] >= 0) & (pixels[:, 1] <= img_height)
                    filtered_pixels = pixels[mask]
                    filtered_points = lane_snapped[mask]

                    # need at least 2 pixels
                    if filtered_pixels.shape[0] < 2:
                        # empty 2D file, no 3D file
                        continue

                    # write 3D
                    pcd_filename = os.path.join(seq_output_detections_3d_folder, f"{id}_{filepath_idx}.pcd")
                    pcd = o3d.geometry.PointCloud()
                    pcd.points = o3d.utility.Vector3dVector(filtered_points)
                    o3d.io.write_point_cloud(pcd_filename, pcd, write_ascii=True)

                    # write 2D
                    lane_str = ' '.join([f"{x:.2f},{y:.2f}" for x, y in filtered_pixels])
                    f.write(f"{lane_str} confidence: 0.9999\n")

                    filepath_idx += 1


            inner_bar.update(1)

    

Processing match_ids: 100%|██████████| 16245/16245 [1:23:50<00:00,  3.23it/s]
