# Parallel Scene Optimization

This notebook parallelizes the boresight optimization across multiple scenes, utilizing all 8 of your RTX 6000 Ada GPUs.

## Your Hardware
- **8 × NVIDIA RTX 6000 Ada Generation** (49 GB each)
- **~48 GB free** per GPU
- **Total compute: 392 GB GPU memory**

## Parallelization Strategy

We'll use **Python multiprocessing** to run 8 scenes simultaneously (one per GPU). Here's why:

| Approach | Pros | Cons | Best For |
|----------|------|------|----------|
| **Sequential** | Simple, predictable | Wastes 7 GPUs | Debugging |
| **Threading** | Low overhead | GIL blocks CPU work | I/O-bound tasks |
| **Multiprocessing** ✓ | True parallelism, GPU isolation | Memory overhead | GPU-heavy tasks |
| **Distributed (Ray/Dask)** | Scales to clusters | Complex setup | Multi-machine |

## Key Concepts Explained

### 1. Why Multiprocessing (not Threading)?

Python has a **Global Interpreter Lock (GIL)** that prevents true parallel execution of Python code in threads. While GPU operations release the GIL, the scene loading and setup code is CPU-bound.

```
Threading:      [Thread 1]----[Thread 2]----[Thread 1]----  (interleaved)
Multiprocessing: [Process 1]========================
                 [Process 2]========================        (truly parallel)
```

### 2. Why `spawn` Start Method?

CUDA requires the `spawn` start method (not `fork`) because:
- `fork` copies the parent's CUDA context, causing conflicts
- `spawn` creates fresh processes with their own CUDA contexts

### 3. GPU Assignment Strategy

We use **round-robin assignment**: Scene 0 → GPU 0, Scene 1 → GPU 1, ..., Scene 8 → GPU 0, etc.

This ensures even distribution across all 8 GPUs.

In [None]:
# ============================================================================
# CELL 1: Setup and GPU Detection
# ============================================================================
# IMPORTANT: This cell must run FIRST before any CUDA operations

import os
import sys
import torch

# Set multiprocessing start method (required for CUDA)
# This MUST be done before spawning any processes
import torch.multiprocessing as mp
try:
    mp.set_start_method('spawn', force=True)
    print("✓ Multiprocessing start method set to 'spawn'")
except RuntimeError:
    print("✓ Multiprocessing start method already set")

# Detect available GPUs
print(f"\nGPU Detection:")
print(f"  PyTorch version: {torch.__version__}")
print(f"  CUDA available: {torch.cuda.is_available()}")
print(f"  CUDA version: {torch.version.cuda}")
print(f"  GPUs detected: {torch.cuda.device_count()}")

# Display GPU info
for i in range(torch.cuda.device_count()):
    props = torch.cuda.get_device_properties(i)
    free_mem = torch.cuda.mem_get_info(i)[0] / (1024**3)
    total_mem = props.total_memory / (1024**3)
    print(f"  GPU {i}: {props.name} ({free_mem:.1f} GB free / {total_mem:.1f} GB total)")

In [None]:
# ============================================================================
# CELL 2: Import Libraries
# ============================================================================
# These imports are for the MAIN process only.
# Worker processes will import their own copies.

import numpy as np
import json
import time
import gc
import traceback
from datetime import datetime
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed

# Add src to path
sys.path.append(os.path.abspath('../src'))

print("✓ Libraries imported")

In [None]:
# ============================================================================
# CELL 3: Configuration
# ============================================================================
# Adjust these settings based on your needs

# Parallelization settings
NUM_WORKERS = 8          # One worker per GPU (optimal for your setup)
GPU_IDS = [0, 1, 2, 3, 4, 5, 6, 7]  # All 8 GPUs

# Scene settings
PARENT_FOLDER = "../scene/scenes"
MAX_SCENES = 50          # Process first N scenes (set to None for all)

# Output settings
OUTPUT_DIR = "./parallel_results"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Zone validation thresholds (same as original notebook)
VALIDATION_THRESHOLDS = {
    'p10_min_dbm': -270.0,
    'p10_max_dbm': -80.0,
    'p90_min_dbm': -130.0,
    'min_percentile_range_db': 15.0,
}

# Get scene directories
scene_dirs = sorted([
    d for d in os.listdir(PARENT_FOLDER)
    if os.path.isdir(os.path.join(PARENT_FOLDER, d))
])

if MAX_SCENES:
    scene_dirs = scene_dirs[:MAX_SCENES]

