# Adaptive Cruise Control (ACC) with PPO and Adversarial Attacks

This notebook contains the complete implementation of an Adaptive Cruise Control system using PPO reinforcement learning, along with adversarial attack methods (FGSM and OIA).

## 1. Environment Definition

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np


class ACCEnv(gym.Env):
    
    metadata = {"render_modes": ["human"]}
    
    # Physical constants
    DT = 0.1    # time step (s)
    TH = 1.5    # desired time headway (s)
    D0 = 5.0    # standstill distance (m)
    V_REF = 15.0    # target speed (m/s)
    A_MIN = -3.5    # braking limit (m/s^2)
    A_MAX = 2.0    # acceleration limit (m/s^2)
    
    # State space bounds (for Box space definition)
    OBS_LOW = np.array([0.0, -30.0, 0.0], dtype=np.float32)
    OBS_HIGH = np.array([200.0, 30.0, 40.0], dtype=np.float32)
    
    def __init__(self):
        super().__init__()
        
        self.observation_space = spaces.Box(
            low=self.OBS_LOW, 
            high=self.OBS_HIGH, 
            dtype=np.float32
        )
        self.action_space = spaces.Box(
            low=np.array([self.A_MIN], dtype=np.float32),
            high=np.array([self.A_MAX], dtype=np.float32),
            dtype=np.float32
        )
        
        # Episode settings
        self.max_steps = 400
        
        # State variables (will be initialized in reset)
        self.x_ego = 0.0
        self.x_lead = 0.0
        self.v_ego = 0.0
        self.v_lead = 0.0
        self.a_ego = 0.0
        self.current_step = 0
        
        # Lead vehicle deceleration profile (for testing scenarios)
        self.lead_decel_active = False
        self.lead_decel_start = None
        self.lead_decel_duration = None
        self.lead_decel_value = None
        
        # Attack interface: allows external override of observation for safety filter
        self._safety_obs_override = None
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        # Initialize state with small random variations
        self.x_ego = 0.0
        self.x_lead = 30.0 + self.np_random.uniform(-1.0, 1.0)
        self.v_ego = self.V_REF - 0.5 + self.np_random.uniform(-0.2, 0.2)
        self.v_lead = self.V_REF + self.np_random.uniform(-0.5, 0.5)
        self.a_ego = 0.0
        self.current_step = 0
        
        # Reset lead vehicle deceleration profile
        self.lead_decel_active = False
        self.lead_decel_start = None
        self.lead_decel_duration = None
        self.lead_decel_value = None
        
        # Clear attack override
        self._safety_obs_override = None
        
        obs = self._get_obs()
        return obs, {}
    
    def _get_obs(self):
        """Return current observation [dx, dv, v]"""
        dx = self.x_lead - self.x_ego
        dv = self.v_lead - self.v_ego
        v = self.v_ego
        return np.array([dx, dv, v], dtype=np.float32)
    
    def set_safety_obs_for_filter(self, obs_adv):
        self._safety_obs_override = np.array(obs_adv, dtype=np.float32)
    
    def _compute_safe_action(self, obs_for_filter):
        dx, dv, v = obs_for_filter[0], obs_for_filter[1], obs_for_filter[2]
        
        numerator = dx - self.TH * v + (self.v_lead - v) * self.DT
        denominator = self.TH * self.DT
        
        if denominator <= 0:
            return self.A_MIN
        
        a_max_safe = numerator / denominator
        return float(np.clip(a_max_safe, self.A_MIN, self.A_MAX))
    
    def _apply_safety_filter(self, action_rl):
        # Determine which observation to use for safety filter
        if self._safety_obs_override is not None:
            obs_for_filter = self._safety_obs_override
            self._safety_obs_override = None  # consume override
        else:
            obs_for_filter = self._get_obs()
        
        # Compute maximum safe action and clamp
        a_max_safe = self._compute_safe_action(obs_for_filter)
        a_safe = min(action_rl, a_max_safe)
        a_safe = np.clip(a_safe, self.A_MIN, self.A_MAX)
        
        return float(a_safe)
    
    def step(self, action):
        # Extract scalar action
        a_rl = float(action[0]) if isinstance(action, np.ndarray) else float(action)
        
        # Apply safety filter
        a_safe = self._apply_safety_filter(a_rl)
        
        # Update ego vehicle (forward Euler integration)
        self.x_ego += self.v_ego * self.DT + 0.5 * a_safe * self.DT ** 2
        self.v_ego = np.clip(self.v_ego + a_safe * self.DT, 0.0, 100.0)
        self.a_ego = a_safe
        
        # Update lead vehicle
        if self.lead_decel_active:
            t = self.current_step * self.DT
            if self.lead_decel_start <= t < self.lead_decel_start + self.lead_decel_duration:
                # Apply deceleration
                self.v_lead = max(0.0, self.v_lead + self.lead_decel_value * self.DT)
        
        self.x_lead += self.v_lead * self.DT
        
        # Get new observation
        obs = self._get_obs()
        
        # Check collision
        collision = obs[0] <= 0.0
        
        # Compute reward
        speed_error = self.v_ego - self.V_REF
        d_safe = self.D0 + self.TH * self.v_ego
        safe_violation = max(0.0, d_safe - obs[0])
        
        reward = -(
            0.5 * speed_error ** 2 +
            2.0 * safe_violation ** 2 +
            0.01 * a_safe ** 2
        )
        
        # Update step counter
        self.current_step += 1
        
        # Check termination
        terminated = collision
        truncated = self.current_step >= self.max_steps
        
        info = {
            "collision": collision,
            "dx": float(obs[0]),
            "ego_v": float(obs[2]),
            "lead_v": float(self.v_lead),
            "applied_action": float(a_safe),
            "rl_action": float(a_rl),
        }
        
        return obs, float(reward), terminated, truncated, info
    
    def render(self):
        dx = self.x_lead - self.x_ego
        print(f"Step {self.current_step:3d} | dx={dx:6.2f}m | "
              f"v_ego={self.v_ego:5.2f}m/s | v_lead={self.v_lead:5.2f}m/s | "
              f"a={self.a_ego:5.2f}m/s²")
    
    def activate_lead_deceleration(self, start_time=5.0, duration=3.0, decel=-2.0):
        self.lead_decel_active = True
        self.lead_decel_start = start_time
        self.lead_decel_duration = duration
        self.lead_decel_value = decel

