# Standard imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

import pandas as pd
from scipy.integrate import odeint, solve_ivp
import copy

# Custom imports that are either local or pulled from github

In [None]:
import sys
import requests
import importlib

def import_local_or_github(package_name, function_name=None, directory=None, giturl=None):
    # Import functions directly from github
    # Important: note that we use raw.githubusercontent.com, not github.com

    try: # to find the file locally
        if directory is not None:
            if directory not in sys.path:
                sys.path.append(directory)

        package = importlib.import_module(package_name)
        if function_name is not None:
            function = getattr(package, function_name)
            return function
        else:
            return package

    except: # get the file from github
        if giturl is None:
            giturl = 'https://raw.githubusercontent.com/florisvb/Nonlinear_and_Data_Driven_Estimation/main/Utility/' + str(package_name) + '.py'

        r = requests.get(giturl)
        print('Fetching from: ')
        print(r)

        # Store the file to the colab working directory
        with open(package_name+'.py', 'w') as f:
            f.write(r.text)
        f.close()

        # import the function we want from that file
        package = importlib.import_module(package_name)
        if function_name is not None:
            function = getattr(package , function_name)
            return function
        else:
            return package

plot_tme = import_local_or_github('plot_utility', 'plot_tme', directory='../Utility')

# Imports related to model predictive control and pybounds

In [None]:
try:
    import math
except:
    !pip install math
    import math
try:
    import casadi
except:
    !pip install casadi
    import casadi

try:
    import do_mpc
except:
    !pip install do_mpc
    import do_mpc

try:
    import pybounds
except:
    #!pip install pybounds
    !pip install git+https://github.com/vanbreugel-lab/pybounds
    import pybounds

# Battery-Drill dynamics.

# Set some constant parameters (the pink symbols above)

In [None]:
params = {
  'Q': 50.0, # Battery Capacity (mAh)
  'R0': 0.05, # Ohmic resistance (ohms)
  'R_rc': 0.5, # RC branch resistance (ohms)
  'C_rc': 50.0, # RC branch capacitance (Farads)
  'm_d': 5.0, # mass (kg)
  'r_d': 0.1, # drill base radius (m)
  'k_F': 2.0, # Force constant for linear actuator
  'k_tau': 4.0, # Torque constant for rotary drill
  'eta_F': 0.6, # Linear actuator battery efficiency
  'eta_tau': 0.5, # Rotary drill battery efficiency
  'eta': 0.5, # Total drill efficiency
  'V_min': 3.0, # Minimum voltage of battery
  'V_max': 12.0, # Maximum voltage of battery
  'k_z': 1.0, # Soil Stiffness parameter
  'c_z': 1.0, # Soil damping parameter
  'k_omega': 2.0, # Soil friction parameter
  'c_omega': 0.1, # Viscous rotational resistance
  'a0': -2.0,
  'a1': 4.0,
  'a2': 7.0,
  'F_load': 1.0, # Resistive linear force of soil (N)
  'tau_load': 1.0, # Resistive torque of soil (N*m)
}

params['a3'] = params["V_min"]
params['Q_sec'] = (params["Q"] * 60.0 * 60.0) / 1000.0 # Battery Capacity (A sec)
params['i_zz'] = round(0.3 * params["m_d"] * (params["r_d"]**2.0), 3) # moment of inertia (kg*m^2)

print(params)

I_list = []
I_list.append(0.0)
u_list = []

# Set some helper functions

In [None]:
# Nonlinear VOC function for battery
def voc_func(s):

    a0 = params["a0"]
    a1 = params["a1"]
    a2 = params["a2"]
    a3 = params["a3"]

    V_oc = a0*s**3.0 + a1*s**2.0 + a2*s + a3

    return V_oc

