# 0. Test parameters

In [24]:
# Isaac Sim RTX Lidar configuration
# ref: https://docs.omniverse.nvidia.com/isaacsim/latest/features/sensors_simulation/isaac_sim_sensors_rtx_based_lidar.html#lidar-config-files
lidar_config_file = "OS1_REV6_32ch10hz512res"

# number of steps per second in the simulation
levels = [30, 60, 200, 512, 5120]

# Radius of the sphere surrounding the lidar
R = 0.5

# Duration of simulation in seconds
T = 5

# 1. Start Isaac Sim

In [None]:
import sys
import signal

from isaacsim import SimulationApp

CONFIG = {
  'headless': False,
  'display_options': 3286
}

# Example for creating a RTX lidar sensor and publishing PointCloud2 data
simulation_app = SimulationApp(CONFIG)
simulation_app.update()

spinning = True
def sigint_handler(sig, frame):
  global spinning
  spinning = False
signal.signal(signal.SIGINT, sigint_handler)

kernel = get_ipython().kernel
kernel.pre_handler_hook = lambda: None
kernel.post_handler_hook = lambda: None

def spin(update_function, *args, **kwargs):
  global spinning
  print('gui can be used now. interrupt kernel to go back to notebook')
  while spinning:
    update_function(*args, **kwargs)
  spinning = True

def resume_gui():
  spin(lambda: simulation_app.update())
  
if CONFIG['headless']:
  del spin
  del resume_gui

# 2. define a new lidar class that can capture full pointcloud

In [None]:
import os
import glob
import json

import carb
import omni
from pxr import Gf, Sdf
import omni.graph.core as og
import omni.replicator.core as rep
from omni.isaac.core import SimulationContext
from omni.isaac.core.objects import *
from omni.isaac.core.utils import stage
from omni.isaac.core.utils.prims import *
from omni.isaac.core.utils.viewports import *
from omni.isaac.sensor import LidarRtx, IMUSensor
from omni.isaac.core.utils.rotations import euler_angles_to_quat
from omni.isaac.core.utils.xforms import get_world_pose
from omni.isaac.nucleus import get_assets_root_path
from typing import Optional, Sequence, Tuple
import numpy as np

