## Nonlinear Model Import
Load a nonlinear Modelica model, compute polytopic linearization bounds along a reference trajectory, and solve time-varying LMIs to certify reachable sets under disturbances.

In [40]:
import sys
from pathlib import Path
import numpy as np

repo_root = None
for path in [Path.cwd().resolve()] + list(Path.cwd().resolve().parents):
    if (path / "cp_reach").exists():
        repo_root = path
        break
if repo_root is None:
    raise RuntimeError("Could not locate repo root")
if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))

from cp_reach.reachability import (
    sympy_load,
    casadi_load,
    simulate_dist,
    plot_grouped,
    plot_flowpipe,
)
from cp_reach.dynamics import classification
import sympy as sp
import cvxpy as cp
import matplotlib.pyplot as plt
from cyecca.backends.sympy import SympyBackend
from cp_reach.dynamics.state_space import SymbolicStateSpace, extract_symbolic_statespace

In [2]:
%load_ext autoreload
%autoreload 2

### Load symbolic model
Build the SymPy backend and classify the dynamics. For nonlinear models, we'll compute polytopic Jacobian bounds.

In [64]:
# Load SymPy backend (with outputs for error dynamics if available)
model_path = "modelica_models/pendulumPID2.mo"
model_sympy = sympy_load(model_path, output_names=["e_theta", "e_omega"])
states = model_sympy.states
inputs = model_sympy.inputs
print("states:", states)
print("inputs:", inputs)

model_type = classification.classify_dynamics(model_sympy)
error_type = classification.classify_error_dynamics(model_sympy)
print("model_type:", model_type)
print("error_type:", error_type)

states: ['plant.omega', 'plant.theta', 'ctrl.xi']
inputs: ['theta_ref', 'dtheta_ref', 'ddtheta_ref', 'd']
model_type: DynamicsClass.NONLINEAR
error_type: DynamicsClass.NONLINEAR


In [59]:
model_path = "modelica_models/pendulumPID2.mo"
backend = SympyBackend.from_file(model_path)
ss = extract_symbolic_statespace(backend, output_names=["e_theta", "e_omega"], simplify=True)

### Simulate reference trajectory
Generate a nominal trajectory with feedforward inputs and no disturbances.

In [None]:
# Load CasADi backend for simulation
model_path = "modelica_models/pendulumPID2.mo"
model_casadi = casadi_load(model_path)

# Reference trajectory parameters (modify for your model)
# For pendulum: reference follows a smooth path
t_final = 5.0
t_grid = np.linspace(0.0, t_final, 501)

# Define reference trajectory function (example: sinusoidal)
def ref_trajectory(t):
    """Return reference state values at time t"""
    theta_ref = 0.5 * np.sin(2 * np.pi * t / t_final)
    dtheta_ref = 0.5 * (2 * np.pi / t_final) * np.cos(2 * np.pi * t / t_final)
    ddtheta_ref = -0.5 * (2 * np.pi / t_final)**2 * np.sin(2 * np.pi * t / t_final)
    return theta_ref, dtheta_ref, ddtheta_ref

# Initial conditions (start at reference)
theta_0, dtheta_0, _ = ref_trajectory(0.0)
x0 = {
    "theta": theta_0,
    "omega": dtheta_0,
    "xi": 0.0,  # integral error starts at zero
}

# Input function (feedforward + reference)
def input_func(t):
    theta_ref, dtheta_ref, ddtheta_ref = ref_trajectory(t)
    return np.array([theta_ref, dtheta_ref, ddtheta_ref, 0.0])  # d=0 for nominal

# Simulate nominal trajectory
dist_bound = 0.0  # No disturbance for reference
nom_res, _ = simulate_dist(
    model_casadi,
    x0=x0,
    params=None,
    dist_bound=dist_bound,
    dist_input=[],
    num_sims=0,
    input_fun=input_func,
)

