In [None]:
from IPython import get_ipython
ipython = get_ipython()
if ipython is not None:
    ipython.run_line_magic('load_ext', 'autoreload')
    ipython.run_line_magic('autoreload', '2')
else:
    print("could not load extension")

%reload_ext autoreload

In [None]:
import getpass

user = getpass.getuser()
from omni.isaac.kit import SimulationApp

# Set the path below to your desired nucleus server
# Make sure you installed a local nucleus server before this
# and that isaacsim is running
simulation_app = SimulationApp({"livesync_usd": f"omniverse://localhost/Users/{user}/telecom_test.usd"})

In [None]:
import carb
from omni.physx import get_physx_scene_query_interface  # for raycasting e.g raycast_closest()
from omni.isaac.core import World
from omni.isaac.core.objects import DynamicCuboid, DynamicSphere, DynamicCone
from omni.timeline import get_timeline_interface
from omni.isaac.core.utils.stage import get_current_stage
import omni.isaac.core.utils.prims as prims_utils

import numpy as np
import matplotlib.pyplot as plt
from pprint import pprint
from math import pi as PI


# Antenna class

In [None]:
from scipy.constants import c

C = c # Speed of light

class Antenna(DynamicCylinder):
    def __init__(self, prim_path, name, waveLength, G, e, position=None, orientation=None, scale=None, color=None, radius=None, height=None, mode=0):
        super().__init__(prim_path=prim_path, name=name, position=position, orientation=orientation, scale=scale, color=color, radius=radius, height=height)
        self.waveLength = waveLength
        self.G = G
        self.e = e
        self.L = waveLength/2
        self.frequency = C/self.waveLength
        self.directivity = G/e
        self.mode = mode
        self.theta = 0
        self.phi = 0
    
    def getLength(self):
        return self.L
    
    def getGain(self):
        return self.G
    
    def getFrequency(self):
        return self.frequency
    
    def getDirectivity(self):
        return self.directivity
    
    def setMode(self, mode):
        if mode == 0 or mode == 1:
            self.mode = mode
        else:
            raise ValueError("The mode must be 0 (emitter) or 1 (receiver).")
    
    def setDirection(self, theta, phi):
        if 0 > theta or theta > PI:
            raise ValueError("Theta must have a value between 0 and pi.")
        if 0 > phi or phi > 2*PI:
            raise ValueError("Phi must have a value between 0 and 2*pi.")
        self.theta = theta
        self.phi = phi

# Setup Test World Programatically


## Case 1: Flat Plane (lunar surface) + simple objects

In [None]:
world = World(stage_units_in_meters=1.0)
world.clear()
world.scene.add_default_ground_plane()
# A render/ step or an update call is needed to reflect the changes to the opened USD in Isaac Sim GUI
#  Note: avoid pressing play / pause or stop in the GUI in this workflow.
world.render()

In [None]:
fancy_cube = world.scene.add(
    DynamicCuboid(
        prim_path="/World/random_cube",
        name="fancy_cube",
        position=np.array([0, 0, 1]),
        scale=np.array([1, 1, 1]),
        size=1.,
        color=np.array([0, 0, 1]),
    )
)

fancy_sphere = world.scene.add(
    DynamicSphere(
        prim_path="/World/random_sphere",
        name="fancy_sphere",
        position=np.array([2, 0, 1]),
        radius=1.,
        color=np.array([0, 1, 0]),
    )
)

fancy_cone = world.scene.add(
    DynamicCone(
        prim_path="/World/random_cone",
        name="fancy_cone",
        position=np.array([-2.5, -0.2, 1]),
        height=2.,
        radius=1.,
        color=np.array([1, 0, 0]),
    )
)

world.render()

## Case 2: Lunar environment + two antennas

In [None]:
world = World(stage_units_in_meters=1.0)
world.clear()

In [None]:
from omni.isaac.core.utils.stage import get_current_stage

usd_path = f"omniverse://localhost/Users/ubuntu/moon_environnement.usd"
prim_path = "/World/Lunar_Base"

ground_path = prim_path + "/Ground"
# create the prim
prim_ground = prims_utils.create_prim(prim_path=ground_path, usd_path=usd_path)
world.render()

