In [None]:
from pathlib import Path
from tqdm import tqdm
import open3d as o3d
# run_path_folder = Path("/Users/username/Downloads/cl_top_half_24_fps")
# run1_path = Path("/Users/username/Downloads/cl_top_half_24_fps_55_to_65_chunk1")
# run2_path = Path("/Users/username/Downloads/cl_top_half_24_fps_55_to_65_chunk2")
# run_paths = [run1_path, run2_path]
run_paths = []
# num_chunks = 16
# for i in range(0, num_chunks):
#     run_paths.append(Path(f"/Users/username/Downloads/all_cl_24_fps_data/CL_TOP_HALF_24_FPS_CHUNK{i+1}.MP4"))
# run_paths

for i in range(2, 3):
    run_paths.append(Path(f"/Users/username/Downloads/clipped_data/cl_clip_00{i}"))
run_paths

In [None]:
from __future__ import annotations

import dataclasses
import os
from pathlib import Path
from typing import Tuple, cast

import imageio.v3 as iio
import numpy as np
import numpy as onp
import numpy.typing as onpt
import skimage.transform
from scipy.spatial.transform import Rotation

class Record3dLoader_Customized:
    """Helper for loading frames for Record3D captures."""

    def __init__(self, data_dir: Path, conf_threshold: float = 1.0, foreground_conf_threshold: float = 0.1, no_mask: bool = False, xyzw=True, init_conf=False):

        # Read metadata.
        intrinsics_path = data_dir / "pred_intrinsics.txt"
        intrinsics = np.loadtxt(intrinsics_path)

        self.K: onp.ndarray = np.array(intrinsics, np.float32).reshape(-1, 3, 3)
        fps = 30

        self.init_conf = init_conf

        poses_path = data_dir / "pred_traj.txt"
        poses = np.loadtxt(poses_path)
        self.T_world_cameras: onp.ndarray = np.array(poses, np.float32)
        self.T_world_cameras = np.concatenate(
            [
                # Convert TUM pose to SE3 pose
                Rotation.from_quat(self.T_world_cameras[:, 4:]).as_matrix() if not xyzw
                else Rotation.from_quat(np.concatenate([self.T_world_cameras[:, 5:], self.T_world_cameras[:, 4:5]], -1)).as_matrix(),
                self.T_world_cameras[:, 1:4, None],
            ],
            -1,
        )
        self.T_world_cameras = self.T_world_cameras.astype(np.float32)

        # Convert to homogeneous transformation matrices (ensure shape is (N, 4, 4))
        num_frames = self.T_world_cameras.shape[0]
        ones = np.tile(np.array([0, 0, 0, 1], dtype=np.float32), (num_frames, 1, 1))
        self.T_world_cameras = np.concatenate([self.T_world_cameras, ones], axis=1)

        self.fps = fps
        self.conf_threshold = conf_threshold
        self.foreground_conf_threshold = foreground_conf_threshold
        self.no_mask = no_mask

        # Read frames.
        self.rgb_paths = sorted(data_dir.glob("frame_*.png"), key=lambda p: int(p.stem.split("_")[-1]))
        self.depth_paths = sorted(data_dir.glob("frame_*.npy"), key=lambda p: int(p.stem.split("_")[-1]))
        if init_conf:
            self.init_conf_paths = sorted(data_dir.glob("init_conf_*.npy"), key=lambda p: int(p.stem.split("_")[-1]))
        else:
            self.init_conf_paths = []
        self.conf_paths = sorted(data_dir.glob("conf_*.npy"), key=lambda p: int(p.stem.split("_")[-1]))
        self.mask_paths = sorted(data_dir.glob("enlarged_dynamic_mask_*.png"), key=lambda p: int(p.stem.split("_")[-1]))

        # Remove the last frame since it does not have a ground truth dynamic mask
        self.rgb_paths = self.rgb_paths[:-1]

        # Align all camera poses by the first frame
        T0 = self.T_world_cameras[len(self.T_world_cameras) // 2]  # First camera pose (4x4 matrix)
        T0_inv = np.linalg.inv(T0)    # Inverse of the first camera pose

        # Apply T0_inv to all camera poses
        self.T_world_cameras = np.matmul(T0_inv[np.newaxis, :, :], self.T_world_cameras)


    def num_frames(self) -> int:
        return len(self.rgb_paths)

    def get_frame(self, index: int) -> Record3dFrame:

        # Read depth.
        depth = np.load(self.depth_paths[index])
        depth: onp.NDArray[onp.float32] = depth
        
        # Check if conf file exists, otherwise initialize with ones
        if len(self.conf_paths) == 0:
            conf = np.ones_like(depth, dtype=onp.float32)
        else:
            conf_path = self.conf_paths[index]
            if os.path.exists(conf_path):
                conf = np.load(conf_path)
                conf: onpt.NDArray[onp.float32] = conf
                # Clip confidence to avoid negative values
                conf = np.clip(conf, 0.0001, 99999)
            else:
                conf = np.ones_like(depth, dtype=onp.float32)

        # Check if init conf file exists, otherwise initialize with ones
        if len(self.init_conf_paths) == 0:  # If init conf is not available, use conf
            init_conf = conf
        else:
            init_conf_path = self.init_conf_paths[index]
            if os.path.exists(init_conf_path):
                init_conf = np.load(init_conf_path)
                init_conf: onpt.NDArray[onp.float32] = init_conf
                # Clip confidence to avoid negative values
                init_conf = np.clip(init_conf, 0.0001, 99999)
            else:
                init_conf = np.ones_like(depth, dtype=onp.float32)
        
        # Check if mask file exists, otherwise initialize with zeros
        if len(self.mask_paths) == 0:
            mask = np.ones_like(depth, dtype=onp.bool_)
        else:
            mask_path = self.mask_paths[index]
            if os.path.exists(mask_path):
                mask = iio.imread(mask_path) > 0
                mask: onpt.NDArray[onp.bool_] = mask
            else:
                mask = np.ones_like(depth, dtype=onp.bool_)

        if self.no_mask:
            mask = np.ones_like(mask).astype(np.bool_)

        # Read RGB.
        rgb = iio.imread(self.rgb_paths[index])
        # if 4 channels, remove the alpha channel
        if rgb.shape[-1] == 4:
            rgb = rgb[..., :3]

        return Record3dFrame(
            K=self.K[index],
            rgb=rgb,
            depth=depth,
            mask=mask,
            conf=conf,
            init_conf=init_conf,
            T_world_camera=self.T_world_cameras[index],
            conf_threshold=self.conf_threshold,
            foreground_conf_threshold=self.foreground_conf_threshold,
        )


@dataclasses.dataclass
class Record3dFrame:
    """A single frame from a Record3D capture."""

    K: onpt.NDArray[onp.float32]
    rgb: onpt.NDArray[onp.uint8]
    depth: onpt.NDArray[onp.float32]
    mask: onpt.NDArray[onp.bool_]
    conf: onpt.NDArray[onp.float32]
    init_conf: onpt.NDArray[onp.float32]
    T_world_camera: onpt.NDArray[onp.float32]
    conf_threshold: float = 1.0
    foreground_conf_threshold: float = 0.1

    def get_point_cloud(
        self, downsample_factor: int = 1, bg_downsample_factor: int = 1,
    ) -> Tuple[onpt.NDArray[onp.float32], onpt.NDArray[onp.uint8], onpt.NDArray[onp.float32], onpt.NDArray[onp.uint8]]:
        rgb = self.rgb[::downsample_factor, ::downsample_factor]
        depth = skimage.transform.resize(self.depth, rgb.shape[:2], order=0)
        mask = cast(
            onpt.NDArray[onp.bool_],
            skimage.transform.resize(self.mask, rgb.shape[:2], order=0),
        )
        assert depth.shape == rgb.shape[:2]

        K = self.K
        T_world_camera = self.T_world_camera

        img_wh = rgb.shape[:2][::-1]

        grid = (
            np.stack(np.meshgrid(np.arange(img_wh[0]), np.arange(img_wh[1])), 2) + 0.5
        )
        grid = grid * downsample_factor
        conf_mask = self.conf > self.conf_threshold
        if self.init_conf is not None:
            fg_conf_mask = self.init_conf > self.foreground_conf_threshold
        else:
            fg_conf_mask = self.conf > self.foreground_conf_threshold
        # reshape the conf mask to the shape of the depth
        conf_mask = skimage.transform.resize(conf_mask, depth.shape, order=0)
        fg_conf_mask = skimage.transform.resize(fg_conf_mask, depth.shape, order=0)

        # Foreground points
        homo_grid = np.pad(grid[fg_conf_mask & mask], ((0, 0), (0, 1)), constant_values=1)
        local_dirs = np.einsum("ij,bj->bi", np.linalg.inv(K), homo_grid)
        dirs = np.einsum("ij,bj->bi", T_world_camera[:3, :3], local_dirs)
        points = (T_world_camera[:3, 3] + dirs * depth[fg_conf_mask & mask, None]).astype(np.float32)
        point_colors = rgb[fg_conf_mask & mask]

        # Background points
        bg_homo_grid = np.pad(grid[conf_mask & ~mask], ((0, 0), (0, 1)), constant_values=1)
        bg_local_dirs = np.einsum("ij,bj->bi", np.linalg.inv(K), bg_homo_grid)
        bg_dirs = np.einsum("ij,bj->bi", T_world_camera[:3, :3], bg_local_dirs)
        bg_points = (T_world_camera[:3, 3] + bg_dirs * depth[conf_mask & ~mask, None]).astype(np.float32)
        bg_point_colors = rgb[conf_mask & ~mask]

        if bg_downsample_factor > 1 and bg_points.shape[0] > 0:
            indices = np.random.choice(
                bg_points.shape[0],
                size=bg_points.shape[0] // bg_downsample_factor,
                replace=False
            )
            bg_points = bg_points[indices]
            bg_point_colors = bg_point_colors[indices]
        return points, point_colors, bg_points, bg_point_colors


In [None]:
# same parameters as viser src code defaults
downsample_factor = 1
max_frames = 100
conf_threshold: float = 1
foreground_conf_threshold: float = 0.1
point_size: float = 0.001
camera_frustum_scale: float = 0.02
no_mask: bool = False
xyzw: bool = True
axes_scale: float = 0.25
bg_downsample_factor: int = 1
init_conf: bool = True
cam_thickness: float = 1.5

In [None]:
loaders = []
for run_path in run_paths:
    loader = Record3dLoader_Customized(
        data_dir=run_path,
        conf_threshold=conf_threshold,
        foreground_conf_threshold=foreground_conf_threshold,
        no_mask=no_mask,
        xyzw=xyzw,
        init_conf=init_conf
    )
    loaders.append(loader)
    print("Number of frames in this loader:", loader.num_frames())

In [None]:
# loader1, loader2, loader3 = loaders
# align_loader2_to_loader1(loader2, loader3)
# align_loader2_to_loader1(loader1, loader2)

In [None]:
# loaders = [Record]
# loader1 = Record3dLoader_Customized(
#         run1_path,
#         conf_threshold=conf_threshold,
#         foreground_conf_threshold=foreground_conf_threshold,
#         no_mask=no_mask,
#         xyzw=xyzw,
#         init_conf=init_conf,
#     )
# loader2 = Record3dLoader_Customized(
#         run2_path,
#         conf_threshold=conf_threshold,
#         foreground_conf_threshold=foreground_conf_threshold,
#         no_mask=no_mask,
#         xyzw=xyzw,
#         init_conf=init_conf,
#     )

In [None]:
import open3d as o3d
import matplotlib.pyplot as plt

def align_loader2_to_loader1(loader1: Record3dLoader_Customized, loader2: Record3dLoader_Customized):
    T1_last = loader1.T_world_cameras[-1]  # Last pose of loader1
    T2_first = loader2.T_world_cameras[0]  # First pose of loader2

    # Compute alignment transformation
    T_align = T1_last @ np.linalg.inv(T2_first)

    # Apply to all of loader2's poses
    loader2.T_world_cameras = np.einsum('ij,njk->nik', T_align, loader2.T_world_cameras)

In [None]:
import open3d as o3d

def visualize_trajectories(loaders):
    """
    Visualize camera‐center trajectories for a list of loader objects.
    Each loader must have a `T_world_cameras` array of shape (N, 4, 4).
    """
    # a simple palette—will wrap if you have more loaders than colors here
    palette = [
        [1.0, 0.0, 0.0],  # red
        [0.0, 0.0, 1.0],  # blue
        [0.0, 1.0, 0.0],  # green
        [1.0, 1.0, 0.0],  # yellow
        [1.0, 0.0, 1.0],  # magenta
        [0.0, 1.0, 1.0],  # cyan
    ]

    geometries = []
    for idx, loader in enumerate(loaders):
        # Extract the 3D camera centers
        traj = loader.T_world_cameras[:, :3, 3]
        color = palette[idx % len(palette)]

        # build point cloud
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(traj)
        pcd.paint_uniform_color(color)
        geometries.append(pcd)

        # build line set to connect consecutive centers
        lines = [[i, i + 1] for i in range(len(traj) - 1)]
        colors = [color for _ in lines]
        ls = o3d.geometry.LineSet()
        ls.points = o3d.utility.Vector3dVector(traj)
        ls.lines = o3d.utility.Vector2iVector(lines)
        ls.colors = o3d.utility.Vector3dVector(colors)
        geometries.append(ls)

    o3d.visualization.draw_geometries(
        geometries,
        window_name="Aligned Trajectories",
        width=800,
        height=600
    )

In [None]:
for j in tqdm(range(1)):
    # print("first pass")
    for i in range(len(loaders) - 1):
        # print(f"Aligning loader {i+1} to loader {i+2}")
        align_loader2_to_loader1(loaders[i], loaders[i + 1])
    align_loader2_to_loader1(loaders[len(loaders) - 1], loaders[0])

In [None]:
visualize_trajectories(loaders)

In [None]:
def loader_to_pcd(loader):
    num_frames = loader.num_frames()
    all_positions = []
    all_colors = []

    bg_positions = []
    bg_colors = []
    for i in tqdm(range(num_frames)):
        frame = loader.get_frame(i)
        position, color, bg_position, bg_color = frame.get_point_cloud(downsample_factor, bg_downsample_factor)

        all_positions.append(position)
        all_colors.append(color)

        bg_positions.append(bg_position)
        bg_colors.append(bg_color)

    all_positions = onp.concatenate(all_positions + bg_positions, axis=0)
    all_colors = onp.concatenate(all_colors + bg_colors, axis=0)

    final_point_cloud = {
        "points": all_positions,
        "colors": all_colors,
    }
    
    pcd = o3d.geometry.PointCloud()
    points = final_point_cloud["points"].astype(np.float32)
    colors = final_point_cloud["colors"].astype(np.float32)
    if colors.max() > 1.0:
        colors /= 255.0
    pcd.points = o3d.utility.Vector3dVector(points)
    pcd.colors = o3d.utility.Vector3dVector(colors)
    return pcd

In [None]:
pcds = [loader_to_pcd(loader) for loader in loaders]

In [None]:
o3d.visualization.draw_geometries(pcds, window_name="Aligned Point Clouds", width=2400, height=1720)

In [None]:
def preprocess(pcd, voxel_size):
    # 1) Downsample
    pcd_down = pcd
    pcd_down = pcd.voxel_down_sample(voxel_size)

    # 2) Estimate normals
    pcd_down.estimate_normals(
        search_param=o3d.geometry.KDTreeSearchParamHybrid(
            radius=voxel_size * 2.0, max_nn=30))

    # 3) Compute FPFH features
    fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(
            radius=voxel_size * 5.0, max_nn=100))
    return pcd_down, fpfh

