In [1]:
# Import all required libraries
from nuscenes.nuscenes import NuScenes
from nuscenes.utils.data_classes import Box
from nuscenes.utils.data_classes import RadarPointCloud
import numpy as np
import matplotlib.pyplot as plt
np.set_printoptions(suppress=True, precision=4)  # suppress scientific notation, 4 decimal digits
import pandas as pd
from nuscenes.can_bus.can_bus_api import NuScenesCanBus
from pyquaternion import Quaternion
from nuscenes.utils.geometry_utils import view_points
import copy
import os
from scipy.spatial import cKDTree
import pandas as pd

In [2]:
#Create the nuscene object to read and traverse the data
nusc = NuScenes(version='v1.0-mini', dataroot='../data/sets/nuscenes', verbose=True)

Loading NuScenes tables for version v1.0-mini...
23 category,
8 attribute,
4 visibility,
911 instance,
12 sensor,
120 calibrated_sensor,
31206 ego_pose,
8 log,
10 scene,
404 sample,
31206 sample_data,
18538 sample_annotation,
4 map,
Done loading in 0.313 seconds.
Reverse indexing ...
Done reverse indexing in 0.1 seconds.


In [3]:
def getCorrespondingCANData(time_stamp):
    nusc_can = NuScenesCanBus(dataroot='../data/sets/nuscenes')
    can_msgs = nusc_can.get_messages(scene['name'], 'vehicle_monitor')
    can_ts = np.array([msg['utime'] for msg in can_msgs])
    idx = np.argmin(np.abs(can_ts - time_stamp))
    can_msg = can_msgs[idx]
    candata = {
        'time_stamp' : time_stamp,
        'brake': can_msg['brake'],
        'brake_switch': can_msg['brake_switch'],
        'rear_left_rpm': can_msg['rear_left_rpm'],
        'rear_right_rpm' : can_msg['rear_right_rpm'],
        'can_utime': can_msg['utime'],
        'vehicle_speed': can_msg['vehicle_speed'],
        'yaw_rate': can_msg['yaw_rate']
        }
    return candata

In [4]:
def get_annotation_by_instance(sample, instance_token, nusc):
    for ann_token in sample['anns']:
        ann = nusc.get('sample_annotation', ann_token)
        if ann['instance_token'] == instance_token:
            return ann
    return None

In [5]:
def extract_box_details(ann):
    return {
        'translation_x': ann['translation'][0],
        'translation_y': ann['translation'][1],
        'translation_z': ann['translation'][2],
        'size_l': ann['size'][0],
        'size_w': ann['size'][1],
        'size_h': ann['size'][2],
        'rotation_w': ann['rotation'][0],
        'rotation_x': ann['rotation'][1],
        'rotation_y': ann['rotation'][2],
        'rotation_z': ann['rotation'][3],
        'num_lidar_pts': ann['num_lidar_pts'],
        'num_radar_pts': ann['num_radar_pts'],
        'category_name': ann['category_name'],
        'visibility_token': ann['visibility_token'],
        'attribute_tokens': ann['attribute_tokens']
    }

In [6]:
def get_transformed_radar_points(radar_data, nusc):
    radar_pc = RadarPointCloud.from_file(os.path.join(nusc.dataroot, radar_data['filename']))
    
    cs_record = nusc.get('calibrated_sensor', radar_data['calibrated_sensor_token'])
    pose_record = nusc.get('ego_pose', radar_data['ego_pose_token'])

    radar_pc.rotate(Quaternion(cs_record['rotation']).rotation_matrix)
    radar_pc.translate(np.array(cs_record['translation']))
    radar_pc.rotate(Quaternion(pose_record['rotation']).rotation_matrix)
    radar_pc.translate(np.array(pose_record['translation']))

    return radar_pc

In [7]:
def transform_box_to_sensor_frame(box, sensor_data, nusc):
    # Get sensor calibration and ego pose
    calib = nusc.get('calibrated_sensor', sensor_data['calibrated_sensor_token'])
    pose = nusc.get('ego_pose', sensor_data['ego_pose_token'])

    # Global → Ego
    translation = np.array(box['translation']) - np.array(pose['translation'])
    rotation = Quaternion(pose['rotation']).inverse * Quaternion(box['rotation'])

    # Ego → Sensor
    translation = Quaternion(calib['rotation']).inverse.rotate(translation - np.array(calib['translation']))
    rotation = Quaternion(calib['rotation']).inverse * rotation

    return {
        'translation_x': translation[0],
        'translation_y': translation[1],
        'translation_z': translation[2],
        'rotation_w': rotation[0],
        'rotation_x': rotation[1],
        'rotation_y': rotation[2],
        'rotation_z': rotation[3],
        'size_l': box['size'][0],
        'size_w': box['size'][1],
        'size_h': box['size'][2]
    }