## 2. Adversarial Attacks

In [None]:
import torch
import numpy as np
import gymnasium as gym
from gymnasium import spaces


class AttackBase:
    def __init__(self, model, epsilon=0.01):
        self.model = model
        self.epsilon = epsilon
        self.device = next(model.policy.parameters()).device
    
    def perturb(self, obs):
        raise NotImplementedError


class FGSM(AttackBase):
    def perturb(self, obs):
        obs_np = np.array(obs, dtype=np.float32)
        is_batched = len(obs_np.shape) == 2
        
        if not is_batched:
            obs_np = obs_np.reshape(1, -1)
        
        # Convert to torch tensor
        obs_tensor = torch.tensor(obs_np, dtype=torch.float32, device=self.device)
        obs_tensor.requires_grad = True
        
        # Forward pass through policy to get mean action
        with torch.enable_grad():
            # Get policy features and mean action
            features = self.model.policy.extract_features(obs_tensor)
            latent_pi = self.model.policy.mlp_extractor.forward_actor(features)
            mean_actions = self.model.policy.action_net(latent_pi)
            loss = mean_actions.sum()
            loss.backward()
        
        # Get gradient and compute perturbation
        grad = obs_tensor.grad.cpu().numpy()
        perturbation = self.epsilon * np.sign(grad)
        
        # Apply perturbation and clip to valid range [-1, 1]
        obs_adv = obs_np + perturbation
        obs_adv = np.clip(obs_adv, -1.0, 1.0)
        
        # Return in original shape
        if not is_batched:
            obs_adv = obs_adv.squeeze(0)
        
        return obs_adv


class OIA(AttackBase):
    def perturb(self, obs):
        # Handle both single obs and batched obs
        obs_np = np.array(obs, dtype=np.float32)
        is_batched = len(obs_np.shape) == 2
        
        if not is_batched:
            obs_np = obs_np.reshape(1, -1)
        
        # Convert to torch tensor
        obs_tensor = torch.tensor(obs_np, dtype=torch.float32, device=self.device)
        obs_tensor.requires_grad = True
        
        # Forward pass through value network
        with torch.enable_grad():
            # Get value estimate
            features = self.model.policy.extract_features(obs_tensor)
            latent_vf = self.model.policy.mlp_extractor.forward_critic(features)
            value = self.model.policy.value_net(latent_vf)
            
            # Compute gradient of value w.r.t. observation
            value.sum().backward()
        
        # Get gradient and compute perturbation
        grad = obs_tensor.grad.cpu().numpy()
        perturbation = self.epsilon * np.sign(grad)
        
        # Apply perturbation and clip to valid range [-1, 1]
        obs_adv = obs_np + perturbation
        obs_adv = np.clip(obs_adv, -1.0, 1.0)
        
        # Return in original shape
        if not is_batched:
            obs_adv = obs_adv.squeeze(0)
        
        return obs_adv


def create_attack(attack_type, model, epsilon=0.01):
    if attack_type.lower() == 'fgsm':
        return FGSM(model, epsilon)
    elif attack_type.lower() == 'oia':
        return OIA(model, epsilon)
    else:
        raise ValueError(f"Unknown attack type: {attack_type}")


class AttackWrapper(gym.Wrapper):
    def __init__(self, env, attack, apply_to_safety_filter=True):
        super().__init__(env)
        self.attack = attack
        self.apply_to_safety_filter = apply_to_safety_filter
        
        # Track original and perturbed observations for RMSE computation
        self.original_obs = None
        self.perturbed_obs = None
        self.step_rmse_values = []
        self.episode_rmse = 0.0
    
    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        
        # Store original observation
        self.original_obs = obs.copy()
        
        # Perturb observation
        self.perturbed_obs = self.attack.perturb(obs)
        
        # Compute step RMSE
        step_rmse = np.sqrt(np.mean((self.perturbed_obs - self.original_obs) ** 2))
        self.step_rmse_values = [step_rmse]
        self.episode_rmse = step_rmse
        
        # Apply to safety filter if enabled
        if self.apply_to_safety_filter and hasattr(self.env, 'set_safety_obs_for_filter'):
            self.env.set_safety_obs_for_filter(self.perturbed_obs)
        
        return self.perturbed_obs, info
    
    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.original_obs = obs.copy()
        self.perturbed_obs = self.attack.perturb(obs)
        
        # Compute step RMSE
        step_rmse = np.sqrt(np.mean((self.perturbed_obs - self.original_obs) ** 2))
        self.step_rmse_values.append(step_rmse)
        
        # Apply to safety filter if enabled
        if self.apply_to_safety_filter and hasattr(self.env, 'set_safety_obs_for_filter'):
            self.env.set_safety_obs_for_filter(self.perturbed_obs)
        
        # Add RMSE to info
        info['step_rmse'] = step_rmse
        info['episode_rmse_mean'] = np.mean(self.step_rmse_values)
        
        return self.perturbed_obs, reward, terminated, truncated, info
    
    def get_episode_rmse(self):
        return np.mean(self.step_rmse_values) if self.step_rmse_values else 0.0


class FGSMAttackWrapper(AttackWrapper):
    def __init__(self, env, model, epsilon=0.01):
        attack = FGSM(model, epsilon)
        super().__init__(env, attack, apply_to_safety_filter=True)


class OIAAttackWrapper(AttackWrapper):
    def __init__(self, env, model, epsilon=0.01):
        attack = OIA(model, epsilon)
        super().__init__(env, attack, apply_to_safety_filter=True)