def terminal_voltage_quadratic(s, v_rc, P_mech, params):
    """
    Solve the quadratic for terminal voltage V_term.

    Inputs
    ------
    s : float
        Battery state of charge (SOC)
    v_rc : float
        RC branch voltage
    P_mech : float
        Mechanical power load (F_load * z_dot + tau_load * omega)
    params : dict
        Parameter dictionary with at least:
        - "V_min", "V_max", "R0"
    voc_func : callable
        Function returning open-circuit voltage: V_oc = voc_func(s, V_min, V_max)

    Returns
    -------
    V_term : float
        Terminal voltage (positive root of quadratic, with guards)
    """

    # Open-circuit voltage
    V_min = params["V_min"]
    V_max = params["V_max"]
    R0    = params["R0"]
    eta = params["eta"]

    V_oc  = voc_func(s)

    # Quadratic coefficients
    a = 1.0
    b = V_oc - v_rc
    c = (R0 / max(eta, 1e-12)) * P_mech

    # Discriminant
    disc = b**2 - 4*a*c

    if disc < 0:
        if disc > -1e-12:
            # Clamp very small negative due to floating-point
            disc = 0.0
        else:
            # Fallback if no real solution: use safe approx
            V_term = max(V_oc - v_rc, V_min)
            return V_term

    # Positive root
    V_term = (b + np.sqrt(disc)) / (2*a)

    # Ensure physically valid voltage
    if V_term < V_min:
        V_term = V_min
    elif V_term > V_max:
        V_term = V_max

    return V_term

# Define your dynamics function

We will use this same format many times, for evaluating observability and Kalman filters.

<span style="color:blue;">AI tip: try an LLM to convert a picture of the dynamics, or the latex code, to a python function. You may have to correct it!

In [None]:
def battery_drill_dynamics(x, u, params=params):
    """
    Continuous-time dynamics for the drilling and battery system.

    Inputs
    ------
    x : array_like, shape (13,)
        State vector [s, v_rc, v_term, P_mech, z, z_dot, omega, k_F, k_tau, F_load, tau_load, u_F, u_tau]
        s      : battery SOC
        v_rc   : RC branch voltage
        v_term : Terminal voltage
        P_mech : Applied mechanical power
        z      : drill penetration depth
        z_dot  : drill penetration speed
        omega  : drill rotational speed
        k_F    : control force constant
        k_tau  : control torque constant
        F_load : resistive force constant
        tau_load : resistive torque constant
        u_F    : linear actuator force input
        u_tau  : drill motor torque input

    u : array_like, shape (2,)
        Control vector [du_F, du_tau]
        du_F : derivative of u_F
        du_tau : derivative of u_tau

    params : dict
        Dictionary of system parameters:
        Q      : battery capacity (mAh)
        Q_sec  : battery capacity (As)
        R0     : ohmic resistance
        R_rc   : RC resistance
        C_rc   : RC capacitance
        m_d    : drill mass
        i_zz   : drill inertia
        eta_F  : actuator efficiency
        eta_tau : motor efficiency
        V_min : minimum voltage
        V_max : maximum voltage


    Output
    ------
    x_dot : ndarray, shape (13,)
        Time derivative of state vector
    """

    # Unpack state variables
    s = x[0]
    v_rc = x[1]
    v_term = x[2]
    P_mech = x[3]
    z = x[4]
    z_dot = x[5]
    omega = x[6]
    k_F = x[7]
    k_tau = x[8]
    F_load = x[9]
    tau_load = x[10]
    u_F = x[11]
    u_tau = x[12]

    # Unpack controls
    du_F = u[0]
    du_tau = u[1]

    # Unpack parameters
    Q = params["Q"]
    Q_sec = params["Q_sec"]
    R0 = params["R0"]
    R = params["R_rc"]
    C = params["C_rc"]
    m_d = params["m_d"]
    i_zz = params["i_zz"]
    eta = params["eta"]
    V_min = params["V_min"]
    V_max = params["V_max"]
    a0 = params["a0"]
    a1 = params["a1"]
    a2 = params["a2"]

    # Safety protocols for battery values
    # s = s.clip(1e-6, 1.0) # Removed clip for Casadi compatibility
    # v_term = v_term.clip(params["V_min"], params["V_max"]) # Removed clip for Casadi compatibility


    # Mechanical power and its derivative
    P_mech = k_F * u_F * z_dot + k_tau * u_tau * omega
    # P_mech_dot = (k_F**2.0 * u_F**2.0 - k_F * u_F * F_load)/(m_d) \
    #             + (k_tau**2.0 * u_tau**2.0 - k_tau * u_tau * tau_load)/(m_d) \
    #             + k_F * du_F * z_dot + k_tau * du_tau * omega
    P_mech_dot = k_F * u_F * (1/m_d) * (k_F * u_F - F_load) \
                + k_tau * u_tau * (1/i_zz) * (k_tau * u_tau - tau_load) \
                + k_F * du_F * z_dot + k_tau * du_tau * omega

    # Battery SOC
    s_dot = (-eta * P_mech) / (Q * v_term)

    # OCV derivative (polynomial)
    VOC_dot_poly = 3.0*a0*s**2.0 + 2.0*a1*s + a2
    VOC_dot = VOC_dot_poly * s_dot

    # Thevenin capacitor voltage
    v_rc_dot = (-v_rc / (R * C)) + (eta * P_mech / (C * v_term))

    # Terminal voltage derivative (solved explicit form)
    Vterm_num = VOC_dot * v_term**2.0 - R0 * eta * P_mech_dot * v_term - v_rc_dot * v_term**2.0
    Vterm_den = v_term**2.0 - R0 * eta * P_mech
    # if Vterm_den == 0: # This check is not needed with Casadi symbolic variables
    Vterm_den = Vterm_den + 1e-6 # avoid divide by zero
    Vterm_dot = Vterm_num / (Vterm_den)

    # Drill kinematics
    z_ddot = (k_F * u_F - F_load) / m_d
    omega_dot = (k_tau * u_tau - tau_load) / i_zz

    # pack x_dot consistent with your state ordering
    x_dot = np.array([
        s_dot,
        v_rc_dot,
        Vterm_dot,
        P_mech_dot,
        z_dot,
        z_ddot,
        omega_dot,
        0.0,
        0.0,
        0.0,
        0.0,
        du_F,
        du_tau,
    ])

    # combined dynamics
    # x_dot = f0_contribution + f1_contribution + f2_contribution

    # Safety terms
    # x_dot[0] = np.clip(x_dot[0], 1e-3, 1.0) # Removed clip for Casadi compatibility
    # x_dot[1] = np.clip(x_dot[1], V_min, V_max) # Removed clip for Casadi compatibility

    return x_dot

