In [1]:
%reset -f
import numpy as np
import sympy
from sympy import Matrix, pprint, symbols
import plotly.graph_objects as go
import math
import random

sympy.init_printing()

def make_global_symbols(name_string):
    syms = sympy.symbols(name_string)
    names = name_string.replace(',', ' ').split()
    if len(syms) != len(names):
        raise ValueError(f'Could not parse the name list string: {len(syms)} symbols != {len(names)} names')
    for sn, sv in zip(names, syms):
        globals()[sn.replace('\\',"")] = sv

In [2]:
make_global_symbols("dt U_in U_c Udot_in Udot_c t R C")

# State vector.
x = Matrix([
    U_c,
    Udot_c,
])

# Time prediction.
f = Matrix([
    U_c + Udot_c*dt,
    Udot_c,
])
F = f.jacobian(x)
pprint(f)
pprint(F)

# Capacitor voltage measurement prediction.
h_U_c = Matrix([U_c])
H_U_c = h_U_c.jacobian(x)
pprint(H_U_c)

# Full time prediction step
p00, p01, p11 = symbols("p00 p01 p11")
P = Matrix([[p00, p01],
            [p01, p11]])
Q = sympy.diag(*symbols("qt00 qt11")) * dt
pprint(sympy.simplify(F @ P @ F.T + Q))  # P time-predicted (TODO transpose-average)

# Kalman gain for capacitor voltage measurement (for reference).
R = Matrix([symbols("r_U_c")])
K = P * H_U_c.transpose() * (H_U_c * P * H_U_c.transpose() + R).inv()
pprint(x + K @ (sympy.Matrix([symbols("y")]) - h_U_c))  # x measurement corrected
pprint((sympy.eye(2) - K @ H_U_c) @ P)  # P measurement corrected

⎡U_c + U̇_c⋅dt⎤
⎢            ⎥
⎣    U̇_c     ⎦
⎡1  dt⎤
⎢     ⎥
⎣0  1 ⎦
[1  0]
⎡dt⋅p₀₁ + dt⋅qt₀₀ + dt⋅(dt⋅p₁₁ + p₀₁) + p₀₀  dt⋅p₁₁ + p₀₁ ⎤
⎢                                                         ⎥
⎣               dt⋅p₁₁ + p₀₁                 dt⋅qt₁₁ + p₁₁⎦
⎡      p₀₀⋅(-U_c + y)⎤
⎢U_c + ──────────────⎥
⎢       p₀₀ + r_U_c  ⎥
⎢                    ⎥
⎢      p₀₁⋅(-U_c + y)⎥
⎢U̇_c + ──────────────⎥
⎣       p₀₀ + r_U_c  ⎦
⎡    ⎛      p₀₀        ⎞      ⎛      p₀₀        ⎞⎤
⎢p₀₀⋅⎜- ─────────── + 1⎟  p₀₁⋅⎜- ─────────── + 1⎟⎥
⎢    ⎝  p₀₀ + r_U_c    ⎠      ⎝  p₀₀ + r_U_c    ⎠⎥
⎢                                                ⎥
⎢                                    2           ⎥
⎢      p₀₀⋅p₀₁                    p₀₁            ⎥
⎢  - ─────────── + p₀₁      - ─────────── + p₁₁  ⎥
⎣    p₀₀ + r_U_c              p₀₀ + r_U_c        ⎦


In [3]:
class Sim:
    """
    Performs transient (time-domain) modeling of the following circuit:

    u_in <--+----------[C_in]---------| GND
            |
            +---[R_hi]---+---[R_lo]---| GND
                         |
     u_c <---------------+---[C_c]----| GND

    Where R_hi,R_lo,C_c form a conventional resistor divider coupled with an RC filter
    for measuring the voltage at C_in.
    Input current i_in is supplied to charge C_in (the current is not shown in the diagram),
    which causes u_in to rise.

    The initial condition is that both capacitors are discharged and the current is zero.

    In Zubax FluxGrip, C_in represents the large film capacitor, and C_c is the ADC input capacitor.
    For related considerations see https://forum.zubax.com/t/fluxgrip-flyback-converter-tuning/2196/17

    R_lo slightly complicates the analysis but it is easy to eliminate it by multiplying u_c by the gain
    of the resistor divider; in that case, the equivalent circuit is an ordinary RC-filter formed by
    R_hi and C_c:

    u_in <--+----------[C_in]---------| GND
            |
            +---[R_hi]---+----[C_c]---| GND
                         |
                        u_c' = u_c * (R_lo+R_hi)/R_lo
    """
    def __init__(self):
        self.C_in = 5e-6
        self.C_c = 1e-9
        self.R_hi = 4.7e6*5
        self.R_lo = 110e3
        self.q_in = 0.0
        self.q_c = 0.0

    def step(self, dt: float, i_in: float) -> "Sim":
        self.q_in += i_in * dt
        u_div = self.u_in * self.R_lo/(self.R_lo+self.R_hi)
        i_c = (u_div - self.u_c) / self.R_hi
        self.q_c += i_c * dt
        return self

    @property
    def u_in(self) -> float:
        return self.q_in / self.C_in

    @property
    def u_c(self) -> float:
        return self.q_c / self.C_c

    def copy(self) -> "Sim":
        from copy import deepcopy
        return deepcopy(self)


