### Calculating Beam-ID for all points in PandaSet

In [1]:
import os
import numpy as np
# CONFIGURATION
DATA_PATH = os.path.join('buni','dataset','pandaset')
SCENE_IDX = 3
FRAME_IDX = 70
GROUND_LABELS = np.array([6, 7, 8, 9, 10, 11, 12, 34, 35, 37, 38, 39])
user_home = os.path.expanduser('~') 
dataset_path = os.path.join(user_home,DATA_PATH)

In [None]:
from pandaset import DataSet
import logging

dataset = DataSet(dataset_path)
scenes_with_semantic_labels = sorted(dataset.sequences(with_semseg=True), key=int)
print(f"List of sequences available with semantic segmentation:\n{scenes_with_semantic_labels}")
scene = dataset[scenes_with_semantic_labels[SCENE_IDX]]
print(f"Selected scene/sequence: {scenes_with_semantic_labels[SCENE_IDX]}")
scene.load_lidar()
scene.load_semseg()
logger = logging.getLogger(__name__)
logging.info(f"Loaded scene {SCENE_IDX} with frame {FRAME_IDX}")
lidar_data = scene.lidar.data[FRAME_IDX]
lidar_poses = scene.lidar.poses[FRAME_IDX]
labels = scene.semseg.data[FRAME_IDX]

from general_utils import pandaset_utils as pdutils
from general_utils import gen_utils

gen_utils.check_type(lidar_data,"lidar_data", logger)
gen_utils.check_type(labels,"labels", logger)

lidar_data, lidar_labels = pdutils.cleanup_lidar_data_and_labels(lidar_data, labels, lidar_poses,logger)

logger.info(f"Shape of lidar_data before reshaping: {lidar_data.shape}")
gen_utils.check_type(lidar_data,"lidar_data",logger)
lidar_points = lidar_data.iloc[:,:3].to_numpy()
lidar_points = lidar_points.astype('float64')
logger.info(f"Shape of lidar_data after reshaping: {lidar_points.shape}")
gen_utils.check_type(lidar_points,"lidar_points",logger)



In [None]:
import open3d as o3d
import numpy as np
import pandas as pd

def cartesian_to_spherical(points):
    """ Convert cartesian coordinates to spherical (azimuth and elevation angles) for multiple points """
    x = points[:, 0]
    y = points[:, 1]
    z = points[:, 2]
    
    azimuth = np.arctan2(x, y)  # Azimuth (horizontal angle) in radians
    elevation = np.arctan2(z, np.sqrt(x**2 + y**2))  # Elevation (vertical angle) in radians
    return azimuth, elevation

def find_closest_channel(elevations, correction_table):
    """ Match elevation angle to the closest laser channel for multiple points """
    vertical_angles = correction_table['Elevation'].to_numpy()
    
    # Find the index of the closest channel for each elevation
    idx = np.abs(np.radians(vertical_angles).reshape(-1, 1) - elevations).argmin(axis=0)
    
    # Return channel IDs and the corresponding channel correction data
    return correction_table.iloc[idx]

def correct_azimuth(azimuths, azimuth_offsets):
    """ Correct the azimuth angle using the provided offset for each channel """
    return azimuths - np.radians(azimuth_offsets)

def assign_beam_ids(points, correction_table):
    """ Assign the beam IDs to each point in the point cloud """
    
    # Step 1: Convert Cartesian coordinates to Spherical
    azimuths, elevations = cartesian_to_spherical(points)
    
    # Step 2: Find closest channel (vertical) for each point
    closest_channels = find_closest_channel(elevations, correction_table)
    
    # Step 3: Correct azimuth for each point based on the closest channel's azimuth offset
    corrected_azimuths = correct_azimuth(azimuths, closest_channels['Azimuth'])
    
    # Step 4: Extract channel IDs and return the assigned beam IDs
    beam_ids = closest_channels['Channel'].to_numpy()
    
    return beam_ids, np.degrees(corrected_azimuths), np.degrees(elevations)

import matplotlib.colors as mcolors