class RtxLidarFullFrame(LidarRtx):
  _CONFIG_DIR = './exts/omni.isaac.sensor/data/lidar_configs/'
  """
  subclass the LidarRtx class. Changes:
  - replace the ""RtxSensorCpuIsaacComputeRTXLidarPointCloud" annotator with
    "RtxSensorCpuIsaacCreateRTXLidarScanBuffer" annotator
  - modify frame keys accordingly
  - initialize the lidar inside __init__ function
  """    
  def __init__(
    self,
    prim_path: str,
    name: str = "lidar_rtx",
    position: Optional[np.ndarray] = None,
    translation: Optional[np.ndarray] = None,
    orientation: Optional[np.ndarray] = None,
    config_file_name: Optional[str] = None,
    **extra_kwargs
  )->None:
    self.extra_kwargs = extra_kwargs
    
    super().__init__(prim_path, name, position, translation, orientation, config_file_name)
    self._attribute_map = {
      "lidar_info": "info",
      "point_cloud_data": "data",
      "range": "distance",
      "intensities_data": "intensity",
      "azimuth": "azimuth",
      "elevation": "elevation",
      "beamId": "beamId",
      "emitterId": "emitterId",
      "index": "index"
    }
    if 'keepOnlyPositiveDistances' in extra_kwargs:
      if extra_kwargs['keepOnlyPositiveDistances']== False:
        del self._attribute_map["index"]
    
    for key in self._attribute_map:
      self._current_frame[key] = []
    
    attrs = dir(LidarRtx)
    remove_funcs = [attr for attr in attrs 
            if (attr.startswith("remove_") and attr.endswith("_from_frame"))
            or (attr.startswith("add_") and attr.endswith("_to_frame"))]
    for func in remove_funcs:
      delattr(LidarRtx, func)
    
    self.initialize()
    
    self.config_copy = {}
    config_file = glob.glob(self._CONFIG_DIR+f'**/{config_file_name}.json', recursive=True)[0]
    with open(config_file, 'r') as f:
      self.config_copy = json.load(f)
    
    return
      
  def _create_point_cloud_graph_node(self):
    self._point_cloud_annotator = rep.AnnotatorRegistry.get_annotator(
      "RtxSensorCpu" + "IsaacCreateRTXLidarScanBuffer"
    )
    self._point_cloud_annotator.initialize(
      **self.extra_kwargs
    )
    self._point_cloud_annotator.attach([self._render_product_path])
    self._point_cloud_node_path = self._point_cloud_annotator.get_node().get_prim_path()
    return
  
  def _data_acquisition_callback(self, event: carb.events.IEvent):
    self._current_frame["rendering_frame"] = (
      og.Controller()
      .node("/Render/PostProcess/SDGPipeline/PostProcessDispatcher")
      .get_attribute("outputs:referenceTimeNumerator")
      .get()
    )

    self._current_frame["rendering_time"] = self._core_nodes_interface.get_sim_time_at_swh_frame(
      self._current_frame["rendering_frame"]
    )
    
    point_cloud_data = self._point_cloud_annotator.get_data()
    
    for key in self._current_frame:
      attribute_name = "".join([word[0].upper() + word[1:] for word in key.split("_")])
      attribute_name = attribute_name[0].lower() + attribute_name[1:]
      if key not in ["rendering_time", "rendering_frame"]:
        if key in self._attribute_map:
          self._current_frame[key] = point_cloud_data[self._attribute_map[key]]
    return
  
  def setKeepOnlyPositiveDistance(self, keepOnlyPositiveDistance: bool):
    node = og.Controller.node(self._point_cloud_node_path)
    node.get_attribute("inputs:keepOnlyPositiveDistance").set(keepOnlyPositiveDistance)
    if keepOnlyPositiveDistance == True:
      self._attribute_map["index"] = "index"
    elif "index" in self._attribute_map:
      del self._attribute_map["index"]

   # lidar_publish_rate = lidar.get_rotation_frequency() # doesn't work until sim started
  # so we add a function to read it from json file =.=
  def get_rotation_frequency(self) -> float:
    return self.config_copy['profile']['scanRateBaseHz']
  

# 3. Test distortion
## 3.1. Convenient functions to setup simulation

In [None]:
stage.create_new_stage()

ctx = SimulationContext(
  physics_dt=1.0/60,
  rendering_dt=1.0/60
)

def make_dimenvue(lidar_config_file):
  # make a small invisible cube as a chassis
  chassis_prim_path = '/chassis'
  chassis = DynamicCuboid(
    chassis_prim_path,
    translation=[0, 0, 0],
    visible=False,
    size=0.01
  )

  lidar_name = 'lidar0'
  lidar_prim_path = chassis_prim_path + '/' + lidar_name
  lidar = RtxLidarFullFrame(
    prim_path = lidar_prim_path,
    name = lidar_name, 
    translation = [0, 0, 0.1], 
    orientation = euler_angles_to_quat([90, 0, 0], degrees=True),
    config_file_name = lidar_config_file
  )
  lidar.initialize()

  imu_name = 'imu0'
  imu_prim_path = chassis_prim_path + '/' + imu_name
  imu_rate = 400
  imu = IMUSensor(
    prim_path = imu_prim_path, 
    name = imu_name, 
    frequency = imu_rate
  )

  dimenvue = {
    'chassis': chassis,
    'lidar': lidar,
    'imu': imu
  }

  return dimenvue

def make_environment():
  sphere = FixedSphere(
    prim_path='/environment',
    position=[0, 0, 0],
    radius = 5.0
  )
  sphere.prim.CreateAttribute('refinementEnableOverride', Sdf.ValueTypeNames.Bool).Set(True)
  sphere.prim.CreateAttribute('refinementLevel', Sdf.ValueTypeNames.Int).Set(5)
  return sphere

