In [None]:
import os
gpu_num = "" # Use "" to use the CPU
os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import sionna

import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
# Avoid warnings from TensorFlow
tf.get_logger().setLevel('ERROR')

tf.random.set_seed(1) # Set global random seed for reproducibility

import matplotlib.pyplot as plt
import numpy as np
import time
import open3d as o3d
import math



TX_POSITION = [100, 170]
RX_GRID_SPACING = 10; 
RX_HEIGHT = 1.5
# CENTER_X, CENTER_Y, CENTER_Z = (27, 1, -3)
SCENE_NAME = "quarry"

RX_GRID_AREA = [95, 105, 165, 175]; # Meters x1, x2, y1, y2 set to [] to use whole area

# subcarrier_frequencies = sionna.channel.subcarrier_frequencies(128, )






# Allows to exit cell execution in Jupyter
class ExitCell(Exception):
    def _render_traceback_(self):
        pass


# Import Sionna RT components
from sionna.rt import load_scene, Transmitter, Receiver, PlanarArray, Camera

In [None]:
o3dmesh = o3d.io.read_triangle_mesh(f"models/{SCENE_NAME}.ply")
o3dmesh = o3d.t.geometry.TriangleMesh.from_legacy(o3dmesh)
box = o3dmesh.get_axis_aligned_bounding_box()
z = box.max_bound[2].item() + 10 # 10 units above highest point is where rays start


if len(RX_GRID_AREA) == 4:
    min_x = RX_GRID_AREA[0]
    max_x = RX_GRID_AREA[1]
    min_y = RX_GRID_AREA[2]
    max_y = RX_GRID_AREA[3]
else:
    min_x = int(box.min_bound[0].item())
    max_x = int(box.max_bound[0].item())
    min_y = int(box.min_bound[1].item())
    max_y = int(box.max_bound[1].item())

scene = o3d.t.geometry.RaycastingScene()
scene.add_triangles(o3dmesh)
rays = []
ray_locations = []
num_elements_x = (int) ((max_x - min_x) / RX_GRID_SPACING) + 1
num_elements_y = (int) ((max_y - min_y) / RX_GRID_SPACING) + 1
print(num_elements_x * num_elements_y)
for col in range(num_elements_x):
    for row in range(num_elements_y):
        x = min_x + (col * RX_GRID_SPACING)
        y = min_y + (row * RX_GRID_SPACING)
        rays.append([x, y, z, 0, 0, -1])
        ray_locations.append((x, y))

rays = o3d.core.Tensor(rays, dtype=o3d.core.Dtype.Float32)

ans = scene.cast_rays(rays)

vertical_dist = ans['t_hit'].numpy()
points = []
for i, z_offset in enumerate(vertical_dist):
    if np.isfinite(z_offset): # check if ray hit
        x, y = ray_locations[i]
        points.append((x, y, z - z_offset))

tx_ray = o3d.core.Tensor([[TX_POSITION[0], TX_POSITION[1], z, 0, 0, -1]], dtype=o3d.core.Dtype.Float32)
tx_ans = scene.cast_rays(tx_ray)
tx_vertical_dist = tx_ans['t_hit'].numpy()
for i, z_offset in enumerate(tx_vertical_dist):
    if np.isfinite(z_offset): # check if ray hit
        tx_pos = (TX_POSITION[0], TX_POSITION[1], z - z_offset)

In [None]:
scene = load_scene(f"models/{SCENE_NAME}.xml")
scene.frequency = 2.408e9


scene.tx_array = PlanarArray(num_rows=1,
                          num_cols=1,
                          vertical_spacing=.5,
                          horizontal_spacing=.5,
                          pattern="tr38901",
                          polarization="V")

# Configure antenna array for all receivers
scene.rx_array = PlanarArray(num_rows=1,
                             num_cols=1,
                             vertical_spacing=0.5,
                             horizontal_spacing=0.5,
                             pattern="dipole",
                             polarization="cross")

tx = Transmitter(name="tx",
              position=[tx_pos[0], tx_pos[1], tx_pos[2] + RX_HEIGHT],
              orientation=[0,0,0])
scene.add(tx)

# Create a receiver

for x, y, z in points:
    rx = Receiver(name=f"rx_{x}_{y}",
           position=[x, y, z + RX_HEIGHT],
           orientation=[0,0,0])
    scene.add(rx)