def visualize_beam_ids(lidar_points, beam_ids):

    # Create a point cloud in Open3D and assign colors based on beam IDs
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(lidar_points)

    # Use a discrete colormap with enough unique colors
    unique_beam_ids = np.unique(beam_ids)
    n_unique_beam_ids = len(unique_beam_ids)

    # Generate a colormap with N unique colors
    colors_list = list(mcolors.CSS4_COLORS.values())  # Get a large list of CSS colors
    if n_unique_beam_ids > len(colors_list):
        raise ValueError("Not enough unique colors available. Increase the color list or choose another colormap.")

    # Create a color dictionary that maps each beam ID to a unique color
    beam_id_to_color = {beam_id: colors_list[i % len(colors_list)] for i, beam_id in enumerate(unique_beam_ids)}

    # Map the beam IDs to the corresponding colors
    colors = np.array([mcolors.to_rgb(beam_id_to_color[beam_id]) for beam_id in beam_ids])

    # Set the colors to the point cloud
    pcd.colors = o3d.utility.Vector3dVector(colors)

    # Visualize the point cloud
    o3d.visualization.draw_geometries([pcd])

In [4]:
# Load angle correction file (replace with actual filename)
corrections_file = os.path.join(os.getcwd(),'Pandar64_Angle_Correction_File.csv')
corrections = pd.read_csv(corrections_file)
beam_ids, _, _ = assign_beam_ids(lidar_points, corrections)

visualize_beam_ids(lidar_points,beam_ids)

lidar_points_with_beam_ids = np.hstack((lidar_points, beam_ids.reshape(-1, 1)))


### Ray-Dropping

+ `beam_ids` is a numpy array. 
+ Joint `beam_ids` with `lidar_points` to get `lidar_points_with_beam_ids`

In [None]:
import numpy as np

def random_beam_drop(points):
    # Randomly select a beam drop ratio from [1, 2, 3]
    beam_drop_ratio = np.random.choice([1, 2, 3])
    # beam_drop_ratio = 3
    print(f"Beam drop ratio: {beam_drop_ratio}")
    # Randomly select a starting beam index
    start_index = np.random.randint(0, beam_drop_ratio)

    # Apply the ray-dropping condition
    mask = (points[:, 3] - start_index) % beam_drop_ratio == 0
    return points[mask]


def spherical_coordinates_conversion(points):
    # Convert to spherical coordinates
    x, y, z = points[:, 0], points[:, 1], points[:, 2]
    radial_dist = np.sqrt(x**2 + y**2 + z**2)
    theta = np.arctan2(y, x)  # azimuth
    phi = np.arcsin(z / radial_dist)  # elevation

    # Filter out points with radial distance < 0.1
    valid_mask = radial_dist > 0.1

    return np.vstack((theta[valid_mask], phi[valid_mask], radial_dist[valid_mask])).T, valid_mask

def random_spherical_drop(points, valid_mask):
    # Sample spherical resolutions for theta and phi
    spherical_resolutions = np.random.choice([600, 900, 1200, 1500])
    
    # Convert theta and phi to grid cells
    spherical_coords, valid_mask = spherical_coordinates_conversion(points)
    theta_grid = (spherical_coords[:, 0] * spherical_resolutions).astype(int)
    phi_grid = (spherical_coords[:, 1] * spherical_resolutions).astype(int)
    
    # Randomly sample spherical drop ratio
    spherical_drop_ratio = np.random.choice([1, 2])
    # spherical_drop_ratio = 2
    print(f"Spherical drop ratio: {spherical_drop_ratio}")
    # Apply the ray-dropping condition in the spherical coordinates
    theta_mask = (theta_grid % spherical_drop_ratio == 0)
    phi_mask = (phi_grid % spherical_drop_ratio == 0)
    
    # Combine the valid_mask from earlier with spherical mask
    combined_mask = valid_mask & theta_mask & phi_mask
    return points[combined_mask]


def ray_dropping_augmentation(points):
    # Step 1: Random beam drop
    points_after_beam_drop = random_beam_drop(points)
    
    # Step 2: Spherical drop
    points_after_spherical_drop = random_spherical_drop(points_after_beam_drop, np.ones(points_after_beam_drop.shape[0], dtype=bool))
    
    return points_after_spherical_drop

# Example usage
augmented_points_final = ray_dropping_augmentation(lidar_points_with_beam_ids)
visualize_beam_ids(augmented_points_final[:,:3], augmented_points_final[:,3])