dimenvue = make_dimenvue(lidar_config_file)
sphere = make_environment()

simulation_app.update()

## 3.2. Generate some trajectories for tests

In [None]:
import rerun as rr
import noise
import numpy as np

rr.init("rerun_example_notebook")

In [None]:
def make_path_on_a_sphere(stepsPerSecond, duration, radius):
  t = np.linspace(0, duration, duration * stepsPerSecond)
  yaw = np.array([noise.pnoise1(ti, octaves=2, base=-1) for ti in t])
  pitch = np.array([noise.pnoise1(ti, octaves=2, base=10) for ti in t])
  roll = np.array([noise.pnoise1(ti, octaves=2, base=20) for ti in t])
  # scale from [min, max] to [-2pi, 2pi]
  yaw = ((yaw - min(yaw)) / (max(yaw) - min(yaw)) - 0.5) * 4* np.pi
  pitch = ((pitch - min(pitch)) / (max(pitch) - min(pitch)) - 0.5) * 4* np.pi
  roll = ((roll - min(roll)) / (max(roll) - min(roll)) - 0.5) * 4* np.pi
  euler_xyz = np.array([roll, pitch, yaw]).T
  quats = np.array([euler_angles_to_quat(angles, degrees=False, extrinsic=True) for angles in euler_xyz])
  w, i, j, k = quats[:, 0], quats[:, 1], quats[:, 2], quats[:, 3]
    
  R = radius
  x = R * np.cos(yaw) * np.cos(pitch)
  y = R * np.sin(yaw) * np.cos(pitch)
  z = R * np.sin(pitch)
  
  return np.array([t, x, y, z, w, i, j, k]).T

In [None]:

paths = {}
for level in levels:
  paths[level] = make_path_on_a_sphere(level, T, R)

## 3.3. Define test

In [None]:
import numpy as np
def deep_copy(data):
  data_copy = {}
  for key in data:
    if isinstance(data[key], np.ndarray):
      data_copy[key] = data[key].copy()
    elif isinstance(data[key], dict):
      data_copy[key] = deep_copy(data[key])
    else:
      data_copy[key] = data[key]
  return data_copy

"""
Args:
  path: np.array, each row is t,x,y,z,w,i,j,k 
  dt: float
"""
def test_distort_scan(path: np.array, dt:float):
  stage.create_new_stage()

  ctx = SimulationContext(physics_dt=dt, rendering_dt=dt)
  
  dimenvue = make_dimenvue(lidar_config_file)
  sphere = make_environment()

  dimenvue_chassis = dimenvue['chassis']
  imu = dimenvue['imu']
  lidar = dimenvue['lidar']

  imu_publish_rate = imu.get_frequency()
  imu_publish_dt = 1.0 / imu_publish_rate
  lidar_publish_rate = lidar.get_rotation_frequency()
  lidar_publish_dt = 1.0 / lidar_publish_rate
  time_publish_rate = max([imu_publish_rate, lidar_publish_rate])
  time_publish_dt = 1 / time_publish_rate
  tf_publish_rate = time_publish_rate
  tf_publish_dt = 1 / tf_publish_rate

  next_imu_publish_t = imu_publish_dt
  next_lidar_publish_t = lidar_publish_dt
  next_time_publish_t = time_publish_dt
  next_tf_publish_t = tf_publish_dt

  data = {
    'lidar': {},
    'imu': {},
    'tf': {}
  }
    
  def check_publish():
    nonlocal next_lidar_publish_t
    nonlocal next_imu_publish_t
    nonlocal next_tf_publish_t
    
    t = ctx.current_time
    if t > next_lidar_publish_t:
      next_lidar_publish_t += lidar_publish_dt
      lidar_data = lidar.get_current_frame()
      data['lidar'][t] = deep_copy(lidar_data)
    if t > next_imu_publish_t:
      next_imu_publish_t += imu_publish_dt
      imu_data = imu.get_current_frame()
      data['imu'][t] = deep_copy(imu_data)
    if t > next_tf_publish_t:
      next_tf_publish_t += tf_publish_dt
      tf_data = {}
      tf_data['lidar'] = get_world_pose(lidar.prim_path)
      tf_data['chassis'] = get_world_pose(dimenvue_chassis.prim_path)
      tf_data['imu'] = get_world_pose(imu.prim_path)
      data['tf'][t] = tf_data

  ctx.play()
    
  for point in path:
    _, x, y, z, w, i, j, k = point
    dimenvue_chassis.set_world_pose([x, y, z], [w, i, j, k])
    ctx.step()
    check_publish()
      
  ctx.stop()
  
  return data
    

