In [1]:
import time
import random
import numpy as np
import pandas as pd
import open3d as o3d

from pathlib import Path
from collections import Counter

np.random.seed(42)
random.seed(42)
o3d.utility.random.seed(42)

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
import utils
import calibration_utils

import importlib
importlib.reload(utils)
importlib.reload(calibration_utils)

<module 'calibration_utils' from '/home/tom/Documents/UNT/csce6260/projects/kitti-experiments/fusion/calibration_utils.py'>

In [3]:
base = Path.home() / "kitti"
train_dir = base / "training"
train_labels_dir = train_dir / "label_2"
kitti_train_labels = sorted(train_labels_dir.glob("*.txt"))
velo_dir = train_dir / "velodyne"
point_cloud_train_files = sorted(velo_dir.glob("*.bin"))
calib_dir = train_dir / "calib"
calib_train_files = sorted(calib_dir.glob("*.txt"))

In [4]:
def get_point_cloud(points_velo):
    cloud = o3d.geometry.PointCloud()
    cloud.points = o3d.utility.Vector3dVector(points_velo[:, :3])
    return cloud

def downsample(cloud):
    cloud = cloud.voxel_down_sample(voxel_size=0.2)
    return cloud

def segment(cloud):
    plane_model, inliers = cloud.segment_plane(distance_threshold=0.3, ransac_n=3, num_iterations=150)
    outlier_cloud = cloud.select_by_index(inliers, invert=True)
    return outlier_cloud

def get_downsampled_point_cloud(points_velo):
    cloud = get_point_cloud(points_velo)
    cloud = downsample(cloud)
    cloud = segment(cloud)
    return cloud

def depth_filter(cloud):
    cloud_points = np.array(cloud.points)
    mask_in_front = cloud_points[:, 0] > 0
    cloud_points_in_cam_front = cloud_points[mask_in_front]
    return cloud_points_in_cam_front

def get_points_uv(cloud_points_in_cam_front, calib):
    points_uv = calibration_utils.convert_to_img_coords(cloud_points_in_cam_front, calib)
    return points_uv[:, :2]

def get_labels(label_file_path):
    labels = utils.parse_label_file(label_file_path)
    labels = [label for label in labels if label['type'] in ['Car', 'Pedestrian', 'Cyclist']]
    return labels

def get_mask_frustums(points_uv, labels):
    mask_frustums = []
    u, v = points_uv[:, 0], points_uv[:, 1]
    valid_mask = np.isfinite(u) & np.isfinite(v)
    for label in labels:
        xmin, ymin, xmax, ymax = label['bbox_2d']
        mask_frustum = (u >= xmin) & (u <= xmax) & (v >= ymin) & (v <= ymax)
        mask_frustum = mask_frustum & valid_mask
        mask_frustums.append(mask_frustum)
    return mask_frustums

def extract_frustum_points(mask_frustums, cloud_points_in_cam_front):
    frustum_points = []
    for mask_frustum in mask_frustums:
        points = cloud_points_in_cam_front[mask_frustum]
        frustum_points.append(points)
    return frustum_points

def get_frustum(points):
    frustum = o3d.geometry.PointCloud()
    frustum.points = o3d.utility.Vector3dVector(points[:, :3])
    return frustum

def get_cluster_labels(frustum):
    if len(frustum.points) == 0:
        return np.array([])    
    with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Warning) as cm:
        cluster_labels = np.array(frustum.cluster_dbscan(eps=0.45, min_points=10, print_progress=False))
    return cluster_labels

def group_cluster_labels(cluster_labels):
    valid_mask = cluster_labels != -1
    return (
        pd.Series(np.arange(len(cluster_labels)))[valid_mask]
        .groupby(cluster_labels[valid_mask], sort=False)
        .apply(list)
        .tolist()
    )

def get_frustum_with_clusters(points):
    frustum = get_frustum(points)
    cluster_labels = get_cluster_labels(frustum)
    return frustum, group_cluster_labels(cluster_labels)    

def get_frustums_with_clusters(frustum_points):
    frustums_with_clusters = []
    for i, points in enumerate(frustum_points):
        frustum, clusters = get_frustum_with_clusters(points)
        frustums_with_clusters.append((frustum, clusters,))
    return frustums_with_clusters

In [5]:
object_stats = {
    'Car': {
        'height': {'mean': 1.526083, 'std': 0.136697},
        'width': {'mean': 1.628590, 'std': 0.102164},
        'length': {'mean': 3.883954, 'std': 0.425924}
    },
    'Pedestrian': {
        'height': {'mean': 1.760706, 'std': 0.113263},
        'width': {'mean': 0.660189, 'std': 0.142667},
        'length': {'mean': 0.842284, 'std': 0.234926}
    },
    'Cyclist': {
        'height': {'mean': 1.737203, 'std': 0.094825},
        'width': {'mean': 0.596773, 'std': 0.124212},
        'length': {'mean': 1.763546, 'std': 0.176663}
    }
}

def get_oriented_bounding_box(proposal):
    center, extent, yaw, corners3d = proposal
    R = np.array([[np.cos(yaw), -np.sin(yaw), 0],
                  [np.sin(yaw),  np.cos(yaw), 0],
                  [0,            0,           1]])
    oriented_bbox = o3d.geometry.OrientedBoundingBox(center, R, extent)
    return oriented_bbox

def get_oriented_bounding_boxes(proposals):
    oriented_bboxes = []
    for proposal in proposals:
        oriented_bbox = get_oriented_bounding_box(proposal)
        oriented_bboxes.append(oriented_bbox)
    return oriented_bboxes