print(f"Reference trajectory computed: {len(nom_res.t)} time steps")

Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/home/micah/miniconda3/envs/reach/lib/python3.11/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_47122/3757020788.py", line 2, in <module>
    model_casadi = casadi_load(model_path)
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/micah/Research/development/cp_reach/cp_reach/reachability/workflows.py", line 331, in casadi_load
    backend = CasadiBackend.from_file(model_path)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<@beartype(cyecca.backends.base.Backend.from_file) at 0x76e3eaf2e160>", line 51, in from_file
  File "/home/micah/Research/dev2/rumoca-test/cyecca/cyecca/backends/base.py", line 162, in from_file
    backend.compile()
  File "<@beartype(cyecca.backends.casadi.CasadiBackend.compile) at 0x76e3eaf2f9c0>", line 12, in compile
  File "/home/micah/Research/dev2/rumoca-test/cyecca/cyecca/backends/casadi.py", line 3

### Compute Jacobian and disturbance matrix
Extract the symbolic Jacobian ∂f/∂x and disturbance input matrix B.

In [None]:
# Extract symbolic dynamics from SymPy backend
sym_ss = model_sympy.symbolic

# Get state and input symbols
x_syms = [sp.Symbol(s) for s in states]
u_syms = [sp.Symbol(u) for u in inputs]

# Get the dynamics (xdot = f(x, u))
# Access the backend's ODE right-hand side
ode_rhs = model_sympy.backend.model.ode

# Compute Jacobian w.r.t. states
J_sym = sp.Matrix(ode_rhs).jacobian(x_syms)
print("Jacobian shape:", J_sym.shape)

# Extract disturbance input matrix (column corresponding to disturbance input 'd')
# Assuming 'd' is the 4th input (index 3)
d_idx = inputs.index('d')
B_sym = sp.Matrix(ode_rhs).jacobian([u_syms[d_idx]])
B = np.array(B_sym.subs(model_sympy.parameters), dtype=float)
print("Disturbance matrix B shape:", B.shape)
print("B =", B.flatten())

### Compute polytopic Jacobian bounds
Sample Jacobian at ±δ around each reference state to create polytopic overapproximation.

In [None]:
def get_polytopic_jacobians(J_sym, x_syms, traj_states, state_errors, params):
    """
    Compute polytopic Jacobian bounds along a trajectory.
    
    Parameters:
    -----------
    J_sym : sympy.Matrix
        Symbolic Jacobian matrix
    x_syms : list
        List of state symbols
    traj_states : dict
        Dictionary mapping state names to arrays of trajectory values
    state_errors : dict
        Dictionary mapping state names to error bounds (±δ)
    params : dict
        Parameter values
    
    Returns:
    --------
    list of lists: For each time step, list of vertex Jacobian matrices
    """
    n_steps = len(next(iter(traj_states.values())))
    polytope_jacobians = []
    
    # Substitute parameters
    J_with_params = J_sym.subs(params)
    
    for k in range(n_steps):
        # Get reference state at this time
        ref_state = {name: traj_states[name][k] for name in traj_states}
        
        # Generate vertices of the polytope (±δ for each state with nonzero error)
        # For simplicity, we use axis-aligned vertices
        vertices = []
        state_names_with_error = [s for s in state_errors if state_errors[s] > 0]
        
        if not state_names_with_error:
            # No error bounds, just use reference
            J_k = np.array(J_with_params.subs(ref_state), dtype=float)
            vertices.append(J_k)
        else:
            # Generate 2^d vertices for d states with error bounds
            # For computational efficiency, use only ±δ on critical states
            for sign_combo in [+1, -1]:  # Simplified: just ± on first critical state
                perturbed_state = ref_state.copy()
                for state_name in state_names_with_error:
                    idx = states.index(state_name)
                    perturbed_state[state_name] = ref_state[state_name] + sign_combo * state_errors[state_name]
                
                J_k = np.array(J_with_params.subs(perturbed_state), dtype=float)
                vertices.append(J_k)
        
        polytope_jacobians.append(vertices)
    
    return polytope_jacobians


# Define error bounds for states (which states to perturb and by how much)
# For pendulum: mainly concerned about angle error
state_errors = {
    "theta": 0.1,  # ±0.1 rad around reference
    "omega": 0.0,  # Don't perturb angular velocity for simplicity
    "xi": 0.0,     # Don't perturb integral term
}

