# Systematic Perturbation Analysis

This notebook demonstrates Scene2Sim's systematic perturbation capabilities for counterfactual analysis.

## Learning Objectives
- Apply temporal, speed, and spatial perturbations
- Conduct parameter sensitivity analysis
- Identify critical parameter ranges
- Generate adversarial scenarios for safety testing

In [None]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.linear_model import LogisticRegression

from Scene2Sim import load_scenario, ADSimulator
from Scene2Sim.core.perturbations import PerturbationEngine

plt.style.use('seaborn-v0_8-paper')
sns.set_palette('husl')

In [None]:
# Load base scenario and initialize perturbation engine
base_scene = load_scenario("scenarios/pedestrian_crossing.json", "crossing_001")
perturb_engine = PerturbationEngine(random_seed=42)

print(f"Base scenario: {base_scene.id}")
print(f"Agents: {list(base_scene.agents.keys())}")

# Run baseline simulation
baseline_log = ADSimulator(base_scene, random_seed=42).run(headless=True)
print(f"\nBaseline safety: {' SAFE' if baseline_log.metrics.get('is_safe', False) else 'UNSAFE'}")
print(f"Baseline TTC: {baseline_log.metrics.get('min_ttc', np.inf):.2f}s")

In [None]:
# Temporal perturbation analysis
delays = np.linspace(-3.0, 3.0, 31)
time_results = []
print(f"Testing {len(delays)} time delays...")

for i, delay in enumerate(delays):
    perturbed_scene = perturb_engine.temporal_shift(base_scene, "ped_0", delay)
    log = ADSimulator(perturbed_scene, random_seed=42).run(headless=True)
    metrics = log.metrics
    time_results.append({
        'delay': delay,
        'is_safe': metrics.get('is_safe', False),
        'n_collisions': metrics.get('n_collisions', 0),
        'min_ttc': metrics.get('min_ttc', np.inf),
        'min_distance': metrics.get('min_distance_overall', np.inf),
        'n_ttc_events': metrics.get('n_ttc_events', 0)
    })
    if (i + 1) % 10 == 0:
        print(f"  Completed {i + 1}/{len(delays)}")

time_df = pd.DataFrame(time_results)
print(f"\n Temporal analysis complete!")
print(f"Collision rate: {(time_df['n_collisions'] > 0).mean():.1%}")

In [None]:
# Plot temporal sensitivity
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Safety vs Delay
axes[0,0].scatter(time_df[time_df['n_collisions'] == 0]['delay'], [1]*len(time_df[time_df['n_collisions'] == 0]), color='green', label='Safe')
axes[0,0].scatter(time_df[time_df['n_collisions'] > 0]['delay'], [0]*len(time_df[time_df['n_collisions'] > 0]), color='red', label='Collision')
axes[0,0].set_ylim(-0.1,1.1); axes[0,0].set_xlabel('Time Delay (s)'); axes[0,0].set_ylabel('Safety Status'); axes[0,0].legend(); axes[0,0].grid(True, alpha=0.3)
axes[0,0].set_title('Safety vs Time Delay')

# TTC vs Delay
valid_ttc = time_df[time_df['min_ttc'] < np.inf]
axes[0,1].scatter(valid_ttc['delay'], valid_ttc['min_ttc'], alpha=0.7)
axes[0,1].axhline(3.0, color='red', linestyle='--', label='Critical TTC'); axes[0,1].set_xlabel('Time Delay (s)'); axes[0,1].set_ylabel('Minimum TTC (s)'); axes[0,1].legend(); axes[0,1].grid(True, alpha=0.3)
axes[0,1].set_title('TTC vs Time Delay')

# Distance vs Delay
valid_dist = time_df[time_df['min_distance'] < np.inf]
axes[1,0].scatter(valid_dist['delay'], valid_dist['min_distance'], alpha=0.7)
axes[1,0].axhline(1.0, color='orange', linestyle='--', label='Safety margin'); axes[1,0].set_xlabel('Time Delay (s)'); axes[1,0].set_ylabel('Min Distance (m)'); axes[1,0].legend(); axes[1,0].grid(True, alpha=0.3)
axes[1,0].set_title('Distance vs Time Delay')

