In [8]:
# === Cell 1: GPU 與環境初始化 ===
import os # Configure which GPU
if os.getenv("CUDA_VISIBLE_DEVICES") is None:
    gpu_num = 0 # Use "" to use the CPU
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{gpu_num}"
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# Import or install Sionna
try:
    import sionna.phy
    import sionna.rt
except ImportError as e:
    import sys
    if 'google.colab' in sys.modules:
       print("Installing Sionna and restarting the runtime. Please run the cell again.")
       os.system("pip install sionna")
       os.kill(os.getpid(), 5)
    else:
       raise e

# Configure TensorFlow GPU
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)
tf.get_logger().setLevel('ERROR')

import numpy as np

# For link-level simulations
from sionna.phy.channel import OFDMChannel, CIRDataset
from sionna.phy.nr import PUSCHConfig, PUSCHTransmitter, PUSCHReceiver
from sionna.phy.utils import ebnodb2no, PlotBER
from sionna.phy.ofdm import KBestDetector, LinearDetector
from sionna.phy.mimo import StreamManagement

# Import Sionna RT components
from sionna.rt import load_scene, Camera, Transmitter, Receiver, PlanarArray, PathSolver, RadioMapSolver

print("[✔] Sionna-RT environment ready.")


[✔] Sionna-RT environment ready.


In [9]:
# === Cell 2: JSON 監聽設定 ===
import json, time
from IPython.display import clear_output

JSON_PATH = "../../jason/uav_from_sionna.json"   # 監聽 JSON 檔
CHECK_INTERVAL = 2.0                   # 每 2 秒更新

last_scene_path = None
scene = None

def read_json(path):
    """安全讀取 JSON"""
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except Exception as e:
        print(f"[ERROR] 無法讀取 JSON: {e}")
        return None


In [10]:
# === Cell 3: 根據 JSON 選擇場景 (僅 nycu, nycu_left, nycu_right) ===

data = read_json(JSON_PATH)

# 對應 UAV 的三個區域名稱（C=左, A=中, B=右）
REGION_PATHS = {
    "A": "../blender_xml/nycu/nycu.xml",
    "B": "../blender_xml/nycu_right/nycu_right.xml",
    "C": "../blender_xml/nycu_left/nycu_left.xml"
}

# 根據 JSON 的 uav.region 選擇
if data and "uav" in data and "region" in data["uav"]:
    region = data["uav"]["region"].strip().upper()
    xml_path = REGION_PATHS.get(region, REGION_PATHS["A"])  # 預設載入 A（中央）
else:
    xml_path = REGION_PATHS["A"]

print(f"[INFO] 載入場景: {xml_path}")

scene = load_scene(xml_path)
last_scene_path = xml_path


[INFO] 載入場景: ../blender_xml/nycu_right/nycu_right.xml


In [11]:
# === Cell 4: 系統參數與天線設定 ===

no_preview = True

subcarrier_spacing = 30e3 # Hz
num_time_steps = 14 # Total number of ofdm symbols per slot
carrier_frequency = 3.5e9 # Hz

num_tx = 2 # Number of users
num_rx = 1 # Only one receiver considered
num_tx_ant = 1 # Each user has 4 antennas
num_rx_ant = 2 # The receiver is equipped with 16 antennas

batch_size_cir = 6

# Transmitter (=basestation) has an antenna pattern from 3GPP 38.901
scene.tx_array = PlanarArray(
    num_rows=1,
    num_cols=num_rx_ant//2,
    vertical_spacing=0.5,
    horizontal_spacing=0.5,
    pattern="tr38901",
    polarization="cross"
)

# 建立 transmitters
tx1 = Transmitter(name="tx1", position=[30, 20, 50],
                  look_at=[30, 0, 10], power_dbm=23, display_radius=8.)
tx2 = Transmitter(name="tx2", position=[30, 46, 22],
                  look_at=[30, 0, 10], power_dbm=23, display_radius=8.)

scene.add(tx1)
scene.add(tx2)

bird_cam = Camera(position=[0, 80, 500],
                  orientation=np.array([0, np.pi/2, -np.pi/2]))

rm_solver = RadioMapSolver()
print("[✔] Transmitters and camera initialized.")


[✔] Transmitters and camera initialized.


In [14]:
# === Cell 5: 初始化與工具函式 ===
import os, time
from IPython.display import clear_output