print(f"Configuration:")
print(f"  Workers: {NUM_WORKERS}")
print(f"  GPUs: {GPU_IDS}")
print(f"  Scenes to process: {len(scene_dirs)}")
print(f"  Output directory: {OUTPUT_DIR}")
print(f"\nScenes: {scene_dirs[:5]}... (showing first 5)")

## The Worker Function

The worker function is defined in **`src/parallel_worker.py`** (not in this notebook).

### Why a Separate File?

This is a fundamental limitation of **Jupyter + multiprocessing + spawn**:

1. When you define a function in a notebook cell, it lives in `__main__`
2. When multiprocessing spawns a child process, it creates a fresh Python interpreter
3. The child process tries to import the worker function, but `__main__` doesn't exist there
4. **Result:** `AttributeError: Can't get attribute 'process_single_scene' on <module '__main__'>`

**Solution:** Put the worker in `src/parallel_worker.py` where it can be imported.

### What the Worker Does

```
Main Process                    Worker Process 0 (GPU 0)
    │                               │
    │──── spawn ────────────────────│
    │                               │── set CUDA_VISIBLE_DEVICES=0
    │                               │── from parallel_worker import ...
    │                               │── import torch, sionna, mitsuba
    │                               │── load_scene("new_york")
    │                               │── optimize_boresight()
    │                               │── evaluate_radiomap()
    │◄─── return results ───────────│
    │                               │── exit
```

Each worker:
1. **Sets CUDA_VISIBLE_DEVICES** before importing GPU libraries
2. **Imports libraries fresh** to get a clean CUDA context
3. **Processes one scene** completely
4. **Returns serializable results** (lists, not numpy arrays)

In [None]:
# ============================================================================
# CELL 4: Import Worker Function
# ============================================================================
# The worker function is defined in a SEPARATE FILE (src/parallel_worker.py)
# 
# WHY? When Python multiprocessing uses 'spawn' (required for CUDA), it creates
# fresh Python interpreters. These child processes need to IMPORT the worker
# function. But Jupyter notebooks run as `__main__`, which doesn't exist as an
# importable module in child processes.
#
# By putting the worker in src/parallel_worker.py, child processes can do:
#   from parallel_worker import process_single_scene
#
# This is a fundamental limitation of Jupyter + multiprocessing with spawn.

from parallel_worker import process_single_scene

print("✓ Worker function imported from src/parallel_worker.py")
print("  This allows spawned processes to import the function correctly.")

## The Parallel Executor

We use `ProcessPoolExecutor` from Python's `concurrent.futures` module:

```python
with ProcessPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(func, args) for args in work_items]
    for future in as_completed(futures):
        result = future.result()  # Get result when ready
```

**Key features:**
- `max_workers=8` creates 8 worker processes
- `as_completed()` yields results as they finish (not in order)
- Automatic process lifecycle management
- Exception handling per-task

In [None]:
# ============================================================================
# CELL 5: Run Parallel Optimization
# ============================================================================