# Measurements

In [None]:
measurement_flags = {
                     's': False,
                     'v_rc': True,
                     'v_term': True,
                     'P_mech': True,
                     'z': True,
                     'z_dot': False,
                     'omega': True,
                     'k_F': False,
                     'k_tau': False,
                     'F_load': False,
                     'tau_load': False,
                     'u_F': True,
                     'u_tau': True,
                    }

measurement_noise_stds = {
                          's': 0.1,
                          'v_rc': 0.2,
                          'v_term': 0.2,
                          'P_mech': 0.2,
                          'z': 0.2,
                          'z_dot': 0.2,
                          'omega': 0.2,
                          'k_F': 0.1,
                          'k_tau': 0.1,
                          'F_load': 0.1,
                          'tau_load': 0.1,
                          'u_F': 0.01,
                          'u_tau': 0.01,
                         }

# Get measurement names:
measurement_names = []
for key in measurement_flags.keys():
  if measurement_flags[key]:
    measurement_names.append(key)

print(measurement_names)

In [None]:
def h(x, u):
    flags = measurement_flags

    # Unpack controls
    du_F = u[0]
    du_tau = u[1]

    # Measurements
    y_vec = []
    i = 0
    for key in flags.keys():
        if flags[key]:
            y_vec.append(x[i])
        i += 1

    # Return measurement
    return y_vec

# Simulate with pybounds / MPC

In [None]:
import pybounds

In [None]:
state_names = ['s', 'v_rc', 'v_term', 'P_mech', 'z', 'z_dot', 'omega','k_F','k_tau', 'F_load', 'tau_load', 'u_F', 'u_tau', ]
input_names = ['du_F', 'du_tau']
# input_names = ['u_F', 'u_tau']
# measurement_names = ['s', 'v_rc', 'v_term', 'P_mech', 'z', 'omega', 'k_F', 'k_tau', 'F_load', 'tau_load']

In [None]:
dt = 0.1  # [s]

In [None]:
simulator = pybounds.Simulator(battery_drill_dynamics, h, dt=dt, state_names=state_names,
                               input_names=input_names, measurement_names=measurement_names, mpc_horizon=10)


