In [32]:
import sys
sys.path.insert(0, 'utils.py')

In [33]:
# /!\ Before running the lab make sure every additional libraries is installed 

# Import local libraries
from utils import *
import time

# Reading of one LiDAR scan

In [34]:
n_frame = 0
actor = 'ego_vehicle'

points = get_point_cloud(n_frame, actor)

pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(points)

flag_display = True
o3d.utility.set_verbosity_level(o3d.utility.VerbosityLevel.Error)

# Voxel Down sampling

To reduce the number of points and accelerate the computation use the function _voxel_down_sample()_ of open3D.

In [35]:
downpcd =  pcd.voxel_down_sample(0.15) # ToDo (Replace this line) 

if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Normals estimation
We would like to detect opstacles, to do that the first step is to estimate normals to obtain the local orientation of the point cloud.

Use the function _estimate_normals()_ of open3D to estimate this normals.

In [36]:
#ToDo estimate the normals

# Estimating normals
downpcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.4, max_nn=100))

if flag_display:
    o3d.visualization.draw_geometries([downpcd], point_show_normal=True)

In [37]:
downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))

if flag_display:
    o3d.visualization.draw_geometries([downpcd])

# Ground and objects segmentation

Based on the height of each point, the local orientation of the points cloud arround this point and eventually the variation of this orientation filter the ground to obtain a new points cloud whith only the ground points and another one with only the objects on top of this ground.

For both of these new points clouds you can also filter the points corresponding to the roof of the ego vehicle.

In [38]:
# Get the shape of pcd
print("The shape of the point cloud is: ", np.array(downpcd.points).shape)

The shape of the point cloud is:  (16928, 3)


In [39]:
def ground_filtering(pcd):
    normals = np.asarray(pcd.normals)  # Convert to NumPy array
    points = np.asarray(pcd.points)  # Convert to NumPy array
    height = 0.3  # Height threshold


    # Get the norm of x and y of normals    
    norm_x_y = np.linalg.norm(normals[:, :2], axis=1)
    mask = (norm_x_y < 0.4) & (points[:, 2] < height)

    pcd_ground = pcd.select_by_index(np.where(mask)[0])
    return pcd_ground

pcd_ground = ground_filtering(downpcd)

# Check the difference between the original point cloud and the ground point cloud
print("The shape of the original point cloud is: ", np.array(downpcd.points).shape)
print("The shape of the ground point cloud is: ", np.array(pcd_ground.points).shape)

if flag_display:
    o3d.visualization.draw_geometries([pcd_ground])

The shape of the original point cloud is:  (16928, 3)
The shape of the ground point cloud is:  (7154, 3)


In [40]:
def objects_filtering(pcd):
    normals = np.asarray(pcd.normals)  # Convert to NumPy array
    points = np.asarray(pcd.points)  # Convert to NumPy array
    height = 0.3  # Height threshold


    # Get the norm of x and y of normals    
    norm_x_y = np.linalg.norm(normals[:, :1], axis=1)
    mask = (norm_x_y > 0.4) | (points[:, 2] > height)

    pcd_objects = pcd.select_by_index(np.where(mask)[0])

    # To understand the part of car we use that lidar which is at the origin and then we use the norm of the vector to exlude the points that are too close to the origin with a specified threshold
    mask = np.linalg.norm(np.asarray(pcd_objects.points), axis=1) > 3
    # Removing the points that make the mask true
    pcd_objects = pcd_objects.select_by_index(np.where(mask)[0])
    return pcd_objects

pcd_objects = objects_filtering(downpcd)

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])

# Clustering

Now that the points of the objects are not connected to the ground anymore we can regroup all the points of the same object on one cluster.

To do that, use the function _cluster_dbscan()_ of open3D to obtain the cluster id for each point.

In [41]:
def clustering(pcd_objects):
    
    cluster_ids = np.array(pcd_objects.cluster_dbscan(eps=2, min_points=10, print_progress=True)) # ToDo (Replace this line)
    
    cluster_nb = cluster_ids.max()
    colors = plt.get_cmap("tab20")(cluster_ids / (cluster_nb if cluster_nb > 0 else 1))
    colors[cluster_ids < 0] = 0
    pcd_objects.colors = o3d.utility.Vector3dVector(colors[:, :3])


    return pcd_objects, cluster_ids

pcd_objects, cluster_ids = clustering(pcd_objects)

print("The points cloud has", cluster_ids.max() + 1, "clusters")

if flag_display:
    o3d.visualization.draw_geometries([pcd_objects])



# Bounding Boxes estimation
Estimate for each of these clusters its bounding box. You can use the function _get_axis_aligned_bounding_box()_ or _get_oriented_bounding_box()_ of open3D.

What are the limits of these functions ?

You can keep only the small bounding boxes correponding to the small objects (such as vehicles or pedestrians and not buildings)

In [42]:
def cluster_boxes(pcd_objects, cluster_ids):
    # Define size constraints for car-sized bounding boxes. No need for human since it will be smaller
    car_size = np.array([6., 6., 6.])   # Example dimensions (length, width, height)
    
    # Bounding boxes
    boxes = []

    points = np.asarray(pcd_objects.points)
    
    # Get the number of clusters
    max_label = cluster_ids.max()
    
    # Iterate through each cluster and compute the bounding box
    for i in range(max_label + 1):
        cluster_points = points[cluster_ids == i]
        cluster_pcd = o3d.geometry.PointCloud()
        cluster_pcd.points = o3d.utility.Vector3dVector(cluster_points)

        if len(cluster_points) < 4:
            continue  # Skip this cluster
        
        # Compute the oriented bounding box (OBB)
        obb = cluster_pcd.get_oriented_bounding_box()
        
        # Check if the bounding box is within the size constraints for humans or cars
        # and if the bounding box is placed lower than 1 meter from the ground
        if all(obb.extent <= car_size):
            boxes.append(obb)
            
    return boxes