def run_parallel_optimization(scene_dirs, parent_folder, output_dir, num_workers, gpu_ids, validation_thresholds):
    """
    Run optimization across multiple scenes in parallel.
    
    This function:
    1. Prepares work items with GPU assignments
    2. Spawns worker processes
    3. Collects results as they complete
    4. Saves incremental progress
    """
    print("=" * 80)
    print("PARALLEL SCENE OPTIMIZATION")
    print("=" * 80)
    print(f"\nScenes: {len(scene_dirs)} | Workers: {num_workers} | GPUs: {gpu_ids}")
    print()
    
    # =========================================================================
    # Prepare work items with ROUND-ROBIN GPU assignment
    # =========================================================================
    # Scene 0 → GPU 0, Scene 1 → GPU 1, ..., Scene 8 → GPU 0, etc.
    work_items = [
        (
            scene_name,
            gpu_ids[i % len(gpu_ids)],  # Round-robin
            validation_thresholds,
            parent_folder,
            output_dir
        )
        for i, scene_name in enumerate(scene_dirs)
    ]
    
    results = {}
    start_time = time.time()
    completed = 0
    
    # =========================================================================
    # Execute in parallel using ProcessPoolExecutor
    # =========================================================================
    print(f"Starting {num_workers} worker processes...")
    print("-" * 80)
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # Submit all tasks to the pool
        # Each task gets queued and assigned to an available worker
        future_to_scene = {
            executor.submit(process_single_scene, item): item[0]
            for item in work_items
        }
        
        # Process results as they complete (not in submission order)
        # This allows us to update progress as each scene finishes
        for future in as_completed(future_to_scene):
            scene_name = future_to_scene[future]
            completed += 1
            
            try:
                _, result = future.result()
                results[scene_name] = result
                
                # Print status
                status = result.get('status', 'unknown')
                if status == 'success':
                    improvement = result['stats']['improvement_mean']
                    elapsed = result['elapsed_time']
                    print(f"✓ [{completed}/{len(scene_dirs)}] {scene_name}: "
                          f"+{improvement:.2f} dB in {elapsed:.1f}s (GPU {result['gpu_id']})")
                else:
                    reason = result.get('reason', 'Unknown error')[:50]
                    print(f"✗ [{completed}/{len(scene_dirs)}] {scene_name}: {status} - {reason}")
                
                # Save incremental progress
                with open(os.path.join(output_dir, 'results_progress.json'), 'w') as f:
                    json.dump(results, f, indent=2)
                    
            except Exception as e:
                print(f"✗ [{completed}/{len(scene_dirs)}] {scene_name}: Exception - {e}")
                results[scene_name] = {'status': 'error', 'reason': str(e)}
            
            # Progress estimate
            elapsed = time.time() - start_time
            rate = completed / elapsed
            remaining = len(scene_dirs) - completed
            eta = remaining / rate if rate > 0 else 0
            print(f"   Progress: {100*completed/len(scene_dirs):.0f}% | "
                  f"Rate: {rate*60:.1f} scenes/min | ETA: {eta/60:.1f} min")
    
    # =========================================================================
    # Final summary
    # =========================================================================
    total_time = time.time() - start_time
    successful = sum(1 for r in results.values() if r.get('status') == 'success')
    failed = sum(1 for r in results.values() if r.get('status') == 'failed')
    errors = sum(1 for r in results.values() if r.get('status') == 'error')
    
    print("\n" + "=" * 80)
    print("SUMMARY")
    print("=" * 80)
    print(f"Total time:     {total_time/60:.1f} minutes ({total_time:.0f} seconds)")
    print(f"Successful:     {successful}/{len(scene_dirs)}")
    print(f"Failed zones:   {failed}")
    print(f"Errors:         {errors}")
    
    if successful > 0:
        improvements = [
            r['stats']['improvement_mean']
            for r in results.values()
            if r.get('status') == 'success'
        ]
        print(f"\nImprovement Statistics:")
        print(f"  Mean:   {np.mean(improvements):+.2f} dB")
        print(f"  Min:    {np.min(improvements):+.2f} dB")
        print(f"  Max:    {np.max(improvements):+.2f} dB")
    
    # Save final results
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    final_file = os.path.join(output_dir, f'results_final_{timestamp}.json')
    with open(final_file, 'w') as f:
        json.dump(results, f, indent=2)
    print(f"\nResults saved to: {final_file}")
    
    return results

In [None]:
# ============================================================================
# CELL 6: Execute Parallel Optimization
# ============================================================================
# This is the main execution cell - run this to start processing!

results = run_parallel_optimization(
    scene_dirs=scene_dirs,
    parent_folder=PARENT_FOLDER,
    output_dir=OUTPUT_DIR,
    num_workers=NUM_WORKERS,
    gpu_ids=GPU_IDS,
    validation_thresholds=VALIDATION_THRESHOLDS
)

---

## Results Analysis

The following cells analyze the results from parallel processing.

In [None]:
# ============================================================================
# CELL 7: Results Analysis
# ============================================================================

import matplotlib.pyplot as plt

# Filter successful results
successful_results = {k: v for k, v in results.items() if v.get('status') == 'success'}
failed_results = {k: v for k, v in results.items() if v.get('status') != 'success'}

print(f"Successful: {len(successful_results)} / {len(results)}")
print(f"Failed: {len(failed_results)}")

if failed_results:
    print("\nFailed scenes:")
    for name, data in failed_results.items():
        print(f"  - {name}: {data.get('reason', 'Unknown')[:60]}")

if successful_results:
    # Collect all zone power values
    all_initial = np.concatenate([np.array(r['zone_power_initial']) for r in successful_results.values()])
    all_optimized = np.concatenate([np.array(r['zone_power_optimized']) for r in successful_results.values()])
    
    # Per-scene improvements
    improvements = [r['stats']['improvement_mean'] for r in successful_results.values()]
    
    print(f"\n" + "="*60)
    print("AGGREGATE STATISTICS")
    print("="*60)
    print(f"\nInitial Configuration:")
    print(f"  Mean:   {np.mean(all_initial):.2f} dBm")
    print(f"  Median: {np.median(all_initial):.2f} dBm")
    print(f"  P10:    {np.percentile(all_initial, 10):.2f} dBm")
    
    print(f"\nOptimized Configuration:")
    print(f"  Mean:   {np.mean(all_optimized):.2f} dBm")
    print(f"  Median: {np.median(all_optimized):.2f} dBm")
    print(f"  P10:    {np.percentile(all_optimized, 10):.2f} dBm")
    
    print(f"\nImprovement:")
    print(f"  Mean:   {np.mean(all_optimized) - np.mean(all_initial):+.2f} dB")
    print(f"  Median: {np.median(all_optimized) - np.median(all_initial):+.2f} dB")
    print(f"  P10:    {np.percentile(all_optimized, 10) - np.percentile(all_initial, 10):+.2f} dB")

