Import necessary libraries

In [None]:
import cv2  # OpenCV for image processing
import numpy as np  # NumPy for numerical operations
import yaml  # YAML parsing
import matplotlib.pyplot as plt  # For plotting results
import os  # For file system operations
import re  # For regular expressions
from mpl_toolkits.mplot3d import Axes3D  # For 3D plotting
import open3d as o3d  # Open3D for point cloud operations and ICP

Custom YAML constructor for OpenCV matrices (used to read matrices from YAML files)

In [None]:
def opencv_matrix_constructor(loader, node):
    mapping = loader.construct_mapping(node, deep=True)
    rows = mapping['rows']
    cols = mapping['cols']
    data = mapping['data']
    return np.array(data, dtype=np.float32).reshape(rows, cols)

# Register the custom constructor to handle opencv-matrix in YAML files
yaml.add_constructor('tag:yaml.org,2002:opencv-matrix', opencv_matrix_constructor, Loader=yaml.SafeLoader)

Load ground truth data (rotation and translation matrices) from a YAML file

In [None]:
def load_yaml(file_path):
    try:
        with open(file_path, 'r') as file:
            content = file.read()
            # Remove the YAML version directive from the content
            content = '\n'.join([line for line in content.splitlines() if not line.startswith('%YAML:1.0')])
            yaml_content = yaml.safe_load(content)
            
            # Validate if required fields are present in the YAML content
            if not yaml_content or not all(key in yaml_content for key in ["object_rotation_wrt_camera", "object_translation_wrt_camera"]):
                print(f"❌ Error: Invalid YAML file {file_path}")
                return None, None

            # Extract the rotation and translation matrices
            R_gt = yaml_content["object_rotation_wrt_camera"]
            T_gt = np.array(yaml_content["object_translation_wrt_camera"], dtype=np.float32).reshape(3, 1)

            # Validate the rotation matrix
            if np.any(np.isnan(T_gt)) or np.any(np.isinf(T_gt)) or not is_valid_rotation_matrix(R_gt):
                print(f"❌ Error: Invalid ground truth data in {file_path}")
                return None, None

            print(f"✅ Successfully loaded: {file_path}")
            return R_gt, T_gt
    except Exception as e:
        print(f"❌ Error loading ground truth for {file_path}: {str(e)}")
        return None, None

Load 3D object models (in .obj format) from the specified directory

In [None]:
def load_obj_files(obj_model_dir, max_objects=25):
    object_points_all = []
    for obj_file in [f for f in os.listdir(obj_model_dir) if f.endswith('.obj')][:max_objects]:
        try:
            # Read vertices from the .obj file
            vertices = [list(map(float, line.strip().split()[1:4])) for line in open(os.path.join(obj_model_dir, obj_file)) if line.startswith('v ')]
            if vertices:
                object_name = os.path.splitext(obj_file)[0]  # Extract object name without the .obj extension
                object_points_all.append((np.array(vertices, dtype=np.float32), object_name))
                print(f"✅ Loaded {obj_file} with {len(vertices)} vertices as '{object_name}'")
        except Exception as e:
            print(f"❌ Error loading {obj_file}: {str(e)}")
    return object_points_all

Compute descriptors for the 3D object model

In [None]:
def compute_object_descriptors(object_points, camera_matrix, dist_coeffs):
    descriptors_all = []
    for angle in range(0, 360, 30):  # Generate descriptors from multiple viewpoints (every 30 degrees)
        R, _ = cv2.Rodrigues(np.radians(angle) * np.array([0, 1, 0]))  # Rotation around the Y-axis
        image_points, _ = cv2.projectPoints(object_points, R, np.zeros((3, 1)), camera_matrix, dist_coeffs)
        rendered_image = np.zeros((480, 640), dtype=np.uint8)
        for (x, y) in image_points.reshape(-1, 2):
            if 0 <= x < 640 and 0 <= y < 480:
                cv2.circle(rendered_image, (int(x), int(y)), 3, 255, -1)
        sift = cv2.SIFT_create(nfeatures=10000, contrastThreshold=0.01, edgeThreshold=10)
        keypoints, descriptors = sift.detectAndCompute(rendered_image, None)
        if descriptors is not None:
            descriptors_all.extend(descriptors)
    return np.array(descriptors_all)[:len(object_points)]  # Return only as many descriptors as there are object points

Helper function to check if a rotation matrix is valid

In [None]:
def is_valid_rotation_matrix(R):
    if R is None or R.shape != (3, 3):
        return False
    det_valid = np.isclose(np.linalg.det(R), 1.0, atol=1e-6)  # Ensure determinant is close to 1
    ortho_valid = np.linalg.norm(R @ R.T - np.eye(3)) < 1e-6  # Ensure orthogonality
    return det_valid and ortho_valid