boxes = cluster_boxes(pcd_objects, cluster_ids)

obj_to_display = boxes
obj_to_display.append(pcd_objects)

flag_display = True
if flag_display:
    o3d.visualization.draw_geometries(obj_to_display)
    

# Objects detection and tracking on LiDAR scans

Applied the previous functions on the different scan to estimate the bounding boxes.


In [43]:
# Apply the same process for different scans  
n_frame = 2
actor = 'ego_vehicle'
points = get_point_cloud(n_frame, actor)

pcd_next = o3d.geometry.PointCloud()
pcd_next.points = o3d.utility.Vector3dVector(points)

downpcd_next = pcd_next.voxel_down_sample(0.1) # ToDo (Replace this line)

downpcd_next.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.3, max_nn=100))

downpcd_next.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd_next.normals)))

pcd_ground_next = ground_filtering(downpcd_next)

pcd_objects_next = objects_filtering(downpcd_next)

pcd_objects_next, cluster_ids_next = clustering(pcd_objects_next)

boxes_next = cluster_boxes(pcd_objects_next, cluster_ids_next)

obj_to_display_next = list(boxes_next)
obj_to_display_next.extend([pcd_objects_next])

# Print the number of cluster and boxes
print("The points cloud has", cluster_ids_next.max() + 1, "clusters")
print("The number of boxes is: ", len(boxes_next))

if True:
    o3d.visualization.draw_geometries(obj_to_display_next)

The number of boxes is:  29



### Bounding boxes association

Create a function to associate the bounding boxes of the current time to the one of the previous time based the shortest distances of there centers.


In [44]:
def associate_bounding(previous_boxes, current_boxes):
    # Associate bounding boxes between the two frames
    # For each bounding box in the first frame, find the closest bounding box in the second frame
    # If the bounding boxes are close enough, consider them as the same object
    
    # Define a threshold for the distance between bounding boxes
    distance_threshold = 5
    
    # List to store the associated bounding boxes
    associated_boxes = []
    
    # Iterate through each bounding box in the previous frame
    for prev_box in previous_boxes:
        min_distance = distance_threshold
        associated_box = None
        
        # Iterate through each bounding box in the current frame
        for curr_box in current_boxes:
            # Compute the distance between the centers of the bounding boxes
            distance = np.linalg.norm(prev_box.get_center() - curr_box.get_center())
            
            # Check if the distance is smaller than the threshold
            if distance < min_distance:
                min_distance = distance
                associated_box = curr_box
        
        # If an associated bounding box was found, add it to the list
        if associated_box is not None:
            associated_boxes.append(associated_box)
        

associate_bounding(boxes, boxes_next)



### Bounding boxes orientation estimation
Based on these association estimate the orientation of there motion to correct the orientation of the bounding boxes.



### Points cloud mapping
Build a map by accumulating the different points cloud and removing the moving objects (keep the cluster id and add a new one for the ground points)



### Iterative Closest Point tracking
To better associate the clusters you can also use the function _icp()_ of open3D on each cluster

In [None]:
# Function to create a coordinate frame
def create_coordinate_frame(size=1.0, origin=[0, 0, 0]):
    return o3d.geometry.TriangleMesh.create_coordinate_frame(size=size, origin=origin)

# Initialize Open3D Visualizer
flag_display = True
if flag_display:
    vis = o3d.visualization.Visualizer()
    vis.create_window()

    # Add the origin coordinate frame to the visualizer
    origin_frame = create_coordinate_frame(size=6., origin=[0, 0, 0])
    vis.add_geometry(origin_frame)

# Initialize an empty list of previous boxes
previous_boxes = []

for n_frame in range(nb_frames):
    
    # Read point cloud
    points = get_point_cloud(n_frame, actor)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    
    # Voxel down sampling
    downpcd = pcd.voxel_down_sample(0.1) # ToDo (Replace this line)
    
    # Transform points cloud from the LiDAR to the world frame
    tf = get_tf_lidar2world(actor, n_frame)
    # ToDo applied this transformation to the points cloud
    pcd.transform(tf)

    # Estimate normals
    #ToDo estimate the normals
    downpcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.4, max_nn=100))
    downpcd.colors = o3d.utility.Vector3dVector(np.abs(np.array(downpcd.normals)))
    

    # Filter ground points
    pcd_ground = ground_filtering(downpcd)
    
    # Filter objects points
    pcd_objects = objects_filtering(downpcd)
    
    # Clustering
    pcd_objects, cluster_ids = clustering(pcd_objects)

    # Get bounding boxes for this frame
    current_boxes = cluster_boxes(pcd_objects, cluster_ids)
    
    # Associate them with bounding boxes from the previous frame (if any)
    if len(previous_boxes) > 0:
        associate_bounding(previous_boxes, current_boxes)
    
    # Update previous_boxes
    previous_boxes = current_boxes
    
    if flag_display:
        # Clear previous geometries and add new point cloud to visualizer
        vis.clear_geometries()
        vis.add_geometry(pcd_objects)

        # Add the bounding boxes
        for box in current_boxes:
            vis.add_geometry(box)
        
        # Re-add the origin coordinate frame to the visualizer
        vis.add_geometry(origin_frame)

        # Update the visualizer to show the new point cloud
        vis.poll_events()
        vis.update_renderer()

        # Wait before updating the points cloud (adjust time as needed)
        time.sleep(0.1)

if flag_display:
    # Close the visualizer
    vis.destroy_window()



KeyboardInterrupt: 

: 