# Collision probability
collision_rate = time_df.groupby('delay')['n_collisions'].apply(lambda x: (x>0).mean())
axes[1,1].plot(collision_rate.index, collision_rate.values, color='red', linewidth=3)
axes[1,1].fill_between(collision_rate.index, collision_rate.values, alpha=0.3, color='red')
axes[1,1].set_xlabel('Time Delay (s)'); axes[1,1].set_ylabel('Collision Probability'); axes[1,1].set_title('Risk Profile'); axes[1,1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('temporal_sensitivity_analysis.png', dpi=150, bbox_inches='tight')
plt.show()

# Critical delay range
unsafe_delays = time_df[time_df['n_collisions'] > 0]['delay']
if not unsafe_delays.empty:
    print(f"Critical delay range: {unsafe_delays.min():.1f}s to {unsafe_delays.max():.1f}s")
else:
    print("No collisions in tested range")

In [None]:
# Speed perturbation analysis
speed_scales = np.logspace(-0.4, 0.4, 25)
speed_results = []
for i, scale in enumerate(speed_scales):
    perturbed_scene = perturb_engine.speed_scaling(base_scene, 'ped_0', scale)
    log = ADSimulator(perturbed_scene, random_seed=42).run(headless=True)
    metrics = log.metrics
    speed_results.append({
        'speed_scale': scale,
        'is_safe': metrics.get('is_safe', False),
        'n_collisions': metrics.get('n_collisions', 0),
        'min_ttc': metrics.get('min_ttc', np.inf),
        'min_distance': metrics.get('min_distance_overall', np.inf)
    })
speed_df = pd.DataFrame(speed_results)
print(f"Speed analysis complete! Collision rate: {(speed_df['n_collisions']>0).mean():.1%}")

In [None]:
# Multi-dimensional 2D parameter space
delay_range = np.linspace(-2, 2, 11)
speed_range = np.linspace(0.7, 1.5, 11)
combined_results = []
total = len(delay_range)*len(speed_range)
progress = 0
for d in delay_range:
    for s in speed_range:
        scene_temp = perturb_engine.temporal_shift(base_scene, 'ped_0', d)
        scene_temp = perturb_engine.speed_scaling(scene_temp, 'ped_0', s)
        log = ADSimulator(scene_temp, random_seed=42).run(headless=True)
        metrics = log.metrics
        combined_results.append({
            'delay': d,
            'speed_scale': s,
            'is_safe': metrics.get('is_safe', False),
            'n_collisions': metrics.get('n_collisions', 0),
            'min_ttc': metrics.get('min_ttc', np.inf)
        })
        progress += 1
        if progress % 20 == 0:
            print(f"Progress: {progress}/{total}")
combined_df = pd.DataFrame(combined_results)
print('Combined analysis complete!')

In [None]:
# Heatmaps
fig, axes = plt.subplots(1,2, figsize=(15,6))
collision_pivot = combined_df.pivot_table(index='delay', columns='speed_scale', values='n_collisions', aggfunc='sum')
im1 = axes[0].imshow(collision_pivot.values, cmap='Reds', aspect='auto', extent=[speed_range.min(), speed_range.max(), delay_range.max(), delay_range.min()])
axes[0].set_title('Collision Count Heatmap'); axes[0].set_xlabel('Speed Scale'); axes[0].set_ylabel('Time Delay (s)'); plt.colorbar(im1, ax=axes[0])
ttc_pivot = combined_df[combined_df['min_ttc']<np.inf].pivot_table(index='delay', columns='speed_scale', values='min_ttc', aggfunc='min')
im2 = axes[1-1].imshow(ttc_pivot.values, cmap='viridis', aspect='auto', extent=[speed_range.min(), speed_range.max(), delay_range.max(), delay_range.min()])
axes[1].set_title('Min TTC Heatmap'); axes[1].set_xlabel('Speed Scale'); axes[1].set_ylabel('Time Delay (s)'); plt.colorbar(im2, ax=axes[1])
plt.tight_layout(); plt.savefig('combined_heatmaps.png', dpi=150, bbox_inches='tight'); plt.show()