In [28]:
# Cell 1: imports & constants
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import math
import json
import os
from typing import Optional, Dict, Any


# Environment physical / control constants
DT = 0.1 # time step (s)
TH = 1.5 # desired time headway (s)
D0 = 5.0 # standstill distance (m)
V_REF = 15.0 # target lead/desired speed (m/s)
A_MIN = -3.5 # braking limit (m/s^2)
A_MAX = 2.0 # acceleration limit (m/s^2)


# Observation scaling ranges (used when normalize_obs=False; if using VecNormalize, wrapper will handle scaling)
OBS_LOW = np.array([0.0, -30.0, 0.0], dtype=np.float32) # [dx, dv, v]
OBS_HIGH = np.array([200.0, 30.0, 40.0], dtype=np.float32)


# Useful for deterministic reproducibility
DEFAULT_SEED = 2025

In [None]:
# Cell 2: ACCEnv implementation


class ACCEnv(gym.Env):
"""
Simple 1D Adaptive Cruise Control (ACC) environment with Control Barrier Function (CBF) safety filter.


State: [dx, dv, v]
- dx = x_lead - x_ego (headway distance, m)
- dv = v_lead - v_ego (relative speed, m/s)
- v = v_ego (ego speed, m/s)


Action: scalar acceleration 'a' in [A_MIN, A_MAX]


Important API methods provided for attacks:
- set_safety_obs_for_filter(perturbed_obs): if evaluation code wants the CBF to evaluate safety using
the perturbed observation (so clamp uses the attacked view), call this method before stepping.
- clear_safety_obs_for_filter(): resets that temporary perturbed observation so baseline evaluations
use the true state.


Notes about normalization:
- If you plan to use Stable-Baselines3 VecNormalize, wrap the env with VecNormalize before
training/evaluation. This class supports two modes: normalize_obs=True (env returns normalized observations)
or False (raw physical values returned). When using VecNormalize you should set normalize_obs=False
here and let the wrapper normalize states consistently. However, to avoid confusion the env contains
`normalize_obs` toggle to optionally perform simple min-max scaling when desired.
"""


metadata = {"render_modes": ["human"], "render_fps": int(1/DT)}


def __init__(self, normalize_obs: bool = False, seed: Optional[int] = None):
super().__init__()
self.normalize_obs = normalize_obs
self.seed_val = seed if seed is not None else DEFAULT_SEED
self.np_random, _ = gym.utils.seeding.np_random(self.seed_val)


# Observation: dx, dv, v (headway, relative speed, ego speed)
self.observation_space = spaces.Box(low=OBS_LOW, high=OBS_HIGH, dtype=np.float32)
# Action: single continuous acceleration
self.action_space = spaces.Box(low=np.array([A_MIN], dtype=np.float32),
high=np.array([A_MAX], dtype=np.float32), dtype=np.float32)


# Internal state
self.x_ego = 0.0
self.x_lead = 20.0 # initial lead position
self.v_ego = V_REF - 0.5
self.v_lead = V_REF
self.a_ego = 0.0


# Safety filter support
self._safety_obs_override = None # if set, safety filter uses this perturbed obs for clamp calc


# Logging
self.current_step = 0
self.max_steps = 400
self.collision = False


self.reset()


# --------------------- Normalization helpers ---------------------
def _obs_to_array(self):
dx = float(self.x_lead - self.x_ego)
dv = float(self.v_lead - self.v_ego)
v = float(self.v_ego)
arr = np.array([dx, dv, v], dtype=np.float32)
return arr

def _normalize(self, obs: np.ndarray) -> np.ndarray:
# Simple min-max normalization to [-1, 1] (if using wrapper, don't use)
low = OBS_LOW
high = OBS_HIGH
scaled = 2.0 * (obs - low) / (high - low) - 1.0
return scaled.astype(np.float32)


def _denormalize(self, obs_norm: np.ndarray) -> np.ndarray:
low = OBS_LOW
high = OBS_HIGH
return (((obs_norm + 1.0) / 2.0) * (high - low) + low).astype(np.float32)


# --------------------- Safety filter / CBF ---------------------
def _compute_amax_safe(self, obs_for_filter: np.ndarray) -> float:
"""Compute the maximum allowed acceleration to ensure h(st+1) >= 0 (linearized CBF constraint).
obs_for_filter is expected to be the raw physical values [dx, dv, v] (not normalized). If a VecNormalize
wrapper is used, you must denormalize prior to calling this function.
"""

