# Real-Time Network Emulation Visualization

This notebook provides real-time visualization of the running SiNE emulation.

## Prerequisites

1. **Channel server must be running**: `uv run sine channel-server`
2. **Emulation must be deployed**: `sudo $(which uv) run sine deploy <topology.yaml>`

## How It Works

- Polls the channel server's `/api/visualization/state` endpoint
- Displays cached path data from previous channel computations
- Shows wireless channel metrics (delay spread, K-factor, coherence bandwidth)
- Updates every 1 second (configurable)

**Important**: No computation is performed in this notebook - all data is pre-computed and cached by the channel server during emulation deployment/updates.

In [1]:
# Cell 1: Setup and Configuration
import asyncio
import httpx
from IPython.display import clear_output, display
from typing import Any

# Configuration
CHANNEL_API = "http://localhost:8000"
UPDATE_INTERVAL_SEC = 1.0  # Poll interval
MAX_RENDER_PATHS = 5       # Limit paths per link for performance

In [2]:
# Cell 2: Helper Functions
async def fetch_visualization_state() -> dict[str, Any]:
    """
    Fetch complete visualization state from channel server.

    Returns scene geometry, device positions, and CACHED paths.
    No computation required - instant response.
    """
    async with httpx.AsyncClient(timeout=5.0) as client:
        response = await client.get(f"{CHANNEL_API}/api/visualization/state")
        response.raise_for_status()
        return response.json()

In [None]:
# Cell 3: Scene Preview with Paths (Option 2: Re-compute paths in notebook)
from sionna.rt import load_scene, Transmitter, Receiver, PlanarArray, PathSolver
import numpy as np
from typing import Optional
from pathlib import Path

def render_scene_with_paths(viz_state: dict[str, Any], clip_at: Optional[float] = None) -> None:
    """
    Render 3D scene with devices and propagation paths using Sionna preview.
    
    Implementation: Option 2 (Re-compute paths for visualization)
    - Uses cached device positions from channel server
    - Re-runs PathSolver in notebook to get Paths object
    - Small overhead acceptable for snapshot/infrequent visualization
    
    Args:
        viz_state: Visualization state from channel server
        clip_at: Optional z-coordinate to clip scene (useful for indoor scenes)
    """
    scene_file = viz_state.get('scene_file')
    if not scene_file:
        print("No scene file available - text visualization only")
        return
    
    try:
        # Resolve scene path - handle both absolute and relative paths
        scene_path = Path(scene_file)
        if not scene_path.is_absolute():
            notebook_dir = Path.cwd()
            if notebook_dir.name == 'scenes':
                project_root = notebook_dir.parent
            else:
                project_root = notebook_dir
            scene_path = project_root / scene_file
        
        if not scene_path.exists():
            print(f"Scene file not found: {scene_path}")
            print(f"Current directory: {Path.cwd()}")
            print("Continuing with text-only visualization...")
            return
        
        # Load scene (merge_shapes=False to keep surfaces separate)
        scene = load_scene(str(scene_path), merge_shapes=False)
        
        # Configure minimal antenna arrays for visualization
        scene.tx_array = PlanarArray(
            num_rows=1, num_cols=1,
            vertical_spacing=0.5, horizontal_spacing=0.5,
            pattern="iso", polarization="V"
        )
        scene.rx_array = PlanarArray(
            num_rows=1, num_cols=1,
            vertical_spacing=0.5, horizontal_spacing=0.5,
            pattern="iso", polarization="V"
        )
        
        # Set frequency (important for material properties)
        scene.frequency = 5.18e9  # 5.18 GHz (WiFi 5)
        
        # Add devices from cached positions
        tx_added = set()
        rx_added = set()
        
        for link in viz_state["paths"]:
            tx_name = link["tx_name"]
            rx_name = link["rx_name"]
            tx_pos = link["tx_position"]
            rx_pos = link["rx_position"]
            
            # Add TX if not already added
            if tx_name not in tx_added:
                scene.add(Transmitter(name=tx_name, position=tx_pos))
                tx_added.add(tx_name)
            
            # Add RX if not already added
            if rx_name not in rx_added:
                scene.add(Receiver(name=rx_name, position=rx_pos))
                rx_added.add(rx_name)
        
        print(f"\n{'='*70}")
        print("3D SCENE PREVIEW WITH PROPAGATION PATHS")
        print(f"{'='*70}")
        print(f"Scene: {scene_path}")
        print(f"Devices: {len(tx_added)} TX, {len(rx_added)} RX")
        print(f"Links: {len(viz_state['paths'])}")
        print("\nComputing propagation paths for visualization...")
        print("(Note: Paths are re-computed from cached positions)")
        
        # Re-compute paths using PathSolver
        # This is redundant (already computed for netem), but necessary
        # to get a Sionna Paths object for scene.preview()
        solver = PathSolver(scene=scene)
        paths = solver()
        
        print("Paths computed successfully!")
        print(f"{'='*70}\n")
        
        # Preview scene with paths and clipping
        if clip_at is not None:
            scene.preview(paths=paths, clip_at=clip_at)
        else:
            scene.preview(paths=paths)
            
    except Exception as e:
        print(f"Failed to render scene: {e}")
        import traceback
        traceback.print_exc()
        print(f"Scene path attempted: {scene_path if 'scene_path' in locals() else scene_file}")
        print(f"Current directory: {Path.cwd()}")