## 3.4. Run tests

In [None]:
data = {}
for level in paths:
  dt = 1.0 / level
  data[level] = test_distort_scan(paths[level], dt)

In [None]:
"""
ground truth is the point cloud captured in static position.
"""
def make_groundtruth_sphere():
  stage.create_new_stage()

  dt = 1.0/60
  ctx = SimulationContext(physics_dt=dt, rendering_dt=dt)
  
  lidar = RtxLidarFullFrame(
    prim_path = '/lidar0',
    name = 'lidar0', 
    position = [0, 0, 0], 
    config_file_name = lidar_config_file,
    transformPoints = True
  )
  lidar.initialize()
  sphere = make_environment()
  
  ctx.play()
  data = []
  rx = np.pi/5 #fov~40deg
  for i in range(10):
    wijk = euler_angles_to_quat([rx*i, 0, 0])
    set_prim_attribute_value(lidar.prim_path, 'xformOp:orient', wijk)
    #wait a few step
    for _ in range(10):
      ctx.step()
    pcd = lidar.get_current_frame()['point_cloud_data']
    print(pcd.shape)
    data.append(pcd)
  ctx.stop()
  
  pcd = [p for p in data if p.shape[0] > 0] #remove empty cloud?? 
  return np.vstack(pcd)

ground_truth = make_groundtruth_sphere()

# 4. Undistort data
## 4.1. Save sim data for later

Simulation with high quality time step is very slow.

It took about 15min to simulate 5 seconds. So it's better to save data and load it for experiments

Rather than running the simulation over and over again (in case kernel crash, restart computer, etc..)

In [None]:
import pickle

data['ground_truth'] = ground_truth
data['paths'] = paths

with open('data.pkl', 'wb') as f:
  pickle.dump(data, f)

# release the simulation
simulation_app.close()

## 4.2. Reload data

In [1]:
import pickle
import numpy as np
import rerun as rr

rr.init("rerun_example_notebook")

with open('data.pkl', 'rb') as f:
  data = pickle.load(f)

In [2]:
# Thanks ChatGPT
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QTreeWidget, QTreeWidgetItem, QVBoxLayout, QWidget

app = QApplication(sys.argv)

In [3]:
class TreeApp(QMainWindow):
  def __init__(self):
    super().__init__()

    # Set up the main window
    self.setWindowTitle("Data browser")
    self.setGeometry(100, 100, 600, 400)

    # Set up the central widget and layout
    self.central_widget = QWidget()
    self.setCentralWidget(self.central_widget)
    self.layout = QVBoxLayout(self.central_widget)

    # Set up the tree widget
    self.tree_widget = QTreeWidget()
    self.tree_widget.setHeaderLabels(['KeyName', 'ValueType', 'Size'])
    self.layout.addWidget(self.tree_widget)

  def set_tree_item_text(self, item: QTreeWidgetItem, key, value):
    item.setText(0, f'{key}')
    item.setText(1, f'{type(value)}')
    if isinstance(value, dict) or isinstance(value, list) or isinstance(value, tuple):
      item.setText(2, f'{len(value)}')
    elif isinstance(value, np.ndarray):
      item.setText(2, f'{value.shape}')
  
  def set_data(self, data_dict: dict, parent:QTreeWidgetItem=None):
    if parent is None:
      root = QTreeWidgetItem(self.tree_widget, ['root', str(type(data_dict))])
    else:
      root = parent
    
    for key in data_dict:
      child = QTreeWidgetItem(root)
      self.set_tree_item_text(child, key, data_dict[key])
      if isinstance(data_dict[key], dict):
        self.set_data(data_dict[key], child)

