# Robust 3D Alignment: RANSAC + ICP

## The Problem
Standard ICP fails when rotation differences are large (>30 degrees). It gets stuck in local minima, resulting in massive translation errors and poor fitness scores.

## The Solution
We implement a two-step pipeline:
1.  **Global Registration (RANSAC):** Uses FPFH (Fast Point Feature Histograms) to match geometric features independent of rotation. This brings the clouds roughly together.
2.  **Local Refinement (Point-to-Plane ICP):** Once roughly aligned, ICP snaps the surfaces together perfectly.

---
## Error Calculation
We verify the result by comparing the inverse of our calculated transformation ($T_{calc}^{-1}$) with the Blender Ground Truth ($T_{gt}$).

In [39]:
import open3d as o3d
import pandas as pd
import numpy as np
import os
import time
import math
import copy

print(f"Open3D version: {o3d.__version__}")

Open3D version: 0.19.0


In [None]:
# --- CONFIGURATION ---
# DATA_ROOT = r"C:\Users\RobinSchool\Stichting Hogeschool Utrecht\MNLE Imagine Project - Documents\DataProgram\Blender_Generated_Data"
DATA_ROOT = "../Blender_Generated_Data"
SCANS_DIR = os.path.join(DATA_ROOT, "scans")
INDEX_FILE = os.path.join(DATA_ROOT, "dataset_index.csv")

# Voxel size determines the 'resolution' for the coarse RANSAC step.
# If your object is 2x2x2 meters, 0.05 (5cm) is good.
# If your object is very small (10cm), set this to 0.005.
VOXEL_SIZE = 0.01 

## Helper Functions (Feature Extraction)

In [41]:
def preprocess_point_cloud(pcd, voxel_size):
    """
    1. Downsample (for performance in RANSAC)
    2. Estimate Normals
    3. Compute FPFH Features (Geometric fingerprints)
    """
    # Downsample
    pcd_down = pcd.voxel_down_sample(voxel_size)

    # Estimate Normals
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30))

    # Compute FPFH Features
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100))
    
    return pcd_down, pcd_fpfh

def execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size):
    """RANSAC: Rough Alignment based on features."""
    distance_threshold = voxel_size * 1.5
    
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, # Take 3 points to propose a match
        [o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
         o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)],
        o3d.pipelines.registration.RANSACConvergenceCriteria(100000, 0.999)
    )
    return result

def refine_registration(source, target, initial_transform, voxel_size):
    """ICP: Fine Alignment using Point-to-Plane."""
    distance_threshold = voxel_size * 0.4
    
    # Ensure normals are computed on original high-res clouds
    source.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
    target.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))
    
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, initial_transform,
        o3d.pipelines.registration.TransformationEstimationPointToPlane()
    )
    return result

# --- MATH HELPERS ---
def get_rotation_error(R_est, R_gt):
    R_diff = np.dot(R_est, R_gt.T)
    tr = np.clip(np.trace(R_diff), -1, 3)
    return math.degrees(math.acos((tr - 1) / 2))

def get_translation_error(t_est, t_gt):
    return np.linalg.norm(t_est - t_gt)

def parse_ground_truth_matrix(row):
    cols = [f"m{r}{c}" for r in range(4) for c in range(4)]
    flat = row[cols].values.astype(float)
    matrix = flat.reshape(4, 4)
    # Normalize rotation columns
    for col in range(3):
        norm = np.linalg.norm(matrix[:3, col])
        if norm > 0: matrix[:3, col] /= norm
    return matrix

def load_csv_pcd(filepath):
    try:
        df = pd.read_csv(filepath)
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(df[['X', 'Y', 'Z']].values)
        return pcd
    except: return None

## Main Pipeline Loop