Compute the rotational error and translation error 

In [None]:
#Function to compute the rotational error between 2 rotation matrices 
def rotation_error(R_est, R_gt):
    if R_est is None or R_gt is None or not is_valid_rotation_matrix(R_est) or not is_valid_rotation_matrix(R_gt):
        print("❌ Error: Invalid rotation matrices.")
        return None
    cos_theta = (np.trace(np.dot(R_est, R_gt.T)) - 1) / 2  # Compute cosine of the angle between the two rotation matrices
    return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))  # Convert to degrees

# Function to compute the translational error between two translation vectors
def translation_error(T_est, T_gt):
    if T_est is None or T_gt is None or np.any(np.isnan(T_est)) or np.any(np.isinf(T_est)) or np.any(np.isnan(T_gt)) or np.any(np.isinf(T_gt)):
        print("❌ Error: Invalid translation vectors.")
        return None
    return np.linalg.norm(T_est.flatten() - T_gt.flatten())  # Return the L2 norm of the difference

Apply a mask to the image and depth data

In [None]:
def apply_mask_to_depth_and_image(image, depth, mask):
    image_masked = cv2.bitwise_and(image, image, mask=mask)  # Apply mask to the image
    depth_masked = cv2.bitwise_and(depth, depth, mask=mask)  # Apply mask to the depth
    return image_masked, depth_masked

Visualize 3D object points in the image (for debugging)

In [None]:
def visualize_points(image, image_points, object_points_for_pnp):
    object_points_for_pnp = object_points_for_pnp.reshape(-1, 3)  # Flatten the 3D object points to a 2D array (N, 3)
    assert object_points_for_pnp.shape[1] == 3, f"Expected 3D object points, got shape {object_points_for_pnp.shape}"

Preprocess images

In [None]:
#Function to enhance image contrast using CLAHE (Contrast Limited Adaptive Histogram Equalization)
def enhance_contrast(image):
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Convert to grayscale if the image is in color
    else:
        gray = image  # Use the image as is if it's already grayscale
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))  # Create CLAHE object
    enhanced = clahe.apply(gray)  # Apply CLAHE to enhance contrast
    return enhanced

# Function to reduce noise in the image using Gaussian Blur
def reduce_noise(image):
    blurred = cv2.GaussianBlur(image, (5, 5), 0)  # Apply Gaussian blur to reduce noise
    return blurred

# Function to enhance edges in the image using Canny edge detection
def enhance_edges(image):
    edges = cv2.Canny(image, 100, 200)  # Apply Canny edge detection
    return edges

# Function to sharpen the image using a convolution kernel
def sharpen_image(image):
    kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])  # Define sharpening kernel
    sharpened = cv2.filter2D(image, -1, kernel)  # Apply the kernel to sharpen the image
    return sharpened

# Function to refine the mask using morphological operations (open and close)
def refine_mask(mask):
    kernel = np.ones((5, 5), np.uint8)  # Define kernel for morphological operations
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)  # Remove small holes and noise
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)  # Fill gaps in the mask
    return mask

# Function to normalize the depth map
def normalize_depth(depth):
    depth_normalized = cv2.normalize(depth, None, 0, 255, cv2.NORM_MINMAX)  # Normalize depth to range [0, 255]
    return depth_normalized

# Function to resize the image to the given dimensions
def resize_image(image, width, height):
    resized = cv2.resize(image, (width, height), interpolation=cv2.INTER_AREA)  # Resize using area interpolation
    return resized

In [None]:
# Function to preprocess image, mask, and depth for further processing
def preprocess_image(image, mask, depth):
    image = enhance_contrast(image)  # Enhance image contrast
    image = reduce_noise(image)  # Reduce noise in the image
    image = sharpen_image(image)  # Sharpen the image
    mask = refine_mask(mask)  # Refine the mask
    depth = normalize_depth(depth)  # Normalize the depth map
    return image, mask, depth

Estimate object pose (rotation and translation) using the PnP algorithm

