<a href="https://colab.research.google.com/github/jeswitha2003/Autonomous-Vehicle-Safety-Analysis-using-nuScenes-Dataset-TTC-Distance-Collision-Risk-Prediction-/blob/main/Autonomous_Vehicle_Risk_Detection_using_Nuscenes_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install nuscenes-devkit pyyaml


In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
from nuscenes.nuscenes import NuScenes

data_path = '/content/drive/MyDrive/datasets/nuscenes'
nusc = NuScenes(version='v1.0-mini', dataroot=data_path, verbose=True)


In [None]:
from nuscenes.nuscenes import NuScenes

data_path = '/content/drive/MyDrive/v1.0-mini (1).tgz'
nusc = NuScenes(version='v1.0-mini (1)', dataroot=data_path, verbose=True)


In [None]:
print("Number of scenes:", len(nusc.scene))


In [None]:
nusc.scene[0]


In [None]:
print("Total sample_data entries:", len(nusc.sample_data))


In [None]:
from collections import Counter
Counter([sd['channel'] for sd in nusc.sample_data])


In [None]:
print("Total object annotations:", len(nusc.sample_annotation))


In [None]:
categories = [c['name'] for c in nusc.category]
categories


In [None]:
!rm -rf /content/drive/MyDrive/datasets/nuscenes


In [None]:
!mkdir -p /content/drive/MyDrive/datasets/nuscenes


In [None]:
!tar -xzf /content/drive/MyDrive/v1.0-mini.tgz -C /content/drive/MyDrive/datasets/nuscenes


In [None]:
!ls /content/drive/MyDrive/datasets/nuscenes


In [None]:
!pip install nuscenes-devkit pyyaml


In [None]:
import pandas as pd


In [None]:
rows = []

for ann in nusc.sample_annotation:
    sample_token = ann['sample_token']
    category = ann['category_name']
    x, y, z = ann['translation']
    rot_x, rot_y, rot_z, rot_w = ann['rotation']
    instance_token = ann['instance_token']
    scene_token = nusc.get('sample', sample_token)['scene_token']

    rows.append({
        'sample_token': sample_token,
        'scene_token': scene_token,
        'instance_token': instance_token,
        'category': category,
        'x': x,
        'y': y,
        'z': z,
        'rot_w': rot_w,
        'rot_x': rot_x,
        'rot_y': rot_y,
        'rot_z': rot_z
    })

df_annotations = pd.DataFrame(rows)
df_annotations.head()


In [None]:
df_annotations['category'].value_counts()


In [None]:
vehicles = df_annotations[df_annotations['category'].str.contains('vehicle')]
vehicles.head()


In [None]:
vehicles_per_scene = vehicles.groupby('scene_token').size().reset_index(name='vehicle_count')
vehicles_per_scene


In [None]:
import matplotlib.pyplot as plt

vehicles_per_scene.sort_values('vehicle_count').plot(
    x='scene_token',
    y='vehicle_count',
    kind='bar',
    figsize=(12,4)
)
plt.title('Vehicle Count per Scene')
plt.show()


In [None]:
trajectories = []

for ann in nusc.sample_annotation:
    instance_token = ann['instance_token']
    sample_token = ann['sample_token']
    category = ann['category_name']

    # Position
    x, y, z = ann['translation']

    # Rotation
    rot_x, rot_y, rot_z, rot_w = ann['rotation']

    # Find scene and timestamp
    sample = nusc.get('sample', sample_token)
    scene_token = sample['scene_token']

    # Get timestamp from a camera (CAM_FRONT) to make sure it's aligned
    cam_token = sample['data']['CAM_FRONT']
    timestamp = nusc.get('sample_data', cam_token)['timestamp']

    trajectories.append({
        'instance_token': instance_token,
        'sample_token': sample_token,
        'scene_token': scene_token,
        'timestamp': timestamp,
        'category': category,
        'x': x,
        'y': y,
        'z': z,
        'rot_x': rot_x,
        'rot_y': rot_y,
        'rot_z': rot_z,
        'rot_w': rot_w
    })

df_traj = pd.DataFrame(trajectories)
df_traj.head()




In [None]:
df_traj = df_traj.sort_values(by=['instance_token', 'timestamp'])


In [None]:
df_traj['delta_x'] = df_traj.groupby('instance_token')['x'].diff()
df_traj['delta_y'] = df_traj.groupby('instance_token')['y'].diff()
df_traj['delta_t'] = df_traj.groupby('instance_token')['timestamp'].diff()