# TX points towards RX
tx.look_at(rx)

In [None]:
paths = scene.compute_paths(
                            # check_scene=False,
                            # diffraction=True,
                            # scattering=True,
                            reflection = False,
                            )
paths.normalize_delays = False


In [None]:

# Show the coordinates of the starting points of all rays.
# These coincide with the location of the transmitters.
print("Source coordinates: ", paths.sources.numpy())
print("Transmitter coordinates: ", list(scene.transmitters.values())[0].position.numpy())

# Show the coordinates of the endpoints of all rays.
# These coincide with the location of the receivers.
print("Target coordinates: ",paths.targets.numpy())
print("Receiver coordinates: ",list(scene.receivers.values())[0].position.numpy())

# Show the types of all paths:
# 0 - LoS, 1 - Reflected, 2 - Diffracted, 3 - Scattered
# Note that Diffraction and scattering are turned off by default.
print("Path types: ", paths.types.numpy())

path_idx = 0
for i in range(441):
    if paths.a[0,i,0,0,0,path_idx, 0].numpy() != 0:
        print(i)
        break
print(f"\n--- Detailed results for path {path_idx} ---")
print(f"Channel coefficient: {paths.a[0,i,0,0,0,path_idx, 0].numpy()}")
print(f"Propagation delay: {paths.tau[0,i,0,path_idx].numpy()*1e6:.5f} us")
print(f"Zenith angle of departure: {paths.theta_t[0,i,0,path_idx]:.4f} rad")
print(f"Azimuth angle of departure: {paths.phi_t[0,i,0,path_idx]:.4f} rad")
print(f"Zenith angle of arrival: {paths.theta_r[0,i,0,path_idx]:.4f} rad")
print(f"Azimuth angle of arrival: {paths.phi_r[0,i,0,path_idx]:.4f} rad")

In [None]:
import h5py

with h5py.File("tiny_example_data.h5", 'r') as example_file:
    example_csis_mag = example_file['csis_mag'][:]
    example_csis_phase = example_file['csis_phase'][:]
    example_rx_positions = example_file['positions'][:] #done
    example_ray_aoas = example_file['ray_aoas'][:] #done
    example_ray_aods = example_file['ray_aods'][:] #done
    example_ray_delays = example_file['ray_delays'][:] #done
    example_ray_path_losses = example_file['ray_path_losses'][:]

rx_positions = [[x for x, _, _ in paths.targets], [y for _, y, _ in paths.targets], [z for __, _, z in paths.targets]]
ray_aoas = tf.concat([tf.transpose(sionna.channel.rad_2_deg(paths.phi_r), [0, 3, 2, 1])[0], tf.transpose(sionna.channel.rad_2_deg(paths.theta_r) - 90, [0, 3, 2, 1])[0] * -1], 1)
ray_aods = tf.concat([tf.transpose(sionna.channel.rad_2_deg(paths.phi_t), [0, 3, 2, 1])[0], tf.transpose(sionna.channel.rad_2_deg(paths.theta_t) - 90, [0, 3, 2, 1])[0] * -1], 1)
ray_delays = tf.transpose(paths.tau[0, :, 0, :], [1, 0])
# this function is most of what I need for calculating mag and phase I think sionna.channel.cir_to_ofdm_channel


with h5py.File("test_dataset.h5", 'a') as test_file:
    test_file.create_dataset("positions", data=rx_positions)
    test_file.create_dataset("ray_aoas", data=ray_aoas)
    test_file.create_dataset("ray_aods", data=ray_aods)
    test_file.create_dataset("ray_delays", data=ray_delays)
    

with h5py.File("test_dataset.h5", 'r') as test_file:
    test_rx_positions = test_file['positions'][:]
    test_ray_aoas = test_file['ray_aoas'][:]
    test_ray_aods = test_file['ray_aods'][:]
    test_ray_delays = test_file['ray_delays'][:]
    
os.remove("test_dataset.h5")
# print(example_ray_aoas)
# print(example_ray_aods )
# print(test_ray_aoas)
# print(test_ray_aods )


# print(np.shape(example_rx_positions))
print(np.shape(example_csis_mag))
print(np.shape(example_csis_phase))

# print(np.shape(example_ray_delays))
# print(np.shape(example_ray_aoas))
# print(np.shape(example_ray_aods ))
print(np.shape(example_ray_path_losses))