In [None]:
### Creation of the first antenna, with a gain = 10dB, a waveLength = 1e-3m, an efficiency = 1

antenna1 = world.scene.add(
    Antenna(
        prim_path="/World/antenna1",
        name="antenna1",
        waveLength = 1e-3,
        G = 10,
        e = 1,
        position=np.array([1,1,2]),
        scale=np.array([0.1,0.1,4]),
        height=1.0,
        color=np.array([0.9,0.9,0.9]),
    )
)
world.render()

In [None]:
### Creation of the second antenna, with the same properties as the first one

antenna2 = world.scene.add(
    Antenna(
        prim_path="/World/antenna2",
        name="antenna2",
        waveLength = 1e-3,
        G = 10,
        e = 1,
        position=np.array([6,6,2]),
        scale=np.array([0.1,0.1,4]),
        height=1.0,
        color=np.array([0.9,0.9,0.9]),
    )
)
world.render()

In [None]:
### To create antennas from external USD use the following

stage = get_current_stage()
usd_path_antenna1 = f"omniverse://localhost/Users/ubuntu/antenna1.usdc"
antenna_path1 = prim_path + "/Antenna1"
usd_path_antenna2 = f"omniverse://localhost/Users/ubuntu/antenna2.usdc"
antenna_path2 = prim_path + "/Antenna2"
usd_path_body = f"omniverse://localhost/Users/ubuntu/body.usdc"
body_path = prim_path + "/Body"

prim_antenna1 = prims_utils.create_prim(prim_path=antenna_path1, usd_path=usd_path_antenna1, position = [5, -15, 15])

world.render()

prim_antenna2 = prims_utils.create_prim(prim_path=antenna_path2, usd_path=usd_path_antenna2, position = [-10, 10, 30])

world.render()

prim_body = prims_utils.create_prim(prim_path=body_path, usd_path=usd_path_body, position = [20, 0, 30])

world.render()

## Position of the Sun

In [None]:
from omni.isaac.core.utils.stage import get_current_stage
stage = get_current_stage()

def adjust_light(stage, position):
    lights = []
    for prim in stage.Traverse():
        if prim.GetTypeName() == "SphereLight":
            lights.append(prim)
    if lights:
        light = lights[0]
        light_path = light.GetPath()
        # Adjust position
        translate_attr = stage.GetPrimAtPath(light_path).GetAttribute("xformOp:translate")
        translate_attr.Set(position)
        # Adjust intensity
        intensity_attr = stage.GetPrimAtPath(light_path).GetAttribute("intensity")
        intensity_attr.Set(3e6)

In [None]:
sun_coord = (5, -5, 5)
adjust_light(stage, sun_coord)
world.render()

In [None]:
# Run the application for multiple frames to ensure the synthetic data pipeline is initialized
timeline = get_timeline_interface()
timeline.play()
for _ in range(10):
    simulation_app.update()
timeline.pause()

# Get groundtruth
rgb_data = rgb.get_data()
depth_data = depth.get_data()
semantic_segmentation_data = semantic_segmentation.get_data()

In [None]:
# Initialize grid with proper resolution
cell_size = 0.025  # meters
grid_size = int(10 / cell_size)   # e.g 100x100 grid for 0.1 cell size in a 10x10 area
half_side_size = grid_size/2*cell_size 
grid_size ** 2

# Raycasting

In [None]:
from pxr import UsdGeom, Gf, Vt

def perform_raycast(stage, object1, object2):

    object1_position,_ = object1.get_world_pose()
    object2_position,_ = object2.get_world_pose()
    
    # Calculate direction from object1 to object2
    direction = (object2_position[0] - object1_position[0], 
                 object2_position[1] - object1_position[1], 
                 object2_position[2] - object1_position[2])
    
    # Normalize the direction vector
    distance = np.linalg.norm(direction)
    direction_normalized = tuple(d / distance for d in direction)

    # Origin of the ray (from object1)
    origin = carb.Float3(object1_position[0], object1_position[1], object1_position[2])
    rayDir = carb.Float3(direction_normalized[0], direction_normalized[1], direction_normalized[2])
    
    # Perform the raycast and check for a hit
    hit = get_physx_scene_query_interface().raycast_closest(origin, rayDir, distance)

    if hit["hit"]:
        usdGeom = UsdGeom.Mesh.Get(stage, hit["rigidBody"])
        hitColor = Vt.Vec3fArray([Gf.Vec3f(1., 1., 0.0)])
        usdGeom.GetDisplayColorAttr().Set(hitColor)
        world.render()
        
        objects_distance = hit["distance"]
        return usdGeom.GetPath().pathString, objects_distance

    # No hit, return None and a large distance value
    return None, 10000.0