In [None]:
# Desired linear/angular speeds
speed_coeff_F = 1.0 #m/s
speed_coeff_omega = 10.0 #rad/s

In [None]:
# First define the set-point(s) to follow
end_t = 30.0
tsim = np.arange(0, end_t, step=dt)
NA = np.zeros_like(tsim)

traj_type = 'sinusoidal'

if traj_type == 'constant':
  z_dot_traj = speed_coeff_F*np.ones_like(tsim)
  omega_traj = speed_coeff_omega*np.ones_like(tsim)
elif traj_type == 'sinusoidal':
  # z_dot_traj = (speed_coeff_F/2.0*np.sin(2.0*np.pi*tsim - np.pi / 2.0) + speed_coeff_F/2.0) + 0.1
  # omega_traj = (speed_coeff_omega/2.0*np.sin(2.0*np.pi*tsim - np.pi / 2.0) + speed_coeff_omega/2.0) + 0.1
  z_dot_traj = speed_coeff_F*(np.sin(tsim)+1.0)
  omega_traj = speed_coeff_omega*(np.sin(tsim)+1.0)
elif traj_type == 'const_increasing':
  z_dot_traj = speed_coeff_F*tsim*np.ones_like(tsim)
  omega_traj = speed_coeff_omega*tsim*np.ones_like(tsim)

setpoint = {'s': NA,
            'v_rc': NA,
            'v_term': NA,
            'P_mech': NA,
            'z': NA,
            'z_dot': z_dot_traj,
            'omega': omega_traj,
            'k_F': params['k_F'] * np.ones_like(tsim),
            'k_tau': params['k_tau'] * np.ones_like(tsim),
            'F_load': params['F_load'] * np.ones_like(tsim),
            'tau_load': params['tau_load'] * np.ones_like(tsim),
            'u_F': NA,
            'u_tau': NA,
           }

In [None]:
# Update the simulator set-point
simulator.update_dict(setpoint, name='setpoint')

In [None]:
# Define MPC cost function: penalize the squared error between the setpoint for g and the true g
cost_z = (simulator.model.x['z_dot'] - simulator.model.tvp['z_dot_set']) ** 2
cost_omega = (simulator.model.x['omega'] - simulator.model.tvp['omega_set']) ** 2
# cost_kF = (simulator.model.x['k_F'] - simulator.model.tvp['k_F_set']) ** 2
# cost_ktau = (simulator.model.x['k_tau'] - simulator.model.tvp['k_tau_set']) ** 2
cost = cost_z + cost_omega #+ cost_k

In [None]:
# Set cost function
simulator.mpc.set_objective(mterm=cost, lterm=cost)  # objective function

# Set input penalty: make this small for accurate state tracking
# simulator.mpc.set_rterm(j1=1e-4, j2=1e-4)

In [None]:
# State bounds
simulator.mpc.bounds['lower', '_x', 's'] = 1e-3
simulator.mpc.bounds['upper', '_x', 's'] = 1.0
simulator.mpc.bounds['lower', '_x', 'v_term'] = params['V_min']
simulator.mpc.bounds['upper', '_x', 'v_term'] = params['V_max']
simulator.mpc.bounds['lower', '_x', 'P_mech'] = 1e-3
simulator.mpc.bounds['upper', '_x', 'P_mech'] = 1e6
simulator.mpc.bounds['lower', '_x', 'z'] = 1e-3
simulator.mpc.bounds['lower', '_x', 'z_dot'] = 0.0 # Modified lower bound
simulator.mpc.bounds['upper', '_x', 'z_dot'] = 1e6
simulator.mpc.bounds['lower', '_x', 'omega'] = 0.0 # Modified lower bound
simulator.mpc.bounds['upper', '_x', 'omega'] = 1e6

# # Input bounds
# simulator.mpc.bounds['lower', '_x', 'u_F'] = -100.0
# simulator.mpc.bounds['upper', '_x', 'u_F'] = 100.0
# simulator.mpc.bounds['lower', '_x', 'u_tau'] = -100.0
# simulator.mpc.bounds['upper', '_x', 'u_tau'] = 100.0