df_traj['speed_m_per_s'] = (
    ((df_traj['delta_x']**2 + df_traj['delta_y']**2)**0.5) / (df_traj['delta_t'] / 1e6)
)


In [None]:
df_vehicle_traj = df_traj[df_traj['category'].str.contains('vehicle')]


In [None]:
unique_vehicles = df_vehicle_traj['instance_token'].unique()
unique_vehicles[:10]


In [None]:
instance_id = unique_vehicles[0]


In [None]:
single_traj = df_vehicle_traj[df_vehicle_traj['instance_token'] == instance_id]
single_traj.head()


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(6,6))

plt.plot(single_traj['x'], single_traj['y'], marker='o', linestyle='-', linewidth=2)
plt.title(f"Trajectory of Vehicle (Instance {instance_id[:6]}...)")
plt.xlabel("X Position (meters)")
plt.ylabel("Y Position (meters)")
plt.grid(True)
plt.axis("equal")

plt.show()


In [None]:
vehicle_positions = df_traj[df_traj['category'].str.contains('vehicle')]


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(7,7))
plt.scatter(vehicle_positions['x'], vehicle_positions['y'], s=5, alpha=0.3)
plt.title('All Vehicle Positions in nuScenes Mini')
plt.xlabel('X Position (meters)')
plt.ylabel('Y Position (meters)')
plt.grid(True)
plt.axis('equal')
plt.show()


In [None]:
import numpy as np
import seaborn as sns

plt.figure(figsize=(8,8))

# Create a 2D histogram (heatmap)
sns.kdeplot(
    x=vehicle_positions['x'],
    y=vehicle_positions['y'],
    fill=True,
    cmap="viridis",
    thresh=0,
    levels=100
)

plt.title("Vehicle Density Heatmap — nuScenes Mini")
plt.xlabel("X Position (m)")
plt.ylabel("Y Position (m)")
plt.axis('equal')
plt.show()


In [None]:
vehicle_positions = df_traj[df_traj['category'].str.contains('vehicle')]

vehicles_per_scene = (
    vehicle_positions.groupby('scene_token')
    .size()
    .reset_index(name='vehicle_count')
)

vehicles_per_scene.head()


In [None]:
scene_info = pd.DataFrame(nusc.scene)
scene_info = scene_info[['token', 'name']]
scene_info.columns = ['scene_token', 'scene_name']

vehicles_scene_named = vehicles_per_scene.merge(scene_info, on='scene_token')
vehicles_scene_named.head()


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12,4))
plt.plot(vehicles_scene_named['scene_name'], vehicles_scene_named['vehicle_count'], marker='o', linewidth=2)

plt.xticks(rotation=90)
plt.title("Vehicle Count per Scene (Traffic Timeline)")
plt.xlabel("Scene")
plt.ylabel("Number of Vehicles")
plt.grid(True)
plt.show()


In [None]:
df_speed = df_traj.copy()

df_speed = df_speed[(df_speed['speed_m_per_s'].notna()) &
                    (df_speed['speed_m_per_s'] < 20)]   # remove outliers >20 m/s (~72 km/h)


In [None]:
avg_speed_per_vehicle = (
    df_speed.groupby('instance_token')['speed_m_per_s']
    .mean()
    .reset_index(name='avg_speed')
)

avg_speed_per_vehicle.head()


In [None]:
top10_fastest = avg_speed_per_vehicle.sort_values(by='avg_speed', ascending=False).head(10)
top10_fastest


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(7,4))
plt.hist(df_speed['speed_m_per_s'], bins=30, alpha=0.7)
plt.title("Vehicle Speed Distribution (m/s)")
plt.xlabel("Speed (m/s)")
plt.ylabel("Frequency")
plt.grid(True)
plt.show()


In [None]:
df_speed['speed_kmph'] = df_speed['speed_m_per_s'] * 3.6


In [None]:
sample = nusc.sample[0]
sample


In [None]:
!ls /content/drive/MyDrive/datasets/nuscenes/samples/


In [None]:
!ls -lh /content/drive/MyDrive/v1.0-mini\ \(1\).tgz


In [None]:
!rm -rf /content/drive/MyDrive/datasets/nuscenes
!mkdir -p /content/drive/MyDrive/datasets/nuscenes