## 3. Training

In [None]:
import os
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.callbacks import CheckpointCallback
# from acc_env import ACCEnv  # Already defined above


def make_env():
    """Create a single ACC environment instance"""
    return ACCEnv()


def train_ppo(
    total_timesteps=200000,
    n_envs=8,
    save_dir="models",
    save_freq=50000
):
    """
    Train PPO agent with vectorized environments and observation normalization.
    """
    os.makedirs(save_dir, exist_ok=True)
    
    print("Creating vectorized environments...")
    # Create vectorized environment
    vec_env = DummyVecEnv([make_env for _ in range(n_envs)])
    vec_env = VecNormalize(
        vec_env,
        norm_obs=True,
        norm_reward=False,
        clip_obs=10.0,
        gamma=0.99
    )
    
    print("Creating PPO model...")
    # PPO hyperparameters (from assignment suggestion)
    model = PPO(
        "MlpPolicy",
        vec_env,
        learning_rate=3e-4,
        n_steps=128,
        batch_size=128,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        ent_coef=0.0,
        vf_coef=0.5,
        max_grad_norm=0.5,
        verbose=1,
        tensorboard_log="./logs"
    )
    
    # Setup checkpoint callback
    checkpoint_callback = CheckpointCallback(
        save_freq=save_freq // n_envs,  # adjust for parallel envs
        save_path=save_dir,
        name_prefix="ppo_acc"
    )
    
    print(f"Training for {total_timesteps} timesteps...")
    model.learn(
        total_timesteps=total_timesteps,
        callback=checkpoint_callback,
        progress_bar=True
    )
    
    # Save final model and normalization stats
    model.save(os.path.join(save_dir, "ppo_acc_final"))
    vec_env.save(os.path.join(save_dir, "vec_normalize.pkl"))
    
    print(f"\nTraining complete. Model saved to {save_dir}")
    print(f"Observation mean: {vec_env.obs_rms.mean}")
    print(f"Observation var: {vec_env.obs_rms.var}")
    
    return model, vec_env

### Run Training



In [None]:
# Train with default settings
model, vec_env = train_ppo(
    total_timesteps=200000,
    n_envs=8,
    save_dir="models",
    save_freq=50000
)
print("\nTraining finished successfully!")

## 4. Evaluation

In [None]:
import os
import json
import numpy as np
import argparse
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
# from acc_env import ACCEnv  # Already defined above
# from attacks import create_attack  # Already defined above


def evaluate_single_condition(vec_env, base_env, model, attack, n_episodes, 
                              scenario_config, verbose=True):
    """
    Evaluate agent for one condition (baseline or attack).
    """
    collisions = []
    returns = []
    min_dx_values = []
    episodes_data = []
    rmse_values = []  # Track RMSE for each episode
    
    for ep in range(n_episodes):
        obs = vec_env.reset()
        
        # Configure scenario
        if scenario_config.get('use_lead_decel', False):
            base_env.activate_lead_deceleration(
                start_time=scenario_config.get('decel_start', 5.0),
                duration=scenario_config.get('decel_duration', 3.0),
                decel=scenario_config.get('decel_value', -2.0)
            )
        
        episode_return = 0.0
        episode_collision = False
        min_dx = float('inf')
        episode_rmse_steps = []  # RMSE for each step in episode
        
        # Store trajectory for first 3 episodes
        if ep < 3:
            trajectory = {
                't': [], 'dx': [], 'dv': [], 'v': [],
                'rl_action': [], 'applied_action': [], 'lead_v': [],
                'step_rmse': []  # Add RMSE tracking
            }
        else:
            trajectory = None
        
        done = False
        step = 0
        
        while not done:
            # Store original observation for RMSE calculation
            obs_original = obs.copy()
            obs_for_policy = obs.copy()
            
            # Apply attack if present
            if attack is not None:
                obs_adv = attack.perturb(obs[0])
                base_env.set_safety_obs_for_filter(obs_adv)
                obs_for_policy = obs_adv.reshape(1, -1)
                
                # Compute step RMSE (between original and perturbed observations)
                step_rmse = np.sqrt(np.mean((obs_adv - obs[0]) ** 2))
                episode_rmse_steps.append(step_rmse)
            else:
                step_rmse = 0.0  # No perturbation for baseline
            
            # Get action from policy
            action, _ = model.predict(obs_for_policy, deterministic=True)
            
            # Step environment
            obs, reward, terminated, info = vec_env.step(action)
            
            episode_return += reward[0]
            min_dx = min(min_dx, info[0]["dx"])
            
            if info[0].get("collision", False):
                episode_collision = True
            
            # Record trajectory
            if trajectory is not None:
                trajectory['t'].append(step * ACCEnv.DT)
                trajectory['dx'].append(info[0]["dx"])
                trajectory['dv'].append(info[0]["lead_v"] - info[0]["ego_v"])
                trajectory['v'].append(info[0]["ego_v"])
                trajectory['rl_action'].append(info[0]["rl_action"])
                trajectory['applied_action'].append(info[0]["applied_action"])
                trajectory['lead_v'].append(info[0]["lead_v"])
                trajectory['step_rmse'].append(step_rmse)
            
            done = terminated[0] or info[0].get("TimeLimit.truncated", False)
            step += 1
        
        collisions.append(episode_collision)
        returns.append(episode_return)
        min_dx_values.append(min_dx)
        
        # Compute episode mean RMSE
        if episode_rmse_steps:
            episode_mean_rmse = np.mean(episode_rmse_steps)
        else:
            episode_mean_rmse = 0.0
        rmse_values.append(episode_mean_rmse)
        
        if trajectory is not None:
            episodes_data.append(trajectory)
        
        if verbose and (ep + 1) % 20 == 0:
            curr_coll = sum(collisions) / len(collisions)
            curr_ret = np.mean(returns)
            curr_rmse = np.mean(rmse_values)
            print(f"  Episode {ep+1}/{n_episodes}: "
                  f"Collision={curr_coll:.3f}, Return={curr_ret:.2f}, RMSE={curr_rmse:.4f}")
    
    # Compute jerk
    jerks = []
    for traj in episodes_data:
        actions = np.array(traj["applied_action"])
        if len(actions) > 1:
            jerk = np.mean(np.abs(np.diff(actions)))
            jerks.append(jerk)
    mean_jerk = float(np.mean(jerks)) if jerks else 0.0
    
    return {
        "collision_rate": float(sum(collisions) / n_episodes),
        "mean_return": float(np.mean(returns)),
        "std_return": float(np.std(returns)),
        "mean_jerk": mean_jerk,
        "mean_rmse": float(np.mean(rmse_values)),  # Add mean RMSE
        "std_rmse": float(np.std(rmse_values)),    # Add std RMSE
        "min_dx_mean": float(np.mean(min_dx_values)),
        "min_dx_std": float(np.std(min_dx_values)),
        "episodes": episodes_data
    }