In [8]:
def get_nearby_radar_points(box_center, radar_trans_pc, radar_frame_pc, radar_token, threshold=2.5):
    radar_trans_xyz = radar_trans_pc.points[:3, :].T
    radar_frame_xyz = radar_frame_pc.points[:3, :].T
    radar_attrs = radar_frame_pc.points[3:, :].T

    tree = cKDTree(radar_trans_xyz)
    idxs = tree.query_ball_point(box_center, r=threshold)

    # If no points within threshold, fall back to closest
    if not idxs:
        dist, idx = tree.query(box_center, k=1)
        idxs = [idx]

    radar_dicts = []
    for idx in idxs:
        radar_trans_point = radar_trans_xyz[idx]
        radar_point = radar_frame_xyz[idx]
        radar_attr = radar_attrs[idx]
        dist = np.linalg.norm(radar_trans_point - box_center)

        radar_dict = {
            'radar_token': radar_token,
            'radar_x': radar_point[0],
            'radar_y': radar_point[1],
            'radar_z': radar_point[2],
            'radar_trans_x': radar_trans_point[0],
            'radar_trans_y': radar_trans_point[1],
            'radar_trans_z': radar_trans_point[2],
            'dyn_prop': radar_attr[0],
            'cluster_id': radar_attr[1],
            'rcs': radar_attr[2],
            'vx': radar_attr[3],
            'vy': radar_attr[4],
            'vx_comp': radar_attr[5],
            'vy_comp': radar_attr[6],
            'is_quality_valid': radar_attr[7],
            'ambig_state': radar_attr[8],
            'x_rms': radar_attr[9],
            'y_rms': radar_attr[10],
            'invalid_state': radar_attr[11],
            'pdh0': radar_attr[12],
            'vx_rms': radar_attr[13],
            'vy_rms': radar_attr[14],
            'radar_distance': dist
        }
        radar_dicts.append(radar_dict)

    return radar_dicts

In [9]:
def get_closest_cluster_radar_points(box_center, radar_trans_pc, radar_frame_pc, radar_token, threshold=2.5):
    radar_trans_xyz = radar_trans_pc.points[:3, :].T
    radar_frame_xyz = radar_frame_pc.points[:3, :].T
    radar_attrs = radar_frame_pc.points[3:, :].T

    tree = cKDTree(radar_trans_xyz)
    idxs = tree.query_ball_point(box_center, r=threshold)

    # Fallback to closest point if none within threshold
    if not idxs:
        _, idx = tree.query(box_center, k=1)
        idxs = [idx]

    # Use the first point (closest) to get its cluster ID
    closest_idx = idxs[0]
    target_cluster_id = radar_attrs[closest_idx][1]

    # Find all points with the same cluster ID
    cluster_mask = radar_attrs[:, 1] == target_cluster_id
    cluster_indices = np.where(cluster_mask)[0]

    radar_dicts = []
    for idx in cluster_indices:
        radar_trans_point = radar_trans_xyz[idx]
        radar_point = radar_frame_xyz[idx]
        radar_attr = radar_attrs[idx]
        dist = np.linalg.norm(radar_trans_point - box_center)

        radar_dict = {
            'radar_token': radar_token,
            'radar_x': radar_point[0],
            'radar_y': radar_point[1],
            'radar_z': radar_point[2],
            'radar_trans_x': radar_trans_point[0],
            'radar_trans_y': radar_trans_point[1],
            'radar_trans_z': radar_trans_point[2],
            'dyn_prop': radar_attr[0],
            'cluster_id': radar_attr[1],
            'rcs': radar_attr[2],
            'vx': radar_attr[3],
            'vy': radar_attr[4],
            'vx_comp': radar_attr[5],
            'vy_comp': radar_attr[6],
            'is_quality_valid': radar_attr[7],
            'ambig_state': radar_attr[8],
            'x_rms': radar_attr[9],
            'y_rms': radar_attr[10],
            'invalid_state': radar_attr[11],
            'pdh0': radar_attr[12],
            'vx_rms': radar_attr[13],
            'vy_rms': radar_attr[14],
            'radar_distance': dist
        }
        radar_dicts.append(radar_dict)

    return radar_dicts

In [10]:
import numpy as np
import cv2
from pyquaternion import Quaternion