!tar -xzf "/content/drive/MyDrive/v1.0-mini_meta.tgz"    -C /content/drive/MyDrive/datasets/nuscenes
!tar -xzf "/content/drive/MyDrive/v1.0-mini_samples.tgz" -C /content/drive/MyDrive/datasets/nuscenes
!tar -xzf "/content/drive/MyDrive/v1.0-mini_sweeps.tgz"  -C /content/drive/MyDrive/datasets/nuscenes


In [None]:
!ls -lh /content/drive/MyDrive/


In [None]:
!mkdir -p /content/drive/MyDrive/nuscenes
!tar -xvzf /content/drive/MyDrive/v1.0-mini.tgz -C /content/drive/MyDrive/nuscenes


In [None]:
!ls /content/drive/MyDrive/


In [None]:
!tar -xvzf "/content/drive/MyDrive/v1.0-mini (1).tgz" -C /content/drive/MyDrive/nuscenes


In [None]:
!ls /content/drive/MyDrive/nuscenes


In [None]:
from nuscenes.nuscenes import NuScenes

nusc = NuScenes(version='v1.0-mini',
                dataroot='/content/drive/MyDrive/nuscenes',
                verbose=True)


In [None]:
import random

# pick a random sample token
sample = random.choice(nusc.sample)

# show camera front
nusc.render_sample(sample['token'])


In [None]:
nusc.render_sample_data(sample['data']['LIDAR_TOP'])


In [None]:
# Show all scenes (mini has 10 scenes)
for i, scene in enumerate(nusc.scene):
    print(i, scene['name'])


In [None]:
scene = nusc.scene[1]
scene


In [None]:
first_sample_token = scene['first_sample_token']
sample = nusc.get('sample', first_sample_token)
sample


In [None]:
cameras = ['CAM_FRONT', 'CAM_FRONT_LEFT', 'CAM_FRONT_RIGHT',
           'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT']

for cam in cameras:
    sd_token = sample['data'][cam]
    print(f"Showing {cam} ...")
    nusc.render_sample_data(sd_token)


In [None]:
lidar_token = sample['data']['LIDAR_TOP']
nusc.render_sample_data(lidar_token, nsweeps=1)


In [None]:
for ann in sample['anns'][:10]:  # print first 10 only
    ann_record = nusc.get('sample_annotation', ann)
    print(ann_record['category_name'], ann_record['translation'])


In [None]:
scene = nusc.scene[1]
scene


In [None]:
first_sample_token = scene['first_sample_token']
sample = nusc.get('sample', first_sample_token)
sample


In [None]:
cameras = ['CAM_FRONT', 'CAM_FRONT_LEFT', 'CAM_FRONT_RIGHT',
           'CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT']

for cam in cameras:
    sd_token = sample['data'][cam]
    print(f"Showing {cam} ...")
    nusc.render_sample_data(sd_token)


In [None]:
lidar_token = sample['data']['LIDAR_TOP']
nusc.render_sample_data(lidar_token, nsweeps=1)


In [None]:
for ann in sample['anns'][:10]:
    ann_record = nusc.get('sample_annotation', ann)
    print(ann_record['category_name'], ann_record['translation'])


In [None]:
nusc.render_sample_data(sample['data']['CAM_FRONT'], with_anns=True)


In [None]:
nusc.render_sample(sample['token'])


In [None]:
ann_token = sample['anns'][0]
ann = nusc.get('sample_annotation', ann_token)
ann


In [None]:
from pyquaternion import Quaternion
from nuscenes.utils.data_classes import Box

quat = Quaternion(ann['rotation'])   # convert list → Quaternion

box = Box(center=ann['translation'],
          size=ann['size'],
          orientation=quat)

box.corners()


In [None]:
nusc.render_annotation(ann_token)


In [None]:
from nuscenes.utils.data_classes import Box
from nuscenes.utils.geometry_utils import view_points
import matplotlib.pyplot as plt
import numpy as np




In [None]:
from pyquaternion import Quaternion

# Convert rotation
quat = Quaternion(ann['rotation'])

# Build 3D Box correctly
box = Box(center=ann['translation'],
          size=ann['size'],
          orientation=quat)


In [None]:
from pyquaternion import Quaternion
import matplotlib.pyplot as plt
import numpy as np

# Get sample
sample = nusc.sample[0]

# Camera front
cam_token = sample['data']['CAM_FRONT']
cam_data = nusc.get('sample_data', cam_token)