In [None]:
print(pcd1)
print(pcd2)

In [None]:
voxel_size = 0.05
ransac_dist = voxel_size * 2.0
icp_dist    = voxel_size * 0.4

In [None]:
pcd1, fpfh1 = preprocess(pcd1, voxel_size)
pcd2, fpfh2 = preprocess(pcd2, voxel_size)

pcd1.estimate_normals(
    search_param=o3d.geometry.KDTreeSearchParamHybrid(
        radius=voxel_size * 2.0,  # same radius you used for FPFH normals
        max_nn=30
    )
)
pcd2.estimate_normals(
    search_param=o3d.geometry.KDTreeSearchParamHybrid(
        radius=voxel_size * 2.0,
        max_nn=30
    )
)

In [None]:
print(pcd1)
print(pcd2)

In [None]:
result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
    source=pcd2, target=pcd1,
    source_feature=fpfh2,  target_feature=fpfh1,
    mutual_filter=True,
    max_correspondence_distance=ransac_dist,
    estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
    ransac_n=4,
    checkers=[
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
        o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(ransac_dist)
    ],
    criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
)
print("RANSAC fitness:", result_ransac.fitness,
      "inlier_rmse:", result_ransac.inlier_rmse)


In [None]:
result_icp = o3d.pipelines.registration.registration_icp(
    source=pcd2,                 # the cloud you want to align
    target=pcd1,                 # the reference cloud
    max_correspondence_distance=icp_dist,
    init=result_ransac.transformation,
    estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane()
)

In [None]:
print(f"ICP fitness: {result_icp.fitness:.4f}, RMSE: {result_icp.inlier_rmse:.4f}")

In [None]:
pcd2.transform(result_icp.transformation)
merged = pcd1 + pcd2
o3d.io.write_point_cloud("merged_icp.ply", merged)
o3d.visualization.draw_geometries([merged])

In [None]:
# Apply it
T = np.eye(4, dtype=np.float64)
T[0, 3] = 1.0   # x shift
T[1, 3] = 0.5   # y shift
pcd2_transformed = pcd2.transform(T)

# ——— Visualize to sanity‑check ———
o3d.visualization.draw_geometries(
    [pcd1, pcd2_transformed],
    window_name="Debug: Hard‑coded transform",
    width=800, height=600
)

In [None]:
pcdpcd2.transform(T)
merged = pcd1 + pcd2

In [None]:
o3d.io.write_point_cloud("merged_ransac_only.ply", merged)
print("Saved merged point cloud to merged_ransac_only.ply")

o3d.visualization.draw_geometries(
    [merged],
    window_name="Merged (RANSAC only)",
    width=1024, height=768,
    mesh_show_back_face=True
)