dx, dv, v = float(obs_for_filter[0]), float(obs_for_filter[1]), float(obs_for_filter[2])
# Using worst-case lead acceleration a_lead = 0 for conservativeness as in the paper
num = dx - TH * v + (self.v_lead - v) * DT
denom = TH * DT
if denom <= 0:
return A_MIN
pass

IndentationError: expected an indented block after class definition on line 4 (3953157976.py, line 5)

In [None]:
# Cell 3: quick sanity run
if __name__ == '__main__':
env = ACCEnv(normalize_obs=False, seed=42)
obs, _ = env.reset()
print('init obs:', obs)
done = False
total_r = 0.0
while not done:
# simple policy: try to track target speed with a P-controller
v = obs[2]
a_cmd = 0.5 * (V_REF - v)
a_cmd = float(np.clip(a_cmd, A_MIN, A_MAX))
obs, r, done, trunc, info = env.step([a_cmd])
total_r += r
if env.current_step % 50 == 0:
env.render()
print('episode return:', total_r, 'collision:', info.get('collision', False))

In [None]:
# Cell 4: logging helpers


def run_and_log_one_episode(env, policy_fn, attack_fn=None, eps=0.01, capture_trace=True, seed=None):

"""Runs one episode with given policy function (callable obs->action) and optional attack_fn(obs)->adv_obs.
Returns: dict containing per-step traces and episode metadata.
"""

if seed is not None:
env.reset(seed=seed)
else:
env.reset()


obs, _ = env.reset() if isinstance(env.reset(), tuple) else env.reset()
traces = {'t': [], 'dx': [], 'dv': [], 'v': [], 'action': [], 'applied_action': [], 'lead_v': []}
ep_return = 0.0
ep_collided = False


while True:
obs_for_policy = obs
# if attack, create perturbed obs in *normalized* space or raw depending on env
if attack_fn is not None:
adv_obs = attack_fn(obs, eps)
# inform safety filter to use adv obs (so clamp is computed on the attacked view)
if hasattr(env, 'set_safety_obs_for_filter'):
env.set_safety_obs_for_filter(adv_obs)
obs_for_policy = adv_obs
else:
if hasattr(env, 'clear_safety_obs_for_filter'):
env.clear_safety_obs_for_filter()


action = policy_fn(obs_for_policy)
next_obs, r, done, trunc, info = env.step(action)
ep_return += r
ep_collided = ep_collided or bool(info.get('collision', False))


if capture_trace:
# Denormalize if env returns normalized values
if env.normalize_obs:
# If obs was normalized, convert to raw for trace clarity
raw_obs = env._denormalize(obs if attack_fn is None else adv_obs)
else:
raw_obs = obs.copy()
traces['t'].append(env.current_step * DT)
traces['dx'].append(float(raw_obs[0]))
traces['dv'].append(float(raw_obs[1]))
traces['v'].append(float(raw_obs[2]))
traces['action'].append(float(action[0] if isinstance(action, (list, tuple, np.ndarray)) else action))
traces['applied_action'].append(float(info.get('applied_action', np.nan)))
traces['lead_v'].append(float(info.get('lead_v', np.nan)))


obs = next_obs
if done:
break


return {
'return': float(ep_return),
'collision': bool(ep_collided),
'traces': traces,
}


# Example policy function for testing (simple PD/P-controller)
def simple_policy(obs):
# obs is either normalized or raw depending on env. Keep generic: if normalized, denorm for decision
if isinstance(obs, np.ndarray) and obs.shape[-1] == 3 and (np.max(np.abs(obs)) <= 1.1):
# assume normalized -> denormalize for control logic
raw = env._denormalize(obs)
else:
raw = obs
v = float(raw[2])
a_cmd = 1.0 * (V_REF - v)
return np.array([float(np.clip(a_cmd, A_MIN, A_MAX))], dtype=np.float32)

# Cell 6: save helper


def save_traces_and_metrics(out_dir: str, traces_list: list, metrics: Dict[str, Any]):
os.makedirs(out_dir, exist_ok=True)
# metrics.json
with open(os.path.join(out_dir, 'metrics.json'), 'w') as f:
json.dump(metrics, f, indent=2)


# per-episode traces: save as npz per episode
for i, tr in enumerate(traces_list):
np.savez_compressed(os.path.join(out_dir, f'trace_ep_{i:03d}.npz'), **tr['traces'])


print('Saved metrics and traces to', out_dir)