# Image path
image_path = nusc.get_sample_data_path(cam_token)
img = plt.imread(image_path)

# Annotation
ann_token = sample['anns'][0]
ann = nusc.get('sample_annotation', ann_token)

# FIX: Convert rotation list → Quaternion
quat = Quaternion(ann['rotation'])

# Build Box
box = Box(center=ann['translation'],
          size=ann['size'],
          orientation=quat)

# Calibrated sensor (camera intrinsics)
cs = nusc.get('calibrated_sensor', cam_data['calibrated_sensor_token'])
camera_intrinsic = np.array(cs['camera_intrinsic'])

# 3D → 2D
corners_3d = box.corners()
corners_2d = view_points(corners_3d, camera_intrinsic, normalize=True)

# Plot
plt.figure(figsize=(10,8))
plt.imshow(img)
plt.scatter(corners_2d[0], corners_2d[1], c='red', s=50)
plt.title("3D Box Projected on CAM_FRONT")
plt.show()


In [None]:
nusc.render_sample_data(cam_token, with_anns=True, verbose=True)


In [None]:
from pyquaternion import Quaternion
import numpy as np
import matplotlib.pyplot as plt
from nuscenes.utils.geometry_utils import transform_matrix, view_points
from nuscenes.utils.data_classes import Box

# 1. Get camera sample
cam_token = sample['data']['CAM_FRONT']
cam_data = nusc.get('sample_data', cam_token)

# 2. Load image
img = plt.imread(nusc.get_sample_data_path(cam_token))

# 3. Get annotation
ann_token = sample['anns'][0]
ann = nusc.get('sample_annotation', ann_token)

# 4. Build box in GLOBAL coordinates
box = Box(center=ann['translation'],
          size=ann['size'],
          orientation=Quaternion(ann['rotation']))

# 5. Camera transforms
cs = nusc.get('calibrated_sensor', cam_data['calibrated_sensor_token'])
ego = nusc.get('ego_pose', cam_data['ego_pose_token'])

# Global → Ego
box.translate(-np.array(ego['translation']))
box.rotate(Quaternion(ego['rotation']).inverse)

# Ego → Camera
box.translate(-np.array(cs['translation']))
box.rotate(Quaternion(cs['rotation']).inverse)

# 6. Project to image plane
corners_3d = box.corners()
corners_2d = view_points(corners_3d, np.array(cs['camera_intrinsic']), normalize=True)

# 7. Plot image + points
plt.figure(figsize=(10,8))
plt.imshow(img)
plt.scatter(corners_2d[0], corners_2d[1], c='red', s=40)
plt.title("Correct 3D Box Projection on CAM_FRONT")
plt.show()


In [None]:
from pyquaternion import Quaternion
import numpy as np
import matplotlib.pyplot as plt
from nuscenes.utils.geometry_utils import view_points
from nuscenes.utils.data_classes import Box

# 1️⃣ Get camera sample token
cam_token = sample['data']['CAM_FRONT']
cam_data = nusc.get('sample_data', cam_token)

# 2️⃣ Load image
img_path = nusc.get_sample_data_path(cam_token)
img = plt.imread(img_path)

# 3️⃣ Choose an annotation (1st box)
ann_token = sample['anns'][0]
ann = nusc.get('sample_annotation', ann_token)

# 4️⃣ Create box in GLOBAL coords
box = Box(center=ann['translation'],
          size=ann['size'],
          orientation=Quaternion(ann['rotation']))

# 5️⃣ Camera transforms
cs = nusc.get('calibrated_sensor', cam_data['calibrated_sensor_token'])
ego = nusc.get('ego_pose', cam_data['ego_pose_token'])

# GLOBAL → EGO
box.translate(-np.array(ego['translation']))
box.rotate(Quaternion(ego['rotation']).inverse)

# EGO → CAMERA
box.translate(-np.array(cs['translation']))
box.rotate(Quaternion(cs['rotation']).inverse)

# 6️⃣ Project 3D corners to 2D image
corners_3d = box.corners()
corners_2d = view_points(
    corners_3d,
    np.array(cs['camera_intrinsic']),
    normalize=True
)

# 7️⃣ Plot image + the 3D box
plt.figure(figsize=(10, 8))
plt.imshow(img)