class Filter:
    def __init__(
        self,
        initial_u: float = 0,
        initial_du_dt: float = 0,
        u_propagation_uncertainty: float = 1000,  # [V^2]
        du_dt_propagation_uncertainty: float = 1e9,  # [(V/s)^2]
        initial_u_error_variance: float = 1e9,
        initial_du_dt_error_variance: float = 1e9,
    ):
        """
        The main configurable is u_propagation_uncertainty; the others should normally be kept at 1e9.
        Higher values enable faster response but increase noise.
        A well-predicted initial du/dt goes a long way to improving the accuracy at startup;
        however, if the duty cycle ramp is used, it may not be as critical.
        """
        self._x = np.array([
            [initial_u],
            [initial_du_dt],
        ])
        self._P = np.array(
            [
                [initial_u_error_variance, 0.0],
                [0.0, initial_du_dt_error_variance],
            ]
        )
        self._Q_t_diag = np.array(
            [u_propagation_uncertainty, du_dt_propagation_uncertainty]
        )

    @property
    def u(self) -> float:
        return float(self._x[0,0])

    @property
    def du_dt(self) -> float:
        return float(self._x[1,0])

    def update_time(self, dt: float) -> None:
        dt = float(dt)
        if not dt > 0 or not np.isfinite(dt):
            raise ValueError(f"{dt=}")
        # New state vector.
        u, du_dt = self._x.flatten()
        x = np.array(
            [
                [u + dt*du_dt],
                [du_dt],
            ]
        )
        # State transition matrix F.
        F = np.array(
            [
                [1.0, dt],
                [0.0, 1.0],
            ]
        )
        # Propagation uncertainty matrix Q.
        Q = np.diag(self._Q_t_diag) * dt
        assert Q.shape == (2, 2)
        # New state uncertainty matrix P.
        P = F @ self._P @ F.T + Q
        # Update the state.
        self._x = x
        self._P = (P + P.T) * 0.5

    def update_measurement(self, measurement: float, uncertainty: float) -> None:
        measurement, uncertainty = float(measurement), float(uncertainty)
        if not np.isfinite(measurement) or not np.isfinite(uncertainty) or not uncertainty > 0:
            raise ValueError(f"{measurement=}, {uncertainty=}")
        R = np.array([[float(uncertainty)]])
        # Measurement prediction.
        h = self._x.flatten()[0]
        # Observation matrix H.
        H = np.array([[1.0, 0.0]])
        # The Kalman gain.
        S = H @ self._P @ H.T + R
        K = self._P @ H.T @ np.linalg.inv(S)
        # Measurement residual.
        residual = np.array([[measurement - h]])
        # New state vector.
        x = self._x + K @ residual
        # New state uncertainty matrix P.
        P = (np.eye(2) - K @ H) @ self._P
        # Update the state.
        self._x = x
        self._P = (P + P.T) * 0.5

    def copy(self) -> "Sim":
        from copy import deepcopy
        return deepcopy(self)

In [4]:
P_in = 2.5
i_in_lim = 1.0
sims = [(0.0, Sim())]
dt = 10e-6
# Charge at constant power.
for _ in range(50_000):
    _, s = sims[-1]
    s = s.copy()
    t = dt*len(sims)
    i_in = min(P_in / max(s.u_in, 1e-12), i_in_lim)
    s.step(dt, i_in)
    sims.append((t, s))
# Stop charging and wait for the circuit to stabilize.
for _ in range(5_000):
    _, s = sims[-1]
    s = s.copy()
    t = dt*len(sims)
    s.step(dt, 0.0)
    sims.append((t, s))

flts = []
flt = Filter()
R_lo, R_hi = 110e3, 4.7e6*5
gain = ((R_lo+R_hi)/R_lo)
prev_t = sims[0][0]
for t, s in sims[::100]:
    dt = t - prev_t
    prev_t = t
    if dt > 1e-9:
        flt.update_time(dt)
    # u_meas represents what we get from the CapVoltageProbe:
    # the ADC input voltage multiplied by the gain of the resistor divider.
    # The pre-multiplication by the gain allows us to eliminate R_lo from further analysis,
    # treating the circuit as if C_c is charged to the maximum voltage, as if there was no
    # resistor divider at all.
    u_meas = gain * (s.u_c + (random.random()-0.5)*1e-3)
    flt.update_measurement(u_meas, 1e-6)
    flts.append((t, flt.copy()))

In [5]:
def predict_output(u, du_dt, R_hi, C):
    return u + R_hi*C*du_dt

predicted = [predict_output(f.u, f.du_dt, 4.7e6*5, 1e-9) for _, f in flts]

fig = go.Figure()
fig.update_layout(
    height=400,
    hovermode='x',
    yaxis2=dict(title='u_c [V]', overlaying='y', side='right'),
)
fig.add_trace(go.Scatter(x=[t for t,_ in sims], y=[s.u_in for t, s in sims], name="u_in"))
fig.add_trace(go.Scatter(x=[t for t,_ in flts], y=predicted, name="u_in_est"))
fig.add_trace(go.Scatter(x=[t for t,_ in sims], y=[s.u_c for t, s in sims], name="u_c", yaxis='y2'))
fig.add_trace(go.Scatter(x=[t for t,_ in flts], y=[f.u for t, f in flts], name="u_c_est", yaxis='y2'))
fig.add_trace(go.Scatter(x=[t for t,_ in flts], y=[f.du_dt for t, f in flts], name="du_dt", yaxis='y2'))