In [None]:
def estimate_pose_with_pnp(image, object_points, camera_matrix, dist_coeffs, mask, depth=None):
    image, mask, depth = preprocess_image(image, mask, depth)  # Preprocess the image, mask, and depth
    image_masked, depth_masked = apply_mask_to_depth_and_image(image, depth, mask)  # Apply the mask to image and depth

    image_points = []
    object_points_for_pnp = []
    good_matches = []

    # Use SIFT to extract keypoints and descriptors
    sift = cv2.SIFT_create(nfeatures=10000, contrastThreshold=0.01, edgeThreshold=10)
    keypoints_image, descriptors_image = sift.detectAndCompute(image_masked, None)

    if descriptors_image is None or len(keypoints_image) == 0:
        print(f"❌ No keypoints found in the image. {len(keypoints_image)}")
        return None, None

    # Compute object descriptors (projecting object points to 2D from multiple viewpoints)
    object_descriptors = compute_object_descriptors(object_points, camera_matrix, dist_coeffs)
    if object_descriptors is None:
        print("❌ Error: Failed to compute object descriptors.")
        return None, None

    # Perform FLANN-based matching of keypoints between the image and object descriptors
    index_params = dict(algorithm=1, trees=10)
    search_params = dict(checks=100)
    flann = cv2.FlannBasedMatcher(index_params, search_params)

    matches = flann.knnMatch(descriptors_image, object_descriptors, k=2)
    good_matches = [m for m, n in matches if m.distance < 0.95 * n.distance]

    # Ensure there are enough good matches for pose estimation
    if len(good_matches) < 6:
        print(f"❌ Not enough valid matches found: {len(good_matches)} (need at least 6)")
        return None, None

    # Extract image points and corresponding object points for PnP
    for match in good_matches:
        if match.trainIdx < len(object_points):  # Ensure that the index is within bounds
            x, y = keypoints_image[match.queryIdx].pt
            if mask[int(y), int(x)] > 0:  # Ensure the keypoint is within the masked region
                image_points.append([x, y])
                object_points_for_pnp.append(object_points[match.trainIdx])
        else:
            print(f"❌ Invalid match: trainIdx {match.trainIdx} is out of bounds (max index {len(object_points) - 1})")

    # If there are fewer than 4 valid points, pose estimation cannot proceed
    if len(image_points) < 4 or len(object_points_for_pnp) < 4:
        print(f"❌ Not enough valid points for PnP: {len(image_points)}")
        return None, None

    image_points = np.array(image_points)
    object_points_for_pnp = np.array(object_points_for_pnp)

    # Perform PnP to estimate object pose
    retval, rvec, tvec = cv2.solvePnP(object_points_for_pnp, image_points, camera_matrix, dist_coeffs)

    if not retval:
        print("❌ PnP pose estimation failed.")
        return None, None

    R_est, _ = cv2.Rodrigues(rvec)  # Convert the rotation vector to a rotation matrix

    return R_est, tvec

Function to find matching mask and depth files based on the base name of the query image

In [None]:
def find_matching_files(base_name, directory):
    # List all files in the directory that start with the base name and contain 'mask' and 'depth' respectively
    mask_files = [f for f in os.listdir(directory) if f.startswith(base_name) and 'mask' in f]
    depth_files = [f for f in os.listdir(directory) if f.startswith(base_name) and 'depth' in f]
    return mask_files, depth_files

Main function to process images and compute pose estimation errors