# Draw lines between corners
edges = [
    [0,1], [1,2], [2,3], [3,0],   # bottom rectangle
    [4,5], [5,6], [6,7], [7,4],   # top rectangle
    [0,4], [1,5], [2,6], [3,7]    # vertical edges
]

for e1, e2 in edges:
    x = [corners_2d[0, e1], corners_2d[0, e2]]
    y = [corners_2d[1, e1], corners_2d[1, e2]]
    plt.plot(x, y, 'r-', linewidth=2)

plt.title("Full 3D Bounding Box on CAM_FRONT")
plt.axis("off")
plt.show()


In [None]:
# Choose a scene
scene = nusc.scene[0]   # scene-0001

# Get all sample tokens (each sample is a moment in time)
sample_tokens = []
current = scene['first_sample_token']

while current != "":
    sample_tokens.append(current)
    current = nusc.get('sample', current)['next']

len(sample_tokens)


In [None]:
ego_positions = []
ego_timestamps = []

for token in sample_tokens:
    sample = nusc.get('sample', token)
    sd = nusc.get('sample_data', sample['data']['LIDAR_TOP'])
    ego_pose = nusc.get('ego_pose', sd['ego_pose_token'])

    ego_positions.append(np.array(ego_pose['translation']))
    ego_timestamps.append(sd['timestamp'])   # in microseconds



In [None]:
ego_speeds = [0]   # first frame speed = 0

for i in range(1, len(ego_positions)):
    p_prev = ego_positions[i-1]
    p_curr = ego_positions[i]

    # distance moved between frames
    dist = np.linalg.norm(p_curr - p_prev)  # in meters

    # time difference
    dt = (ego_timestamps[i] - ego_timestamps[i-1]) / 1e6  # convert µs to seconds

    speed = dist / dt  # m/s
    ego_speeds.append(speed)


In [None]:
ego_speeds[:10]


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# --- 1. EXTRACT EGO POSITIONS + TIMESTAMPS ---
ego_positions = []
ego_timestamps = []

for token in sample_tokens:
    sample = nusc.get('sample', token)
    sd = nusc.get('sample_data', sample['data']['LIDAR_TOP'])
    pose = nusc.get('ego_pose', sd['ego_pose_token'])

    ego_positions.append(np.array(pose['translation']))
    ego_timestamps.append(sd['timestamp'])   # microseconds


# --- 2. COMPUTE EGO SPEED (m/s) ---
ego_speeds = [0]  # first frame speed = 0

for i in range(1, len(ego_positions)):
    p_prev = ego_positions[i-1]
    p_curr = ego_positions[i]

    dist = np.linalg.norm(p_curr - p_prev)
    dt = (ego_timestamps[i] - ego_timestamps[i-1]) / 1e6  # seconds

    speed = dist / dt
    ego_speeds.append(speed)


# --- 3. COMPUTE EGO ACCELERATION (m/s²) ---
ego_accels = [0]  # first frame accel = 0

for i in range(1, len(ego_speeds)):
    dv = ego_speeds[i] - ego_speeds[i-1]
    dt = (ego_timestamps[i] - ego_timestamps[i-1]) / 1e6

    accel = dv / dt
    ego_accels.append(accel)


# --- 4. CREATE RELATIVE TIME ARRAY ---
relative_time = (np.array(ego_timestamps) - ego_timestamps[0]) / 1e6


# --- 5. PLOT SPEED ---
plt.figure(figsize=(12,5))
plt.plot(relative_time, ego_speeds, marker='o')
plt.xlabel("Time (s)")
plt.ylabel("Speed (m/s)")
plt.title("Ego Vehicle Speed Over Time")
plt.grid(True)
plt.show()

# --- 6. PLOT ACCELERATION ---
plt.figure(figsize=(12,5))
plt.plot(relative_time, ego_accels, marker='x', color='orange')
plt.xlabel("Time (s)")
plt.ylabel("Acceleration (m/s²)")
plt.title("Ego Vehicle Acceleration Over Time")
plt.grid(True)
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Convert timestamps to seconds relative to first frame
relative_time = (np.array(ego_timestamps) - ego_timestamps[0]) / 1e6

plt.figure(figsize=(10,5))
plt.plot(relative_time, ego_speeds, marker='o')
plt.xlabel("Time (seconds)")
plt.ylabel("Ego Speed (m/s)")
plt.title("Ego Vehicle Speed Over Time — Scene 1")
plt.grid(True)
plt.show()


In [None]:
dist_car = []
dist_ped = []
dist_min = []