# # Bounds on the rate of change of control inputs
# simulator.mpc.bounds['lower', '_u', 'du_F'] = -100.0 # Adjusted bounds
# simulator.mpc.bounds['upper', '_u', 'du_F'] = 100.0 # Adjusted bounds
# simulator.mpc.bounds['lower', '_u', 'du_tau'] = -100.0 # Adjusted bounds
# simulator.mpc.bounds['upper', '_u', 'du_tau'] = 100.0 # Adjusted bounds

# simulator.mpc.set_rterm(u_F=1e-2, u_tau=10.0, du_F=100.0, du_tau=100.0)
# simulator.mpc.set_rterm(du_F=1e-4, du_tau=1e-4)
simulator.mpc.set_rterm(du_F=1e-4, du_tau=1.0)

In [None]:
# initial condition
# Calculate initial terminal voltage using the quadratic solver for consistency
s0 = 0.99
v0_rc = 0.0
P0_mech = 0.0  # Assuming no initial mechanical load
v0_term = terminal_voltage_quadratic(s0, v0_rc, P0_mech, params)
z0_dot = speed_coeff_F
omega0 = speed_coeff_omega

# if traj_type == 'sinusoidal':
#   uF_0 = (params["k_F"] * speed_coeff_F*np.cos(0.0) - params["F_load"]) / params["m_d"]
#   utau_0 = (params["k_tau"] * speed_coeff_omega*np.cos(0.0) - params["tau_load"]) / params["i_zz"]

uF_0 = 0.0
utau_0 = 0.0


x0 = np.array([s0, v0_rc, v0_term, P0_mech, 0.0, z0_dot, omega0, params["k_F"], params["k_tau"], params["F_load"], params["tau_load"], uF_0, utau_0])

In [None]:
# Run simulation using MPC
t_sim, x_sim, u_sim, y_sim = simulator.simulate(x0=x0, u=None, mpc=True, return_full_output=True)

# Plot Controls and Trajectory

In [None]:
plt.plot(t_sim, u_sim['du_F'], label='du_F', color='red')
plt.plot(t_sim, u_sim['du_tau'], label='du_tau', color='yellow')
plt.title('Controls')
simulator.plot('setpoint')
plt.show()

In [None]:
u_sim_df = pd.DataFrame(u_sim)
x_sim_df = pd.DataFrame(x_sim)

# Observability analysis

In [None]:
w = 12  # window size, set to None to use entire time-series as one window

### This is the computationally heavy step

Here we calculate the observability matrix for each sliding window, all the states, and measurements. Later we can subselect from this to choose specific time steps, measurements, states, etc. to consider.

In [None]:
# Construct O in sliding windows
SEOM = pybounds.SlidingEmpiricalObservabilityMatrix(simulator, t_sim, x_sim, u_sim, w=w, eps=1e-4)

# Get O's
O_sliding = SEOM.get_observability_matrix()

In [None]:
n_window = len(O_sliding)
print(n_window, 'windows')
O_sliding[0]

In [None]:
max_rank = 0
for i, O_matrix in enumerate(O_sliding):
  rank = np.linalg.matrix_rank(O_matrix)
  if rank > max_rank:
    max_rank = rank
  # print(f"Rank of O_sliding[{i}]: {rank}")

print(f"Maximum rank of O_sliding: {max_rank}")

### Convert a single observability matrix into fisher information

$F = \mathcal{O}_w^T R_w^{-1} \mathcal{O}_w$

In [None]:
# measurement_noise_vars = {key: val**2 for key, val in measurement_noise_stds.items()}
measurement_noise_vars = {}
for name in measurement_names:
  measurement_noise_vars[name] = measurement_noise_stds[name]**2.0

print(measurement_noise_vars.keys())

# Compute the Fisher information & Chernoff inverse
FO = pybounds.FisherObservability(SEOM.O_df_sliding[0], measurement_noise_vars, lam=1e-8)

# Get the Fisher information, inverse, and R matrix
F, F_inv, R = FO.get_fisher_information()
F_inv

### Efficiently repeat the above process for a specific set of sensors, states, and time points

In [None]:
# from os import O_SHORT_LIVED
# Choose sensors to use from O -- you can select a subset from the available measurements
o_sensors = measurement_names