In [None]:
# ============================================================================
# CELL 8: Visualization
# ============================================================================

if successful_results:
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    # CDF comparison
    ax1 = axes[0]
    sorted_initial = np.sort(all_initial)
    sorted_optimized = np.sort(all_optimized)
    cdf = np.arange(1, len(sorted_initial) + 1) / len(sorted_initial)
    
    ax1.plot(sorted_initial, cdf * 100, linewidth=2, label='Initial', alpha=0.7)
    ax1.plot(sorted_optimized, cdf * 100, linewidth=2, label='Optimized', alpha=0.7)
    ax1.axhline(y=50, color='gray', linestyle='--', alpha=0.3)
    ax1.axhline(y=10, color='red', linestyle='--', alpha=0.3)
    ax1.set_xlabel('Signal Strength (dBm)')
    ax1.set_ylabel('Cumulative Probability (%)')
    ax1.set_title(f'CDF: Initial vs Optimized\n({len(successful_results)} scenes)')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Improvement histogram
    ax2 = axes[1]
    ax2.hist(improvements, bins=20, edgecolor='black', alpha=0.7)
    ax2.axvline(x=np.mean(improvements), color='red', linestyle='--', 
                label=f'Mean: {np.mean(improvements):+.2f} dB')
    ax2.axvline(x=0, color='gray', linestyle='-', alpha=0.5)
    ax2.set_xlabel('Mean Power Improvement (dB)')
    ax2.set_ylabel('Number of Scenes')
    ax2.set_title('Distribution of Improvements')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    # Processing time by GPU
    ax3 = axes[2]
    gpu_times = {}
    for name, data in successful_results.items():
        gpu_id = data.get('gpu_id', 0)
        if gpu_id not in gpu_times:
            gpu_times[gpu_id] = []
        gpu_times[gpu_id].append(data['elapsed_time'])
    
    gpu_ids_sorted = sorted(gpu_times.keys())
    avg_times = [np.mean(gpu_times[g]) for g in gpu_ids_sorted]
    ax3.bar(gpu_ids_sorted, avg_times, edgecolor='black', alpha=0.7)
    ax3.set_xlabel('GPU ID')
    ax3.set_ylabel('Average Processing Time (s)')
    ax3.set_title('Processing Time by GPU')
    ax3.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.savefig(os.path.join(OUTPUT_DIR, 'parallel_results_summary.png'), dpi=150)
    plt.show()
    
    print(f"\nPlot saved to: {OUTPUT_DIR}/parallel_results_summary.png")

---

## Troubleshooting

### Common Issues

**1. "Can't get attribute 'process_single_scene' on module '__main__'"**
- This happens when the worker function is defined in the notebook instead of a separate file
- **Solution:** The worker is now in `src/parallel_worker.py` - make sure you import it, not define it
- Restart the kernel and run cells in order

**2. "CUDA out of memory"**
- Reduce `NUM_WORKERS` to run fewer scenes simultaneously
- Each scene uses ~2-8 GB GPU memory

**3. "Cannot re-initialize CUDA in forked subprocess"**
- Make sure `mp.set_start_method('spawn')` runs BEFORE any CUDA imports
- Restart the kernel and run cells in order

**4. "Processes hang or freeze"**
- Check `nvidia-smi` for GPU utilization
- One scene may have complex geometry causing long ray tracing
- Consider adding timeout to `future.result(timeout=3600)`

**5. "Results lost on error"**
- Incremental progress is saved to `results_progress.json`
- Load it with: `results = json.load(open('parallel_results/results_progress.json'))`

**6. "ModuleNotFoundError: No module named 'parallel_worker'"**
- Make sure Cell 2 adds `../src` to the path before Cell 4 imports the worker

In [None]:
# ============================================================================
# CELL 9: GPU Monitoring (run while processing)
# ============================================================================
# Run this cell in a separate terminal or notebook to monitor GPU usage

# Uncomment to run nvidia-smi:
# !nvidia-smi

# Or for continuous monitoring (interrupt with Ctrl+C):
# !watch -n 1 nvidia-smi