In [10]:
# Cell 4: Text Display with Wireless Metrics
def display_text_summary(viz_state: dict[str, Any]) -> None:
    """
    Display text summary of visualization state with wireless channel analysis.
    
    Args:
        viz_state: Visualization state from channel server
    """
    print(f"=== Visualization State (Cache: {viz_state['cache_size']} links) ===")
    print(f"Scene: {viz_state.get('scene_file', 'N/A')}\n")

    # Display device positions
    print("Devices:")
    for device in viz_state["devices"]:
        pos = device["position"]
        print(f"  {device['name']}: ({pos['x']:.1f}, {pos['y']:.1f}, {pos['z']:.1f})")

    # Display detailed channel information
    print(f"\n{'='*70}")
    print("WIRELESS CHANNEL ANALYSIS")
    print(f"{'='*70}")

    for link_data in viz_state["paths"]:
        print(f"\nLink: {link_data['tx_name']} → {link_data['rx_name']}")
        print(f"{'-'*70}")

        # Basic link info
        print(f"Distance: {link_data['distance_m']:.1f} m")
        print(f"Paths: {link_data['num_paths_shown']}/{link_data['num_paths_total']} "
              f"({link_data.get('power_coverage_percent', 100):.1f}% power)")

        # Delay spread analysis (ISI characterization)
        rms_ds_ns = link_data.get('rms_delay_spread_ns', 0)
        bc_mhz = link_data.get('coherence_bandwidth_hz', 0) / 1e6

        print(f"\nDelay Characteristics:")
        print(f"  RMS Delay Spread (τ_rms): {rms_ds_ns:.2f} ns")
        print(f"  Coherence Bandwidth (Bc): {bc_mhz:.1f} MHz")

        # Frequency selectivity assessment
        # Assume 80 MHz signal BW (adjust based on your config)
        signal_bw_mhz = 80  # TODO: Get from link config
        if bc_mhz > signal_bw_mhz:
            print(f"  ✓ Frequency-flat channel (Bc > BW)")
        else:
            print(f"  ⚠ Frequency-selective channel (Bc ≈ BW)")
            print(f"    ISI may be significant - OFDM recommended")

        # LOS/NLOS classification via Rician K-factor
        k_factor = link_data.get('k_factor_db')
        dominant_type = link_data.get('dominant_path_type', 'unknown')

        print(f"\nChannel Classification:")
        if k_factor is not None:
            print(f"  Rician K-factor: {k_factor:.1f} dB")
            if k_factor > 10:
                print(f"  → Strong LOS component (K > 10 dB)")
            elif k_factor > 0:
                print(f"  → Moderate LOS with multipath (0 < K < 10 dB)")
            else:
                print(f"  → NLOS dominant (K < 0 dB)")
        else:
            print(f"  Channel Type: NLOS (no direct path)")
            print(f"  Dominant: {dominant_type}")

        # Individual path details
        print(f"\nPropagation Paths (strongest {link_data['num_paths_shown']}):")
        for i, path in enumerate(link_data['paths'], 1):
            los_marker = " [LOS]" if path['is_los'] else ""
            interactions = ", ".join(path['interaction_types']) if path['interaction_types'] else "direct"
            doppler = f", Doppler: {path.get('doppler_hz', 0):.1f} Hz" if path.get('doppler_hz') is not None else ""

            print(f"  Path {i}: {path['delay_ns']:.2f} ns, {path['power_db']:.1f} dB{los_marker}")
            print(f"          Interactions: {interactions}{doppler}")