for token in sample_tokens:
    sample = nusc.get('sample', token)

    # Ego position (using LiDAR_TOP reference)
    sd = nusc.get('sample_data', sample['data']['LIDAR_TOP'])
    ego_pose = nusc.get('ego_pose', sd['ego_pose_token'])
    ego_pos = np.array(ego_pose['translation'])

    nearest_car = float('inf')
    nearest_ped = float('inf')

    for ann_token in sample['anns']:
        ann = nusc.get('sample_annotation', ann_token)
        category = ann['category_name']
        obj_pos = np.array(ann['translation'])

        dist = np.linalg.norm(obj_pos - ego_pos)

        # Cars
        if "vehicle." in category:
            nearest_car = min(nearest_car, dist)

        # Pedestrians
        if "human.pedestrian" in category:
            nearest_ped = min(nearest_ped, dist)

    dist_car.append(nearest_car)
    dist_ped.append(nearest_ped)
    dist_min.append(min(nearest_car, nearest_ped))


In [None]:
plt.figure(figsize=(12,6))
plt.plot(relative_time, dist_car, marker='o', label='Distance to nearest CAR')
plt.plot(relative_time, dist_ped, marker='s', label='Distance to nearest PEDESTRIAN')
plt.xlabel("Time (s)")
plt.ylabel("Distance (m)")
plt.title("Distance to Nearest Car and Pedestrian — Scene 1")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
plt.figure(figsize=(12,6))
plt.plot(relative_time, dist_min, marker='x', color='green', label='Minimum distance')

plt.axhline(5, color='red', linestyle='--', label='DANGER (<5m)')
plt.axhline(10, color='orange', linestyle='--', label='CAUTION (5–10m)')

plt.xlabel("Time (s)")
plt.ylabel("Minimum Distance (m)")
plt.title("Minimum Safety Distance Over Time — Scene 1")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# For storing TTC values
ttc_values = []

# Loop through frames (except last)
for i in range(len(sample_tokens)-1):

    # current + next sample
    curr_token = sample_tokens[i]
    next_token = sample_tokens[i+1]

    curr_sample = nusc.get('sample', curr_token)
    next_sample = nusc.get('sample', next_token)

    # Ego position
    sd_curr = nusc.get('sample_data', curr_sample['data']['LIDAR_TOP'])
    sd_next = nusc.get('sample_data', next_sample['data']['LIDAR_TOP'])

    ego_curr = np.array(nusc.get('ego_pose', sd_curr['ego_pose_token'])['translation'])
    ego_next = np.array(nusc.get('ego_pose', sd_next['ego_pose_token'])['translation'])

    dt = (sd_next['timestamp'] - sd_curr['timestamp']) / 1e6  # seconds

    # Ego speed
    ego_speed = np.linalg.norm(ego_next - ego_curr) / dt

    # Find nearest object (car or pedestrian)
    min_dist = float('inf')
    closest_ann_curr = None

    for ann_token in curr_sample['anns']:
        ann = nusc.get('sample_annotation', ann_token)
        category = ann['category_name']

        if "vehicle." in category or "human.pedestrian" in category:
            dist = np.linalg.norm(np.array(ann['translation']) - ego_curr)
            if dist < min_dist:
                min_dist = dist
                closest_ann_curr = ann_token

    # No objects in scene
    if closest_ann_curr is None:
        ttc_values.append(np.inf)
        continue

    # object current and next positions
    ann_curr = nusc.get('sample_annotation', closest_ann_curr)

    # match same instance in next frame
    ann_next_token = None
    for ann_token_nxt in next_sample['anns']:
        ann_next = nusc.get('sample_annotation', ann_token_nxt)
        if ann_next['instance_token'] == ann_curr['instance_token']:
            ann_next_token = ann_token_nxt
            break

    if ann_next_token is None:
        ttc_values.append(np.inf)
        continue

    ann_next = nusc.get('sample_annotation', ann_next_token)

    obj_curr = np.array(ann_curr['translation'])
    obj_next = np.array(ann_next['translation'])

    obj_speed = np.linalg.norm(obj_next - obj_curr) / dt

    # relative speed (ego → object)
    relative_speed = ego_speed - obj_speed

    # TTC
    if relative_speed > 0:   # closing in
        ttc = min_dist / relative_speed
    else:
        ttc = np.inf          # object moving away

    ttc_values.append(ttc)