# Extract trajectory states
traj_states = {name: nom_res.x[:, i] for i, name in enumerate(model_casadi.state_names)}

# Compute polytopic bounds
print("Computing polytopic Jacobian bounds...")
polytope_jacobians = get_polytopic_jacobians(
    J_sym,
    x_syms,
    traj_states,
    state_errors,
    model_sympy.parameters
)
print(f"Computed {len(polytope_jacobians)} polytopic bounds with {len(polytope_jacobians[0])} vertices each")

### Build time-varying LMI problem
Formulate the polynomial metric LMI with polytopic Jacobian constraints at each time sample.

In [None]:
def build_timevarying_lmi(
    polytope_jacobians,
    B,
    times,
    poly_degree=2,
    eps=1e-6,
):
    """
    Build polynomial-metric LMI for time-varying contraction.
    
    Parameters:
    -----------
    polytope_jacobians : list of lists
        Polytopic Jacobian vertices at each time sample
    B : ndarray
        Disturbance input matrix
    times : ndarray
        Time samples
    poly_degree : int
        Degree of polynomial metric M(t) = sum M_r t^r
    eps : float
        Minimum eigenvalue bound for positive definiteness
    
    Returns:
    --------
    prob : cvxpy.Problem
    alpha : cvxpy.Parameter (contraction rate)
    mu : cvxpy.Variable (disturbance gain)
    M_coeffs : list of cvxpy.Variable (metric polynomial coefficients)
    """
    # Normalize time to [0, 1]
    t = np.asarray(times, dtype=float)
    T = t[-1] - t[0]
    t_norm = (t - t[0]) / T
    B_scaled = T * np.asarray(B, dtype=float)
    
    n = polytope_jacobians[0][0].shape[0]
    m = B_scaled.shape[1]
    I_n = np.eye(n)
    I_m = np.eye(m)
    
    # Decision variables
    M_coeffs = [cp.Variable((n, n), symmetric=True, name=f"M_{r}") for r in range(poly_degree + 1)]
    mu = cp.Variable(nonneg=True, name="mu")
    alpha = cp.Parameter(nonneg=True, name="alpha")
    
    # Constraints
    constraints = []
    
    # For each time sample
    for k, t_k in enumerate(t_norm):
        # Evaluate M(t_k) and Mdot(t_k)
        M_k = sum(M_coeffs[r] * (t_k ** r) for r in range(poly_degree + 1))
        Mdot_k = sum(r * M_coeffs[r] * (t_k ** (r - 1)) for r in range(1, poly_degree + 1))
        
        # Positive definiteness: M_k >= eps * I
        constraints.append(M_k >> eps * I_n)
        
        # For each vertex of the polytopic Jacobian
        for J_vertex in polytope_jacobians[k]:
            J_k = np.asarray(J_vertex, dtype=float)
            
            # Build block LMI:
            # [ M_k*J + J.T*M_k + Mdot_k + alpha*M_k    M_k*B     ] << 0
            # [         B.T*M_k                      -alpha*mu*I ]
            
            S = M_k @ J_k + J_k.T @ M_k + Mdot_k + alpha * M_k
            MB = M_k @ B_scaled
            
            block = cp.bmat([
                [S, MB],
                [MB.T, -alpha * mu * I_m]
            ])
            
            constraints.append(block << 0)
    
    # Objective: minimize mu (disturbance gain)
    problem = cp.Problem(cp.Minimize(mu), constraints)
    
    return problem, alpha, mu, M_coeffs


# Sample time points for LMI (downsample for computational efficiency)
num_samples = 50
sample_indices = np.linspace(0, len(nom_res.t) - 1, num_samples, dtype=int)
sample_times = nom_res.t[sample_indices]
sample_jacobians = [polytope_jacobians[i] for i in sample_indices]

print(f"Building LMI problem with {num_samples} time samples...")
prob, alpha_param, mu_var, M_vars = build_timevarying_lmi(
    sample_jacobians,
    B,
    sample_times,
    poly_degree=2,
)
print(f"Problem built with {prob.size_metrics} size metrics")