def safe_float(val):
    """安全轉成 Python float，支援 Float / Tensor / ndarray"""
    try:
        if hasattr(val, "item"):
            return float(val.item())
        return float(val)
    except Exception:
        return 0.0

# === Region 對應中心位置 ===
REGION_CENTERS = {
    "A": 0.0,
    "B": 1400.0,   # B 在右邊
    "C": -1400.0,  # C 在左邊
}

# === 全域狀態 ===
last_scene_path = None
tx1 = None
tx2 = None

# === 主函式：更新場景 ===
def update_scene_from_json(data):
    """依據 JSON 資料更新場景或重新載入"""
    global last_scene_path, tx1, tx2

    moved = False  # ✅ 標記 UAV 是否移動

    # === region 與 XML 路徑 ===
    if data and "uav" in data and "region" in data["uav"]:
        region = data["uav"]["region"].strip().upper()
        xml_path = REGION_PATHS.get(region, REGION_PATHS["A"])
    else:
        region = "A"
        xml_path = REGION_PATHS["A"]

    # === 若場景改變 ===
    if xml_path != last_scene_path:
        print(f"[INFO] 載入場景: {xml_path}")
        scene = load_scene(xml_path)
        last_scene_path = xml_path

        # 重建發射器
        scene.tx_array = PlanarArray(
            num_rows=1, num_cols=num_rx_ant // 2,
            vertical_spacing=0.5, horizontal_spacing=0.5,
            pattern="tr38901", polarization="cross"
        )

        tx_radius = 8.0
        tx1 = Transmitter(name="tx1", position=[30, 20, 50],
                          look_at=[30, 0, 10], power_dbm=23,
                          display_radius=tx_radius, color=[1, 0, 0])
        scene.add(tx1)

        tx2 = Transmitter(name="tx2", position=[30, 46, 22],
                          look_at=[30, 0, 10], power_dbm=23,
                          display_radius=tx_radius, color=[1, 0, 0])
        scene.add(tx2)
    else:
        scene = None  # 保留現有場景

    # === 若 UAV 更新 ===
    if data and "uav" in data:
        u = data["uav"]
        x = safe_float(u.get("x", 0))
        y = safe_float(u.get("y", 0))
        z = safe_float(u.get("z", 0))
        region = u.get("region", "A").strip().upper()

        region_center = REGION_CENTERS.get(region, 0.0)
        x_adj = x - region_center

        if tx1 is not None:
            old_pos = list(tx1.position)
            new_pos = [x_adj, y, z]
            if any(abs(a - b) > 1e-3 for a, b in zip(old_pos, new_pos)):
                tx1.position = new_pos
                moved = True  # ✅ 有移動
                print(f"[TX-UPDATE] tx1 → ({x_adj:.1f}, {y:.1f}, {z:.1f})  region={region}")

    return scene, region, moved


print("[INIT] 已定義 safe_float() 與 update_scene_from_json()。請執行下一格以開始監聽。")


[INIT] 已定義 safe_float() 與 update_scene_from_json()。請執行下一格以開始監聽。


In [17]:
# === Cell 2: 主監聽迴圈 ===
print("[SYSTEM] 啟動 JSON 監聽器，每 20 秒更新一次（Ctrl + C 停止）")

CHECK_INTERVAL = 20.0
current_scene = None

while True:
    try:
        data = read_json(JSON_PATH)
        scene, region, moved = update_scene_from_json(data)

        # ✅ 若場景被重新載入，更新 current_scene
        if scene is not None:
            current_scene = scene

        # ✅ 若 UAV 移動 或 場景改變，都重新 render
        if current_scene is not None and moved:
            rm = rm_solver(current_scene, max_depth=8,
                           cell_size=(2., 2.), samples_per_tx=10**5)
            clear_output(wait=True)
            print(f"[UPDATE] {time.strftime('%H:%M:%S')} - region = {region}")
            current_scene.preview(radio_map=rm, rm_vmin=-110, clip_at=None)

        time.sleep(CHECK_INTERVAL)

    except KeyboardInterrupt:
        print("\n[STOPPED] JSON 監聽結束。")
        break
    except Exception as e:
        print(f"[ERROR] {e}")
        time.sleep(CHECK_INTERVAL)


[UPDATE] 14:57:00 - region = A


HBox(children=(Renderer(camera=PerspectiveCamera(aspect=1.31, children=(DirectionalLight(intensity=0.25, posit…


[STOPPED] JSON 監聽結束。