In [None]:
plt.figure(figsize=(12,6))
plt.plot(relative_time[:-1], ttc_values, marker='o', color='purple')
plt.axhline(3, color='orange', linestyle='--', label='Warning (<3s)')
plt.axhline(2, color='red', linestyle='--', label='Danger (<2s)')
plt.xlabel("Time (s)")
plt.ylabel("TTC (seconds)")
plt.title("Time-To-Collision Timeline")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt

# 1️⃣ Convert lists to numpy arrays
dist_min_arr = np.array(dist_min)
ego_accels_arr = np.array(ego_accels)
ttc_arr = np.array(ttc_values)

# 2️⃣ Make all arrays same length
n = min(len(dist_min_arr), len(ego_accels_arr), len(ttc_arr))
dist_min_arr = dist_min_arr[:n]
ego_accels_arr = ego_accels_arr[:n]
ttc_arr = ttc_arr[:n]
time_arr = relative_time[:n]

# 3️⃣ Define weights (NOW w1, w2, w3 ARE DEFINED ✅)
w1 = 0.6   # TTC weight
w2 = 0.3   # Distance weight
w3 = 0.1   # Acceleration weight

# 4️⃣ Avoid division by zero / infinity in TTC & distance
safe_ttc = np.copy(ttc_arr)
safe_ttc[~np.isfinite(safe_ttc)] = 9999  # if inf, treat as very safe

safe_dist = np.copy(dist_min_arr)
safe_dist[safe_dist <= 0] = 0.1  # avoid divide by zero

# 5️⃣ Compute simple risk score
risk_scores = (
    w1 * (1.0 / safe_ttc) +      # higher when TTC small
    w2 * (1.0 / safe_dist) +     # higher when distance small
    w3 * np.maximum(0, -ego_accels_arr)  # braking → positive risk
)

# 6️⃣ Plot risk over time
plt.figure(figsize=(12,6))
plt.plot(time_arr, risk_scores, marker='o', color='crimson')
plt.xlabel("Time (s)")
plt.ylabel("Risk Score (arbitrary units)")
plt.title("Overall Safety Risk Over Time")
plt.grid(True)
plt.show()


In [None]:
# Make sure arrays align with risk_scores length
n = len(risk_scores)
time_used = time_arr[:n]
dist_used = dist_min_arr[:n]
ttc_used = ttc_arr[:n]

# If you have ego_speeds defined:
ego_speeds_used = np.array(ego_speeds[:n])

# Get indices sorted by risk (highest first)
top_idx = np.argsort(-risk_scores)  # minus sign = descending

top_k = 3  # top 3 dangerous frames

print("Top dangerous frames in this scene:\n")
for rank in range(top_k):
    i = int(top_idx[rank])
    print(f"Rank {rank+1}:")
    print(f"  Time          : {time_used[i]:.2f} s")
    print(f"  Risk score    : {risk_scores[i]:.3f}")
    print(f"  Min distance  : {dist_used[i]:.2f} m")
    print(f"  TTC           : {ttc_used[i]:.2f} s")
    print(f"  Ego speed     : {ego_speeds_used[i]:.2f} m/s")
    print("-"*40)


In [None]:
for rank in range(top_k):
    i = int(top_idx[rank])
    token = sample_tokens[i]

    print(f"\n=== Visualizing dangerous frame Rank {rank+1} (t = {time_used[i]:.2f} s) ===")
    nusc.render_sample(token)


In [None]:
for rank in range(top_k):
    i = int(top_idx[rank])
    token = sample_tokens[i]
    sample = nusc.get('sample', token)
    cam_token = sample['data']['CAM_FRONT']

    print(f"\n=== CAM_FRONT for dangerous frame Rank {rank+1} (t = {time_used[i]:.2f} s) ===")
    nusc.render_sample_data(cam_token, with_anns=True)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Make sure all arrays are same length
n = min(len(relative_time), len(ego_speeds), len(ego_accels),
        len(dist_car), len(dist_ped), len(dist_min), len(ttc_values), len(risk_scores))

time = np.array(relative_time[:n])
speed = np.array(ego_speeds[:n])
accel = np.array(ego_accels[:n])
car_dist = np.array(dist_car[:n])
ped_dist = np.array(dist_ped[:n])
min_dist = np.array(dist_min[:n])
ttc = np.array(ttc_values[:n])
risk = np.array(risk_scores[:n])