window = TreeApp()
window.set_data(data)
window.show()
app.exec()

0

## 4.3. Log to rerun for visualization

In [2]:
# log trajectories to rerun
Axes = rr.Arrows3D(
    vectors=[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
    colors=[[255,0,0], [0,255,0], [0,0,255]],
)
rr.log('/world', Axes, static=True)

paths = data['paths']
for level in paths:
    t = paths[level][:, 0]
    xyz = paths[level][:, 1:4]
    wijk = paths[level][:, 4:]
    ijkw = np.roll(wijk, -1, axis=1)
    rr.log(f'/{level}/trajectory', rr.Points3D(xyz), static=True)

ground_truth = data['ground_truth']
rr.log('/world/gt', rr.Points3D(ground_truth), static=True)

R = np.mean(np.sqrt(np.sum(ground_truth**2, axis=1)))

In [3]:
rr.notebook_show(width=800, height=800)

Viewer()

In [3]:
import cv2 

"""
compute 4x4 transformation matrix from translation and quaternion
"""
def transform(translation, rot_quaternion)->np.array:
    x, y, z = translation
    w, i, j, k = rot_quaternion
    return np.array([
        [1-2*j**2-2*k**2, 2*i*j-2*w*k, 2*i*k+2*w*j, x],
        [2*i*j+2*w*k, 1-2*i**2-2*k**2, 2*j*k-2*w*i, y],
        [2*i*k-2*w*j, 2*j*k+2*w*i, 1-2*i**2-2*j**2, z],
        [0, 0, 0, 1]
    ])

def colormap(x, cmap=cv2.COLORMAP_JET):
    # normalize x to [0, 1]
    x = (x - np.min(x)) / (np.max(x) - np.min(x))
    x = (x * 255).astype(np.uint8)
    return cv2.applyColorMap(x, cmap)
    

for level in data['paths']:
    data_5120 = data[level]
    lidar_data = data_5120['lidar']
    imu_data = data_5120['imu']
    tf_data = data_5120['tf']
    tf_times = sorted(tf_data.keys())
    lidar_times = sorted(lidar_data.keys())
    imu_times = sorted(imu_data.keys())
    next_tf_t = tf_times[0]
    current_tf_t = next_tf_t
    
    for lidar_t in lidar_times:
        lidar_frame = lidar_data[lidar_t]
        xyz = lidar_frame['point_cloud_data']
        if xyz.shape[0] == 0:
            continue
        # log piont cloud in lidar coordinate	
        rr.set_time_seconds('sim_time', lidar_t)
        rr.log(f'/{level}/lidar', rr.Points3D(xyz))
        while lidar_t > next_tf_t:
            current_tf_t = next_tf_t
            next_tf_t = tf_times.pop(0)
        current_tf = tf_data[current_tf_t]
        #log point cloud in world coordinate
        lidar_pos, lidar_ori = current_tf['lidar']
        T = transform(lidar_pos, lidar_ori)
        xyz_homo = np.hstack([xyz, np.ones((xyz.shape[0], 1))]).T
        xyz_world = (T @ xyz_homo)[:3,:].T
        ranges = np.sqrt(np.sum(xyz_world**2, axis=1))
        # colorize base on differences with real radius
        colors = colormap(ranges - R)
        
        rr.log(
            entity_path=f'/world/{level}/lidar', 
            entity=rr.Points3D(
                positions=xyz_world,
                colors=colors
            )
        )