In [None]:
def process_images(image_dir, obj_model_dir, camera_matrix, dist_coeffs):
    # Load 3D object points from .obj files (up to 10 objects)
    object_points_all = load_obj_files(obj_model_dir, max_objects=10)
    if not object_points_all:
        print("❌ No valid OBJ files found.")
        return

    # List query images (only '.png' files with 'image' in their name)
    query_images = [f for f in os.listdir(image_dir) if f.endswith('.png') and 'image' in f]
    if not query_images:
        print(f"❌ No query images found in {image_dir}.")
        return

    # List YAML files that contain ground truth data for pose estimation
    yml_files = [f for f in os.listdir(image_dir) if f.endswith('.yml')]
    if not yml_files:
        print(f"❌ No YAML files found in {image_dir}.")
        return

    # Initialize storage for errors (translation, rotation, and clutter levels) for each object
    object_errors = {i: {"trans_errors": [], "rot_errors": [], "clutter_levels": []} for i in range(len(object_points_all))}

    # Process each query image one by one
    for query_image_file in query_images:
        query_image_path = os.path.join(image_dir, query_image_file)
        query_image = cv2.imread(query_image_path)

        # Check if the query image was successfully read
        if query_image is None:
            print(f"❌ Error reading query image: {query_image_file}")
            continue

        # Extract base name of the query image for matching mask, depth, and ground truth
        base_name = re.sub(r'-image-.*', '', query_image_file)
        object_name = base_name

        # Find corresponding object points based on the base name (match object model)
        for idx, (points, name) in enumerate(object_points_all):
            if name == object_name:
                object_points = points
                obj_idx = idx
                break
        else:
            print(f"❌ No matching object model found for {object_name}")
            continue

        # Find corresponding mask and depth files based on the base name
        mask_files, depth_files = find_matching_files(base_name, image_dir)
        if not mask_files or not depth_files:
            print(f"❌ No matching mask or depth files found for {query_image_file}")
            continue

        # Load the mask and depth images
        mask_image_path = os.path.join(image_dir, mask_files[0])
        depth_image_path = os.path.join(image_dir, depth_files[0])
        mask = cv2.imread(mask_image_path, cv2.IMREAD_GRAYSCALE)
        depth = cv2.imread(depth_image_path, cv2.IMREAD_ANYDEPTH)

        # Check if mask and depth images were successfully loaded
        if mask is None or depth is None:
            print(f"❌ Error reading mask or depth image for {query_image_file}")
            continue

        # Find the corresponding YAML file for the query image
        matching_yml_file = next((yml_file for yml_file in yml_files if base_name in yml_file), None)
        if not matching_yml_file:
            print(f"❌ No matching YML file found for {query_image_file}")
            continue

        # Load the ground truth pose (rotation and translation) from the YAML file
        R_gt, T_gt = load_yaml(os.path.join(image_dir, matching_yml_file))
        if R_gt is None or T_gt is None:
            print(f"❌ Error loading ground truth for {query_image_file}")
            continue

        # Extract clutter level from the filename (index 4 corresponds to the clutter level in the filename)
        clutter_level = int(query_image_file.split('-')[4])
        if clutter_level == 1:
            clutter_description = "No Clutter"
        elif clutter_level == 2:
            clutter_description = "1 Clutter Item"
        elif clutter_level == 3:
            clutter_description = "2 Clutter Items"
        else:
            print(f"❌ Invalid clutter level in filename: {query_image_file}")
            continue

        # Estimate the object pose using the query image, object points, camera matrix, and depth information
        R_est, T_est = estimate_pose_with_pnp(query_image, object_points, camera_matrix, dist_coeffs, mask, depth)
        if R_est is None or T_est is None:
            print(f"❌ Pose estimation failed for {query_image_file}")
            continue

        # Calculate the translation and rotational errors between estimated and ground truth poses
        l2_distance = translation_error(T_est, T_gt)
        rot_error = rotation_error(R_est, R_gt)
        if l2_distance is None or rot_error is None:
            print(f"❌ Error calculation failed for {query_image_file}")
            continue

        # Skip the current image if the translational error exceeds 2 meters
        if l2_distance > 2.0:
            print(f"❌ Skipping {query_image_file} due to high translational error: {l2_distance} meters")
            continue

        # Store the calculated errors and clutter level for the corresponding object
        object_errors[obj_idx]["trans_errors"].append(l2_distance)
        object_errors[obj_idx]["rot_errors"].append(rot_error)
        object_errors[obj_idx]["clutter_levels"].append(clutter_level)

        # Output the result for the current image
        print(f"✅ Pose estimation for {query_image_file} with object {object_name} (Clutter Level: {clutter_description})")
        print(f"L2 Translation Error (meters): {l2_distance}")
        print(f"Rotational Error (degrees): {rot_error}")
        print("-" * 40)

    # Plot pose estimation errors for each object
    for obj_idx, (object_points, object_name) in enumerate(object_points_all):
        errors = object_errors[obj_idx]
        if not errors["trans_errors"]:
            continue

        # Create a scatter plot of translational and rotational errors for different clutter levels
        plt.figure(figsize=(10, 7))
        markers = ['s', 'x', 'o']
        colors = ['blue', 'red', 'green']
        labels = ['No Clutter', '1 Clutter Item', '2 Clutter Items']

        # Plot the data points based on clutter levels
        for clutter_level in range(1, 4):
            indices = [i for i, cl in enumerate(errors["clutter_levels"]) if cl == clutter_level]
            plt.scatter([errors["trans_errors"][i] for i in indices], [errors["rot_errors"][i] for i in indices],
                        color=colors[clutter_level - 1], marker=markers[clutter_level - 1], label=labels[clutter_level - 1], s=50, alpha=0.8)

        # Add labels and title to the plot
        plt.xlabel("Translational Error (meters)", fontsize=12)
        plt.ylabel("Rotational Error (degrees)", fontsize=12)
        plt.title(f"Pose Estimation Errors for {object_name}", fontsize=14)
        plt.legend(title="Clutter Conditions")
        plt.grid(True)
        plt.tight_layout()
        plt.show()

Use above code now in an example

In [None]:
image_dir = r"rutgers_apc_dataset\all_data" # Directory that has all the dataset 
obj_model_dir = r"rutgers_apc_models\apc_main\object_models\tarball" # Directory that has all the .obj files
camera_matrix = np.array([[575.8157, 0, 319.5], [0, 575.8157, 239.5], [0, 0, 1]], dtype=np.float64)
dist_coeffs = np.zeros((4, 1), dtype=np.float64)

# Run the process_images function to process the dataset and visualize errors
process_images(image_dir, obj_model_dir, camera_matrix, dist_coeffs)