def is_annotation_visible_in_front_camera(nusc, ann_token, sample):
    """
    Checks if the annotation is visible in the CAM_FRONT image.
    
    Parameters:
        nusc: NuScenes instance
        ann_token: Token of the annotation to check
        sample: Sample dictionary containing 'data' field
    
    Returns:
        bool: True if annotation is visible in CAM_FRONT image, False otherwise
    """
    try:
        # Get camera data
        cam_token = sample['data']['CAM_FRONT']
        cam_data = nusc.get('sample_data', cam_token)
        cam_cs = nusc.get('calibrated_sensor', cam_data['calibrated_sensor_token'])
        cam_pose = nusc.get('ego_pose', cam_data['ego_pose_token'])
        cam_intrinsic = np.array(cam_cs['camera_intrinsic'])

        # Get box and transform to camera frame
        box = nusc.get_box(ann_token)
        box.translate(-np.array(cam_pose['translation']))
        box.rotate(Quaternion(cam_pose['rotation']).inverse)
        box.translate(-np.array(cam_cs['translation']))
        box.rotate(Quaternion(cam_cs['rotation']).inverse)

        # Project box corners
        corners = box.corners()
        corners_cam = cam_intrinsic @ corners
        depths = corners_cam[2, :]

        # Check if behind camera
        if np.any(depths <= 0):
            return False

        # Project to 2D
        corners_2d = corners_cam[:2, :] / depths
        corners_2d = corners_2d.T.astype(np.int32)

        # Load image and check bounds
        image_path = os.path.join(nusc.dataroot, cam_data['filename'])
        image = cv2.imread(image_path)
        if image is None:
            return False

        H, W = image.shape[:2]
        if np.any(corners_2d[:, 0] < 0) or np.any(corners_2d[:, 0] >= W) or \
           np.any(corners_2d[:, 1] < 0) or np.any(corners_2d[:, 1] >= H):
            return False

        return True

    except Exception as e:
        print(f"Visibility check failed: {e}")
        return False

In [11]:
rows = []

for scene in nusc.scene:
    scene_token = scene['token']
    scene_name = scene['name']
    
    sample_token = scene['first_sample_token']
    
    while sample_token:
        sample = nusc.get('sample', sample_token)
        next_token = sample['next']
        
        cam_token = sample['data']['CAM_FRONT']
        cam_data = nusc.get('sample_data', cam_token)
        camera_pose = nusc.get('ego_pose', cam_data['ego_pose_token'])

        radar_token = sample['data']['RADAR_FRONT']
        radar_data = nusc.get('sample_data', radar_token)

        radar_pc = RadarPointCloud.from_file(os.path.join(nusc.dataroot, radar_data['filename']))
        radar_trans_pc = get_transformed_radar_points(radar_data, nusc)

        can_msg = getCorrespondingCANData(radar_data['timestamp'])

        for ann_token in sample['anns']:
            ann = nusc.get('sample_annotation', ann_token)

            if not is_annotation_visible_in_front_camera(nusc, ann_token, sample):
                continue

            box_radar = transform_box_to_sensor_frame(ann, radar_data, nusc)

            #Tried with nearby points with better results. Even the RADAR is doing approximation with cluster ID
            radar_rows = get_nearby_radar_points(ann['translation'], radar_trans_pc, radar_pc, radar_token)
            #This function gets all the points in the same cluster. This result in the dyn_model2 was not as good as closest points
            #radar_rows = get_closest_cluster_radar_points(ann['translation'], radar_trans_pc, radar_pc, radar_token)

            for radar_row in radar_rows:
                if next_token:
                    row = {
                        'scene_token': scene_token,
                        'scene_name': scene_name,
                        'sample_token': sample_token,
                        'next_sample_token': next_token,
                        'instance_token': ann['instance_token'],
                        'camera_pose_x': camera_pose['translation'][0],
                        'camera_pose_y': camera_pose['translation'][1],
                        'camera_pose_z': camera_pose['translation'][2],
                    }
    
                    row.update({f'box_{k}': v for k, v in extract_box_details(ann).items()})
                    row.update({f'box_rad_{k}': v for k, v in box_radar.items()})
                    row.update({f'radar_{k}': v for k, v in radar_row.items()})
                    row.update({f'can_{k}': v for k, v in can_msg.items()})
    
                    rows.append(row)

        sample_token = next_token  # Move to next sample

df_single_instance = pd.DataFrame(rows)
df_single_instance = df_single_instance.drop(['box_num_lidar_pts', 'box_num_radar_pts', 'box_visibility_token', 'box_attribute_tokens'], axis=1)

In [12]:
df_single_instance.shape

(6250, 60)

In [13]:
df_single_instance.isna().sum().sum()

np.int64(0)

In [14]:
df_single_instance = df_single_instance.drop_duplicates()
df_single_instance.shape

(6250, 60)

In [15]:
df_single_instance.to_excel('Box_PCD_Association.xlsx', index=False)

In [16]:
# Create a renamed copy of the original DataFrame to act as "B"
df_a = df_single_instance.copy()
df_b = df_single_instance.copy()

df_a = df_a.add_prefix("a_")  # So B.sample_token becomes next_sample_token
df_b = df_b.add_prefix("b_")  # So B.sample_token becomes next_sample_token

# Perform the join: A.next_sample_token == B.sample_token AND A.instance_token == B.instance_token
df_merged = pd.merge(
    df_a,         # A
    df_b,         # B
    left_on=["a_next_sample_token", "a_instance_token"],
    right_on=["b_sample_token", "b_instance_token"],
    how="inner"
)


In [17]:
df_merged.shape

(9482, 120)

In [18]:
df_merged = df_merged.drop_duplicates()
df_merged.shape

(9482, 120)

In [19]:
df_merged.to_excel('BoxAB_PCD_Association.xlsx')