In [None]:
# HRET Empirical Validation with Monte Carlo Re-Entry Dispersion

## 1. Setup Environment

In [None]:
!apt update && apt install -y curl build-essential pkg-config libssl-dev
!curl https://sh.rustup.rs -sSf | sh -s -- -y
import os
os.environ['PATH'] += ":/root/.cargo/bin"
!pip install maturin seaborn ipywidgets scipy
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from ipywidgets import interact, FloatSlider
from scipy.integrate import solve_ivp
from multiprocessing import Pool, cpu_count
sns.set(style="whitegrid")


In [None]:
## 2. Clone Repo and Build/Install Crate

In [None]:
!git clone https://github.com/infinityabundance/dsfb.git
%cd dsfb/crates/dsfb-hret
!maturin develop --release
import dsfb_hret

In [None]:
## 3. Toy Validation: Constant Velocity with Correlated Faults
Test HRET on simple system (eq. 1: x[n+1] = x[n] + dt*v).

In [None]:
def toy_sim(N=500, dt=0.1, v=1.0, D_k=0.1, correlated_group=0, dist_start=200, dist_mag=5.0):
    m, g = 10, 2
    group_map = [0]*5 + [1]*5
    rho, rho_g = 0.95, [0.9, 0.9]
    beta_k, beta_g = [1.0]*m, [1.0]*g
    k_k = [[0.5]*m]  # p=1, simple gains
    observer = dsfb_hret.HretObserver(m, g, group_map, rho, rho_g, beta_k, beta_g, k_k)
    true_x = np.cumsum(np.ones(N)*dt*v)
    hat_x = np.zeros(N); hat_x[0] = true_x[0] + np.random.randn()
    errors, weights_log = [], np.zeros((N, m))
    for n in range(1, N):
        d_k = np.random.uniform(-D_k, D_k, m)
        if dist_start <= n < dist_start + 100:
            d_k[group_map == correlated_group] += dist_mag if np.random.rand() > 0.5 else -dist_mag
        y_k = true_x[n] + d_k
        r_k = y_k - hat_x[n-1]
        delta_x, weights, s_k, s_g = observer.update(r_k.tolist())
        hat_x[n] = hat_x[n-1] + dt*v + delta_x[0]
        errors.append(hat_x[n] - true_x[n])
        weights_log[n] = weights
    return true_x, hat_x, np.array(errors), weights_log

true_x, hat_x, errors, weights = toy_sim()
fig, axs = plt.subplots(2, 1, figsize=(10, 8))
axs[0].plot(true_x, label='True'); axs[0].plot(hat_x, label='Estimate'); axs[0].legend()
axs[0].set_title(f'State Estimation (RMSE: {np.sqrt(np.mean(errors**2)):.3f})')
axs[1].imshow(weights.T, aspect='auto', cmap='viridis'); axs[1].set_title('Weights Evolution')
plt.show()

In [None]:
## 4. Re-Entry Model and HRET Fusion
3-DoF: [x, y, vx, vy, theta]. Exponential atmosphere, drag/gravity.

In [None]:
def reentry_dynamics(t, state, params):
    rho0, H, g0, R, m, A, Cd = params['rho0'], params['H'], params['g0'], params['R'], params['m'], params['A'], params['Cd']
    x, y, vx, vy, theta = state
    h = y
    g = g0 * (R / (R + h))**2 if h > 0 else g0
    v = np.sqrt(vx**2 + vy**2)
    rho = rho0 * np.exp(-h / H) if h > 0 else rho0
    Fd = 0.5 * rho * v**2 * Cd * A
    ax = -(Fd / m) * (vx / v)
    ay = -g - (Fd / m) * (vy / v)
    dtheta_dt = - (g / v) * np.cos(theta) + (v / (R + h)) * np.cos(theta)  # Coriolis-like
    return [vx, vy, ax, ay, dtheta_dt]

def fuse_hret(observer, true_state, params, n, plasma_start=100, plasma_end=200):
    M, p = params['M'], 5  # State dim
    D_k = params['D_k']
    h_k = np.eye(M)[:p, :p]  # Measure positions/velocities (simplified)
    d_k = np.random.uniform(-D_k, D_k, M)
    if plasma_start <= n < plasma_end and params['correlated_group'] is not None:
        phi_g = np.random.uniform(-params['dist_mag'], params['dist_mag'])
        group_mask = np.array(observer.group_mapping) == params['correlated_group']
        d_k[group_mask] += phi_g
    y_k = h_k @ true_state + d_k[:p]
    r_k = np.zeros(M); r_k[:p] = y_k - h_k @ true_state  # Pad if M > p
    delta_x, weights, s_k, s_g = observer.update(r_k.tolist())
    return np.array(delta_x[:p]), weights, s_k, s_g  # Trim to p