# Infinite TTC replaced for plotting
ttc_plot = np.copy(ttc)
ttc_plot[~np.isfinite(ttc_plot)] = np.nan   # show gaps where TTC is infinite

# Create a mega-dashboard
fig, axs = plt.subplots(4, 2, figsize=(18, 20))
axs = axs.ravel()

# 1. Speed
axs[0].plot(time, speed, marker='o')
axs[0].set_title("Ego Speed (m/s)")
axs[0].grid(True)

# 2. Acceleration
axs[1].plot(time, accel, marker='x', color='orange')
axs[1].set_title("Ego Acceleration (m/s²)")
axs[1].grid(True)

# 3. Distance to Cars
axs[2].plot(time, car_dist, marker='o', color='blue')
axs[2].set_title("Distance to Nearest Car (m)")
axs[2].grid(True)

# 4. Distance to Pedestrians
axs[3].plot(time, ped_dist, marker='s', color='purple')
axs[3].set_title("Distance to Nearest Pedestrian (m)")
axs[3].grid(True)

# 5. Minimum Distance + danger bands
axs[4].plot(time, min_dist, marker='x', color='green')
axs[4].axhline(5, color='red', linestyle='--', label='Danger (<5m)')
axs[4].axhline(10, color='orange', linestyle='--', label='Caution (5–10m)')
axs[4].set_title("Minimum Safety Distance (m)")
axs[4].legend()
axs[4].grid(True)

# 6. TTC
axs[5].plot(time, ttc_plot, marker='o', color='crimson')
axs[5].axhline(3, color='orange', linestyle='--', label='Warning (<3s)')
axs[5].axhline(2, color='red', linestyle='--', label='Critical (<2s)')
axs[5].set_title("Time-To-Collision (seconds)")
axs[5].legend()
axs[5].grid(True)

# 7. Risk Score
axs[6].plot(time, risk, marker='o', color='black')
axs[6].set_title("Combined Safety Risk Score")
axs[6].grid(True)

# 8. Empty slot (we can use later)
axs[7].axis('off')

plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
import numpy as np

# Make sure all arrays are same length
n = min(len(time_arr), len(dist_min_arr), len(ttc_arr), len(risk_scores))
time_used = time_arr[:n]
dist_used = dist_min_arr[:n]
ttc_used = ttc_arr[:n]
risk_used = risk_scores[:n]

# Sort by highest risk (descending)
top_idx = np.argsort(-risk_used)

top_k = 5

rows = []

for rank in range(top_k):
    i = int(top_idx[rank])

    # Label based on safety
    if dist_used[i] < 5 or (np.isfinite(ttc_used[i]) and ttc_used[i] < 2):
        level = "CRITICAL"
    elif dist_used[i] < 10 or (np.isfinite(ttc_used[i]) and ttc_used[i] < 3):
        level = "WARNING"
    else:
        level = "SAFE"

    rows.append({
        "Rank": rank + 1,
        "Time (s)": round(time_used[i], 2),
        "Min Distance (m)": round(dist_used[i], 2),
        "TTC (s)": round(ttc_used[i], 2) if np.isfinite(ttc_used[i]) else "∞",
        "Risk Score": round(risk_used[i], 3),
        "Safety Level": level
    })

# Convert to DataFrame
danger_df = pd.DataFrame(rows)

danger_df


In [None]:
top_k = 5  # how many dangerous frames to visualize

# Rebuild aligned arrays (just to be safe)
n = min(len(sample_tokens), len(risk_scores))
risk_used = np.array(risk_scores[:n])

# Get indices of top-k highest risk frames
top_idx = np.argsort(-risk_used)[:top_k]

for rank, i in enumerate(top_idx, start=1):
    sample_token = sample_tokens[i]
    sample = nusc.get('sample', sample_token)
    cam_token = sample['data']['CAM_FRONT']

    print(f"\n=== Dangerous frame RANK {rank} (index {i}) ===")
    print(f"Time   : {time_arr[i]:.2f} s")
    print(f"Risk   : {risk_used[i]:.3f}")
    print(f"Min dist: {dist_min_arr[i]:.2f} m")
    print(f"TTC    : {ttc_arr[i]:.2f} s" if np.isfinite(ttc_arr[i]) else "TTC    : ∞ s")

    # Render CAM_FRONT with annotations
    nusc.render_sample_data(cam_token, with_anns=True)