### Solve LMI for optimal contraction rate
Search over contraction rates α to minimize disturbance gain μ.

In [None]:
def solve_for_alpha(alpha_value):
    """Solve LMI for a given alpha value"""
    alpha_param.value = float(alpha_value)
    
    prob.solve(solver=cp.SCS, warm_start=True, eps=1e-6, max_iters=8000, verbose=False)
    
    if prob.status in ("optimal", "optimal_inaccurate"):
        print(f"α={alpha_value:.4f}: μ={mu_var.value:.6f}, status={prob.status}")
        return float(mu_var.value)
    else:
        print(f"α={alpha_value:.4f}: INFEASIBLE")
        return 1e6  # Large penalty


# Coarse grid search
print("Searching for feasible contraction rate...")
alpha_candidates = np.logspace(-1, 1, 15)
mu_values = np.array([solve_for_alpha(a) for a in alpha_candidates])

# Find best feasible α
feasible = mu_values < 1e5
if not feasible.any():
    raise RuntimeError("No feasible α found in search range")

best_idx = np.argmin(mu_values[feasible])
best_alpha = alpha_candidates[feasible][best_idx]
best_mu = mu_values[feasible][best_idx]

print(f"\nBest solution: α={best_alpha:.4f}, μ={best_mu:.6f}")

# Re-solve at best α to get final solution
alpha_param.value = best_alpha
prob.solve(solver=cp.SCS, eps=1e-6, max_iters=8000, verbose=True)

print(f"\nFinal: α={best_alpha:.4f}, μ={mu_var.value:.6f}")
M_solution = [M.value for M in M_vars]
print(f"M(t) coefficients computed: {[f'M_{i}' for i in range(len(M_solution))]}")

### Compute reachable set bounds
Evaluate M(t) along the trajectory and compute ellipsoidal bounds: e^T M(t) e ≤ μ·d_max^2.

In [None]:
def eval_metric_polynomial(M_coeffs, t):
    """Evaluate M(t) = sum M_r t^r"""
    M_t = sum(M_coeffs[r] * (t ** r) for r in range(len(M_coeffs)))
    return M_t


# Normalize time for polynomial evaluation
T = nom_res.t[-1] - nom_res.t[0]
t_norm = (nom_res.t - nom_res.t[0]) / T

# Disturbance bound
d_max = 1.0  # Set based on your problem

# Compute state-space bounds from ellipsoid e^T M(t) e <= mu * d_max^2
# For each time, compute axis-aligned bounds
n = len(model_casadi.state_names)
bounds_upper = np.zeros((len(nom_res.t), n))
bounds_lower = np.zeros((len(nom_res.t), n))

for k, t_k in enumerate(t_norm):
    M_k = eval_metric_polynomial(M_solution, t_k)
    M_k_inv = np.linalg.inv(M_k)
    
    # Radius squared: e^T M e <= mu * d_max^2
    radius_sq = mu_var.value * (d_max ** 2)
    
    # Axis-aligned bounds: for each state i, e_i = ±sqrt(radius_sq * M_inv[i,i])
    for i in range(n):
        bound = np.sqrt(radius_sq * M_k_inv[i, i])
        bounds_upper[k, i] = bound
        bounds_lower[k, i] = -bound

print(f"Computed reachable set bounds for {len(nom_res.t)} time steps")
print(f"Max bounds: {np.max(bounds_upper, axis=0)}")

### Simulate with disturbances
Run Monte Carlo simulations with bounded disturbances to verify the reachable set.

In [None]:
# Simulate with disturbances
print("Running Monte Carlo simulations with disturbances...")
_, monte_carlo_res = simulate_dist(
    model_casadi,
    x0=x0,
    params=None,
    dist_bound=d_max,
    dist_input=["d"],
    num_sims=100,
    input_fun=input_func,
)
print(f"Completed {len(monte_carlo_res)} Monte Carlo trials")

### Plot results with flowpipe
Visualize the nominal trajectory, Monte Carlo samples, and computed reachable set bounds.

In [None]:
# Create error function that returns time-varying bounds
def error_bounds(t):
    # Find nearest time index
    idx = np.argmin(np.abs(nom_res.t - t))
    return bounds_upper[idx, :]