def run_evaluation(
    model_path="models/ppo_acc_final.zip",
    vec_normalize_path="models/vec_normalize.pkl",
    n_episodes=100,
    epsilon=0.01,
    scenario="normal",
    output_dir="results"
):
    """
    Run complete evaluation with baseline, FGSM, and OIA.
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # Configure scenario
    scenario_configs = {
        'normal': {
            'use_lead_decel': False,
            'description': 'Normal driving (no lead braking)'
        },
        'challenging': {
            'use_lead_decel': True,
            'decel_value': -2.0,
            'description': 'Challenging (lead brakes at -2.0 m/s²)'
        },
        'gentle': {
            'use_lead_decel': True,
            'decel_value': -1.5,
            'description': 'Gentle challenge (lead brakes at -1.5 m/s²)'
        }
    }
    
    scenario_config = scenario_configs.get(scenario, scenario_configs['normal'])
    
    print("=" * 70)
    print(f"EVALUATION: {scenario_config['description']}")
    print("=" * 70)
    print(f"Episodes per condition: {n_episodes}")
    print(f"Attack epsilon: {epsilon}")
    print()
    
    # Load model and environment
    print("Loading model and environment...")
    vec_env = DummyVecEnv([lambda: ACCEnv()])
    vec_env = VecNormalize.load(vec_normalize_path, vec_env)
    vec_env.training = False
    vec_env.norm_reward = False
    
    model = PPO.load(model_path, env=vec_env)
    base_env = vec_env.envs[0]
    
    # Run evaluations
    results = {}
    
    # Baseline
    print("\n" + "=" * 70)
    print("BASELINE")
    print("=" * 70)
    baseline_results = evaluate_single_condition(
        vec_env, base_env, model, None, n_episodes, scenario_config
    )
    results['baseline'] = baseline_results
    
    # FGSM
    print("\n" + "=" * 70)
    print(f"FGSM ATTACK (ε={epsilon})")
    print("=" * 70)
    fgsm_attack = create_attack("fgsm", model, epsilon)
    fgsm_results = evaluate_single_condition(
        vec_env, base_env, model, fgsm_attack, n_episodes, scenario_config
    )
    results['fgsm'] = fgsm_results
    
    # OIA
    print("\n" + "=" * 70)
    print(f"OIA ATTACK (ε={epsilon})")
    print("=" * 70)
    oia_attack = create_attack("oia", model, epsilon)
    oia_results = evaluate_single_condition(
        vec_env, base_env, model, oia_attack, n_episodes, scenario_config
    )
    results['oia'] = oia_results
    
    # Print summary
    print("\n" + "=" * 70)
    print("SUMMARY")
    print("=" * 70)
    print(f"\nBaseline:")
    print(f"  Collision Rate: {baseline_results['collision_rate']:.3f}")
    print(f"  Mean Return: {baseline_results['mean_return']:.2f} ± {baseline_results['std_return']:.2f}")
    print(f"  Mean Jerk: {baseline_results['mean_jerk']:.4f}")
    print(f"  Mean RMSE: {baseline_results['mean_rmse']:.4f} (no attack)")
    
    print(f"\nFGSM (ε={epsilon}):")
    print(f"  Collision Rate: {fgsm_results['collision_rate']:.3f}")
    print(f"  Mean Return: {fgsm_results['mean_return']:.2f} ± {fgsm_results['std_return']:.2f}")
    print(f"  Mean Jerk: {fgsm_results['mean_jerk']:.4f}")
    print(f"  Mean RMSE: {fgsm_results['mean_rmse']:.4f} (stealth)")
    
    print(f"\nOIA (ε={epsilon}):")
    print(f"  Collision Rate: {oia_results['collision_rate']:.3f}")
    print(f"  Mean Return: {oia_results['mean_return']:.2f} ± {oia_results['std_return']:.2f}")
    print(f"  Mean Jerk: {oia_results['mean_jerk']:.4f}")
    print(f"  Mean RMSE: {oia_results['mean_rmse']:.4f} (stealth)")
    
    # Compute degradation
    fgsm_deg = baseline_results['mean_return'] - fgsm_results['mean_return']
    oia_deg = baseline_results['mean_return'] - oia_results['mean_return']
    
    print(f"\nPerformance Degradation:")
    print(f"  FGSM: {fgsm_deg:.2f}")
    print(f"  OIA: {oia_deg:.2f}")
    if oia_deg > 0 and fgsm_deg > 0:
        ratio = oia_deg / fgsm_deg
        print(f"  OIA is {ratio:.2f}x more damaging than FGSM")
    
    if oia_results['collision_rate'] > fgsm_results['collision_rate']:
        coll_ratio = oia_results['collision_rate'] / max(fgsm_results['collision_rate'], 0.01)
        print(f"  OIA causes {coll_ratio:.2f}x more collisions than FGSM")
    
    # Stealth comparison
    print(f"\nStealth (RMSE - Lower is Stealthier):")
    print(f"  FGSM: {fgsm_results['mean_rmse']:.4f}")
    print(f"  OIA: {oia_results['mean_rmse']:.4f}")
    if fgsm_results['mean_rmse'] > oia_results['mean_rmse']:
        stealth_ratio = fgsm_results['mean_rmse'] / max(oia_results['mean_rmse'], 1e-6)
        print(f"  OIA is {stealth_ratio:.2f}x more stealthy than FGSM")
    elif oia_results['mean_rmse'] > fgsm_results['mean_rmse']:
        stealth_ratio = oia_results['mean_rmse'] / max(fgsm_results['mean_rmse'], 1e-6)
        print(f"  FGSM is {stealth_ratio:.2f}x more stealthy than OIA")
    else:
        print(f"  Similar stealth levels")
    
    # Save results
    summary = {
        'scenario': scenario,
        'epsilon': float(epsilon),
        'n_episodes': n_episodes,
        'baseline': {
            'collision_rate': baseline_results['collision_rate'],
            'mean_return': baseline_results['mean_return'],
            'std_return': baseline_results['std_return'],
            'mean_jerk': baseline_results['mean_jerk'],
            'mean_rmse': baseline_results['mean_rmse'],
            'std_rmse': baseline_results['std_rmse'],
        },
        'fgsm': {
            'collision_rate': fgsm_results['collision_rate'],
            'mean_return': fgsm_results['mean_return'],
            'std_return': fgsm_results['std_return'],
            'mean_jerk': fgsm_results['mean_jerk'],
            'mean_rmse': fgsm_results['mean_rmse'],
            'std_rmse': fgsm_results['std_rmse'],
            'epsilon': float(epsilon),
        },
        'oia': {
            'collision_rate': oia_results['collision_rate'],
            'mean_return': oia_results['mean_return'],
            'std_return': oia_results['std_return'],
            'mean_jerk': oia_results['mean_jerk'],
            'mean_rmse': oia_results['mean_rmse'],
            'std_rmse': oia_results['std_rmse'],
            'epsilon': float(epsilon),
        }
    }
    
    summary_path = os.path.join(output_dir, 'summary.json')
    with open(summary_path, 'w') as f:
        json.dump(summary, f, indent=2)
    
    print(f"\nResults saved to {summary_path}")
    
    # Save sample trajectories
    for condition_name in ['baseline', 'fgsm', 'oia']:
        condition_data = results[condition_name]
        for i, ep_data in enumerate(condition_data['episodes']):
            traj_path = os.path.join(output_dir, f'trajectory_{condition_name}_ep{i}.npz')
            np.savez(traj_path, **ep_data)
    
    print(f"Sample trajectories saved to {output_dir}")
    print("=" * 70)
    
    return summary


def run_multi_epsilon_evaluation(
    model_path="models/ppo_acc_final.zip",
    vec_normalize_path="models/vec_normalize.pkl",
    n_episodes=100,
    epsilons=[0.005, 0.01, 0.015, 0.02],
    scenario="challenging",
    output_dir="results_multi_epsilon"
):
    """
    Run evaluation across multiple epsilon values.
    """
    os.makedirs(output_dir, exist_ok=True)
    
    all_results = {}
    
    for eps in epsilons:
        print(f"\n{'=' * 70}")
        print(f"TESTING EPSILON = {eps}")
        print('=' * 70)
        
        result = run_evaluation(
            model_path=model_path,
            vec_normalize_path=vec_normalize_path,
            n_episodes=n_episodes,
            epsilon=eps,
            scenario=scenario,
            output_dir=os.path.join(output_dir, f"eps_{eps}")
        )
        
        all_results[f"eps_{eps}"] = result
    
    # Save combined results
    combined_path = os.path.join(output_dir, 'multi_epsilon_summary.json')
    with open(combined_path, 'w') as f:
        json.dump(all_results, f, indent=2)
    
    print(f"\n{'=' * 70}")
    print("MULTI-EPSILON SUMMARY")
    print('=' * 70)
    
    for eps_key, result in all_results.items():
        eps_val = float(eps_key.split('_')[1])
        print(f"\nε = {eps_val}:")
        print(f"  Baseline:  Coll={result['baseline']['collision_rate']:.1%}, "
              f"Return={result['baseline']['mean_return']:.1f}")
        print(f"  FGSM:      Coll={result['fgsm']['collision_rate']:.1%}, "
              f"Return={result['fgsm']['mean_return']:.1f}")
        print(f"  OIA:       Coll={result['oia']['collision_rate']:.1%}, "
              f"Return={result['oia']['mean_return']:.1f}")
    
    print(f"\nCombined results saved to {combined_path}")
    return all_results

### Run Evaluation



In [None]:
# Single evaluation run
summary = run_evaluation(
    model_path="models/ppo_acc_final.zip",
    vec_normalize_path="models/vec_normalize.pkl",
    n_episodes=100,
    epsilon=0.01,
    scenario="normal",  # Options: 'normal', 'challenging', 'gentle'
    output_dir="results"
)

In [None]:
# Multi-epsilon evaluation
all_results = run_multi_epsilon_evaluation(
    model_path="models/ppo_acc_final.zip",
    vec_normalize_path="models/vec_normalize.pkl",
    n_episodes=100,
    epsilons=[0.005, 0.01, 0.015, 0.02],
    scenario="challenging",
    output_dir="results_multi_epsilon"
)

## 5. Analysis

In [None]:
import json
import numpy as np


def analyze_results(summary_path="results/summary.json"):
    """
    Analyze and interpret evaluation results.
    """
    with open(summary_path, 'r') as f:
        summary = json.load(f)
    
    print("=" * 70)
    print("EVALUATION RESULTS ANALYSIS")
    print("=" * 70)
    
    baseline = summary["baseline"]
    fgsm = summary["fgsm"]
    oia = summary["oia"]
    
    print("\n1. COLLISION RATE ANALYSIS")
    print("-" * 70)
    print(f"   Baseline:  {baseline['collision_rate']:.1%}")
    print(f"   FGSM:      {fgsm['collision_rate']:.1%}  "
          f"(+{(fgsm['collision_rate'] - baseline['collision_rate'])*100:.1f} pp)")
    print(f"   OIA:       {oia['collision_rate']:.1%}  "
          f"(+{(oia['collision_rate'] - baseline['collision_rate'])*100:.1f} pp)")
    
    if oia['collision_rate'] > fgsm['collision_rate']:
        print(f"OIA causes {oia['collision_rate']/max(fgsm['collision_rate'], 1e-6):.1f}x more collisions than FGSM")
    elif oia['collision_rate'] == fgsm['collision_rate'] == baseline['collision_rate'] == 0:
        print(f"No collisions in any condition - safety filter too strong or scenarios too easy")
    
    print("\n2. EPISODE RETURN ANALYSIS (Higher is Better)")
    print("-" * 70)
    print(f"   Baseline:  {baseline['mean_return']:7.2f} ± {baseline['std_return']:.2f}")
    print(f"   FGSM:      {fgsm['mean_return']:7.2f} ± {fgsm['std_return']:.2f}  "
          f"({fgsm['mean_return'] - baseline['mean_return']:+.2f})")
    print(f"   OIA:       {oia['mean_return']:7.2f} ± {oia['std_return']:.2f}  "
          f"({oia['mean_return'] - baseline['mean_return']:+.2f})")
    
    baseline_degradation = baseline['mean_return'] - baseline['mean_return']
    fgsm_degradation = baseline['mean_return'] - fgsm['mean_return']
    oia_degradation = baseline['mean_return'] - oia['mean_return']
    
    print(f"\n   Performance Degradation:")
    print(f"   - FGSM: {fgsm_degradation:.2f} ({(fgsm_degradation/abs(baseline['mean_return']))*100:.1f}%)")
    print(f"   - OIA:  {oia_degradation:.2f} ({(oia_degradation/abs(baseline['mean_return']))*100:.1f}%)")
    
    if oia_degradation > fgsm_degradation:
        ratio = oia_degradation / max(fgsm_degradation, 1e-6)
        print(f"OIA degrades performance {ratio:.1f}x more than FGSM")
    else:
        print(f"OIA should degrade performance more than FGSM")
    
    print("\n3. JERK ANALYSIS (Lower is Smoother)")
    print("-" * 70)
    print(f"   Baseline:  {baseline['mean_jerk']:.4f} m/s³")
    print(f"   FGSM:      {fgsm['mean_jerk']:.4f} m/s³  "
          f"({fgsm['mean_jerk'] - baseline['mean_jerk']:+.4f})")
    print(f"   OIA:       {oia['mean_jerk']:.4f} m/s³  "
          f"({oia['mean_jerk'] - baseline['mean_jerk']:+.4f})")
    
    if fgsm['mean_jerk'] > baseline['mean_jerk'] or oia['mean_jerk'] > baseline['mean_jerk']:
        print(f"Attacks cause less smooth control (higher jerk)")
    
    print("\n4. STEALTH ANALYSIS (RMSE - Lower is Stealthier)")
    print("-" * 70)
    baseline_rmse = baseline.get('mean_rmse', 0.0)
    fgsm_rmse = fgsm.get('mean_rmse', 0.0)
    oia_rmse = oia.get('mean_rmse', 0.0)
    
    print(f"   Baseline:  {baseline_rmse:.4f} (no perturbation)")
    print(f"   FGSM:      {fgsm_rmse:.4f}")
    print(f"   OIA:       {oia_rmse:.4f}")
    
    if fgsm_rmse > 0 and oia_rmse > 0:
        if oia_rmse < fgsm_rmse:
            stealth_ratio = fgsm_rmse / oia_rmse
            print(f"OIA is {stealth_ratio:.2f}x more stealthy than FGSM")
            print(f"     (Lower RMSE means smaller perturbations)")
        elif fgsm_rmse < oia_rmse:
            stealth_ratio = oia_rmse / fgsm_rmse
            print(f"FGSM is {stealth_ratio:.2f}x more stealthy than OIA")
            print(f"     (Lower RMSE means smaller perturbations)")
        else:
            print(f"   ≈ Similar stealth levels")
    
    print("\n5. KEY FINDINGS")
    print("-" * 70)
    
    findings = []
    
    # Check if OIA is more effective
    if oia_degradation > fgsm_degradation * 1.2:
        findings.append("OIA is significantly more effective than FGSM at degrading performance")
    elif oia_degradation > fgsm_degradation:
        findings.append("OIA is more effective than FGSM (as expected)")
    else:
        findings.append("OIA should be more effective than FGSM - consider increasing epsilon or using lead deceleration")
    
    # Check collision rates
    if baseline['collision_rate'] == 0 and oia['collision_rate'] == 0:
        findings.append("No collisions observed - safety filter is working but scenarios may be too easy")
        findings.append("  Recommendation: Enable lead vehicle deceleration with use_lead_decel=True")
    elif oia['collision_rate'] > 0:
        findings.append(f"OIA successfully causes collisions ({oia['collision_rate']:.1%} rate)")
    
    # Check stealth
    fgsm_rmse = fgsm.get('mean_rmse', 0.0)
    oia_rmse = oia.get('mean_rmse', 0.0)
    if fgsm_rmse > 0 and oia_rmse > 0:
        if oia_rmse < fgsm_rmse:
            findings.append(f"OIA is more stealthy than FGSM (RMSE: {oia_rmse:.4f} vs {fgsm_rmse:.4f})")
            findings.append("  → OIA is BOTH more effective AND more stealthy!")
        elif fgsm_rmse < oia_rmse:
            findings.append(f"FGSM is more stealthy than OIA (RMSE: {fgsm_rmse:.4f} vs {oia_rmse:.4f})")
            findings.append("  → OIA is more effective but less stealthy")
        else:
            findings.append(f"≈ Similar stealth levels (RMSE: {oia_rmse:.4f})")
    
    # Check return degradation magnitude
    if abs(oia_degradation) > 10:
        findings.append(f"OIA causes substantial performance degradation ({oia_degradation:.1f} return decrease)")
    
    # Check jerk increase
    if oia['mean_jerk'] > baseline['mean_jerk'] * 2:
        findings.append("OIA causes significantly less smooth control behavior")
    
    for finding in findings:
        print(f"   {finding}")
    
    print("\n6. INTERPRETATION")
    print("-" * 70)
    print("""
   The results show that both FGSM and OIA adversarial attacks successfully
   degrade the PPO agent's performance compared to baseline:
   
   - FGSM perturbs observations to change the policy's immediate action output
   - OIA perturbs observations to inflate the value estimate, making the agent
     overly optimistic about safety
   
   Key difference: OIA's optimism causes DELAYED reactions to threats because
   the agent believes it's safer than it actually is. This makes OIA more
   dangerous in safety-critical scenarios.
   
   The safety filter (CBF) prevents many collisions, but attacks can still
   manipulate it by feeding adversarial observations. When the filter receives
   false safety information, it may allow unsafe actions.
   """)
    
    if baseline['collision_rate'] == 0:
        print("""
   NOTE: Zero collisions suggest the safety filter is very effective in the
   default scenarios. To see more dramatic effects:
   
   1. Enable lead vehicle deceleration: use_lead_decel=True
   2. Increase attack strength: epsilon=0.02 or 0.03
   3. Use more challenging initial conditions
   """)
    
    print("=" * 70)

### Run Analysis (Optional)

Uncomment and run after evaluation:

In [None]:
import os
results_path = "results/summary.json"
if not os.path.exists(results_path):
    print(f"Error: {results_path} not found. Run evaluate.py first.")
else:
    analyze_results(results_path)

## 6. Visualization

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt


def plot_single_trajectory(traj_path, save_path=None):
    data = np.load(traj_path)
    
    fig, axes = plt.subplots(3, 1, figsize=(10, 8), sharex=True)
    
    # Plot 1: Distance headway
    axes[0].plot(data["t"], data["dx"], 'b-', linewidth=1.5, label="Distance headway")
    axes[0].axhline(y=0, color='r', linestyle='--', linewidth=1, label="Collision threshold")
    axes[0].set_ylabel("dx (m)", fontsize=11)
    axes[0].grid(True, alpha=0.3)
    axes[0].legend(fontsize=9)
    axes[0].set_title("Trajectory Analysis", fontsize=12, fontweight='bold')
    
    # Plot 2: Velocities
    axes[1].plot(data["t"], data["v"], 'b-', linewidth=1.5, label="Ego velocity")
    axes[1].plot(data["t"], data["lead_v"], 'g--', linewidth=1.5, label="Lead velocity")
    axes[1].set_ylabel("Velocity (m/s)", fontsize=11)
    axes[1].grid(True, alpha=0.3)
    axes[1].legend(fontsize=9)
    
    # Plot 3: Actions
    axes[2].plot(data["t"], data["rl_action"], 'orange', linewidth=1, alpha=0.7, label="RL action")
    axes[2].plot(data["t"], data["applied_action"], 'b-', linewidth=1.5, label="Applied action (after safety filter)")
    axes[2].axhline(y=0, color='gray', linestyle='-', linewidth=0.5, alpha=0.5)
    axes[2].set_ylabel("Acceleration (m/s²)", fontsize=11)
    axes[2].set_xlabel("Time (s)", fontsize=11)
    axes[2].grid(True, alpha=0.3)
    axes[2].legend(fontsize=9)
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Saved figure to {save_path}")
    else:
        plt.show()
    
    plt.close()


def plot_comparison(results_dir="results", output_path="results/comparison.png"):
    # Load first episode from each condition
    conditions = ["baseline", "fgsm", "oia"]
    colors = {"baseline": "blue", "fgsm": "orange", "oia": "red"}
    
    fig, axes = plt.subplots(3, 1, figsize=(12, 10), sharex=True)
    
    for condition in conditions:
        traj_path = os.path.join(results_dir, f"trajectory_{condition}_ep0.npz")
        
        if not os.path.exists(traj_path):
            print(f"Warning: {traj_path} not found")
            continue
        
        data = np.load(traj_path)
        color = colors[condition]
        label = condition.upper()
        
        # Plot distance headway
        axes[0].plot(data["t"], data["dx"], color=color, linewidth=1.5, 
                    label=label, alpha=0.8)
        
        # Plot ego velocity
        axes[1].plot(data["t"], data["v"], color=color, linewidth=1.5, 
                    label=label, alpha=0.8)
        
        # Plot applied action
        axes[2].plot(data["t"], data["applied_action"], color=color, 
                    linewidth=1.5, label=label, alpha=0.8)
    
    # Format plots
    axes[0].axhline(y=0, color='black', linestyle='--', linewidth=1, alpha=0.5)
    axes[0].set_ylabel("Distance Headway (m)", fontsize=12)
    axes[0].grid(True, alpha=0.3)
    axes[0].legend(fontsize=10, loc='best')
    axes[0].set_title("Comparison: Baseline vs FGSM vs OIA", fontsize=14, fontweight='bold')
    
    axes[1].set_ylabel("Ego Velocity (m/s)", fontsize=12)
    axes[1].grid(True, alpha=0.3)
    axes[1].legend(fontsize=10, loc='best')
    
    axes[2].axhline(y=0, color='black', linestyle='-', linewidth=0.5, alpha=0.3)
    axes[2].set_ylabel("Applied Acceleration (m/s²)", fontsize=12)
    axes[2].set_xlabel("Time (s)", fontsize=12)
    axes[2].grid(True, alpha=0.3)
    axes[2].legend(fontsize=10, loc='best')
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    print(f"Saved comparison figure to {output_path}")
    plt.close()


def plot_summary_metrics(summary_path="results/summary.json", output_path="results/metrics.png"):
    import json
    
    with open(summary_path, 'r') as f:
        summary = json.load(f)
    
    conditions = ["baseline", "fgsm", "oia"]
    collision_rates = [summary[c]["collision_rate"] for c in conditions]
    mean_returns = [summary[c]["mean_return"] for c in conditions]
    std_returns = [summary[c]["std_return"] for c in conditions]
    jerks = [summary[c]["mean_jerk"] for c in conditions]
    rmse_values = [summary[c].get("mean_rmse", 0.0) for c in conditions]
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Collision rate
    axes[0, 0].bar(conditions, collision_rates, color=['blue', 'orange', 'red'], alpha=0.7)
    axes[0, 0].set_ylabel("Collision Rate", fontsize=12)
    axes[0, 0].set_title("Collision Rate by Condition", fontsize=12, fontweight='bold')
    axes[0, 0].set_ylim([0, max(collision_rates) * 1.2 if max(collision_rates) > 0 else 1.0])
    axes[0, 0].grid(True, alpha=0.3, axis='y')
    
    # Mean return
    axes[0, 1].bar(conditions, mean_returns, yerr=std_returns, 
               color=['blue', 'orange', 'red'], alpha=0.7, capsize=5)
    axes[0, 1].set_ylabel("Episode Return", fontsize=12)
    axes[0, 1].set_title("Mean Episode Return", fontsize=12, fontweight='bold')
    axes[0, 1].grid(True, alpha=0.3, axis='y')
    
    # Jerk
    axes[1, 0].bar(conditions, jerks, color=['blue', 'orange', 'red'], alpha=0.7)
    axes[1, 0].set_ylabel("Mean Jerk (m/s³)", fontsize=12)
    axes[1, 0].set_title("Mean Jerk (Control Smoothness)", fontsize=12, fontweight='bold')
    axes[1, 0].grid(True, alpha=0.3, axis='y')
    
    # RMSE (Stealth) - Lower is more stealthy
    axes[1, 1].bar(conditions, rmse_values, color=['blue', 'orange', 'red'], alpha=0.7)
    axes[1, 1].set_ylabel("Mean RMSE (Lower = Stealthier)", fontsize=12)
    axes[1, 1].set_title("Attack Stealth (RMSE)", fontsize=12, fontweight='bold')
    axes[1, 1].grid(True, alpha=0.3, axis='y')
    
    # Add values on top of bars for RMSE
    for i, v in enumerate(rmse_values):
        if v > 0:
            axes[1, 1].text(i, v + max(rmse_values) * 0.02, f'{v:.4f}', 
                           ha='center', va='bottom', fontsize=10)
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    print(f"Saved metrics figure to {output_path}")
    plt.close()


def plot_stealth_comparison(summary_path="results/summary.json", output_path="results/stealth_comparison.png"):
    import json
    
    with open(summary_path, 'r') as f:
        summary = json.load(f)
    
    # Extract RMSE values (excluding baseline which should be 0)
    fgsm_rmse = summary['fgsm'].get('mean_rmse', 0.0)
    oia_rmse = summary['oia'].get('mean_rmse', 0.0)
    fgsm_std = summary['fgsm'].get('std_rmse', 0.0)
    oia_std = summary['oia'].get('std_rmse', 0.0)
    
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    
    attacks = ['FGSM', 'OIA']
    rmse_means = [fgsm_rmse, oia_rmse]
    rmse_stds = [fgsm_std, oia_std]
    colors = ['orange', 'red']
    
    bars = ax.bar(attacks, rmse_means, yerr=rmse_stds, 
                  color=colors, alpha=0.7, capsize=10, width=0.6)
    
    ax.set_ylabel("RMSE (Root Mean Square Error)", fontsize=14)
    ax.set_title("Attack Stealth Comparison\n(Lower RMSE = More Stealthy)", 
                fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3, axis='y')
    
    # Add value labels on bars
    for i, (v, std) in enumerate(zip(rmse_means, rmse_stds)):
        if v > 0:
            ax.text(i, v + std + max(rmse_means) * 0.02, 
                   f'{v:.4f}\n±{std:.4f}', 
                   ha='center', va='bottom', fontsize=12, fontweight='bold')
    
    # Add interpretation text
    if oia_rmse < fgsm_rmse and oia_rmse > 0:
        winner = "OIA"
        ratio = fgsm_rmse / oia_rmse
        textstr = f'{winner} is {ratio:.2f}x more stealthy\n(Lower perturbation magnitude)'
    elif fgsm_rmse < oia_rmse and fgsm_rmse > 0:
        winner = "FGSM"
        ratio = oia_rmse / fgsm_rmse
        textstr = f'{winner} is {ratio:.2f}x more stealthy\n(Lower perturbation magnitude)'
    else:
        textstr = 'Similar stealth levels'
    
    ax.text(0.5, 0.95, textstr, transform=ax.transAxes,
           fontsize=11, verticalalignment='top', horizontalalignment='center',
           bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    print(f"Saved stealth comparison to {output_path}")
    plt.close()

### Generate All Visualizations



In [None]:
import sys
results_dir = "results"

# Generate all plots
print("Generating visualization plots...")

# Individual trajectories
for condition in ["baseline", "fgsm", "oia"]:
    traj_path = os.path.join(results_dir, f"trajectory_{condition}_ep0.npz")
    if os.path.exists(traj_path):
        save_path = os.path.join(results_dir, f"plot_{condition}.png")
        plot_single_trajectory(traj_path, save_path)

# Comparison plot
plot_comparison(results_dir, os.path.join(results_dir, "comparison.png"))

# Summary metrics (now includes RMSE)
summary_path = os.path.join(results_dir, "summary.json")
if os.path.exists(summary_path):
    plot_summary_metrics(summary_path, os.path.join(results_dir, "metrics.png"))
    plot_stealth_comparison(summary_path, os.path.join(results_dir, "stealth_comparison.png"))

print("\nAll plots generated successfully!")