def simulate_reentry(params, observer_class, perturbations=None):
    pert_params = params.copy()
    if perturbations:
        pert_params['initial_state'] += np.random.uniform(-perturbations, perturbations, 5)
    sol = solve_ivp(reentry_dynamics, (0, 600), pert_params['initial_state'], args=(pert_params,), rtol=1e-6, events=lambda t, y: y[1])  # Stop at h=0
    t, states = sol.t, sol.y
    hat_states = np.zeros_like(states); hat_states[:, 0] = states[:, 0] + np.random.uniform(-100, 100, 5)  # Initial error
    errors = []
    for i in range(1, len(t)):
        observer = observer_class(params['M'], params['G'], params['group_map'], params['rho'], params['rho_g'], params['beta_k'], params['beta_g'], params['k_k'])
        delta_x, _, _, _ = fuse_hret(observer, states[:, i], pert_params, i)
        hat_states[:, i] = hat_states[:, i-1] + delta_x  # Simplified update (add full dynamics)
        errors.append(np.linalg.norm(hat_states[:, i] - states[:, i]))
    impact = hat_states[:2, -1] / 1000  # km
    return np.sqrt(np.mean(errors**2)), impact

In [None]:
## 5. Monte Carlo Dispersion Simulation
1000 trials with 360-degree perturbations (entry angle, velocity, density).

In [None]:
def mc_trial(_):
    params = {
        'rho0': 1.225, 'H': 11000, 'g0': 9.81, 'R': 6371000, 'm': 3000, 'A': 20, 'Cd': 1.0,
        'initial_state': [0, 120000, 7800, -500, -np.pi/90],  # x, h=120km, vx=7.8km/s, vy, theta=-2deg
        'M': 10, 'G': 2, 'group_map': [0]*5 + [1]*5, 'rho': 0.95, 'rho_g': [0.9, 0.9],
        'beta_k': [1.0]*10, 'beta_g': [1.0]*2, 'k_k': [[0.5]*10]*5,  # p=5
        'D_k': 100, 'correlated_group': 1, 'dist_mag': 500,  # Plasma on group 1
        'perturbations': [1000, 1000, 100, 100, np.pi/36]  # x360: angle ±5deg (pi/36 rad ~5deg)
    }
    observer_class = dsfb_hret.HretObserver
    rmse, impact = simulate_reentry(params, observer_class, params['perturbations'])
    return rmse, impact

def run_mc(N=1000):
    with Pool(cpu_count()) as p:
        results = p.map(mc_trial, range(N))
    rmses, impacts = [r[0] for r in results], np.array([r[1] for r in results])
    return rmses, impacts

rmses, impacts = run_mc()
cep = np.median(np.linalg.norm(impacts - np.mean(impacts, axis=0), axis=1))
print(f"Mean RMSE: {np.mean(rmses):.2f} m, Std: {np.std(rmses):.2f} m, CEP: {cep:.2f} km")

fig, axs = plt.subplots(1, 2, figsize=(12, 5))
sns.histplot(rmses, kde=True, ax=axs[0]); axs[0].set_title('RMSE Distribution')
axs[1].scatter(impacts[:,0], impacts[:,1], alpha=0.5); axs[1].set_title('Impact Dispersion')
plt.show()

In [None]:
## 6. Baselines and Sensitivity
Compare to DSFB (singleton groups) and EKF. Sweep ρ.

In [None]:
class SimpleEKF:
    def __init__(self, p):
        self.P = np.eye(p)  # Covariance
    def update(self, r, H, R=100):
        K = self.P @ H.T @ np.linalg.inv(H @ self.P @ H.T + R)
        delta = K @ r
        self.P = (np.eye(len(self.P)) - K @ H) @ self.P
        return delta

# Run MC with DSFB (singleton) or EKF
# ... (Adapt simulate_reentry with baseline)
@interact(rho=FloatSlider(min=0.8, max=0.99, step=0.01, value=0.95))
def sensitivity(rho):
    # Rerun MC with new rho, plot
    pass