# Plot flowpipe for all states
plot_flowpipe(
    nom_res,
    monte_carlo_res,
    groups=[[s] for s in model_casadi.state_names],  # One plot per state
    state_names=model_casadi.state_names,
    error_fn=error_bounds,
)
plt.suptitle(f"Nonlinear Reachable Set (α={best_alpha:.3f}, μ={mu_var.value:.3f})")
plt.show()

### Metric-space visualization (optional)
Transform errors into metric space where bounds become circles: z(t) = H(t)·e(t), where M(t) = H(t)^T H(t).

In [None]:
# Compute metric-space errors for 2D visualization (if n=2 or we pick 2 states)
if n >= 2:
    # Pick first two states for visualization
    state_idx = [0, 1]
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
    
    # Plot in state space
    ax1.set_title("State Space (Euclidean)")
    ax1.set_xlabel(model_casadi.state_names[state_idx[0]])
    ax1.set_ylabel(model_casadi.state_names[state_idx[1]])
    
    for trial in monte_carlo_res:
        ax1.plot(trial.x[:, state_idx[0]], trial.x[:, state_idx[1]], 
                 'b-', alpha=0.2, linewidth=0.5)
    ax1.plot(nom_res.x[:, state_idx[0]], nom_res.x[:, state_idx[1]], 
             'r-', linewidth=2, label='Reference')
    
    # Plot ellipses at selected times
    time_snapshots = np.linspace(0, len(nom_res.t) - 1, 10, dtype=int)
    for k in time_snapshots:
        t_k = t_norm[k]
        M_k = eval_metric_polynomial(M_solution, t_k)
        M_k_2d = M_k[np.ix_(state_idx, state_idx)]
        M_inv_2d = np.linalg.inv(M_k_2d)
        
        # Eigendecomposition for ellipse
        eigvals, eigvecs = np.linalg.eigh(M_inv_2d)
        angle = np.degrees(np.arctan2(eigvecs[1, 0], eigvecs[0, 0]))
        width = 2 * np.sqrt(mu_var.value * d_max**2 * eigvals[0])
        height = 2 * np.sqrt(mu_var.value * d_max**2 * eigvals[1])
        
        ellipse = Ellipse(
            xy=(nom_res.x[k, state_idx[0]], nom_res.x[k, state_idx[1]]),
            width=width,
            height=height,
            angle=angle,
            facecolor='red',
            alpha=0.1,
            edgecolor='red',
            linewidth=1.5
        )
        ax1.add_patch(ellipse)
    
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    ax1.axis('equal')
    
    # Plot in metric space
    ax2.set_title("Metric Space (z = H·e)")
    ax2.set_xlabel(f"z[{state_idx[0]}]")
    ax2.set_ylabel(f"z[{state_idx[1]}]")
    
    for trial in monte_carlo_res:
        z_traj = []
        for k in range(len(trial.t)):
            t_k = (trial.t[k] - nom_res.t[0]) / T
            M_k = eval_metric_polynomial(M_solution, t_k)
            
            # Cholesky factorization: M = H^T H
            try:
                H_k = np.linalg.cholesky(M_k).T
            except:
                H_k = np.eye(n)  # Fallback
            
            e_k = trial.x[k, :] - nom_res.x[k, :]
            z_k = H_k @ e_k
            z_traj.append(z_k[state_idx])
        
        z_traj = np.array(z_traj)
        ax2.plot(z_traj[:, 0], z_traj[:, 1], 'b-', alpha=0.2, linewidth=0.5)
    
    # In metric space, bound is a circle with radius sqrt(mu * d_max^2)
    circle = plt.Circle((0, 0), np.sqrt(mu_var.value * d_max**2), 
                        color='red', fill=False, linewidth=2, label='Bound')
    ax2.add_patch(circle)
    ax2.plot(0, 0, 'r*', markersize=10, label='Reference')
    
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    ax2.axis('equal')
    
    plt.tight_layout()
    plt.show()
else:
    print("Skipping 2D visualization (need at least 2 states)")