def move_origin_raycast(object1, object2, increment):

    object1_position,_ = object1.get_world_pose()
    object2_position,_ = object2.get_world_pose()
    
    direction = (object2_position[0] - object1_position[0], 
                 object2_position[1] - object1_position[1], 
                 object2_position[2] - object1_position[2])
    origin = np.array(object1_position)

    direction_normalized = direction / np.linalg.norm(direction)

    new_origin = origin + direction_normalized * increment
    
    return tuple(new_origin)

def check_raycast_coherence(output_ray, path2):
    path1 = output_ray[0]

    if path1 is None or path2 is None:
        return 0
    
    object_name1 = path1.split('/')[-1]
    object_name2 = path2.split('/')[-1]

    if object_name2 == object_name1:
        return 1
    else:
        return 0

In [None]:
def power_losses(distance):
    return 1/distance**2

def check_power_feasibility(distance, minimum_power):

    if power_losses(distance) >= minimum_power:
        return 1
    else:
        return 0

In [None]:
### example

origin = fancy_sphere
pointingObject = fancy_cone
pointingObjectPath = pointingObject.prim_path
minimum_power = 10

origin = move_origin_raycast(origin, pointingObject, 2*1.)
out_raycast = perform_raycast(stage, origin, pointingObject)

print(check_raycast_coherence)

if check_raycast_coherence:
    print(check_power_feasibility(out_raycast[1], minimum_power))

# Example of use

In [None]:
#world = World(stage_units_in_meters=1.0)
world.clear()
world.scene.add_default_ground_plane()
# A render/ step or an update call is needed to reflect the changes to the opened USD in Isaac Sim GUI
#  Note: avoid pressing play / pause or stop in the GUI in this workflow.
world.render()

fancy_cube = world.scene.add(
    DynamicCuboid(
        prim_path="/World/random_cube",
        name="fancy_cube",
        position=np.array([20, 20, 1.5]),
        scale=np.array([1, 1, 1]),
        size=3.,
        color=np.array([0, 0, 1]),
    )
)

fancy_sphere = world.scene.add(
    DynamicSphere(
        prim_path="/World/random_sphere",
        name="fancy_sphere",
        position=np.array([40, -15, 2]),
        radius=2.,
        color=np.array([0, 1, 0]),
    )
)

fancy_cone = world.scene.add(
    DynamicCone(
        prim_path="/World/random_cone",
        name="fancy_cone",
        position=np.array([-35, -0.2, 1]),
        height=2.,
        radius=1.,
        color=np.array([1, 0, 0]),
    )
)

# Run the application for multiple frames to ensure the synthetic data pipeline is initialized
timeline = get_timeline_interface()
timeline.play()
for _ in range(10):
    simulation_app.update()
timeline.pause()

# Get groundtruth
rgb_data = rgb.get_data()
depth_data = depth.get_data()
semantic_segmentation_data = semantic_segmentation.get_data()

In [None]:
size_cube = 3.
size_sphere = 2.

fancy_sphere_pos, _ = fancy_sphere.get_world_pose()
fancy_cone_pos, _= fancy_cone.get_world_pose()
fancy_cube_pos, _ = fancy_cube.get_world_pose()

origin = fancy_cube_pos
pointingObject = fancy_sphere_pos
pointingObjectPath = fancy_sphere.prim_path
origin = move_origin_raycast(origin, pointingObject, 1.1*3.)

out_raycast = check_raycast(stage, origin, pointingObject)
print(out_raycast)

print(are_paths_identical(out_raycast, pointingObjectPath))


print(check_power_feasibility(out_raycast[1], 10))