In [42]:
if os.path.exists(INDEX_FILE):
    df_index = pd.read_csv(INDEX_FILE)
    
    # 1. Load Reference (Target)
    ref_row = df_index.iloc[0]
    ref_path = os.path.join(SCANS_DIR, ref_row['filename'])
    pcd_target = load_csv_pcd(ref_path)
    
    # Preprocess Target for RANSAC
    target_down, target_fpfh = preprocess_point_cloud(pcd_target, VOXEL_SIZE)
    
    results = []
    print("Starting Robust Alignment (RANSAC + ICP)...")

    # Loop
    for idx, row in df_index.iloc[1:].iterrows():
        filepath = os.path.join(SCANS_DIR, row['filename'])
        pcd_source = load_csv_pcd(filepath)
        if pcd_source is None: continue

        # --- STEP 1: Global Registration (RANSAC) ---
        # We need downsampled features to match rotational variants
        source_down, source_fpfh = preprocess_point_cloud(pcd_source, VOXEL_SIZE)
        
        ransac_result = execute_global_registration(
            source_down, target_down, source_fpfh, target_fpfh, VOXEL_SIZE)
        
        # --- STEP 2: Local Refinement (ICP) ---
        # Use RANSAC result as starting point
        icp_result = refine_registration(
            pcd_source, pcd_target, ransac_result.transformation, VOXEL_SIZE)
        
        # --- STEP 3: Validate against Ground Truth ---
        T_icp = icp_result.transformation
        T_gt = parse_ground_truth_matrix(row)
        
        # Invert ICP result to compare with GT
        try:
            T_est = np.linalg.inv(T_icp)
            
            err_rot = get_rotation_error(T_est[:3, :3], T_gt[:3, :3])
            err_trans = get_translation_error(T_est[:3, 3], T_gt[:3, 3])
            
            results.append({
                'id': row['id'],
                'fitness': icp_result.fitness,
                'rmse': icp_result.inlier_rmse,
                'rot_error': err_rot,
                'trans_error': err_trans,
                'matrix_icp': T_icp
            })
            
            if idx % 1 == 0:
                print(f"Sample {idx}: Fit={icp_result.fitness:.2f}, RotErr={err_rot:.2f} deg, TransErr={err_trans:.4f} m")
                
        except Exception as e:
            print(f"Matrix Error Sample {idx}: {e}")

    # --- REPORT ---
    df_res = pd.DataFrame(results)
    print("\n=== FINAL REPORT ===")
    print(f"Mean Fitness:         {df_res['fitness'].mean():.5f}")
    print(f"Mean Rotation Error:  {df_res['rot_error'].mean():.5f} deg")
    print(f"Mean Trans Error:     {df_res['trans_error'].mean():.5f} m")
    
else:
    print("Index file not found.")

Starting Robust Alignment (RANSAC + ICP)...
Sample 1: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 2: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 3: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 4: Fit=1.00, RotErr=0.00 deg, TransErr=0.0001 m
Sample 5: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 6: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 7: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 8: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 9: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 10: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 11: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 12: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 13: Fit=1.00, RotErr=90.00 deg, TransErr=0.0000 m
Sample 14: Fit=1.00, RotErr=0.01 deg, TransErr=0.0000 m
Sample 15: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 16: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 17: Fit=1.00, RotErr=0.00 deg, TransErr=0.0000 m
Sample 18: F

In [43]:
def visualize_result(sample_id):
    row = df_res[df_res['id'] == sample_id]
    if row.empty: return
    
    filename = df_index[df_index['id'] == sample_id].iloc[0]['filename']
    icp_matrix = row.iloc[0]['matrix_icp']
    
    # Load
    pcd_src = load_csv_pcd(os.path.join(SCANS_DIR, filename))
    pcd_src.estimate_normals()
    
    # Apply Transform
    pcd_src.transform(icp_matrix)
    
    # Colors: Grey=Ref, Red=Aligned
    pcd_src.paint_uniform_color([1, 0, 0])
    pcd_target.paint_uniform_color([0.5, 0.5, 0.5])
    
    o3d.visualization.draw_geometries([pcd_target, pcd_src], window_name=f"Sample {sample_id}")

# Check a sample
visualize_result(5)

In [44]:
visualize_result(12)

In [None]:
visualize_result(13)

In [46]:
visualize_result(46)