def score_proposal(extent, object_type):
    z_threshold = stats.norm.ppf(0.995)
    dim_names = ['length', 'width', 'height']
    scores = []
    for value, dim_name in zip(extent, dim_names):
        mean = object_stats[object_type][dim_name]['mean']
        std = object_stats[object_type][dim_name]['std']
        within_threshold = np.abs(value - mean) <= z_threshold * std
        scores.append(within_threshold)
    return all(scores)

def score_proposal(extent, object_type, z_threshold=2.0):
    dim_names = ['length', 'width', 'height']
    scores = []
    for value, dim_name in zip(extent, dim_names):
        mean = object_stats[object_type][dim_name]['mean']
        std = object_stats[object_type][dim_name]['std']
        z = np.abs(value - mean) / std
        dim_score = max(0, 1 - z / z_threshold)
        scores.append(dim_score)
    return np.mean(scores)

def get_oriented_bounding_box_proposals(frustum_with_clusters, object_type):
    proposals = []
    frustum, clusters = frustum_with_clusters
    for cluster_idx in clusters:
        cluster = frustum.select_by_index(cluster_idx)
        points = np.asarray(cluster.points)
        center, extent, yaw, corners3d = utils.fit_bev_oriented_bounding_box(points)
        if score_proposal(extent, object_type) > 0:
            proposals.append((center, extent, yaw, corners3d,))
    return proposals

def get_oriented_bounding_box_predictions(frustums_with_clusters, labels):
    oriented_bbox_predictions = []
    for i, frustum_with_clusters in enumerate(frustums_with_clusters):
        object_type = labels[i]['type']
        proposals = get_oriented_bounding_box_proposals(frustum_with_clusters, object_type)
        oriented_bboxes = get_oriented_bounding_boxes(proposals)
        oriented_bbox_predictions.extend(oriented_bboxes)
    return oriented_bbox_predictions

In [6]:
def get_cam_to_velo_transform(calib):
    Tr_velo_to_cam = calib['Tr_velo_to_cam']
    Tr_velo_to_cam = np.vstack((Tr_velo_to_cam, [0, 0, 0, 1]))
    Tr_cam_to_velo = calibration_utils.inverse_rigid_transform(Tr_velo_to_cam)
    return Tr_cam_to_velo

def get_oriented_bounding_box_labels(labels, Tr_cam_to_velo):
    oriented_bbox_labels = []
    for label in labels:
        corners_3d_velo = calibration_utils.compute_box_3d(label, Tr_cam_to_velo)
        cloud = o3d.geometry.PointCloud()
        cloud.points = o3d.utility.Vector3dVector(corners_3d_velo)
        oriented_bbox = cloud.get_oriented_bounding_box()
        oriented_bbox_labels.append(oriented_bbox)
    return oriented_bbox_labels

def evaluate(oriented_bbox_bev_labels, oriented_bbox_bev_predictions):
    results = utils.evaluate_bev_metrics(
        oriented_bbox_bev_labels, 
        oriented_bbox_bev_predictions, 
        iou_threshold=0.2
    )
    return results

In [7]:
results = Counter()
start_time = time.time()

num_samples = 100
random_indices = random.sample(range(len(kitti_train_labels)), num_samples)

for analysis_file_index in random_indices:
    bin_path = point_cloud_train_files[analysis_file_index]
    points_velo = utils.read_velodyne_bin(bin_path)
    cloud = get_downsampled_point_cloud(points_velo)
    cloud_points_in_cam_front = depth_filter(cloud)
    calib = utils.parse_calib_file(calib_train_files[analysis_file_index])
    points_uv = get_points_uv(cloud_points_in_cam_front, calib)
    labels = get_labels(kitti_train_labels[analysis_file_index])
    mask_frustums = get_mask_frustums(points_uv, labels)
    frustum_points = extract_frustum_points(mask_frustums, cloud_points_in_cam_front)
    frustums_with_clusters = get_frustums_with_clusters(frustum_points)
    oriented_bbox_predictions = get_oriented_bounding_box_predictions(frustums_with_clusters, labels)
    Tr_cam_to_velo = get_cam_to_velo_transform(calib)
    oriented_bbox_labels = get_oriented_bounding_box_labels(labels, Tr_cam_to_velo)
    oriented_bbox_bev_labels = [utils.convert_to_bev(label) for label in oriented_bbox_labels]
    oriented_bbox_bev_predictions = [utils.convert_to_bev(pred) for pred in oriented_bbox_predictions]
    cloud_results = evaluate(oriented_bbox_bev_labels, oriented_bbox_bev_predictions)
    results.update(cloud_results)
    
elapsed = time.time() - start_time

In [8]:
print(f"Total evaluation time: {elapsed:.2f} seconds")

Total evaluation time: 11.62 seconds


In [9]:
def calculate_precision_and_recall(results):
    precision, recall = utils.calculate_precision_recall(results)
    return precision, recall

def calculate_f1_score(precision, recall):
    f1_score = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    return f1_score

precision, recall = calculate_precision_and_recall(results)
f1_score = calculate_f1_score(precision, recall)
print(f"Precision={precision:.3f}, Recall={recall:.3f}, F1={f1_score:.3f}")
print(f"Raw Results: {dict(results)}")

Precision=0.423, Recall=0.232, F1=0.300
Raw Results: {'FN': 338, 'FP': 139, 'TP': 102}