In [11]:
# Cell 5: One-Time Snapshot with 3D Preview
async def render_snapshot(show_3d: bool = True, clip_at: Optional[float] = 2.0) -> None:
    """
    Render a single snapshot of current visualization state.
    
    Args:
        show_3d: Whether to show 3D scene preview (default: True)
        clip_at: Z-coordinate to clip scene at (default: 2.0m for indoor scenes)
    
    Use this instead of the continuous loop if you just want
    to see the current state once.
    """
    # Fetch current state
    viz_state = await fetch_visualization_state()

    # Display 3D scene preview if requested
    if show_3d:
        render_scene_with_paths(viz_state, clip_at=clip_at)
    
    # Display text summary
    display_text_summary(viz_state)

# Run snapshot (with 3D preview clipped at 2m height)
await render_snapshot(show_3d=True, clip_at=2.0)

[0m[31m2026-01-06 17:00:23 WARN  [HDRFilm] Monochrome mode enabled, setting film output pixel format to 'luminance' (was rgb).

3D SCENE PREVIEW
Scene: /home/joshua/Documents/SiNE/scenes/two_rooms.xml
Devices: 1 TX, 1 RX
Links: 1

Note: Path visualization shows cached propagation paths



HBox(children=(Renderer(camera=PerspectiveCamera(aspect=1.31, children=(DirectionalLight(intensity=0.25, posit…

HBox(children=(Label(value='Clipping plane', layout=Layout(flex='2 2 auto', width='auto')), Checkbox(value=Tru…

=== Visualization State (Cache: 1 links) ===
Scene: scenes/two_rooms.xml

Devices:
  node1: (10.0, 10.0, 1.0)
  node2: (30.0, 10.0, 1.0)

WIRELESS CHANNEL ANALYSIS

Link: node1 → node2
----------------------------------------------------------------------
Distance: 20.0 m
Paths: 5/9 (92.4% power)

Delay Characteristics:
  RMS Delay Spread (τ_rms): 61.67 ns
  Coherence Bandwidth (Bc): 3.2 MHz
  ⚠ Frequency-selective channel (Bc ≈ BW)
    ISI may be significant - OFDM recommended

Channel Classification:
  Channel Type: NLOS (no direct path)
  Dominant: los

Propagation Paths (strongest 5):
  Path 1: 0.00 ns, -99.4 dB
          Interactions: refraction, refraction
  Path 2: 145.03 ns, -101.3 dB
          Interactions: specular_reflection, specular_reflection, specular_reflection
  Path 3: 0.33 ns, -104.2 dB
          Interactions: refraction, specular_reflection, refraction
  Path 4: 0.33 ns, -104.2 dB
          Interactions: refraction, specular_reflection, refraction
  Path 5: 1.32 ns,

In [12]:
# Cell 5c: Simple Scene Test (Diagnose preview issues)
from sionna.rt import load_scene, Transmitter, Receiver, PlanarArray
from pathlib import Path

# Simple test: just load and preview the scene
scene_path = Path("/home/joshua/Documents/SiNE/scenes/two_rooms.xml")

print(f"Loading scene from: {scene_path}")
print(f"File exists: {scene_path.exists()}")

scene = load_scene(str(scene_path), merge_shapes=False)
print(f"Scene loaded successfully with {len(scene.objects)} objects")

# Configure arrays
scene.tx_array = PlanarArray(num_rows=1, num_cols=1, pattern="iso", polarization="V")
scene.rx_array = PlanarArray(num_rows=1, num_cols=1, pattern="iso", polarization="V")
scene.frequency = 5.18e9

# Add a simple TX and RX
scene.add(Transmitter(name="tx1", position=[10.0, 10.0, 1.0]))
scene.add(Receiver(name="rx1", position=[30.0, 10.0, 1.0]))

print("\nCalling scene.preview(clip_at=2.0)...")
print("Note: If you don't see a 3D viewer, your Jupyter environment may not support interactive preview.")
print("Try running this notebook in standard Jupyter Notebook (not VS Code) if the preview doesn't appear.")

# This should trigger the preview
scene.preview(clip_at=2.0)

Loading scene from: /home/joshua/Documents/SiNE/scenes/two_rooms.xml
File exists: True
[0m[31m2026-01-06 17:00:40 WARN  [HDRFilm] Monochrome mode enabled, setting film output pixel format to 'luminance' (was rgb).
Scene loaded successfully with 11 objects

Calling scene.preview(clip_at=2.0)...
Note: If you don't see a 3D viewer, your Jupyter environment may not support interactive preview.
Try running this notebook in standard Jupyter Notebook (not VS Code) if the preview doesn't appear.


HBox(children=(Renderer(camera=PerspectiveCamera(aspect=1.31, children=(DirectionalLight(intensity=0.25, posit…

HBox(children=(Label(value='Clipping plane', layout=Layout(flex='2 2 auto', width='auto')), Checkbox(value=Tru…

In [None]:
# Cell 7: Continuous Auto-Refresh Loop (for mobility scenarios)
import asyncio
from IPython.display import clear_output

async def continuous_monitoring(update_interval_sec: float = 1.0, max_iterations: int = 100) -> None:
    """
    Continuously poll and display visualization state.
    
    Args:
        update_interval_sec: How often to refresh (default: 1.0 seconds)
        max_iterations: Maximum number of updates (default: 100, ~1.6 minutes at 1s interval)
    
    Use this for monitoring mobility scenarios where device positions change.
    Press 'Interrupt Kernel' (■ button) to stop.
    """
    print(f"Starting continuous monitoring (updating every {update_interval_sec}s)")
    print("Press 'Interrupt Kernel' (■ button in toolbar) to stop\n")
    
    iteration = 0
    try:
        while iteration < max_iterations:
            clear_output(wait=True)
            
            # Fetch and display current state
            viz_state = await fetch_visualization_state()
            
            print(f"=== Update {iteration + 1}/{max_iterations} ===")
            print(f"Timestamp: {asyncio.get_event_loop().time():.1f}s since start\n")
            
            # Display text summary (scene.preview() doesn't auto-refresh well)
            display_text_summary(viz_state)
            
            print(f"\n{'='*70}")
            print(f"Next update in {update_interval_sec}s... (Ctrl+C or ■ to stop)")
            print(f"{'='*70}")
            
            iteration += 1
            await asyncio.sleep(update_interval_sec)
            
    except KeyboardInterrupt:
        print("\n\nMonitoring stopped by user")
    except Exception as e:
        print(f"\n\nMonitoring stopped due to error: {e}")
        import traceback
        traceback.print_exc()

# Example: Monitor for 60 seconds (60 updates at 1s interval)
# Uncomment the line below to start continuous monitoring:
# await continuous_monitoring(update_interval_sec=1.0, max_iterations=60)