# Chose states to use from O
# o_states = ['s', 'v_rc', 'v_term', 'P_mech', 'z', 'z_dot', 'omega', 'k_F', 'k_tau', 'F_load', 'tau_load', 'u_F', 'u_tau']
o_states = ['s', 'v_rc', 'v_term', 'P_mech', 'z', 'z_dot', 'omega']

# Choose time-steps to use from O
window_size = 6 # this cannot be larger than what was defined above
o_time_steps = np.arange(0, window_size, step=1)

# Redefine R -- if you remove a sensor you need to change R
o_measurement_noise_vars = {key: measurement_noise_vars[key] for key in o_sensors}

print(measurement_names)
print(o_measurement_noise_vars)

In [None]:
# Compute the Fisher information & inverse for each window and store the minimum error variance
SFO = pybounds.SlidingFisherObservability(SEOM.O_df_sliding, time=SEOM.t_sim, lam=1e-8, R=o_measurement_noise_vars,
                                 states=o_states, sensors=o_sensors, time_steps=o_time_steps, w=None)

# If you want to manually inspect one of the fisher info matrices:
# SFO.FO[1].O

In [None]:
# Pull out minimum error variance, 'time' column is the time vector shifted forward by w/2 and 'time_initial' is the original time
EV_aligned = SFO.get_minimum_error_variance()

In [None]:
EV_no_nan = EV_aligned.fillna(method='bfill').fillna(method='ffill')
# EV_no_nan = EV_aligned.fillna(0)

# Plot observability over time

In [None]:
states = list(SFO.FO[0].O.columns)
n_state = len(states)

fig, ax = plt.subplots(n_state, 2, figsize=(6, n_state*2), dpi=150, sharex=True)
ax = np.atleast_2d(ax)

cmap = 'inferno_r'

min_ev = np.min(EV_no_nan.iloc[:, 2:].values)
max_ev = np.max(EV_no_nan.iloc[:, 2:].values)

log_tick_high = int(np.ceil(np.log10(max_ev)))
log_tick_low = int(np.floor(np.log10(min_ev)))
cnorm = mpl.colors.LogNorm(10**log_tick_low, 10**log_tick_high)

for n, state_name in enumerate(states):
    pybounds.colorline(t_sim, x_sim[state_name], EV_no_nan[state_name].values, ax=ax[n, 0], cmap=cmap, norm=cnorm)
    pybounds.colorline(t_sim, EV_no_nan[state_name].values, EV_no_nan[state_name].values, ax=ax[n, 1], cmap=cmap, norm=cnorm)

    # Colorbar
    cax = ax[n, -1].inset_axes([1.03, 0.0, 0.04, 1.0])
    cbar = fig.colorbar(mpl.cm.ScalarMappable(norm=cnorm, cmap=cmap), cax=cax,
                        ticks=np.logspace(log_tick_low, log_tick_high, log_tick_high-log_tick_low + 1))
    cbar.set_label('min. EV: ' + state_name, rotation=270, fontsize=7, labelpad=8)
    cbar.ax.tick_params(labelsize=6)

    x_max = np.max(x_sim[state_name])
    x_min = np.min(x_sim[state_name])

    ax[n, 0].set_ylim(x_min - 0.1, x_max + 0.1)
    ax[n, 0].set_ylabel('state: ' + state_name, fontsize=7)

    ax[n, 1].set_ylim(10**log_tick_low, 10**log_tick_high)
    ax[n, 1].set_yscale('log')
    ax[n, 1].set_ylabel('min. EV: ' + state_name, fontsize=7)
    ax[n, 1].set_yticks(np.logspace(log_tick_low, log_tick_high, log_tick_high-log_tick_low + 1))

for a in ax.flat:
    a.tick_params(axis='both', labelsize=6)
    a.set_xlabel('time (s)', fontsize=7)
    offset = t_sim[-1] * 0.05
    a.set_xlim(-offset, t_sim[-1] + offset)

fig.subplots_adjust(left=None, bottom=None, right=None, top=None, wspace=0.3, hspace=0.4)

# Plot controls

In [None]:
plt.plot(t_sim, u_sim['du_F'], label='du_F', color='red')
plt.plot(t_sim, u_sim['du_tau'], label='du_tau', color='yellow')
plt.title('Controls')
# simulator.plot('setpoint')
plt.legend()
plt.show()